From bb3006d747cb346970885aa5509e3b04f1f20293 Mon Sep 17 00:00:00 2001 From: Sale Djenic Date: Fri, 23 Feb 2024 15:27:08 +0100 Subject: [PATCH] feat: rpc request limiter --- rpc/chain/client.go | 157 +++++++++++++---- rpc/chain/rpc_limiter.go | 162 ++++++++++++++++++ rpc/client.go | 79 +++++++-- .../transfer/commands_sequential_test.go | 2 +- services/wallet/transfer/downloader.go | 2 +- 5 files changed, 353 insertions(+), 49 deletions(-) create mode 100644 rpc/chain/rpc_limiter.go diff --git a/rpc/chain/client.go b/rpc/chain/client.go index 726043e89..4b73010c9 100644 --- a/rpc/chain/client.go +++ b/rpc/chain/client.go @@ -13,7 +13,6 @@ import ( "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/ethclient" @@ -21,10 +20,6 @@ import ( "github.com/status-im/status-go/services/rpcstats" ) -type FeeHistory struct { - BaseFeePerGas []string `json:"baseFeePerGas"` -} - type BatchCallClient interface { BatchCallContext(ctx context.Context, b []rpc.BatchElem) error } @@ -39,7 +34,7 @@ type ClientInterface interface { BalanceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (*big.Int, error) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) CallBlockHashByTransaction(ctx context.Context, blockNumber *big.Int, index uint) (common.Hash, error) - GetBaseFeeFromBlock(blockNumber *big.Int) (string, error) + GetBaseFeeFromBlock(ctx context.Context, blockNumber *big.Int) (string, error) NetworkID() uint64 ToBigInt() *big.Int CodeAt(ctx context.Context, contract common.Address, blockNumber *big.Int) ([]byte, error) @@ -58,9 +53,11 @@ type ClientInterface interface { } type ClientWithFallback struct { - ChainID uint64 - main *ethclient.Client - fallback *ethclient.Client + ChainID uint64 + main *ethclient.Client + fallback *ethclient.Client + mainLimiter *RPCLimiter + fallbackLimiter *RPCLimiter mainRPC *rpc.Client fallbackRPC *rpc.Client @@ -99,7 +96,7 @@ type CommandResult struct { vmError error } -func NewSimpleClient(main *rpc.Client, chainID uint64) *ClientWithFallback { +func NewSimpleClient(mainLimiter *RPCLimiter, main *rpc.Client, chainID uint64) *ClientWithFallback { hystrix.ConfigureCommand(fmt.Sprintf("ethClient_%d", chainID), hystrix.CommandConfig{ Timeout: 10000, MaxConcurrentRequests: 100, @@ -108,17 +105,19 @@ func NewSimpleClient(main *rpc.Client, chainID uint64) *ClientWithFallback { }) return &ClientWithFallback{ - ChainID: chainID, - main: ethclient.NewClient(main), - fallback: nil, - mainRPC: main, - fallbackRPC: nil, - IsConnected: true, - LastCheckedAt: time.Now().Unix(), + ChainID: chainID, + main: ethclient.NewClient(main), + fallback: nil, + mainLimiter: mainLimiter, + fallbackLimiter: nil, + mainRPC: main, + fallbackRPC: nil, + IsConnected: true, + LastCheckedAt: time.Now().Unix(), } } -func NewClient(main, fallback *rpc.Client, chainID uint64) *ClientWithFallback { +func NewClient(mainLimiter *RPCLimiter, main *rpc.Client, fallbackLimiter *RPCLimiter, fallback *rpc.Client, chainID uint64) *ClientWithFallback { hystrix.ConfigureCommand(fmt.Sprintf("ethClient_%d", chainID), hystrix.CommandConfig{ Timeout: 20000, MaxConcurrentRequests: 100, @@ -131,13 +130,15 @@ func NewClient(main, fallback *rpc.Client, chainID uint64) *ClientWithFallback { fallbackEthClient = ethclient.NewClient(fallback) } return &ClientWithFallback{ - ChainID: chainID, - main: ethclient.NewClient(main), - fallback: fallbackEthClient, - mainRPC: main, - fallbackRPC: fallback, - IsConnected: true, - LastCheckedAt: time.Now().Unix(), + ChainID: chainID, + main: ethclient.NewClient(main), + fallback: fallbackEthClient, + mainLimiter: mainLimiter, + fallbackLimiter: fallbackLimiter, + mainRPC: main, + fallbackRPC: fallback, + IsConnected: true, + LastCheckedAt: time.Now().Unix(), } } @@ -160,6 +161,10 @@ func isVMError(err error) bool { return false } +func isRPSLimitError(err error) bool { + return strings.Contains(err.Error(), "backoff_seconds") +} + func (c *ClientWithFallback) SetIsConnected(value bool) { c.IsConnectedLock.Lock() defer c.IsConnectedLock.Unlock() @@ -188,12 +193,20 @@ func (c *ClientWithFallback) GetIsConnected() bool { return c.IsConnected } -func (c *ClientWithFallback) makeCallNoReturn(main func() error, fallback func() error) error { +func (c *ClientWithFallback) makeCallNoReturn(ctx context.Context, main func() error, fallback func() error) error { resultChan := make(chan CommandResult, 1) c.LastCheckedAt = time.Now().Unix() errChan := hystrix.Go(fmt.Sprintf("ethClient_%d", c.ChainID), func() error { - err := main() + err := c.mainLimiter.WaitForRequestsAvailability(1) if err != nil { + return err + } + + err = main() + if err != nil { + if isRPSLimitError(err) { + c.mainLimiter.ReduceLimit() + } if isVMError(err) { resultChan <- CommandResult{vmError: err} return nil @@ -209,8 +222,16 @@ func (c *ClientWithFallback) makeCallNoReturn(main func() error, fallback func() return err } + err = c.fallbackLimiter.WaitForRequestsAvailability(1) + if err != nil { + return err + } + err = fallback() if err != nil { + if isRPSLimitError(err) { + c.fallbackLimiter.ReduceLimit() + } if isVMError(err) { resultChan <- CommandResult{vmError: err} return nil @@ -233,11 +254,19 @@ func (c *ClientWithFallback) makeCallNoReturn(main func() error, fallback func() } } -func (c *ClientWithFallback) makeCallSingleReturn(main func() (any, error), fallback func() (any, error), toggleIsConnected bool) (any, error) { +func (c *ClientWithFallback) makeCallSingleReturn(ctx context.Context, main func() (any, error), fallback func() (any, error), toggleIsConnected bool) (any, error) { resultChan := make(chan CommandResult, 1) errChan := hystrix.Go(fmt.Sprintf("ethClient_%d", c.ChainID), func() error { + err := c.mainLimiter.WaitForRequestsAvailability(1) + if err != nil { + return err + } + res, err := main() if err != nil { + if isRPSLimitError(err) { + c.mainLimiter.ReduceLimit() + } if isVMError(err) { resultChan <- CommandResult{vmError: err} return nil @@ -257,8 +286,16 @@ func (c *ClientWithFallback) makeCallSingleReturn(main func() (any, error), fall return err } + err = c.fallbackLimiter.WaitForRequestsAvailability(1) + if err != nil { + return err + } + res, err := fallback() if err != nil { + if isRPSLimitError(err) { + c.fallbackLimiter.ReduceLimit() + } if isVMError(err) { resultChan <- CommandResult{vmError: err} return nil @@ -286,12 +323,20 @@ func (c *ClientWithFallback) makeCallSingleReturn(main func() (any, error), fall } } -func (c *ClientWithFallback) makeCallDoubleReturn(main func() (any, any, error), fallback func() (any, any, error)) (any, any, error) { +func (c *ClientWithFallback) makeCallDoubleReturn(ctx context.Context, main func() (any, any, error), fallback func() (any, any, error)) (any, any, error) { resultChan := make(chan CommandResult, 1) c.LastCheckedAt = time.Now().Unix() errChan := hystrix.Go(fmt.Sprintf("ethClient_%d", c.ChainID), func() error { + err := c.mainLimiter.WaitForRequestsAvailability(1) + if err != nil { + return err + } + a, b, err := main() if err != nil { + if isRPSLimitError(err) { + c.mainLimiter.ReduceLimit() + } if isVMError(err) { resultChan <- CommandResult{vmError: err} return nil @@ -307,8 +352,16 @@ func (c *ClientWithFallback) makeCallDoubleReturn(main func() (any, any, error), return err } + err = c.fallbackLimiter.WaitForRequestsAvailability(1) + if err != nil { + return err + } + a, b, err := fallback() if err != nil { + if isRPSLimitError(err) { + c.fallbackLimiter.ReduceLimit() + } if isVMError(err) { resultChan <- CommandResult{vmError: err} return nil @@ -336,6 +389,7 @@ func (c *ClientWithFallback) BlockByHash(ctx context.Context, hash common.Hash) rpcstats.CountCall("eth_BlockByHash") block, err := c.makeCallSingleReturn( + ctx, func() (any, error) { return c.main.BlockByHash(ctx, hash) }, func() (any, error) { return c.fallback.BlockByHash(ctx, hash) }, true, @@ -351,6 +405,7 @@ func (c *ClientWithFallback) BlockByHash(ctx context.Context, hash common.Hash) func (c *ClientWithFallback) BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error) { rpcstats.CountCall("eth_BlockByNumber") block, err := c.makeCallSingleReturn( + ctx, func() (any, error) { return c.main.BlockByNumber(ctx, number) }, func() (any, error) { return c.fallback.BlockByNumber(ctx, number) }, true, @@ -367,6 +422,7 @@ func (c *ClientWithFallback) BlockNumber(ctx context.Context) (uint64, error) { rpcstats.CountCall("eth_BlockNumber") number, err := c.makeCallSingleReturn( + ctx, func() (any, error) { return c.main.BlockNumber(ctx) }, func() (any, error) { return c.fallback.BlockNumber(ctx) }, true, @@ -383,6 +439,7 @@ func (c *ClientWithFallback) PeerCount(ctx context.Context) (uint64, error) { rpcstats.CountCall("eth_PeerCount") peerCount, err := c.makeCallSingleReturn( + ctx, func() (any, error) { return c.main.PeerCount(ctx) }, func() (any, error) { return c.fallback.PeerCount(ctx) }, true, @@ -398,6 +455,7 @@ func (c *ClientWithFallback) PeerCount(ctx context.Context) (uint64, error) { func (c *ClientWithFallback) HeaderByHash(ctx context.Context, hash common.Hash) (*types.Header, error) { rpcstats.CountCall("eth_HeaderByHash") header, err := c.makeCallSingleReturn( + ctx, func() (any, error) { return c.main.HeaderByHash(ctx, hash) }, func() (any, error) { return c.fallback.HeaderByHash(ctx, hash) }, false, @@ -413,6 +471,7 @@ func (c *ClientWithFallback) HeaderByHash(ctx context.Context, hash common.Hash) func (c *ClientWithFallback) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) { rpcstats.CountCall("eth_HeaderByNumber") header, err := c.makeCallSingleReturn( + ctx, func() (any, error) { return c.main.HeaderByNumber(ctx, number) }, func() (any, error) { return c.fallback.HeaderByNumber(ctx, number) }, false, @@ -429,6 +488,7 @@ func (c *ClientWithFallback) TransactionByHash(ctx context.Context, hash common. rpcstats.CountCall("eth_TransactionByHash") tx, isPending, err := c.makeCallDoubleReturn( + ctx, func() (any, any, error) { return c.main.TransactionByHash(ctx, hash) }, func() (any, any, error) { return c.fallback.TransactionByHash(ctx, hash) }, ) @@ -444,6 +504,7 @@ func (c *ClientWithFallback) TransactionSender(ctx context.Context, tx *types.Tr rpcstats.CountCall("eth_TransactionSender") address, err := c.makeCallSingleReturn( + ctx, func() (any, error) { return c.main.TransactionSender(ctx, tx, block, index) }, func() (any, error) { return c.fallback.TransactionSender(ctx, tx, block, index) }, true, @@ -456,6 +517,7 @@ func (c *ClientWithFallback) TransactionCount(ctx context.Context, blockHash com rpcstats.CountCall("eth_TransactionCount") count, err := c.makeCallSingleReturn( + ctx, func() (any, error) { return c.main.TransactionCount(ctx, blockHash) }, func() (any, error) { return c.fallback.TransactionCount(ctx, blockHash) }, true, @@ -472,6 +534,7 @@ func (c *ClientWithFallback) TransactionInBlock(ctx context.Context, blockHash c rpcstats.CountCall("eth_TransactionInBlock") transactions, err := c.makeCallSingleReturn( + ctx, func() (any, error) { return c.main.TransactionInBlock(ctx, blockHash, index) }, func() (any, error) { return c.fallback.TransactionInBlock(ctx, blockHash, index) }, true, @@ -488,6 +551,7 @@ func (c *ClientWithFallback) TransactionReceipt(ctx context.Context, txHash comm rpcstats.CountCall("eth_TransactionReceipt") receipt, err := c.makeCallSingleReturn( + ctx, func() (any, error) { return c.main.TransactionReceipt(ctx, txHash) }, func() (any, error) { return c.fallback.TransactionReceipt(ctx, txHash) }, true, @@ -504,6 +568,7 @@ func (c *ClientWithFallback) SyncProgress(ctx context.Context) (*ethereum.SyncPr rpcstats.CountCall("eth_SyncProgress") progress, err := c.makeCallSingleReturn( + ctx, func() (any, error) { return c.main.SyncProgress(ctx) }, func() (any, error) { return c.fallback.SyncProgress(ctx) }, true, @@ -520,6 +585,7 @@ func (c *ClientWithFallback) SubscribeNewHead(ctx context.Context, ch chan<- *ty rpcstats.CountCall("eth_SubscribeNewHead") sub, err := c.makeCallSingleReturn( + ctx, func() (any, error) { return c.main.SubscribeNewHead(ctx, ch) }, func() (any, error) { return c.fallback.SubscribeNewHead(ctx, ch) }, true, @@ -540,6 +606,7 @@ func (c *ClientWithFallback) BalanceAt(ctx context.Context, account common.Addre rpcstats.CountCall("eth_BalanceAt") balance, err := c.makeCallSingleReturn( + ctx, func() (any, error) { return c.main.BalanceAt(ctx, account, blockNumber) }, func() (any, error) { return c.fallback.BalanceAt(ctx, account, blockNumber) }, true, @@ -556,6 +623,7 @@ func (c *ClientWithFallback) StorageAt(ctx context.Context, account common.Addre rpcstats.CountCall("eth_StorageAt") storage, err := c.makeCallSingleReturn( + ctx, func() (any, error) { return c.main.StorageAt(ctx, account, key, blockNumber) }, func() (any, error) { return c.fallback.StorageAt(ctx, account, key, blockNumber) }, true, @@ -572,6 +640,7 @@ func (c *ClientWithFallback) CodeAt(ctx context.Context, account common.Address, rpcstats.CountCall("eth_CodeAt") code, err := c.makeCallSingleReturn( + ctx, func() (any, error) { return c.main.CodeAt(ctx, account, blockNumber) }, func() (any, error) { return c.fallback.CodeAt(ctx, account, blockNumber) }, true, @@ -588,6 +657,7 @@ func (c *ClientWithFallback) NonceAt(ctx context.Context, account common.Address rpcstats.CountCall("eth_NonceAt") nonce, err := c.makeCallSingleReturn( + ctx, func() (any, error) { return c.main.NonceAt(ctx, account, blockNumber) }, func() (any, error) { return c.fallback.NonceAt(ctx, account, blockNumber) }, true, @@ -604,6 +674,7 @@ func (c *ClientWithFallback) FilterLogs(ctx context.Context, q ethereum.FilterQu rpcstats.CountCall("eth_FilterLogs") logs, err := c.makeCallSingleReturn( + ctx, func() (any, error) { return c.main.FilterLogs(ctx, q) }, func() (any, error) { return c.fallback.FilterLogs(ctx, q) }, true, @@ -620,6 +691,7 @@ func (c *ClientWithFallback) SubscribeFilterLogs(ctx context.Context, q ethereum rpcstats.CountCall("eth_SubscribeFilterLogs") sub, err := c.makeCallSingleReturn( + ctx, func() (any, error) { return c.main.SubscribeFilterLogs(ctx, q, ch) }, func() (any, error) { return c.fallback.SubscribeFilterLogs(ctx, q, ch) }, true, @@ -636,6 +708,7 @@ func (c *ClientWithFallback) PendingBalanceAt(ctx context.Context, account commo rpcstats.CountCall("eth_PendingBalanceAt") balance, err := c.makeCallSingleReturn( + ctx, func() (any, error) { return c.main.PendingBalanceAt(ctx, account) }, func() (any, error) { return c.fallback.PendingBalanceAt(ctx, account) }, true, @@ -652,6 +725,7 @@ func (c *ClientWithFallback) PendingStorageAt(ctx context.Context, account commo rpcstats.CountCall("eth_PendingStorageAt") storage, err := c.makeCallSingleReturn( + ctx, func() (any, error) { return c.main.PendingStorageAt(ctx, account, key) }, func() (any, error) { return c.fallback.PendingStorageAt(ctx, account, key) }, true, @@ -668,6 +742,7 @@ func (c *ClientWithFallback) PendingCodeAt(ctx context.Context, account common.A rpcstats.CountCall("eth_PendingCodeAt") code, err := c.makeCallSingleReturn( + ctx, func() (any, error) { return c.main.PendingCodeAt(ctx, account) }, func() (any, error) { return c.fallback.PendingCodeAt(ctx, account) }, true, @@ -684,6 +759,7 @@ func (c *ClientWithFallback) PendingNonceAt(ctx context.Context, account common. rpcstats.CountCall("eth_PendingNonceAt") nonce, err := c.makeCallSingleReturn( + ctx, func() (any, error) { return c.main.PendingNonceAt(ctx, account) }, func() (any, error) { return c.fallback.PendingNonceAt(ctx, account) }, true, @@ -700,6 +776,7 @@ func (c *ClientWithFallback) PendingTransactionCount(ctx context.Context) (uint, rpcstats.CountCall("eth_PendingTransactionCount") count, err := c.makeCallSingleReturn( + ctx, func() (any, error) { return c.main.PendingTransactionCount(ctx) }, func() (any, error) { return c.fallback.PendingTransactionCount(ctx) }, true, @@ -716,6 +793,7 @@ func (c *ClientWithFallback) CallContract(ctx context.Context, msg ethereum.Call rpcstats.CountCall("eth_CallContract_" + msg.To.String()) data, err := c.makeCallSingleReturn( + ctx, func() (any, error) { return c.main.CallContract(ctx, msg, blockNumber) }, func() (any, error) { return c.fallback.CallContract(ctx, msg, blockNumber) }, true, @@ -732,6 +810,7 @@ func (c *ClientWithFallback) CallContractAtHash(ctx context.Context, msg ethereu rpcstats.CountCall("eth_CallContractAtHash") data, err := c.makeCallSingleReturn( + ctx, func() (any, error) { return c.main.CallContractAtHash(ctx, msg, blockHash) }, func() (any, error) { return c.fallback.CallContractAtHash(ctx, msg, blockHash) }, true, @@ -748,6 +827,7 @@ func (c *ClientWithFallback) PendingCallContract(ctx context.Context, msg ethere rpcstats.CountCall("eth_PendingCallContract") data, err := c.makeCallSingleReturn( + ctx, func() (any, error) { return c.main.PendingCallContract(ctx, msg) }, func() (any, error) { return c.fallback.PendingCallContract(ctx, msg) }, true, @@ -764,6 +844,7 @@ func (c *ClientWithFallback) SuggestGasPrice(ctx context.Context) (*big.Int, err rpcstats.CountCall("eth_SuggestGasPrice") gasPrice, err := c.makeCallSingleReturn( + ctx, func() (any, error) { return c.main.SuggestGasPrice(ctx) }, func() (any, error) { return c.fallback.SuggestGasPrice(ctx) }, true, @@ -780,6 +861,7 @@ func (c *ClientWithFallback) SuggestGasTipCap(ctx context.Context) (*big.Int, er rpcstats.CountCall("eth_SuggestGasTipCap") tip, err := c.makeCallSingleReturn( + ctx, func() (any, error) { return c.main.SuggestGasTipCap(ctx) }, func() (any, error) { return c.fallback.SuggestGasTipCap(ctx) }, true, @@ -796,6 +878,7 @@ func (c *ClientWithFallback) FeeHistory(ctx context.Context, blockCount uint64, rpcstats.CountCall("eth_FeeHistory") feeHistory, err := c.makeCallSingleReturn( + ctx, func() (any, error) { return c.main.FeeHistory(ctx, blockCount, lastBlock, rewardPercentiles) }, func() (any, error) { return c.fallback.FeeHistory(ctx, blockCount, lastBlock, rewardPercentiles) }, true, @@ -812,6 +895,7 @@ func (c *ClientWithFallback) EstimateGas(ctx context.Context, msg ethereum.CallM rpcstats.CountCall("eth_EstimateGas") estimate, err := c.makeCallSingleReturn( + ctx, func() (any, error) { return c.main.EstimateGas(ctx, msg) }, func() (any, error) { return c.fallback.EstimateGas(ctx, msg) }, true, @@ -828,6 +912,7 @@ func (c *ClientWithFallback) SendTransaction(ctx context.Context, tx *types.Tran rpcstats.CountCall("eth_SendTransaction") return c.makeCallNoReturn( + ctx, func() error { return c.main.SendTransaction(ctx, tx) }, func() error { return c.fallback.SendTransaction(ctx, tx) }, ) @@ -837,6 +922,7 @@ func (c *ClientWithFallback) CallContext(ctx context.Context, result interface{} rpcstats.CountCall("eth_CallContext") return c.makeCallNoReturn( + ctx, func() error { return c.mainRPC.CallContext(ctx, result, method, args...) }, func() error { return c.fallbackRPC.CallContext(ctx, result, method, args...) }, ) @@ -846,6 +932,7 @@ func (c *ClientWithFallback) BatchCallContext(ctx context.Context, b []rpc.Batch rpcstats.CountCall("eth_BatchCallContext") return c.makeCallNoReturn( + ctx, func() error { return c.mainRPC.BatchCallContext(ctx, b) }, func() error { return c.fallbackRPC.BatchCallContext(ctx, b) }, ) @@ -855,10 +942,11 @@ func (c *ClientWithFallback) ToBigInt() *big.Int { return big.NewInt(int64(c.ChainID)) } -func (c *ClientWithFallback) GetBaseFeeFromBlock(blockNumber *big.Int) (string, error) { +func (c *ClientWithFallback) GetBaseFeeFromBlock(ctx context.Context, blockNumber *big.Int) (string, error) { rpcstats.CountCall("eth_GetBaseFeeFromBlock") - var feeHistory FeeHistory - err := c.mainRPC.Call(&feeHistory, "eth_feeHistory", "0x1", (*hexutil.Big)(blockNumber), nil) + + feeHistory, err := c.FeeHistory(ctx, 1, blockNumber, nil) + if err != nil { if err.Error() == "the method eth_feeHistory does not exist/is not available" { return "", nil @@ -867,8 +955,8 @@ func (c *ClientWithFallback) GetBaseFeeFromBlock(blockNumber *big.Int) (string, } var baseGasFee string = "" - if len(feeHistory.BaseFeePerGas) > 0 { - baseGasFee = feeHistory.BaseFeePerGas[0] + if len(feeHistory.BaseFee) > 0 { + baseGasFee = feeHistory.BaseFee[0].String() } return baseGasFee, err @@ -881,6 +969,7 @@ func (c *ClientWithFallback) CallBlockHashByTransaction(ctx context.Context, blo rpcstats.CountCall("eth_FullTransactionByBlockNumberAndIndex") tx, err := c.makeCallSingleReturn( + ctx, func() (any, error) { return callBlockHashByTransaction(ctx, c.mainRPC, blockNumber, index) }, diff --git a/rpc/chain/rpc_limiter.go b/rpc/chain/rpc_limiter.go new file mode 100644 index 000000000..b0db5d23b --- /dev/null +++ b/rpc/chain/rpc_limiter.go @@ -0,0 +1,162 @@ +package chain + +import ( + "fmt" + "sync" + "time" + + "github.com/google/uuid" +) + +const ( + defaultMaxRequestsPerSecond = 100 + minRequestsPerSecond = 20 + requestsPerSecondStep = 10 + + tickerInterval = 1 * time.Second +) + +var ( + ErrRequestsOverLimit = fmt.Errorf("number of requests over limit") +) + +type callerOnWait struct { + requests int + ch chan bool +} + +type RPCLimiter struct { + uuid uuid.UUID + + maxRequestsPerSecond int + maxRequestsPerSecondMutex sync.RWMutex + + requestsMadeWithinSecond int + requestsMadeWithinSecondMutex sync.RWMutex + + callersOnWaitForRequests []callerOnWait + callersOnWaitForRequestsMutex sync.RWMutex + + quit chan bool +} + +func NewRPCLimiter() *RPCLimiter { + + limiter := RPCLimiter{ + uuid: uuid.New(), + maxRequestsPerSecond: defaultMaxRequestsPerSecond, + quit: make(chan bool), + } + + limiter.start() + + return &limiter +} + +func (rl *RPCLimiter) ReduceLimit() { + rl.maxRequestsPerSecondMutex.Lock() + defer rl.maxRequestsPerSecondMutex.Unlock() + if rl.maxRequestsPerSecond <= minRequestsPerSecond { + return + } + rl.maxRequestsPerSecond = rl.maxRequestsPerSecond - requestsPerSecondStep +} + +func (rl *RPCLimiter) start() { + ticker := time.NewTicker(tickerInterval) + go func() { + for { + select { + case <-ticker.C: + { + rl.requestsMadeWithinSecondMutex.Lock() + oldrequestsMadeWithinSecond := rl.requestsMadeWithinSecond + if rl.requestsMadeWithinSecond != 0 { + rl.requestsMadeWithinSecond = 0 + } + rl.requestsMadeWithinSecondMutex.Unlock() + if oldrequestsMadeWithinSecond == 0 { + continue + } + } + + rl.callersOnWaitForRequestsMutex.Lock() + numOfRequestsToMakeAvailable := rl.maxRequestsPerSecond + for { + if numOfRequestsToMakeAvailable == 0 || len(rl.callersOnWaitForRequests) == 0 { + break + } + + var index = -1 + for i := 0; i < len(rl.callersOnWaitForRequests); i++ { + if rl.callersOnWaitForRequests[i].requests <= numOfRequestsToMakeAvailable { + index = i + break + } + } + + if index == -1 { + break + } + + callerOnWait := rl.callersOnWaitForRequests[index] + numOfRequestsToMakeAvailable -= callerOnWait.requests + rl.callersOnWaitForRequests = append(rl.callersOnWaitForRequests[:index], rl.callersOnWaitForRequests[index+1:]...) + + callerOnWait.ch <- true + } + rl.callersOnWaitForRequestsMutex.Unlock() + + case <-rl.quit: + ticker.Stop() + return + } + } + }() +} + +func (rl *RPCLimiter) Stop() { + rl.quit <- true + close(rl.quit) + for _, callerOnWait := range rl.callersOnWaitForRequests { + close(callerOnWait.ch) + } + rl.callersOnWaitForRequests = nil +} + +func (rl *RPCLimiter) WaitForRequestsAvailability(requests int) error { + if requests > rl.maxRequestsPerSecond { + return ErrRequestsOverLimit + } + + { + rl.requestsMadeWithinSecondMutex.Lock() + if rl.requestsMadeWithinSecond+requests <= rl.maxRequestsPerSecond { + rl.requestsMadeWithinSecond += requests + rl.requestsMadeWithinSecondMutex.Unlock() + return nil + } + rl.requestsMadeWithinSecondMutex.Unlock() + } + + callerOnWait := callerOnWait{ + requests: requests, + ch: make(chan bool), + } + + { + rl.callersOnWaitForRequestsMutex.Lock() + rl.callersOnWaitForRequests = append(rl.callersOnWaitForRequests, callerOnWait) + rl.callersOnWaitForRequestsMutex.Unlock() + } + + <-callerOnWait.ch + + close(callerOnWait.ch) + + rl.requestsMadeWithinSecondMutex.Lock() + rl.requestsMadeWithinSecond += requests + rl.requestsMadeWithinSecondMutex.Unlock() + + return nil +} diff --git a/rpc/client.go b/rpc/client.go index c219dcdca..c404cf69b 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -6,7 +6,9 @@ import ( "encoding/json" "errors" "fmt" + "net/url" "reflect" + "strings" "sync" "time" @@ -47,10 +49,12 @@ type Client struct { upstreamURL string UpstreamChainID uint64 - local *gethrpc.Client - upstream chain.ClientInterface - rpcClientsMutex sync.RWMutex - rpcClients map[uint64]chain.ClientInterface + local *gethrpc.Client + upstream chain.ClientInterface + rpcClientsMutex sync.RWMutex + rpcClients map[uint64]chain.ClientInterface + rpcLimiterMutex sync.RWMutex + limiterPerProvider map[string]*chain.RPCLimiter router *router NetworkManager *network.Manager @@ -81,11 +85,12 @@ func NewClient(client *gethrpc.Client, upstreamChainID uint64, upstream params.U } c := Client{ - local: client, - NetworkManager: networkManager, - handlers: make(map[string]Handler), - rpcClients: make(map[uint64]chain.ClientInterface), - log: log, + local: client, + NetworkManager: networkManager, + handlers: make(map[string]Handler), + rpcClients: make(map[uint64]chain.ClientInterface), + limiterPerProvider: make(map[string]*chain.RPCLimiter), + log: log, } if upstream.Enabled { @@ -96,7 +101,11 @@ func NewClient(client *gethrpc.Client, upstreamChainID uint64, upstream params.U if err != nil { return nil, fmt.Errorf("dial upstream server: %s", err) } - c.upstream = chain.NewSimpleClient(upstreamClient, upstreamChainID) + limiter, err := c.getRPCLimiter(c.upstreamURL) + if err != nil { + return nil, fmt.Errorf("get RPC limiter: %s", err) + } + c.upstream = chain.NewSimpleClient(limiter, upstreamClient, upstreamChainID) } c.router = newRouter(c.upstreamEnabled) @@ -112,6 +121,33 @@ func (c *Client) SetWalletNotifier(notifier func(chainID uint64, message string) c.walletNotifier = notifier } +func extractLastParamFromURL(inputURL string) (string, error) { + parsedURL, err := url.Parse(inputURL) + if err != nil { + return "", err + } + + pathSegments := strings.Split(parsedURL.Path, "/") + lastSegment := pathSegments[len(pathSegments)-1] + + return lastSegment, nil +} + +func (c *Client) getRPCLimiter(URL string) (*chain.RPCLimiter, error) { + apiKey, err := extractLastParamFromURL(URL) + if err != nil { + return nil, err + } + c.rpcLimiterMutex.Lock() + defer c.rpcLimiterMutex.Unlock() + if limiter, ok := c.limiterPerProvider[apiKey]; ok { + return limiter, nil + } + limiter := chain.NewRPCLimiter() + c.limiterPerProvider[apiKey] = limiter + return limiter, nil +} + func (c *Client) getClientUsingCache(chainID uint64) (chain.ClientInterface, error) { c.rpcClientsMutex.Lock() defer c.rpcClientsMutex.Unlock() @@ -135,15 +171,28 @@ func (c *Client) getClientUsingCache(chainID uint64) (chain.ClientInterface, err return nil, fmt.Errorf("dial upstream server: %s", err) } - var rpcFallbackClient *gethrpc.Client + rpcLimiter, err := c.getRPCLimiter(network.RPCURL) + if err != nil { + return nil, fmt.Errorf("get RPC limiter: %s", err) + } + + var ( + rpcFallbackClient *gethrpc.Client + rpcFallbackLimiter *chain.RPCLimiter + ) if len(network.FallbackURL) > 0 { rpcFallbackClient, err = gethrpc.Dial(network.FallbackURL) if err != nil { return nil, fmt.Errorf("dial upstream server: %s", err) } + + rpcFallbackLimiter, err = c.getRPCLimiter(network.FallbackURL) + if err != nil { + return nil, fmt.Errorf("get RPC fallback limiter: %s", err) + } } - client := chain.NewClient(rpcClient, rpcFallbackClient, chainID) + client := chain.NewClient(rpcLimiter, rpcClient, rpcFallbackLimiter, rpcFallbackClient, chainID) client.WalletNotifier = c.walletNotifier c.rpcClients[chainID] = client return client, nil @@ -199,8 +248,12 @@ func (c *Client) UpdateUpstreamURL(url string) error { if err != nil { return err } + rpcLimiter, err := c.getRPCLimiter(url) + if err != nil { + return err + } c.Lock() - c.upstream = chain.NewSimpleClient(rpcClient, c.UpstreamChainID) + c.upstream = chain.NewSimpleClient(rpcLimiter, rpcClient, c.UpstreamChainID) c.upstreamURL = url c.Unlock() diff --git a/services/wallet/transfer/commands_sequential_test.go b/services/wallet/transfer/commands_sequential_test.go index deeb30d0b..f5be844c7 100644 --- a/services/wallet/transfer/commands_sequential_test.go +++ b/services/wallet/transfer/commands_sequential_test.go @@ -288,7 +288,7 @@ func (tc *TestClient) CallBlockHashByTransaction(ctx context.Context, blockNumbe return common.BigToHash(blockNumber), nil } -func (tc *TestClient) GetBaseFeeFromBlock(blockNumber *big.Int) (string, error) { +func (tc *TestClient) GetBaseFeeFromBlock(ctx context.Context, blockNumber *big.Int) (string, error) { tc.incCounter("GetBaseFeeFromBlock") if tc.traceAPICalls { tc.t.Log("GetBaseFeeFromBlock") diff --git a/services/wallet/transfer/downloader.go b/services/wallet/transfer/downloader.go index 75e6cc51c..aaf5f2f85 100644 --- a/services/wallet/transfer/downloader.go +++ b/services/wallet/transfer/downloader.go @@ -109,7 +109,7 @@ func getTransferByHash(ctx context.Context, client chain.ClientInterface, signer return nil, err } - baseGasFee, err := client.GetBaseFeeFromBlock(big.NewInt(int64(transactionLog.BlockNumber))) + baseGasFee, err := client.GetBaseFeeFromBlock(ctx, big.NewInt(int64(transactionLog.BlockNumber))) if err != nil { return nil, err }