diff --git a/rpc/chain/client.go b/rpc/chain/client.go index 8e10347ae..7023e7906 100644 --- a/rpc/chain/client.go +++ b/rpc/chain/client.go @@ -1006,7 +1006,7 @@ func (c *ClientWithFallback) SetWalletNotifier(notifier func(chainId uint64, mes func (c *ClientWithFallback) toggleConnectionState(err error) { connected := true if err != nil { - if !isVMError(err) && !errors.Is(ErrRequestsOverLimit, err) { + if !isVMError(err) && !errors.Is(err, ErrRequestsOverLimit) { connected = false } } diff --git a/rpc/chain/rpc_limiter.go b/rpc/chain/rpc_limiter.go index 369e45a72..5e9469c68 100644 --- a/rpc/chain/rpc_limiter.go +++ b/rpc/chain/rpc_limiter.go @@ -2,6 +2,7 @@ package chain import ( "database/sql" + "errors" "fmt" "sync" "time" @@ -21,7 +22,7 @@ const ( ) var ( - ErrRequestsOverLimit = fmt.Errorf("number of requests over limit") + ErrRequestsOverLimit = errors.New("number of requests over limit") ) type callerOnWait struct { @@ -32,6 +33,7 @@ type callerOnWait struct { type LimitsStorage interface { Get(tag string) (*LimitData, error) Set(data *LimitData) error + Delete(tag string) error } type InMemRequestsMapStorage struct { @@ -60,6 +62,11 @@ func (s *InMemRequestsMapStorage) Set(data *LimitData) error { return nil } +func (s *InMemRequestsMapStorage) Delete(tag string) error { + s.data.Delete(tag) + return nil +} + type LimitsDBStorage struct { db *RPCLimiterDB } @@ -91,6 +98,10 @@ func (s *LimitsDBStorage) Set(data *LimitData) error { return s.db.UpdateRPCLimit(*data) } +func (s *LimitsDBStorage) Delete(tag string) error { + return s.db.DeleteRPCLimit(tag) +} + type LimitData struct { Tag string CreatedAt time.Time @@ -102,6 +113,7 @@ type LimitData struct { type RequestLimiter interface { SetLimit(tag string, maxRequests int, interval time.Duration) error GetLimit(tag string) (*LimitData, error) + DeleteLimit(tag string) error Allow(tag string) (bool, error) } @@ -135,6 +147,16 @@ func (rl *RPCRequestLimiter) GetLimit(tag string) (*LimitData, error) { return data, nil } +func (rl *RPCRequestLimiter) DeleteLimit(tag string) error { + err := rl.storage.Delete(tag) + if err != nil { + log.Error("Failed to delete request data from storage", "error", err) + return err + } + + return nil +} + func (rl *RPCRequestLimiter) saveToStorage(tag string, maxRequests int, interval time.Duration, numReqs int, timestamp time.Time) error { data := &LimitData{ Tag: tag, @@ -166,13 +188,9 @@ func (rl *RPCRequestLimiter) Allow(tag string) (bool, error) { return true, nil } - // Check if a number of requests is over the limit within the interval - if time.Since(data.CreatedAt) < data.Period || data.Period.Milliseconds() == LimitInfinitely { - if data.NumReqs >= data.MaxReqs { - return false, nil - } - - err := rl.saveToStorage(tag, data.MaxReqs, data.Period, data.NumReqs+1, data.CreatedAt) + // Check if the interval has passed and reset the number of requests + if time.Since(data.CreatedAt) >= data.Period && data.Period.Milliseconds() != LimitInfinitely { + err = rl.saveToStorage(tag, data.MaxReqs, data.Period, 0, time.Now()) if err != nil { return true, err } @@ -180,13 +198,18 @@ func (rl *RPCRequestLimiter) Allow(tag string) (bool, error) { return true, nil } - // Reset the number of requests if the interval has passed - err = rl.saveToStorage(tag, data.MaxReqs, data.Period, 0, time.Now()) - if err != nil { - return true, err // still allow if failed to save + // Check if a number of requests is over the limit within the interval + if time.Since(data.CreatedAt) < data.Period || data.Period.Milliseconds() == LimitInfinitely { + if data.NumReqs >= data.MaxReqs { + log.Info("Number of requests over limit", "tag", tag, "numReqs", data.NumReqs, "maxReqs", data.MaxReqs, "period", data.Period, "createdAt", data.CreatedAt) + return false, ErrRequestsOverLimit + } + + return true, rl.saveToStorage(tag, data.MaxReqs, data.Period, data.NumReqs+1, data.CreatedAt) } - return true, nil + // Reset the number of requests if the interval has passed + return true, rl.saveToStorage(tag, data.MaxReqs, data.Period, 0, time.Now()) // still allow the request if failed to save as not critical } type RPCRpsLimiter struct { diff --git a/rpc/chain/rpc_limiter_test.go b/rpc/chain/rpc_limiter_test.go index 142ca9a29..57c759d57 100644 --- a/rpc/chain/rpc_limiter_test.go +++ b/rpc/chain/rpc_limiter_test.go @@ -55,6 +55,33 @@ func TestGetLimit(t *testing.T) { require.Equal(t, data, ret) } +func TestDeleteLimit(t *testing.T) { + storage, rl := setupTest() + + // Define test inputs + tag := "testTag" + data := &LimitData{ + Tag: tag, + Period: time.Second, + MaxReqs: 10, + NumReqs: 1, + } + err := storage.Set(data) + require.NoError(t, err) + + // Call the DeleteLimit method + err = rl.DeleteLimit(tag) + require.NoError(t, err) + + // Verify that the data was deleted from storage + limit, _ := storage.Get(tag) + require.Nil(t, limit) + + // Test double delete + err = rl.DeleteLimit(tag) + require.NoError(t, err) +} + func TestAllowWithinPeriod(t *testing.T) { storage, rl := setupTest() @@ -84,7 +111,7 @@ func TestAllowWithinPeriod(t *testing.T) { // Call the Allow method again allow, err := rl.Allow(tag) - require.NoError(t, err) + require.ErrorIs(t, err, ErrRequestsOverLimit) require.False(t, allow) } @@ -135,7 +162,7 @@ func TestAllowRestrictInfinitelyWhenLimitReached(t *testing.T) { // Call the Allow method allow, err := rl.Allow(tag) - require.NoError(t, err) + require.ErrorIs(t, err, ErrRequestsOverLimit) // Verify the result require.False(t, allow) diff --git a/services/wallet/transfer/commands_sequential.go b/services/wallet/transfer/commands_sequential.go index abbae60a4..96e7ddc21 100644 --- a/services/wallet/transfer/commands_sequential.go +++ b/services/wallet/transfer/commands_sequential.go @@ -1119,47 +1119,34 @@ func (c *loadBlocksAndTransfersCommand) fetchHistoryBlocksForAccount(group *asyn ranges = append(ranges, []*big.Int{fromNum, toNum}) } - for _, rangeItem := range ranges { - log.Debug("range item", "r", rangeItem, "n", c.chainClient.NetworkID(), "a", account) - - // Each account has its own limit and a global limit for all accounts - accountTag := transferHistoryTag + "_" + account.String() - chainClient := chain.ClientWithTag(c.chainClient, accountTag, transferHistoryTag) + if len(ranges) > 0 { storage := chain.NewLimitsDBStorage(c.db.client) limiter := chain.NewRequestLimiter(storage) - - // Check if limit is already reached, then skip the comamnd - if allow, _ := limiter.Allow(accountTag); !allow { - log.Debug("fetchHistoryBlocksForAccount limit reached", "account", account, "chain", c.chainClient.NetworkID()) - continue + chainClient, _ := createChainClientWithLimiter(c.chainClient, account, limiter) + if chainClient == nil { + chainClient = c.chainClient } - err := limiter.SetLimit(accountTag, transferHistoryLimitPerAccount, chain.LimitInfinitely) - if err != nil { - log.Error("fetchHistoryBlocksForAccount SetLimit", "error", err, "accountTag", accountTag) - } - err = limiter.SetLimit(transferHistoryTag, transferHistoryLimit, transferHistoryLimitPeriod) - if err != nil { - log.Error("fetchHistoryBlocksForAccount SetLimit", "error", err, "groupTag", transferHistoryTag) - } - chainClient.SetLimiter(limiter) + for _, rangeItem := range ranges { + log.Debug("range item", "r", rangeItem, "n", c.chainClient.NetworkID(), "a", account) - fbc := &findBlocksCommand{ - accounts: []common.Address{account}, - db: c.db, - accountsDB: c.accountsDB, - blockRangeDAO: c.blockRangeDAO, - chainClient: chainClient, - balanceCacher: c.balanceCacher, - feed: c.feed, - noLimit: false, - fromBlockNumber: rangeItem[0], - toBlockNumber: rangeItem[1], - tokenManager: c.tokenManager, - blocksLoadedCh: blocksLoadedCh, - defaultNodeBlockChunkSize: DefaultNodeBlockChunkSize, + fbc := &findBlocksCommand{ + accounts: []common.Address{account}, + db: c.db, + accountsDB: c.accountsDB, + blockRangeDAO: c.blockRangeDAO, + chainClient: chainClient, + balanceCacher: c.balanceCacher, + feed: c.feed, + noLimit: false, + fromBlockNumber: rangeItem[0], + toBlockNumber: rangeItem[1], + tokenManager: c.tokenManager, + blocksLoadedCh: blocksLoadedCh, + defaultNodeBlockChunkSize: DefaultNodeBlockChunkSize, + } + group.Add(fbc.Command()) } - group.Add(fbc.Command()) } return nil @@ -1318,3 +1305,42 @@ func nextRange(maxRangeSize int, prevFrom, zeroBlockNumber *big.Int) (*big.Int, return from, to } + +func accountLimiterTag(account common.Address) string { + return transferHistoryTag + "_" + account.String() +} + +func createChainClientWithLimiter(client chain.ClientInterface, account common.Address, limiter chain.RequestLimiter) (chain.ClientInterface, error) { + // Each account has its own limit and a global limit for all accounts + accountTag := accountLimiterTag(account) + chainClient := chain.ClientWithTag(client, accountTag, transferHistoryTag) + + // Check if limit is already reached, then skip the comamnd + if allow, err := limiter.Allow(accountTag); !allow { + log.Info("fetchHistoryBlocksForAccount limit reached", "account", account, "chain", chainClient.NetworkID(), "error", err) + return nil, err + } + + if allow, err := limiter.Allow(transferHistoryTag); !allow { + log.Info("fetchHistoryBlocksForAccount common limit reached", "chain", chainClient.NetworkID(), "error", err) + return nil, err + } + + limit, _ := limiter.GetLimit(accountTag) + if limit == nil { + err := limiter.SetLimit(accountTag, transferHistoryLimitPerAccount, chain.LimitInfinitely) + if err != nil { + log.Error("fetchHistoryBlocksForAccount SetLimit", "error", err, "accountTag", accountTag) + } + } + + // Here total limit per day is overwriten on each app start, that still saves us RPC calls, but allows to proceed + // after app restart if the limit was reached. Currently there is no way to reset the limit from UI + err := limiter.SetLimit(transferHistoryTag, transferHistoryLimit, transferHistoryLimitPeriod) + if err != nil { + log.Error("fetchHistoryBlocksForAccount SetLimit", "error", err, "groupTag", transferHistoryTag) + } + chainClient.SetLimiter(limiter) + + return chainClient, nil +} diff --git a/services/wallet/transfer/controller.go b/services/wallet/transfer/controller.go index 5c68a45d7..3fed14ac5 100644 --- a/services/wallet/transfer/controller.go +++ b/services/wallet/transfer/controller.go @@ -15,6 +15,7 @@ import ( statusaccounts "github.com/status-im/status-go/multiaccounts/accounts" "github.com/status-im/status-go/multiaccounts/settings" "github.com/status-im/status-go/rpc" + "github.com/status-im/status-go/rpc/chain" "github.com/status-im/status-go/services/accounts/accountsevent" "github.com/status-im/status-go/services/wallet/balance" "github.com/status-im/status-go/services/wallet/blockchainstate" @@ -273,6 +274,12 @@ func (c *Controller) cleanUpRemovedAccount(address common.Address) { if err != nil { log.Error("Failed to delete multitransactions", "error", err) } + + rpcLimitsStorage := chain.NewLimitsDBStorage(c.db.client) + err = rpcLimitsStorage.Delete(accountLimiterTag(address)) + if err != nil { + log.Error("Failed to delete limits", "error", err) + } } func (c *Controller) cleanupAccountsLeftovers() error {