From 43c986049171cecc83c03ee2aa11c3e332da519f Mon Sep 17 00:00:00 2001 From: Ivan Belyakov Date: Wed, 19 Jun 2024 07:20:32 +0200 Subject: [PATCH] fix(wallet)_: fix rpc limiter to reset counters on timeout fix rpc limiter to delete limits on account removal fix rpc limiter to not overwrite existing account limit on startup fix providers down banner on limit reached error --- rpc/chain/client.go | 2 +- rpc/chain/rpc_limiter.go | 49 +++++++--- rpc/chain/rpc_limiter_test.go | 31 +++++- .../wallet/transfer/commands_sequential.go | 96 ++++++++++++------- services/wallet/transfer/controller.go | 7 ++ 5 files changed, 134 insertions(+), 51 deletions(-) diff --git a/rpc/chain/client.go b/rpc/chain/client.go index 8e10347ae..7023e7906 100644 --- a/rpc/chain/client.go +++ b/rpc/chain/client.go @@ -1006,7 +1006,7 @@ func (c *ClientWithFallback) SetWalletNotifier(notifier func(chainId uint64, mes func (c *ClientWithFallback) toggleConnectionState(err error) { connected := true if err != nil { - if !isVMError(err) && !errors.Is(ErrRequestsOverLimit, err) { + if !isVMError(err) && !errors.Is(err, ErrRequestsOverLimit) { connected = false } } diff --git a/rpc/chain/rpc_limiter.go b/rpc/chain/rpc_limiter.go index 369e45a72..5e9469c68 100644 --- a/rpc/chain/rpc_limiter.go +++ b/rpc/chain/rpc_limiter.go @@ -2,6 +2,7 @@ package chain import ( "database/sql" + "errors" "fmt" "sync" "time" @@ -21,7 +22,7 @@ const ( ) var ( - ErrRequestsOverLimit = fmt.Errorf("number of requests over limit") + ErrRequestsOverLimit = errors.New("number of requests over limit") ) type callerOnWait struct { @@ -32,6 +33,7 @@ type callerOnWait struct { type LimitsStorage interface { Get(tag string) (*LimitData, error) Set(data *LimitData) error + Delete(tag string) error } type InMemRequestsMapStorage struct { @@ -60,6 +62,11 @@ func (s *InMemRequestsMapStorage) Set(data *LimitData) error { return nil } +func (s *InMemRequestsMapStorage) Delete(tag string) error { + s.data.Delete(tag) + return nil +} + type LimitsDBStorage struct { db *RPCLimiterDB } @@ -91,6 +98,10 @@ func (s *LimitsDBStorage) Set(data *LimitData) error { return s.db.UpdateRPCLimit(*data) } +func (s *LimitsDBStorage) Delete(tag string) error { + return s.db.DeleteRPCLimit(tag) +} + type LimitData struct { Tag string CreatedAt time.Time @@ -102,6 +113,7 @@ type LimitData struct { type RequestLimiter interface { SetLimit(tag string, maxRequests int, interval time.Duration) error GetLimit(tag string) (*LimitData, error) + DeleteLimit(tag string) error Allow(tag string) (bool, error) } @@ -135,6 +147,16 @@ func (rl *RPCRequestLimiter) GetLimit(tag string) (*LimitData, error) { return data, nil } +func (rl *RPCRequestLimiter) DeleteLimit(tag string) error { + err := rl.storage.Delete(tag) + if err != nil { + log.Error("Failed to delete request data from storage", "error", err) + return err + } + + return nil +} + func (rl *RPCRequestLimiter) saveToStorage(tag string, maxRequests int, interval time.Duration, numReqs int, timestamp time.Time) error { data := &LimitData{ Tag: tag, @@ -166,13 +188,9 @@ func (rl *RPCRequestLimiter) Allow(tag string) (bool, error) { return true, nil } - // Check if a number of requests is over the limit within the interval - if time.Since(data.CreatedAt) < data.Period || data.Period.Milliseconds() == LimitInfinitely { - if data.NumReqs >= data.MaxReqs { - return false, nil - } - - err := rl.saveToStorage(tag, data.MaxReqs, data.Period, data.NumReqs+1, data.CreatedAt) + // Check if the interval has passed and reset the number of requests + if time.Since(data.CreatedAt) >= data.Period && data.Period.Milliseconds() != LimitInfinitely { + err = rl.saveToStorage(tag, data.MaxReqs, data.Period, 0, time.Now()) if err != nil { return true, err } @@ -180,13 +198,18 @@ func (rl *RPCRequestLimiter) Allow(tag string) (bool, error) { return true, nil } - // Reset the number of requests if the interval has passed - err = rl.saveToStorage(tag, data.MaxReqs, data.Period, 0, time.Now()) - if err != nil { - return true, err // still allow if failed to save + // Check if a number of requests is over the limit within the interval + if time.Since(data.CreatedAt) < data.Period || data.Period.Milliseconds() == LimitInfinitely { + if data.NumReqs >= data.MaxReqs { + log.Info("Number of requests over limit", "tag", tag, "numReqs", data.NumReqs, "maxReqs", data.MaxReqs, "period", data.Period, "createdAt", data.CreatedAt) + return false, ErrRequestsOverLimit + } + + return true, rl.saveToStorage(tag, data.MaxReqs, data.Period, data.NumReqs+1, data.CreatedAt) } - return true, nil + // Reset the number of requests if the interval has passed + return true, rl.saveToStorage(tag, data.MaxReqs, data.Period, 0, time.Now()) // still allow the request if failed to save as not critical } type RPCRpsLimiter struct { diff --git a/rpc/chain/rpc_limiter_test.go b/rpc/chain/rpc_limiter_test.go index 142ca9a29..57c759d57 100644 --- a/rpc/chain/rpc_limiter_test.go +++ b/rpc/chain/rpc_limiter_test.go @@ -55,6 +55,33 @@ func TestGetLimit(t *testing.T) { require.Equal(t, data, ret) } +func TestDeleteLimit(t *testing.T) { + storage, rl := setupTest() + + // Define test inputs + tag := "testTag" + data := &LimitData{ + Tag: tag, + Period: time.Second, + MaxReqs: 10, + NumReqs: 1, + } + err := storage.Set(data) + require.NoError(t, err) + + // Call the DeleteLimit method + err = rl.DeleteLimit(tag) + require.NoError(t, err) + + // Verify that the data was deleted from storage + limit, _ := storage.Get(tag) + require.Nil(t, limit) + + // Test double delete + err = rl.DeleteLimit(tag) + require.NoError(t, err) +} + func TestAllowWithinPeriod(t *testing.T) { storage, rl := setupTest() @@ -84,7 +111,7 @@ func TestAllowWithinPeriod(t *testing.T) { // Call the Allow method again allow, err := rl.Allow(tag) - require.NoError(t, err) + require.ErrorIs(t, err, ErrRequestsOverLimit) require.False(t, allow) } @@ -135,7 +162,7 @@ func TestAllowRestrictInfinitelyWhenLimitReached(t *testing.T) { // Call the Allow method allow, err := rl.Allow(tag) - require.NoError(t, err) + require.ErrorIs(t, err, ErrRequestsOverLimit) // Verify the result require.False(t, allow) diff --git a/services/wallet/transfer/commands_sequential.go b/services/wallet/transfer/commands_sequential.go index abbae60a4..96e7ddc21 100644 --- a/services/wallet/transfer/commands_sequential.go +++ b/services/wallet/transfer/commands_sequential.go @@ -1119,47 +1119,34 @@ func (c *loadBlocksAndTransfersCommand) fetchHistoryBlocksForAccount(group *asyn ranges = append(ranges, []*big.Int{fromNum, toNum}) } - for _, rangeItem := range ranges { - log.Debug("range item", "r", rangeItem, "n", c.chainClient.NetworkID(), "a", account) - - // Each account has its own limit and a global limit for all accounts - accountTag := transferHistoryTag + "_" + account.String() - chainClient := chain.ClientWithTag(c.chainClient, accountTag, transferHistoryTag) + if len(ranges) > 0 { storage := chain.NewLimitsDBStorage(c.db.client) limiter := chain.NewRequestLimiter(storage) - - // Check if limit is already reached, then skip the comamnd - if allow, _ := limiter.Allow(accountTag); !allow { - log.Debug("fetchHistoryBlocksForAccount limit reached", "account", account, "chain", c.chainClient.NetworkID()) - continue + chainClient, _ := createChainClientWithLimiter(c.chainClient, account, limiter) + if chainClient == nil { + chainClient = c.chainClient } - err := limiter.SetLimit(accountTag, transferHistoryLimitPerAccount, chain.LimitInfinitely) - if err != nil { - log.Error("fetchHistoryBlocksForAccount SetLimit", "error", err, "accountTag", accountTag) - } - err = limiter.SetLimit(transferHistoryTag, transferHistoryLimit, transferHistoryLimitPeriod) - if err != nil { - log.Error("fetchHistoryBlocksForAccount SetLimit", "error", err, "groupTag", transferHistoryTag) - } - chainClient.SetLimiter(limiter) + for _, rangeItem := range ranges { + log.Debug("range item", "r", rangeItem, "n", c.chainClient.NetworkID(), "a", account) - fbc := &findBlocksCommand{ - accounts: []common.Address{account}, - db: c.db, - accountsDB: c.accountsDB, - blockRangeDAO: c.blockRangeDAO, - chainClient: chainClient, - balanceCacher: c.balanceCacher, - feed: c.feed, - noLimit: false, - fromBlockNumber: rangeItem[0], - toBlockNumber: rangeItem[1], - tokenManager: c.tokenManager, - blocksLoadedCh: blocksLoadedCh, - defaultNodeBlockChunkSize: DefaultNodeBlockChunkSize, + fbc := &findBlocksCommand{ + accounts: []common.Address{account}, + db: c.db, + accountsDB: c.accountsDB, + blockRangeDAO: c.blockRangeDAO, + chainClient: chainClient, + balanceCacher: c.balanceCacher, + feed: c.feed, + noLimit: false, + fromBlockNumber: rangeItem[0], + toBlockNumber: rangeItem[1], + tokenManager: c.tokenManager, + blocksLoadedCh: blocksLoadedCh, + defaultNodeBlockChunkSize: DefaultNodeBlockChunkSize, + } + group.Add(fbc.Command()) } - group.Add(fbc.Command()) } return nil @@ -1318,3 +1305,42 @@ func nextRange(maxRangeSize int, prevFrom, zeroBlockNumber *big.Int) (*big.Int, return from, to } + +func accountLimiterTag(account common.Address) string { + return transferHistoryTag + "_" + account.String() +} + +func createChainClientWithLimiter(client chain.ClientInterface, account common.Address, limiter chain.RequestLimiter) (chain.ClientInterface, error) { + // Each account has its own limit and a global limit for all accounts + accountTag := accountLimiterTag(account) + chainClient := chain.ClientWithTag(client, accountTag, transferHistoryTag) + + // Check if limit is already reached, then skip the comamnd + if allow, err := limiter.Allow(accountTag); !allow { + log.Info("fetchHistoryBlocksForAccount limit reached", "account", account, "chain", chainClient.NetworkID(), "error", err) + return nil, err + } + + if allow, err := limiter.Allow(transferHistoryTag); !allow { + log.Info("fetchHistoryBlocksForAccount common limit reached", "chain", chainClient.NetworkID(), "error", err) + return nil, err + } + + limit, _ := limiter.GetLimit(accountTag) + if limit == nil { + err := limiter.SetLimit(accountTag, transferHistoryLimitPerAccount, chain.LimitInfinitely) + if err != nil { + log.Error("fetchHistoryBlocksForAccount SetLimit", "error", err, "accountTag", accountTag) + } + } + + // Here total limit per day is overwriten on each app start, that still saves us RPC calls, but allows to proceed + // after app restart if the limit was reached. Currently there is no way to reset the limit from UI + err := limiter.SetLimit(transferHistoryTag, transferHistoryLimit, transferHistoryLimitPeriod) + if err != nil { + log.Error("fetchHistoryBlocksForAccount SetLimit", "error", err, "groupTag", transferHistoryTag) + } + chainClient.SetLimiter(limiter) + + return chainClient, nil +} diff --git a/services/wallet/transfer/controller.go b/services/wallet/transfer/controller.go index 5c68a45d7..3fed14ac5 100644 --- a/services/wallet/transfer/controller.go +++ b/services/wallet/transfer/controller.go @@ -15,6 +15,7 @@ import ( statusaccounts "github.com/status-im/status-go/multiaccounts/accounts" "github.com/status-im/status-go/multiaccounts/settings" "github.com/status-im/status-go/rpc" + "github.com/status-im/status-go/rpc/chain" "github.com/status-im/status-go/services/accounts/accountsevent" "github.com/status-im/status-go/services/wallet/balance" "github.com/status-im/status-go/services/wallet/blockchainstate" @@ -273,6 +274,12 @@ func (c *Controller) cleanUpRemovedAccount(address common.Address) { if err != nil { log.Error("Failed to delete multitransactions", "error", err) } + + rpcLimitsStorage := chain.NewLimitsDBStorage(c.db.client) + err = rpcLimitsStorage.Delete(accountLimiterTag(address)) + if err != nil { + log.Error("Failed to delete limits", "error", err) + } } func (c *Controller) cleanupAccountsLeftovers() error {