fix(wallet)_: fix rpc limiter to reset counters on timeout

fix rpc limiter to delete limits on account removal
fix rpc limiter to not overwrite existing account limit on startup
fix providers down banner on limit reached error
This commit is contained in:
Ivan Belyakov 2024-06-19 07:20:32 +02:00 committed by IvanBelyakoff
parent 0995802428
commit 43c9860491
5 changed files with 134 additions and 51 deletions

View File

@ -1006,7 +1006,7 @@ func (c *ClientWithFallback) SetWalletNotifier(notifier func(chainId uint64, mes
func (c *ClientWithFallback) toggleConnectionState(err error) { func (c *ClientWithFallback) toggleConnectionState(err error) {
connected := true connected := true
if err != nil { if err != nil {
if !isVMError(err) && !errors.Is(ErrRequestsOverLimit, err) { if !isVMError(err) && !errors.Is(err, ErrRequestsOverLimit) {
connected = false connected = false
} }
} }

View File

@ -2,6 +2,7 @@ package chain
import ( import (
"database/sql" "database/sql"
"errors"
"fmt" "fmt"
"sync" "sync"
"time" "time"
@ -21,7 +22,7 @@ const (
) )
var ( var (
ErrRequestsOverLimit = fmt.Errorf("number of requests over limit") ErrRequestsOverLimit = errors.New("number of requests over limit")
) )
type callerOnWait struct { type callerOnWait struct {
@ -32,6 +33,7 @@ type callerOnWait struct {
type LimitsStorage interface { type LimitsStorage interface {
Get(tag string) (*LimitData, error) Get(tag string) (*LimitData, error)
Set(data *LimitData) error Set(data *LimitData) error
Delete(tag string) error
} }
type InMemRequestsMapStorage struct { type InMemRequestsMapStorage struct {
@ -60,6 +62,11 @@ func (s *InMemRequestsMapStorage) Set(data *LimitData) error {
return nil return nil
} }
func (s *InMemRequestsMapStorage) Delete(tag string) error {
s.data.Delete(tag)
return nil
}
type LimitsDBStorage struct { type LimitsDBStorage struct {
db *RPCLimiterDB db *RPCLimiterDB
} }
@ -91,6 +98,10 @@ func (s *LimitsDBStorage) Set(data *LimitData) error {
return s.db.UpdateRPCLimit(*data) return s.db.UpdateRPCLimit(*data)
} }
func (s *LimitsDBStorage) Delete(tag string) error {
return s.db.DeleteRPCLimit(tag)
}
type LimitData struct { type LimitData struct {
Tag string Tag string
CreatedAt time.Time CreatedAt time.Time
@ -102,6 +113,7 @@ type LimitData struct {
type RequestLimiter interface { type RequestLimiter interface {
SetLimit(tag string, maxRequests int, interval time.Duration) error SetLimit(tag string, maxRequests int, interval time.Duration) error
GetLimit(tag string) (*LimitData, error) GetLimit(tag string) (*LimitData, error)
DeleteLimit(tag string) error
Allow(tag string) (bool, error) Allow(tag string) (bool, error)
} }
@ -135,6 +147,16 @@ func (rl *RPCRequestLimiter) GetLimit(tag string) (*LimitData, error) {
return data, nil return data, nil
} }
func (rl *RPCRequestLimiter) DeleteLimit(tag string) error {
err := rl.storage.Delete(tag)
if err != nil {
log.Error("Failed to delete request data from storage", "error", err)
return err
}
return nil
}
func (rl *RPCRequestLimiter) saveToStorage(tag string, maxRequests int, interval time.Duration, numReqs int, timestamp time.Time) error { func (rl *RPCRequestLimiter) saveToStorage(tag string, maxRequests int, interval time.Duration, numReqs int, timestamp time.Time) error {
data := &LimitData{ data := &LimitData{
Tag: tag, Tag: tag,
@ -166,13 +188,9 @@ func (rl *RPCRequestLimiter) Allow(tag string) (bool, error) {
return true, nil return true, nil
} }
// Check if a number of requests is over the limit within the interval // Check if the interval has passed and reset the number of requests
if time.Since(data.CreatedAt) < data.Period || data.Period.Milliseconds() == LimitInfinitely { if time.Since(data.CreatedAt) >= data.Period && data.Period.Milliseconds() != LimitInfinitely {
if data.NumReqs >= data.MaxReqs { err = rl.saveToStorage(tag, data.MaxReqs, data.Period, 0, time.Now())
return false, nil
}
err := rl.saveToStorage(tag, data.MaxReqs, data.Period, data.NumReqs+1, data.CreatedAt)
if err != nil { if err != nil {
return true, err return true, err
} }
@ -180,13 +198,18 @@ func (rl *RPCRequestLimiter) Allow(tag string) (bool, error) {
return true, nil return true, nil
} }
// Reset the number of requests if the interval has passed // Check if a number of requests is over the limit within the interval
err = rl.saveToStorage(tag, data.MaxReqs, data.Period, 0, time.Now()) if time.Since(data.CreatedAt) < data.Period || data.Period.Milliseconds() == LimitInfinitely {
if err != nil { if data.NumReqs >= data.MaxReqs {
return true, err // still allow if failed to save log.Info("Number of requests over limit", "tag", tag, "numReqs", data.NumReqs, "maxReqs", data.MaxReqs, "period", data.Period, "createdAt", data.CreatedAt)
return false, ErrRequestsOverLimit
}
return true, rl.saveToStorage(tag, data.MaxReqs, data.Period, data.NumReqs+1, data.CreatedAt)
} }
return true, nil // Reset the number of requests if the interval has passed
return true, rl.saveToStorage(tag, data.MaxReqs, data.Period, 0, time.Now()) // still allow the request if failed to save as not critical
} }
type RPCRpsLimiter struct { type RPCRpsLimiter struct {

View File

@ -55,6 +55,33 @@ func TestGetLimit(t *testing.T) {
require.Equal(t, data, ret) require.Equal(t, data, ret)
} }
func TestDeleteLimit(t *testing.T) {
storage, rl := setupTest()
// Define test inputs
tag := "testTag"
data := &LimitData{
Tag: tag,
Period: time.Second,
MaxReqs: 10,
NumReqs: 1,
}
err := storage.Set(data)
require.NoError(t, err)
// Call the DeleteLimit method
err = rl.DeleteLimit(tag)
require.NoError(t, err)
// Verify that the data was deleted from storage
limit, _ := storage.Get(tag)
require.Nil(t, limit)
// Test double delete
err = rl.DeleteLimit(tag)
require.NoError(t, err)
}
func TestAllowWithinPeriod(t *testing.T) { func TestAllowWithinPeriod(t *testing.T) {
storage, rl := setupTest() storage, rl := setupTest()
@ -84,7 +111,7 @@ func TestAllowWithinPeriod(t *testing.T) {
// Call the Allow method again // Call the Allow method again
allow, err := rl.Allow(tag) allow, err := rl.Allow(tag)
require.NoError(t, err) require.ErrorIs(t, err, ErrRequestsOverLimit)
require.False(t, allow) require.False(t, allow)
} }
@ -135,7 +162,7 @@ func TestAllowRestrictInfinitelyWhenLimitReached(t *testing.T) {
// Call the Allow method // Call the Allow method
allow, err := rl.Allow(tag) allow, err := rl.Allow(tag)
require.NoError(t, err) require.ErrorIs(t, err, ErrRequestsOverLimit)
// Verify the result // Verify the result
require.False(t, allow) require.False(t, allow)

View File

@ -1119,47 +1119,34 @@ func (c *loadBlocksAndTransfersCommand) fetchHistoryBlocksForAccount(group *asyn
ranges = append(ranges, []*big.Int{fromNum, toNum}) ranges = append(ranges, []*big.Int{fromNum, toNum})
} }
for _, rangeItem := range ranges { if len(ranges) > 0 {
log.Debug("range item", "r", rangeItem, "n", c.chainClient.NetworkID(), "a", account)
// Each account has its own limit and a global limit for all accounts
accountTag := transferHistoryTag + "_" + account.String()
chainClient := chain.ClientWithTag(c.chainClient, accountTag, transferHistoryTag)
storage := chain.NewLimitsDBStorage(c.db.client) storage := chain.NewLimitsDBStorage(c.db.client)
limiter := chain.NewRequestLimiter(storage) limiter := chain.NewRequestLimiter(storage)
chainClient, _ := createChainClientWithLimiter(c.chainClient, account, limiter)
// Check if limit is already reached, then skip the comamnd if chainClient == nil {
if allow, _ := limiter.Allow(accountTag); !allow { chainClient = c.chainClient
log.Debug("fetchHistoryBlocksForAccount limit reached", "account", account, "chain", c.chainClient.NetworkID())
continue
} }
err := limiter.SetLimit(accountTag, transferHistoryLimitPerAccount, chain.LimitInfinitely) for _, rangeItem := range ranges {
if err != nil { log.Debug("range item", "r", rangeItem, "n", c.chainClient.NetworkID(), "a", account)
log.Error("fetchHistoryBlocksForAccount SetLimit", "error", err, "accountTag", accountTag)
}
err = limiter.SetLimit(transferHistoryTag, transferHistoryLimit, transferHistoryLimitPeriod)
if err != nil {
log.Error("fetchHistoryBlocksForAccount SetLimit", "error", err, "groupTag", transferHistoryTag)
}
chainClient.SetLimiter(limiter)
fbc := &findBlocksCommand{ fbc := &findBlocksCommand{
accounts: []common.Address{account}, accounts: []common.Address{account},
db: c.db, db: c.db,
accountsDB: c.accountsDB, accountsDB: c.accountsDB,
blockRangeDAO: c.blockRangeDAO, blockRangeDAO: c.blockRangeDAO,
chainClient: chainClient, chainClient: chainClient,
balanceCacher: c.balanceCacher, balanceCacher: c.balanceCacher,
feed: c.feed, feed: c.feed,
noLimit: false, noLimit: false,
fromBlockNumber: rangeItem[0], fromBlockNumber: rangeItem[0],
toBlockNumber: rangeItem[1], toBlockNumber: rangeItem[1],
tokenManager: c.tokenManager, tokenManager: c.tokenManager,
blocksLoadedCh: blocksLoadedCh, blocksLoadedCh: blocksLoadedCh,
defaultNodeBlockChunkSize: DefaultNodeBlockChunkSize, defaultNodeBlockChunkSize: DefaultNodeBlockChunkSize,
}
group.Add(fbc.Command())
} }
group.Add(fbc.Command())
} }
return nil return nil
@ -1318,3 +1305,42 @@ func nextRange(maxRangeSize int, prevFrom, zeroBlockNumber *big.Int) (*big.Int,
return from, to return from, to
} }
func accountLimiterTag(account common.Address) string {
return transferHistoryTag + "_" + account.String()
}
func createChainClientWithLimiter(client chain.ClientInterface, account common.Address, limiter chain.RequestLimiter) (chain.ClientInterface, error) {
// Each account has its own limit and a global limit for all accounts
accountTag := accountLimiterTag(account)
chainClient := chain.ClientWithTag(client, accountTag, transferHistoryTag)
// Check if limit is already reached, then skip the comamnd
if allow, err := limiter.Allow(accountTag); !allow {
log.Info("fetchHistoryBlocksForAccount limit reached", "account", account, "chain", chainClient.NetworkID(), "error", err)
return nil, err
}
if allow, err := limiter.Allow(transferHistoryTag); !allow {
log.Info("fetchHistoryBlocksForAccount common limit reached", "chain", chainClient.NetworkID(), "error", err)
return nil, err
}
limit, _ := limiter.GetLimit(accountTag)
if limit == nil {
err := limiter.SetLimit(accountTag, transferHistoryLimitPerAccount, chain.LimitInfinitely)
if err != nil {
log.Error("fetchHistoryBlocksForAccount SetLimit", "error", err, "accountTag", accountTag)
}
}
// Here total limit per day is overwriten on each app start, that still saves us RPC calls, but allows to proceed
// after app restart if the limit was reached. Currently there is no way to reset the limit from UI
err := limiter.SetLimit(transferHistoryTag, transferHistoryLimit, transferHistoryLimitPeriod)
if err != nil {
log.Error("fetchHistoryBlocksForAccount SetLimit", "error", err, "groupTag", transferHistoryTag)
}
chainClient.SetLimiter(limiter)
return chainClient, nil
}

View File

@ -15,6 +15,7 @@ import (
statusaccounts "github.com/status-im/status-go/multiaccounts/accounts" statusaccounts "github.com/status-im/status-go/multiaccounts/accounts"
"github.com/status-im/status-go/multiaccounts/settings" "github.com/status-im/status-go/multiaccounts/settings"
"github.com/status-im/status-go/rpc" "github.com/status-im/status-go/rpc"
"github.com/status-im/status-go/rpc/chain"
"github.com/status-im/status-go/services/accounts/accountsevent" "github.com/status-im/status-go/services/accounts/accountsevent"
"github.com/status-im/status-go/services/wallet/balance" "github.com/status-im/status-go/services/wallet/balance"
"github.com/status-im/status-go/services/wallet/blockchainstate" "github.com/status-im/status-go/services/wallet/blockchainstate"
@ -273,6 +274,12 @@ func (c *Controller) cleanUpRemovedAccount(address common.Address) {
if err != nil { if err != nil {
log.Error("Failed to delete multitransactions", "error", err) log.Error("Failed to delete multitransactions", "error", err)
} }
rpcLimitsStorage := chain.NewLimitsDBStorage(c.db.client)
err = rpcLimitsStorage.Delete(accountLimiterTag(address))
if err != nil {
log.Error("Failed to delete limits", "error", err)
}
} }
func (c *Controller) cleanupAccountsLeftovers() error { func (c *Controller) cleanupAccountsLeftovers() error {