feat: rpc request limiter

This commit is contained in:
Sale Djenic 2024-02-23 15:27:08 +01:00 committed by saledjenic
parent ae9b697eda
commit bb3006d747
5 changed files with 353 additions and 49 deletions

View File

@ -13,7 +13,6 @@ import (
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/ethclient"
@ -21,10 +20,6 @@ import (
"github.com/status-im/status-go/services/rpcstats"
)
type FeeHistory struct {
BaseFeePerGas []string `json:"baseFeePerGas"`
}
type BatchCallClient interface {
BatchCallContext(ctx context.Context, b []rpc.BatchElem) error
}
@ -39,7 +34,7 @@ type ClientInterface interface {
BalanceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (*big.Int, error)
HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error)
CallBlockHashByTransaction(ctx context.Context, blockNumber *big.Int, index uint) (common.Hash, error)
GetBaseFeeFromBlock(blockNumber *big.Int) (string, error)
GetBaseFeeFromBlock(ctx context.Context, blockNumber *big.Int) (string, error)
NetworkID() uint64
ToBigInt() *big.Int
CodeAt(ctx context.Context, contract common.Address, blockNumber *big.Int) ([]byte, error)
@ -58,9 +53,11 @@ type ClientInterface interface {
}
type ClientWithFallback struct {
ChainID uint64
main *ethclient.Client
fallback *ethclient.Client
ChainID uint64
main *ethclient.Client
fallback *ethclient.Client
mainLimiter *RPCLimiter
fallbackLimiter *RPCLimiter
mainRPC *rpc.Client
fallbackRPC *rpc.Client
@ -99,7 +96,7 @@ type CommandResult struct {
vmError error
}
func NewSimpleClient(main *rpc.Client, chainID uint64) *ClientWithFallback {
func NewSimpleClient(mainLimiter *RPCLimiter, main *rpc.Client, chainID uint64) *ClientWithFallback {
hystrix.ConfigureCommand(fmt.Sprintf("ethClient_%d", chainID), hystrix.CommandConfig{
Timeout: 10000,
MaxConcurrentRequests: 100,
@ -108,17 +105,19 @@ func NewSimpleClient(main *rpc.Client, chainID uint64) *ClientWithFallback {
})
return &ClientWithFallback{
ChainID: chainID,
main: ethclient.NewClient(main),
fallback: nil,
mainRPC: main,
fallbackRPC: nil,
IsConnected: true,
LastCheckedAt: time.Now().Unix(),
ChainID: chainID,
main: ethclient.NewClient(main),
fallback: nil,
mainLimiter: mainLimiter,
fallbackLimiter: nil,
mainRPC: main,
fallbackRPC: nil,
IsConnected: true,
LastCheckedAt: time.Now().Unix(),
}
}
func NewClient(main, fallback *rpc.Client, chainID uint64) *ClientWithFallback {
func NewClient(mainLimiter *RPCLimiter, main *rpc.Client, fallbackLimiter *RPCLimiter, fallback *rpc.Client, chainID uint64) *ClientWithFallback {
hystrix.ConfigureCommand(fmt.Sprintf("ethClient_%d", chainID), hystrix.CommandConfig{
Timeout: 20000,
MaxConcurrentRequests: 100,
@ -131,13 +130,15 @@ func NewClient(main, fallback *rpc.Client, chainID uint64) *ClientWithFallback {
fallbackEthClient = ethclient.NewClient(fallback)
}
return &ClientWithFallback{
ChainID: chainID,
main: ethclient.NewClient(main),
fallback: fallbackEthClient,
mainRPC: main,
fallbackRPC: fallback,
IsConnected: true,
LastCheckedAt: time.Now().Unix(),
ChainID: chainID,
main: ethclient.NewClient(main),
fallback: fallbackEthClient,
mainLimiter: mainLimiter,
fallbackLimiter: fallbackLimiter,
mainRPC: main,
fallbackRPC: fallback,
IsConnected: true,
LastCheckedAt: time.Now().Unix(),
}
}
@ -160,6 +161,10 @@ func isVMError(err error) bool {
return false
}
func isRPSLimitError(err error) bool {
return strings.Contains(err.Error(), "backoff_seconds")
}
func (c *ClientWithFallback) SetIsConnected(value bool) {
c.IsConnectedLock.Lock()
defer c.IsConnectedLock.Unlock()
@ -188,12 +193,20 @@ func (c *ClientWithFallback) GetIsConnected() bool {
return c.IsConnected
}
func (c *ClientWithFallback) makeCallNoReturn(main func() error, fallback func() error) error {
func (c *ClientWithFallback) makeCallNoReturn(ctx context.Context, main func() error, fallback func() error) error {
resultChan := make(chan CommandResult, 1)
c.LastCheckedAt = time.Now().Unix()
errChan := hystrix.Go(fmt.Sprintf("ethClient_%d", c.ChainID), func() error {
err := main()
err := c.mainLimiter.WaitForRequestsAvailability(1)
if err != nil {
return err
}
err = main()
if err != nil {
if isRPSLimitError(err) {
c.mainLimiter.ReduceLimit()
}
if isVMError(err) {
resultChan <- CommandResult{vmError: err}
return nil
@ -209,8 +222,16 @@ func (c *ClientWithFallback) makeCallNoReturn(main func() error, fallback func()
return err
}
err = c.fallbackLimiter.WaitForRequestsAvailability(1)
if err != nil {
return err
}
err = fallback()
if err != nil {
if isRPSLimitError(err) {
c.fallbackLimiter.ReduceLimit()
}
if isVMError(err) {
resultChan <- CommandResult{vmError: err}
return nil
@ -233,11 +254,19 @@ func (c *ClientWithFallback) makeCallNoReturn(main func() error, fallback func()
}
}
func (c *ClientWithFallback) makeCallSingleReturn(main func() (any, error), fallback func() (any, error), toggleIsConnected bool) (any, error) {
func (c *ClientWithFallback) makeCallSingleReturn(ctx context.Context, main func() (any, error), fallback func() (any, error), toggleIsConnected bool) (any, error) {
resultChan := make(chan CommandResult, 1)
errChan := hystrix.Go(fmt.Sprintf("ethClient_%d", c.ChainID), func() error {
err := c.mainLimiter.WaitForRequestsAvailability(1)
if err != nil {
return err
}
res, err := main()
if err != nil {
if isRPSLimitError(err) {
c.mainLimiter.ReduceLimit()
}
if isVMError(err) {
resultChan <- CommandResult{vmError: err}
return nil
@ -257,8 +286,16 @@ func (c *ClientWithFallback) makeCallSingleReturn(main func() (any, error), fall
return err
}
err = c.fallbackLimiter.WaitForRequestsAvailability(1)
if err != nil {
return err
}
res, err := fallback()
if err != nil {
if isRPSLimitError(err) {
c.fallbackLimiter.ReduceLimit()
}
if isVMError(err) {
resultChan <- CommandResult{vmError: err}
return nil
@ -286,12 +323,20 @@ func (c *ClientWithFallback) makeCallSingleReturn(main func() (any, error), fall
}
}
func (c *ClientWithFallback) makeCallDoubleReturn(main func() (any, any, error), fallback func() (any, any, error)) (any, any, error) {
func (c *ClientWithFallback) makeCallDoubleReturn(ctx context.Context, main func() (any, any, error), fallback func() (any, any, error)) (any, any, error) {
resultChan := make(chan CommandResult, 1)
c.LastCheckedAt = time.Now().Unix()
errChan := hystrix.Go(fmt.Sprintf("ethClient_%d", c.ChainID), func() error {
err := c.mainLimiter.WaitForRequestsAvailability(1)
if err != nil {
return err
}
a, b, err := main()
if err != nil {
if isRPSLimitError(err) {
c.mainLimiter.ReduceLimit()
}
if isVMError(err) {
resultChan <- CommandResult{vmError: err}
return nil
@ -307,8 +352,16 @@ func (c *ClientWithFallback) makeCallDoubleReturn(main func() (any, any, error),
return err
}
err = c.fallbackLimiter.WaitForRequestsAvailability(1)
if err != nil {
return err
}
a, b, err := fallback()
if err != nil {
if isRPSLimitError(err) {
c.fallbackLimiter.ReduceLimit()
}
if isVMError(err) {
resultChan <- CommandResult{vmError: err}
return nil
@ -336,6 +389,7 @@ func (c *ClientWithFallback) BlockByHash(ctx context.Context, hash common.Hash)
rpcstats.CountCall("eth_BlockByHash")
block, err := c.makeCallSingleReturn(
ctx,
func() (any, error) { return c.main.BlockByHash(ctx, hash) },
func() (any, error) { return c.fallback.BlockByHash(ctx, hash) },
true,
@ -351,6 +405,7 @@ func (c *ClientWithFallback) BlockByHash(ctx context.Context, hash common.Hash)
func (c *ClientWithFallback) BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error) {
rpcstats.CountCall("eth_BlockByNumber")
block, err := c.makeCallSingleReturn(
ctx,
func() (any, error) { return c.main.BlockByNumber(ctx, number) },
func() (any, error) { return c.fallback.BlockByNumber(ctx, number) },
true,
@ -367,6 +422,7 @@ func (c *ClientWithFallback) BlockNumber(ctx context.Context) (uint64, error) {
rpcstats.CountCall("eth_BlockNumber")
number, err := c.makeCallSingleReturn(
ctx,
func() (any, error) { return c.main.BlockNumber(ctx) },
func() (any, error) { return c.fallback.BlockNumber(ctx) },
true,
@ -383,6 +439,7 @@ func (c *ClientWithFallback) PeerCount(ctx context.Context) (uint64, error) {
rpcstats.CountCall("eth_PeerCount")
peerCount, err := c.makeCallSingleReturn(
ctx,
func() (any, error) { return c.main.PeerCount(ctx) },
func() (any, error) { return c.fallback.PeerCount(ctx) },
true,
@ -398,6 +455,7 @@ func (c *ClientWithFallback) PeerCount(ctx context.Context) (uint64, error) {
func (c *ClientWithFallback) HeaderByHash(ctx context.Context, hash common.Hash) (*types.Header, error) {
rpcstats.CountCall("eth_HeaderByHash")
header, err := c.makeCallSingleReturn(
ctx,
func() (any, error) { return c.main.HeaderByHash(ctx, hash) },
func() (any, error) { return c.fallback.HeaderByHash(ctx, hash) },
false,
@ -413,6 +471,7 @@ func (c *ClientWithFallback) HeaderByHash(ctx context.Context, hash common.Hash)
func (c *ClientWithFallback) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) {
rpcstats.CountCall("eth_HeaderByNumber")
header, err := c.makeCallSingleReturn(
ctx,
func() (any, error) { return c.main.HeaderByNumber(ctx, number) },
func() (any, error) { return c.fallback.HeaderByNumber(ctx, number) },
false,
@ -429,6 +488,7 @@ func (c *ClientWithFallback) TransactionByHash(ctx context.Context, hash common.
rpcstats.CountCall("eth_TransactionByHash")
tx, isPending, err := c.makeCallDoubleReturn(
ctx,
func() (any, any, error) { return c.main.TransactionByHash(ctx, hash) },
func() (any, any, error) { return c.fallback.TransactionByHash(ctx, hash) },
)
@ -444,6 +504,7 @@ func (c *ClientWithFallback) TransactionSender(ctx context.Context, tx *types.Tr
rpcstats.CountCall("eth_TransactionSender")
address, err := c.makeCallSingleReturn(
ctx,
func() (any, error) { return c.main.TransactionSender(ctx, tx, block, index) },
func() (any, error) { return c.fallback.TransactionSender(ctx, tx, block, index) },
true,
@ -456,6 +517,7 @@ func (c *ClientWithFallback) TransactionCount(ctx context.Context, blockHash com
rpcstats.CountCall("eth_TransactionCount")
count, err := c.makeCallSingleReturn(
ctx,
func() (any, error) { return c.main.TransactionCount(ctx, blockHash) },
func() (any, error) { return c.fallback.TransactionCount(ctx, blockHash) },
true,
@ -472,6 +534,7 @@ func (c *ClientWithFallback) TransactionInBlock(ctx context.Context, blockHash c
rpcstats.CountCall("eth_TransactionInBlock")
transactions, err := c.makeCallSingleReturn(
ctx,
func() (any, error) { return c.main.TransactionInBlock(ctx, blockHash, index) },
func() (any, error) { return c.fallback.TransactionInBlock(ctx, blockHash, index) },
true,
@ -488,6 +551,7 @@ func (c *ClientWithFallback) TransactionReceipt(ctx context.Context, txHash comm
rpcstats.CountCall("eth_TransactionReceipt")
receipt, err := c.makeCallSingleReturn(
ctx,
func() (any, error) { return c.main.TransactionReceipt(ctx, txHash) },
func() (any, error) { return c.fallback.TransactionReceipt(ctx, txHash) },
true,
@ -504,6 +568,7 @@ func (c *ClientWithFallback) SyncProgress(ctx context.Context) (*ethereum.SyncPr
rpcstats.CountCall("eth_SyncProgress")
progress, err := c.makeCallSingleReturn(
ctx,
func() (any, error) { return c.main.SyncProgress(ctx) },
func() (any, error) { return c.fallback.SyncProgress(ctx) },
true,
@ -520,6 +585,7 @@ func (c *ClientWithFallback) SubscribeNewHead(ctx context.Context, ch chan<- *ty
rpcstats.CountCall("eth_SubscribeNewHead")
sub, err := c.makeCallSingleReturn(
ctx,
func() (any, error) { return c.main.SubscribeNewHead(ctx, ch) },
func() (any, error) { return c.fallback.SubscribeNewHead(ctx, ch) },
true,
@ -540,6 +606,7 @@ func (c *ClientWithFallback) BalanceAt(ctx context.Context, account common.Addre
rpcstats.CountCall("eth_BalanceAt")
balance, err := c.makeCallSingleReturn(
ctx,
func() (any, error) { return c.main.BalanceAt(ctx, account, blockNumber) },
func() (any, error) { return c.fallback.BalanceAt(ctx, account, blockNumber) },
true,
@ -556,6 +623,7 @@ func (c *ClientWithFallback) StorageAt(ctx context.Context, account common.Addre
rpcstats.CountCall("eth_StorageAt")
storage, err := c.makeCallSingleReturn(
ctx,
func() (any, error) { return c.main.StorageAt(ctx, account, key, blockNumber) },
func() (any, error) { return c.fallback.StorageAt(ctx, account, key, blockNumber) },
true,
@ -572,6 +640,7 @@ func (c *ClientWithFallback) CodeAt(ctx context.Context, account common.Address,
rpcstats.CountCall("eth_CodeAt")
code, err := c.makeCallSingleReturn(
ctx,
func() (any, error) { return c.main.CodeAt(ctx, account, blockNumber) },
func() (any, error) { return c.fallback.CodeAt(ctx, account, blockNumber) },
true,
@ -588,6 +657,7 @@ func (c *ClientWithFallback) NonceAt(ctx context.Context, account common.Address
rpcstats.CountCall("eth_NonceAt")
nonce, err := c.makeCallSingleReturn(
ctx,
func() (any, error) { return c.main.NonceAt(ctx, account, blockNumber) },
func() (any, error) { return c.fallback.NonceAt(ctx, account, blockNumber) },
true,
@ -604,6 +674,7 @@ func (c *ClientWithFallback) FilterLogs(ctx context.Context, q ethereum.FilterQu
rpcstats.CountCall("eth_FilterLogs")
logs, err := c.makeCallSingleReturn(
ctx,
func() (any, error) { return c.main.FilterLogs(ctx, q) },
func() (any, error) { return c.fallback.FilterLogs(ctx, q) },
true,
@ -620,6 +691,7 @@ func (c *ClientWithFallback) SubscribeFilterLogs(ctx context.Context, q ethereum
rpcstats.CountCall("eth_SubscribeFilterLogs")
sub, err := c.makeCallSingleReturn(
ctx,
func() (any, error) { return c.main.SubscribeFilterLogs(ctx, q, ch) },
func() (any, error) { return c.fallback.SubscribeFilterLogs(ctx, q, ch) },
true,
@ -636,6 +708,7 @@ func (c *ClientWithFallback) PendingBalanceAt(ctx context.Context, account commo
rpcstats.CountCall("eth_PendingBalanceAt")
balance, err := c.makeCallSingleReturn(
ctx,
func() (any, error) { return c.main.PendingBalanceAt(ctx, account) },
func() (any, error) { return c.fallback.PendingBalanceAt(ctx, account) },
true,
@ -652,6 +725,7 @@ func (c *ClientWithFallback) PendingStorageAt(ctx context.Context, account commo
rpcstats.CountCall("eth_PendingStorageAt")
storage, err := c.makeCallSingleReturn(
ctx,
func() (any, error) { return c.main.PendingStorageAt(ctx, account, key) },
func() (any, error) { return c.fallback.PendingStorageAt(ctx, account, key) },
true,
@ -668,6 +742,7 @@ func (c *ClientWithFallback) PendingCodeAt(ctx context.Context, account common.A
rpcstats.CountCall("eth_PendingCodeAt")
code, err := c.makeCallSingleReturn(
ctx,
func() (any, error) { return c.main.PendingCodeAt(ctx, account) },
func() (any, error) { return c.fallback.PendingCodeAt(ctx, account) },
true,
@ -684,6 +759,7 @@ func (c *ClientWithFallback) PendingNonceAt(ctx context.Context, account common.
rpcstats.CountCall("eth_PendingNonceAt")
nonce, err := c.makeCallSingleReturn(
ctx,
func() (any, error) { return c.main.PendingNonceAt(ctx, account) },
func() (any, error) { return c.fallback.PendingNonceAt(ctx, account) },
true,
@ -700,6 +776,7 @@ func (c *ClientWithFallback) PendingTransactionCount(ctx context.Context) (uint,
rpcstats.CountCall("eth_PendingTransactionCount")
count, err := c.makeCallSingleReturn(
ctx,
func() (any, error) { return c.main.PendingTransactionCount(ctx) },
func() (any, error) { return c.fallback.PendingTransactionCount(ctx) },
true,
@ -716,6 +793,7 @@ func (c *ClientWithFallback) CallContract(ctx context.Context, msg ethereum.Call
rpcstats.CountCall("eth_CallContract_" + msg.To.String())
data, err := c.makeCallSingleReturn(
ctx,
func() (any, error) { return c.main.CallContract(ctx, msg, blockNumber) },
func() (any, error) { return c.fallback.CallContract(ctx, msg, blockNumber) },
true,
@ -732,6 +810,7 @@ func (c *ClientWithFallback) CallContractAtHash(ctx context.Context, msg ethereu
rpcstats.CountCall("eth_CallContractAtHash")
data, err := c.makeCallSingleReturn(
ctx,
func() (any, error) { return c.main.CallContractAtHash(ctx, msg, blockHash) },
func() (any, error) { return c.fallback.CallContractAtHash(ctx, msg, blockHash) },
true,
@ -748,6 +827,7 @@ func (c *ClientWithFallback) PendingCallContract(ctx context.Context, msg ethere
rpcstats.CountCall("eth_PendingCallContract")
data, err := c.makeCallSingleReturn(
ctx,
func() (any, error) { return c.main.PendingCallContract(ctx, msg) },
func() (any, error) { return c.fallback.PendingCallContract(ctx, msg) },
true,
@ -764,6 +844,7 @@ func (c *ClientWithFallback) SuggestGasPrice(ctx context.Context) (*big.Int, err
rpcstats.CountCall("eth_SuggestGasPrice")
gasPrice, err := c.makeCallSingleReturn(
ctx,
func() (any, error) { return c.main.SuggestGasPrice(ctx) },
func() (any, error) { return c.fallback.SuggestGasPrice(ctx) },
true,
@ -780,6 +861,7 @@ func (c *ClientWithFallback) SuggestGasTipCap(ctx context.Context) (*big.Int, er
rpcstats.CountCall("eth_SuggestGasTipCap")
tip, err := c.makeCallSingleReturn(
ctx,
func() (any, error) { return c.main.SuggestGasTipCap(ctx) },
func() (any, error) { return c.fallback.SuggestGasTipCap(ctx) },
true,
@ -796,6 +878,7 @@ func (c *ClientWithFallback) FeeHistory(ctx context.Context, blockCount uint64,
rpcstats.CountCall("eth_FeeHistory")
feeHistory, err := c.makeCallSingleReturn(
ctx,
func() (any, error) { return c.main.FeeHistory(ctx, blockCount, lastBlock, rewardPercentiles) },
func() (any, error) { return c.fallback.FeeHistory(ctx, blockCount, lastBlock, rewardPercentiles) },
true,
@ -812,6 +895,7 @@ func (c *ClientWithFallback) EstimateGas(ctx context.Context, msg ethereum.CallM
rpcstats.CountCall("eth_EstimateGas")
estimate, err := c.makeCallSingleReturn(
ctx,
func() (any, error) { return c.main.EstimateGas(ctx, msg) },
func() (any, error) { return c.fallback.EstimateGas(ctx, msg) },
true,
@ -828,6 +912,7 @@ func (c *ClientWithFallback) SendTransaction(ctx context.Context, tx *types.Tran
rpcstats.CountCall("eth_SendTransaction")
return c.makeCallNoReturn(
ctx,
func() error { return c.main.SendTransaction(ctx, tx) },
func() error { return c.fallback.SendTransaction(ctx, tx) },
)
@ -837,6 +922,7 @@ func (c *ClientWithFallback) CallContext(ctx context.Context, result interface{}
rpcstats.CountCall("eth_CallContext")
return c.makeCallNoReturn(
ctx,
func() error { return c.mainRPC.CallContext(ctx, result, method, args...) },
func() error { return c.fallbackRPC.CallContext(ctx, result, method, args...) },
)
@ -846,6 +932,7 @@ func (c *ClientWithFallback) BatchCallContext(ctx context.Context, b []rpc.Batch
rpcstats.CountCall("eth_BatchCallContext")
return c.makeCallNoReturn(
ctx,
func() error { return c.mainRPC.BatchCallContext(ctx, b) },
func() error { return c.fallbackRPC.BatchCallContext(ctx, b) },
)
@ -855,10 +942,11 @@ func (c *ClientWithFallback) ToBigInt() *big.Int {
return big.NewInt(int64(c.ChainID))
}
func (c *ClientWithFallback) GetBaseFeeFromBlock(blockNumber *big.Int) (string, error) {
func (c *ClientWithFallback) GetBaseFeeFromBlock(ctx context.Context, blockNumber *big.Int) (string, error) {
rpcstats.CountCall("eth_GetBaseFeeFromBlock")
var feeHistory FeeHistory
err := c.mainRPC.Call(&feeHistory, "eth_feeHistory", "0x1", (*hexutil.Big)(blockNumber), nil)
feeHistory, err := c.FeeHistory(ctx, 1, blockNumber, nil)
if err != nil {
if err.Error() == "the method eth_feeHistory does not exist/is not available" {
return "", nil
@ -867,8 +955,8 @@ func (c *ClientWithFallback) GetBaseFeeFromBlock(blockNumber *big.Int) (string,
}
var baseGasFee string = ""
if len(feeHistory.BaseFeePerGas) > 0 {
baseGasFee = feeHistory.BaseFeePerGas[0]
if len(feeHistory.BaseFee) > 0 {
baseGasFee = feeHistory.BaseFee[0].String()
}
return baseGasFee, err
@ -881,6 +969,7 @@ func (c *ClientWithFallback) CallBlockHashByTransaction(ctx context.Context, blo
rpcstats.CountCall("eth_FullTransactionByBlockNumberAndIndex")
tx, err := c.makeCallSingleReturn(
ctx,
func() (any, error) {
return callBlockHashByTransaction(ctx, c.mainRPC, blockNumber, index)
},

162
rpc/chain/rpc_limiter.go Normal file
View File

@ -0,0 +1,162 @@
package chain
import (
"fmt"
"sync"
"time"
"github.com/google/uuid"
)
const (
defaultMaxRequestsPerSecond = 100
minRequestsPerSecond = 20
requestsPerSecondStep = 10
tickerInterval = 1 * time.Second
)
var (
ErrRequestsOverLimit = fmt.Errorf("number of requests over limit")
)
type callerOnWait struct {
requests int
ch chan bool
}
type RPCLimiter struct {
uuid uuid.UUID
maxRequestsPerSecond int
maxRequestsPerSecondMutex sync.RWMutex
requestsMadeWithinSecond int
requestsMadeWithinSecondMutex sync.RWMutex
callersOnWaitForRequests []callerOnWait
callersOnWaitForRequestsMutex sync.RWMutex
quit chan bool
}
func NewRPCLimiter() *RPCLimiter {
limiter := RPCLimiter{
uuid: uuid.New(),
maxRequestsPerSecond: defaultMaxRequestsPerSecond,
quit: make(chan bool),
}
limiter.start()
return &limiter
}
func (rl *RPCLimiter) ReduceLimit() {
rl.maxRequestsPerSecondMutex.Lock()
defer rl.maxRequestsPerSecondMutex.Unlock()
if rl.maxRequestsPerSecond <= minRequestsPerSecond {
return
}
rl.maxRequestsPerSecond = rl.maxRequestsPerSecond - requestsPerSecondStep
}
func (rl *RPCLimiter) start() {
ticker := time.NewTicker(tickerInterval)
go func() {
for {
select {
case <-ticker.C:
{
rl.requestsMadeWithinSecondMutex.Lock()
oldrequestsMadeWithinSecond := rl.requestsMadeWithinSecond
if rl.requestsMadeWithinSecond != 0 {
rl.requestsMadeWithinSecond = 0
}
rl.requestsMadeWithinSecondMutex.Unlock()
if oldrequestsMadeWithinSecond == 0 {
continue
}
}
rl.callersOnWaitForRequestsMutex.Lock()
numOfRequestsToMakeAvailable := rl.maxRequestsPerSecond
for {
if numOfRequestsToMakeAvailable == 0 || len(rl.callersOnWaitForRequests) == 0 {
break
}
var index = -1
for i := 0; i < len(rl.callersOnWaitForRequests); i++ {
if rl.callersOnWaitForRequests[i].requests <= numOfRequestsToMakeAvailable {
index = i
break
}
}
if index == -1 {
break
}
callerOnWait := rl.callersOnWaitForRequests[index]
numOfRequestsToMakeAvailable -= callerOnWait.requests
rl.callersOnWaitForRequests = append(rl.callersOnWaitForRequests[:index], rl.callersOnWaitForRequests[index+1:]...)
callerOnWait.ch <- true
}
rl.callersOnWaitForRequestsMutex.Unlock()
case <-rl.quit:
ticker.Stop()
return
}
}
}()
}
func (rl *RPCLimiter) Stop() {
rl.quit <- true
close(rl.quit)
for _, callerOnWait := range rl.callersOnWaitForRequests {
close(callerOnWait.ch)
}
rl.callersOnWaitForRequests = nil
}
func (rl *RPCLimiter) WaitForRequestsAvailability(requests int) error {
if requests > rl.maxRequestsPerSecond {
return ErrRequestsOverLimit
}
{
rl.requestsMadeWithinSecondMutex.Lock()
if rl.requestsMadeWithinSecond+requests <= rl.maxRequestsPerSecond {
rl.requestsMadeWithinSecond += requests
rl.requestsMadeWithinSecondMutex.Unlock()
return nil
}
rl.requestsMadeWithinSecondMutex.Unlock()
}
callerOnWait := callerOnWait{
requests: requests,
ch: make(chan bool),
}
{
rl.callersOnWaitForRequestsMutex.Lock()
rl.callersOnWaitForRequests = append(rl.callersOnWaitForRequests, callerOnWait)
rl.callersOnWaitForRequestsMutex.Unlock()
}
<-callerOnWait.ch
close(callerOnWait.ch)
rl.requestsMadeWithinSecondMutex.Lock()
rl.requestsMadeWithinSecond += requests
rl.requestsMadeWithinSecondMutex.Unlock()
return nil
}

View File

@ -6,7 +6,9 @@ import (
"encoding/json"
"errors"
"fmt"
"net/url"
"reflect"
"strings"
"sync"
"time"
@ -47,10 +49,12 @@ type Client struct {
upstreamURL string
UpstreamChainID uint64
local *gethrpc.Client
upstream chain.ClientInterface
rpcClientsMutex sync.RWMutex
rpcClients map[uint64]chain.ClientInterface
local *gethrpc.Client
upstream chain.ClientInterface
rpcClientsMutex sync.RWMutex
rpcClients map[uint64]chain.ClientInterface
rpcLimiterMutex sync.RWMutex
limiterPerProvider map[string]*chain.RPCLimiter
router *router
NetworkManager *network.Manager
@ -81,11 +85,12 @@ func NewClient(client *gethrpc.Client, upstreamChainID uint64, upstream params.U
}
c := Client{
local: client,
NetworkManager: networkManager,
handlers: make(map[string]Handler),
rpcClients: make(map[uint64]chain.ClientInterface),
log: log,
local: client,
NetworkManager: networkManager,
handlers: make(map[string]Handler),
rpcClients: make(map[uint64]chain.ClientInterface),
limiterPerProvider: make(map[string]*chain.RPCLimiter),
log: log,
}
if upstream.Enabled {
@ -96,7 +101,11 @@ func NewClient(client *gethrpc.Client, upstreamChainID uint64, upstream params.U
if err != nil {
return nil, fmt.Errorf("dial upstream server: %s", err)
}
c.upstream = chain.NewSimpleClient(upstreamClient, upstreamChainID)
limiter, err := c.getRPCLimiter(c.upstreamURL)
if err != nil {
return nil, fmt.Errorf("get RPC limiter: %s", err)
}
c.upstream = chain.NewSimpleClient(limiter, upstreamClient, upstreamChainID)
}
c.router = newRouter(c.upstreamEnabled)
@ -112,6 +121,33 @@ func (c *Client) SetWalletNotifier(notifier func(chainID uint64, message string)
c.walletNotifier = notifier
}
func extractLastParamFromURL(inputURL string) (string, error) {
parsedURL, err := url.Parse(inputURL)
if err != nil {
return "", err
}
pathSegments := strings.Split(parsedURL.Path, "/")
lastSegment := pathSegments[len(pathSegments)-1]
return lastSegment, nil
}
func (c *Client) getRPCLimiter(URL string) (*chain.RPCLimiter, error) {
apiKey, err := extractLastParamFromURL(URL)
if err != nil {
return nil, err
}
c.rpcLimiterMutex.Lock()
defer c.rpcLimiterMutex.Unlock()
if limiter, ok := c.limiterPerProvider[apiKey]; ok {
return limiter, nil
}
limiter := chain.NewRPCLimiter()
c.limiterPerProvider[apiKey] = limiter
return limiter, nil
}
func (c *Client) getClientUsingCache(chainID uint64) (chain.ClientInterface, error) {
c.rpcClientsMutex.Lock()
defer c.rpcClientsMutex.Unlock()
@ -135,15 +171,28 @@ func (c *Client) getClientUsingCache(chainID uint64) (chain.ClientInterface, err
return nil, fmt.Errorf("dial upstream server: %s", err)
}
var rpcFallbackClient *gethrpc.Client
rpcLimiter, err := c.getRPCLimiter(network.RPCURL)
if err != nil {
return nil, fmt.Errorf("get RPC limiter: %s", err)
}
var (
rpcFallbackClient *gethrpc.Client
rpcFallbackLimiter *chain.RPCLimiter
)
if len(network.FallbackURL) > 0 {
rpcFallbackClient, err = gethrpc.Dial(network.FallbackURL)
if err != nil {
return nil, fmt.Errorf("dial upstream server: %s", err)
}
rpcFallbackLimiter, err = c.getRPCLimiter(network.FallbackURL)
if err != nil {
return nil, fmt.Errorf("get RPC fallback limiter: %s", err)
}
}
client := chain.NewClient(rpcClient, rpcFallbackClient, chainID)
client := chain.NewClient(rpcLimiter, rpcClient, rpcFallbackLimiter, rpcFallbackClient, chainID)
client.WalletNotifier = c.walletNotifier
c.rpcClients[chainID] = client
return client, nil
@ -199,8 +248,12 @@ func (c *Client) UpdateUpstreamURL(url string) error {
if err != nil {
return err
}
rpcLimiter, err := c.getRPCLimiter(url)
if err != nil {
return err
}
c.Lock()
c.upstream = chain.NewSimpleClient(rpcClient, c.UpstreamChainID)
c.upstream = chain.NewSimpleClient(rpcLimiter, rpcClient, c.UpstreamChainID)
c.upstreamURL = url
c.Unlock()

View File

@ -288,7 +288,7 @@ func (tc *TestClient) CallBlockHashByTransaction(ctx context.Context, blockNumbe
return common.BigToHash(blockNumber), nil
}
func (tc *TestClient) GetBaseFeeFromBlock(blockNumber *big.Int) (string, error) {
func (tc *TestClient) GetBaseFeeFromBlock(ctx context.Context, blockNumber *big.Int) (string, error) {
tc.incCounter("GetBaseFeeFromBlock")
if tc.traceAPICalls {
tc.t.Log("GetBaseFeeFromBlock")

View File

@ -109,7 +109,7 @@ func getTransferByHash(ctx context.Context, client chain.ClientInterface, signer
return nil, err
}
baseGasFee, err := client.GetBaseFeeFromBlock(big.NewInt(int64(transactionLog.BlockNumber)))
baseGasFee, err := client.GetBaseFeeFromBlock(ctx, big.NewInt(int64(transactionLog.BlockNumber)))
if err != nil {
return nil, err
}