feat(wallet)_: Implemented RPC limiter interface

Split ClientInterface to aggregation of multiple interfaces
Added tags to RPC stats API
Use tagged RPC client for transfers commands
Implemented general interface for RPC limiting
This commit is contained in:
Ivan Belyakov 2024-05-20 13:21:21 +02:00 committed by IvanBelyakoff
parent ad9032d036
commit 4a1f751ced
8 changed files with 219 additions and 81 deletions

View File

@ -19,13 +19,14 @@ import (
"github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
"github.com/status-im/status-go/services/rpcstats" "github.com/status-im/status-go/services/rpcstats"
"github.com/status-im/status-go/services/wallet/connection"
) )
type BatchCallClient interface { type BatchCallClient interface {
BatchCallContext(ctx context.Context, b []rpc.BatchElem) error BatchCallContext(ctx context.Context, b []rpc.BatchElem) error
} }
type ClientInterface interface { type ChainInterface interface {
BatchCallContext(ctx context.Context, b []rpc.BatchElem) error BatchCallContext(ctx context.Context, b []rpc.BatchElem) error
HeaderByHash(ctx context.Context, hash common.Hash) (*types.Header, error) HeaderByHash(ctx context.Context, hash common.Hash) (*types.Header, error)
BlockByHash(ctx context.Context, hash common.Hash) (*types.Block, error) BlockByHash(ctx context.Context, hash common.Hash) (*types.Block, error)
@ -41,35 +42,61 @@ type ClientInterface interface {
CodeAt(ctx context.Context, contract common.Address, blockNumber *big.Int) ([]byte, error) CodeAt(ctx context.Context, contract common.Address, blockNumber *big.Int) ([]byte, error)
CallContract(ctx context.Context, call ethereum.CallMsg, blockNumber *big.Int) ([]byte, error) CallContract(ctx context.Context, call ethereum.CallMsg, blockNumber *big.Int) ([]byte, error)
CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error
GetWalletNotifier() func(chainId uint64, message string)
SetWalletNotifier(notifier func(chainId uint64, message string))
TransactionByHash(ctx context.Context, hash common.Hash) (*types.Transaction, bool, error) TransactionByHash(ctx context.Context, hash common.Hash) (*types.Transaction, bool, error)
TransactionReceipt(ctx context.Context, txHash common.Hash) (*types.Receipt, error) TransactionReceipt(ctx context.Context, txHash common.Hash) (*types.Receipt, error)
BlockNumber(ctx context.Context) (uint64, error) BlockNumber(ctx context.Context) (uint64, error)
SetIsConnected(value bool) }
GetIsConnected() bool
type ClientInterface interface {
ChainInterface
GetWalletNotifier() func(chainId uint64, message string)
SetWalletNotifier(notifier func(chainId uint64, message string))
connection.Connectable
bind.ContractCaller bind.ContractCaller
bind.ContractTransactor bind.ContractTransactor
bind.ContractFilterer bind.ContractFilterer
} }
type Tagger interface {
Tag() string
SetTag(tag string)
DeepCopyTag() Tagger
}
func DeepCopyTagger(t Tagger) Tagger {
return t.DeepCopyTag()
}
// Shallow copy of the client with a deep copy of tag
func ClientWithTag(chainClient ClientInterface, tag string) ClientInterface {
newClient := chainClient
if tagIface, ok := chainClient.(Tagger); ok {
tagIface = DeepCopyTagger(tagIface)
tagIface.SetTag(tag)
newClient = tagIface.(ClientInterface)
}
return newClient
}
type ClientWithFallback struct { type ClientWithFallback struct {
ChainID uint64 ChainID uint64
main *ethclient.Client main *ethclient.Client
fallback *ethclient.Client fallback *ethclient.Client
mainLimiter *RPCLimiter mainLimiter *RPCRpsLimiter
fallbackLimiter *RPCLimiter fallbackLimiter *RPCRpsLimiter
mainRPC *rpc.Client mainRPC *rpc.Client
fallbackRPC *rpc.Client fallbackRPC *rpc.Client
WalletNotifier func(chainId uint64, message string) WalletNotifier func(chainId uint64, message string)
IsConnected bool isConnected bool
IsConnectedLock sync.RWMutex isConnectedLock sync.RWMutex
LastCheckedAt int64 LastCheckedAt int64
circuitBreakerCmdName string circuitBreakerCmdName string
tag string
} }
// Don't mark connection as failed if we get one of these errors // Don't mark connection as failed if we get one of these errors
@ -98,7 +125,7 @@ type CommandResult struct {
err error err error
} }
func NewSimpleClient(mainLimiter *RPCLimiter, main *rpc.Client, chainID uint64) *ClientWithFallback { func NewSimpleClient(mainLimiter *RPCRpsLimiter, main *rpc.Client, chainID uint64) *ClientWithFallback {
circuitBreakerCmdName := fmt.Sprintf("ethClient_%d", chainID) circuitBreakerCmdName := fmt.Sprintf("ethClient_%d", chainID)
hystrix.ConfigureCommand(circuitBreakerCmdName, hystrix.CommandConfig{ hystrix.ConfigureCommand(circuitBreakerCmdName, hystrix.CommandConfig{
Timeout: 10000, Timeout: 10000,
@ -115,13 +142,13 @@ func NewSimpleClient(mainLimiter *RPCLimiter, main *rpc.Client, chainID uint64)
fallbackLimiter: nil, fallbackLimiter: nil,
mainRPC: main, mainRPC: main,
fallbackRPC: nil, fallbackRPC: nil,
IsConnected: true, isConnected: true,
LastCheckedAt: time.Now().Unix(), LastCheckedAt: time.Now().Unix(),
circuitBreakerCmdName: circuitBreakerCmdName, circuitBreakerCmdName: circuitBreakerCmdName,
} }
} }
func NewClient(mainLimiter *RPCLimiter, main *rpc.Client, fallbackLimiter *RPCLimiter, fallback *rpc.Client, chainID uint64) *ClientWithFallback { func NewClient(mainLimiter *RPCRpsLimiter, main *rpc.Client, fallbackLimiter *RPCRpsLimiter, fallback *rpc.Client, chainID uint64) *ClientWithFallback {
circuitBreakerCmdName := fmt.Sprintf("ethClient_%d", chainID) circuitBreakerCmdName := fmt.Sprintf("ethClient_%d", chainID)
hystrix.ConfigureCommand(circuitBreakerCmdName, hystrix.CommandConfig{ hystrix.ConfigureCommand(circuitBreakerCmdName, hystrix.CommandConfig{
Timeout: 20000, Timeout: 20000,
@ -142,7 +169,7 @@ func NewClient(mainLimiter *RPCLimiter, main *rpc.Client, fallbackLimiter *RPCLi
fallbackLimiter: fallbackLimiter, fallbackLimiter: fallbackLimiter,
mainRPC: main, mainRPC: main,
fallbackRPC: fallback, fallbackRPC: fallback,
IsConnected: true, isConnected: true,
LastCheckedAt: time.Now().Unix(), LastCheckedAt: time.Now().Unix(),
circuitBreakerCmdName: circuitBreakerCmdName, circuitBreakerCmdName: circuitBreakerCmdName,
} }
@ -178,20 +205,20 @@ func isRPSLimitError(err error) bool {
} }
func (c *ClientWithFallback) SetIsConnected(value bool) { func (c *ClientWithFallback) SetIsConnected(value bool) {
c.IsConnectedLock.Lock() c.isConnectedLock.Lock()
defer c.IsConnectedLock.Unlock() defer c.isConnectedLock.Unlock()
c.LastCheckedAt = time.Now().Unix() c.LastCheckedAt = time.Now().Unix()
if !value { if !value {
if c.IsConnected { if c.isConnected {
if c.WalletNotifier != nil { if c.WalletNotifier != nil {
c.WalletNotifier(c.ChainID, "down") c.WalletNotifier(c.ChainID, "down")
} }
c.IsConnected = false c.isConnected = false
} }
} else { } else {
if !c.IsConnected { if !c.isConnected {
c.IsConnected = true c.isConnected = true
if c.WalletNotifier != nil { if c.WalletNotifier != nil {
c.WalletNotifier(c.ChainID, "up") c.WalletNotifier(c.ChainID, "up")
} }
@ -199,10 +226,10 @@ func (c *ClientWithFallback) SetIsConnected(value bool) {
} }
} }
func (c *ClientWithFallback) GetIsConnected() bool { func (c *ClientWithFallback) IsConnected() bool {
c.IsConnectedLock.RLock() c.isConnectedLock.RLock()
defer c.IsConnectedLock.RUnlock() defer c.isConnectedLock.RUnlock()
return c.IsConnected return c.isConnected
} }
func (c *ClientWithFallback) makeCall(ctx context.Context, main func() ([]any, error), fallback func() ([]any, error)) ([]any, error) { func (c *ClientWithFallback) makeCall(ctx context.Context, main func() ([]any, error), fallback func() ([]any, error)) ([]any, error) {
@ -292,7 +319,7 @@ func (c *ClientWithFallback) makeCall(ctx context.Context, main func() ([]any, e
} }
func (c *ClientWithFallback) BlockByHash(ctx context.Context, hash common.Hash) (*types.Block, error) { func (c *ClientWithFallback) BlockByHash(ctx context.Context, hash common.Hash) (*types.Block, error) {
rpcstats.CountCall("eth_BlockByHash") rpcstats.CountCallWithTag("eth_BlockByHash", c.tag)
res, err := c.makeCall( res, err := c.makeCall(
ctx, ctx,
@ -310,7 +337,7 @@ func (c *ClientWithFallback) BlockByHash(ctx context.Context, hash common.Hash)
} }
func (c *ClientWithFallback) BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error) { func (c *ClientWithFallback) BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error) {
rpcstats.CountCall("eth_BlockByNumber") rpcstats.CountCallWithTag("eth_BlockByNumber", c.tag)
res, err := c.makeCall( res, err := c.makeCall(
ctx, ctx,
func() ([]any, error) { a, err := c.main.BlockByNumber(ctx, number); return []any{a}, err }, func() ([]any, error) { a, err := c.main.BlockByNumber(ctx, number); return []any{a}, err },
@ -327,7 +354,7 @@ func (c *ClientWithFallback) BlockByNumber(ctx context.Context, number *big.Int)
} }
func (c *ClientWithFallback) BlockNumber(ctx context.Context) (uint64, error) { func (c *ClientWithFallback) BlockNumber(ctx context.Context) (uint64, error) {
rpcstats.CountCall("eth_BlockNumber") rpcstats.CountCallWithTag("eth_BlockNumber", c.tag)
res, err := c.makeCall( res, err := c.makeCall(
ctx, ctx,
@ -345,7 +372,7 @@ func (c *ClientWithFallback) BlockNumber(ctx context.Context) (uint64, error) {
} }
func (c *ClientWithFallback) PeerCount(ctx context.Context) (uint64, error) { func (c *ClientWithFallback) PeerCount(ctx context.Context) (uint64, error) {
rpcstats.CountCall("eth_PeerCount") rpcstats.CountCallWithTag("eth_PeerCount", c.tag)
res, err := c.makeCall( res, err := c.makeCall(
ctx, ctx,
@ -363,7 +390,7 @@ func (c *ClientWithFallback) PeerCount(ctx context.Context) (uint64, error) {
} }
func (c *ClientWithFallback) HeaderByHash(ctx context.Context, hash common.Hash) (*types.Header, error) { func (c *ClientWithFallback) HeaderByHash(ctx context.Context, hash common.Hash) (*types.Header, error) {
rpcstats.CountCall("eth_HeaderByHash") rpcstats.CountCallWithTag("eth_HeaderByHash", c.tag)
res, err := c.makeCall( res, err := c.makeCall(
ctx, ctx,
func() ([]any, error) { a, err := c.main.HeaderByHash(ctx, hash); return []any{a}, err }, func() ([]any, error) { a, err := c.main.HeaderByHash(ctx, hash); return []any{a}, err },
@ -378,7 +405,7 @@ func (c *ClientWithFallback) HeaderByHash(ctx context.Context, hash common.Hash)
} }
func (c *ClientWithFallback) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) { func (c *ClientWithFallback) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) {
rpcstats.CountCall("eth_HeaderByNumber") rpcstats.CountCallWithTag("eth_HeaderByNumber", c.tag)
res, err := c.makeCall( res, err := c.makeCall(
ctx, ctx,
func() ([]any, error) { a, err := c.main.HeaderByNumber(ctx, number); return []any{a}, err }, func() ([]any, error) { a, err := c.main.HeaderByNumber(ctx, number); return []any{a}, err },
@ -393,7 +420,7 @@ func (c *ClientWithFallback) HeaderByNumber(ctx context.Context, number *big.Int
} }
func (c *ClientWithFallback) TransactionByHash(ctx context.Context, hash common.Hash) (*types.Transaction, bool, error) { func (c *ClientWithFallback) TransactionByHash(ctx context.Context, hash common.Hash) (*types.Transaction, bool, error) {
rpcstats.CountCall("eth_TransactionByHash") rpcstats.CountCallWithTag("eth_TransactionByHash", c.tag)
res, err := c.makeCall( res, err := c.makeCall(
ctx, ctx,
@ -526,7 +553,7 @@ func (c *ClientWithFallback) NetworkID() uint64 {
} }
func (c *ClientWithFallback) BalanceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (*big.Int, error) { func (c *ClientWithFallback) BalanceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (*big.Int, error) {
rpcstats.CountCall("eth_BalanceAt") rpcstats.CountCallWithTag("eth_BalanceAt", c.tag)
res, err := c.makeCall( res, err := c.makeCall(
ctx, ctx,
@ -586,7 +613,7 @@ func (c *ClientWithFallback) CodeAt(ctx context.Context, account common.Address,
} }
func (c *ClientWithFallback) NonceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (uint64, error) { func (c *ClientWithFallback) NonceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (uint64, error) {
rpcstats.CountCall("eth_NonceAt") rpcstats.CountCallWithTag("eth_NonceAt", c.tag)
res, err := c.makeCall( res, err := c.makeCall(
ctx, ctx,
@ -604,7 +631,7 @@ func (c *ClientWithFallback) NonceAt(ctx context.Context, account common.Address
} }
func (c *ClientWithFallback) FilterLogs(ctx context.Context, q ethereum.FilterQuery) ([]types.Log, error) { func (c *ClientWithFallback) FilterLogs(ctx context.Context, q ethereum.FilterQuery) ([]types.Log, error) {
rpcstats.CountCall("eth_FilterLogs") rpcstats.CountCallWithTag("eth_FilterLogs", c.tag)
res, err := c.makeCall( res, err := c.makeCall(
ctx, ctx,
@ -925,7 +952,7 @@ func (c *ClientWithFallback) GetBaseFeeFromBlock(ctx context.Context, blockNumbe
// This function preserves the additional data. This is the cheapest way to obtain // This function preserves the additional data. This is the cheapest way to obtain
// the block hash for a given block number. // the block hash for a given block number.
func (c *ClientWithFallback) CallBlockHashByTransaction(ctx context.Context, blockNumber *big.Int, index uint) (common.Hash, error) { func (c *ClientWithFallback) CallBlockHashByTransaction(ctx context.Context, blockNumber *big.Int, index uint) (common.Hash, error) {
rpcstats.CountCall("eth_FullTransactionByBlockNumberAndIndex") rpcstats.CountCallWithTag("eth_FullTransactionByBlockNumberAndIndex", c.tag)
res, err := c.makeCall( res, err := c.makeCall(
ctx, ctx,
@ -965,3 +992,16 @@ func (c *ClientWithFallback) toggleConnectionState(err error) {
} }
c.SetIsConnected(connected) c.SetIsConnected(connected)
} }
func (c *ClientWithFallback) Tag() string {
return c.tag
}
func (c *ClientWithFallback) SetTag(tag string) {
c.tag = tag
}
func (c *ClientWithFallback) DeepCopyTag() Tagger {
copy := *c
return &copy
}

View File

@ -5,6 +5,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/ethereum/go-ethereum/log"
"github.com/google/uuid" "github.com/google/uuid"
) )
@ -25,7 +26,68 @@ type callerOnWait struct {
ch chan bool ch chan bool
} }
type RPCLimiter struct { type RequestsStorage interface {
Get(tag string) (RequestData, error)
Set(data RequestData) error
}
type RequestData struct {
Tag string
CreatedAt time.Time
Period time.Duration
}
type RequestLimiter interface {
SetMaxRequests(tag string, maxRequests int, interval time.Duration)
IsLimitReached(tag string) bool
}
type RPCRequestLimiter struct {
storage RequestsStorage
}
func NewRequestLimiter(storage RequestsStorage) *RPCRequestLimiter {
return &RPCRequestLimiter{
storage: storage,
}
}
func (rl *RPCRequestLimiter) SetMaxRequests(tag string, maxRequests int, interval time.Duration) {
err := rl.saveToStorage(tag, maxRequests, interval)
if err != nil {
log.Error("Failed to save request data to storage", "error", err)
return
}
// Set max requests logic here
}
func (rl *RPCRequestLimiter) saveToStorage(tag string, maxRequests int, interval time.Duration) error {
data := RequestData{
Tag: tag,
CreatedAt: time.Now(),
Period: interval,
}
err := rl.storage.Set(data)
if err != nil {
return err
}
return nil
}
func (rl *RPCRequestLimiter) IsLimitReached(tag string) bool {
data, err := rl.storage.Get(tag)
if err != nil {
log.Error("Failed to get request data from storage", "error", err, "tag", tag)
return false
}
return time.Since(data.CreatedAt) >= data.Period
}
type RPCRpsLimiter struct {
uuid uuid.UUID uuid uuid.UUID
maxRequestsPerSecond int maxRequestsPerSecond int
@ -40,9 +102,9 @@ type RPCLimiter struct {
quit chan bool quit chan bool
} }
func NewRPCLimiter() *RPCLimiter { func NewRPCRpsLimiter() *RPCRpsLimiter {
limiter := RPCLimiter{ limiter := RPCRpsLimiter{
uuid: uuid.New(), uuid: uuid.New(),
maxRequestsPerSecond: defaultMaxRequestsPerSecond, maxRequestsPerSecond: defaultMaxRequestsPerSecond,
quit: make(chan bool), quit: make(chan bool),
@ -53,7 +115,7 @@ func NewRPCLimiter() *RPCLimiter {
return &limiter return &limiter
} }
func (rl *RPCLimiter) ReduceLimit() { func (rl *RPCRpsLimiter) ReduceLimit() {
rl.maxRequestsPerSecondMutex.Lock() rl.maxRequestsPerSecondMutex.Lock()
defer rl.maxRequestsPerSecondMutex.Unlock() defer rl.maxRequestsPerSecondMutex.Unlock()
if rl.maxRequestsPerSecond <= minRequestsPerSecond { if rl.maxRequestsPerSecond <= minRequestsPerSecond {
@ -62,7 +124,7 @@ func (rl *RPCLimiter) ReduceLimit() {
rl.maxRequestsPerSecond = rl.maxRequestsPerSecond - requestsPerSecondStep rl.maxRequestsPerSecond = rl.maxRequestsPerSecond - requestsPerSecondStep
} }
func (rl *RPCLimiter) start() { func (rl *RPCRpsLimiter) start() {
ticker := time.NewTicker(tickerInterval) ticker := time.NewTicker(tickerInterval)
go func() { go func() {
for { for {
@ -115,7 +177,7 @@ func (rl *RPCLimiter) start() {
}() }()
} }
func (rl *RPCLimiter) Stop() { func (rl *RPCRpsLimiter) Stop() {
rl.quit <- true rl.quit <- true
close(rl.quit) close(rl.quit)
for _, callerOnWait := range rl.callersOnWaitForRequests { for _, callerOnWait := range rl.callersOnWaitForRequests {
@ -124,7 +186,7 @@ func (rl *RPCLimiter) Stop() {
rl.callersOnWaitForRequests = nil rl.callersOnWaitForRequests = nil
} }
func (rl *RPCLimiter) WaitForRequestsAvailability(requests int) error { func (rl *RPCRpsLimiter) WaitForRequestsAvailability(requests int) error {
if requests > rl.maxRequestsPerSecond { if requests > rl.maxRequestsPerSecond {
return ErrRequestsOverLimit return ErrRequestsOverLimit
} }

View File

@ -53,8 +53,8 @@ type Client struct {
upstream chain.ClientInterface upstream chain.ClientInterface
rpcClientsMutex sync.RWMutex rpcClientsMutex sync.RWMutex
rpcClients map[uint64]chain.ClientInterface rpcClients map[uint64]chain.ClientInterface
rpcLimiterMutex sync.RWMutex rpsLimiterMutex sync.RWMutex
limiterPerProvider map[string]*chain.RPCLimiter limiterPerProvider map[string]*chain.RPCRpsLimiter
router *router router *router
NetworkManager *network.Manager NetworkManager *network.Manager
@ -93,7 +93,7 @@ func NewClient(client *gethrpc.Client, upstreamChainID uint64, upstream params.U
NetworkManager: networkManager, NetworkManager: networkManager,
handlers: make(map[string]Handler), handlers: make(map[string]Handler),
rpcClients: make(map[uint64]chain.ClientInterface), rpcClients: make(map[uint64]chain.ClientInterface),
limiterPerProvider: make(map[string]*chain.RPCLimiter), limiterPerProvider: make(map[string]*chain.RPCRpsLimiter),
log: log, log: log,
} }
@ -105,7 +105,7 @@ func NewClient(client *gethrpc.Client, upstreamChainID uint64, upstream params.U
if err != nil { if err != nil {
return nil, fmt.Errorf("dial upstream server: %s", err) return nil, fmt.Errorf("dial upstream server: %s", err)
} }
limiter, err := c.getRPCLimiter(c.upstreamURL) limiter, err := c.getRPCRpsLimiter(c.upstreamURL)
if err != nil { if err != nil {
return nil, fmt.Errorf("get RPC limiter: %s", err) return nil, fmt.Errorf("get RPC limiter: %s", err)
} }
@ -137,17 +137,17 @@ func extractLastParamFromURL(inputURL string) (string, error) {
return lastSegment, nil return lastSegment, nil
} }
func (c *Client) getRPCLimiter(URL string) (*chain.RPCLimiter, error) { func (c *Client) getRPCRpsLimiter(URL string) (*chain.RPCRpsLimiter, error) {
apiKey, err := extractLastParamFromURL(URL) apiKey, err := extractLastParamFromURL(URL)
if err != nil { if err != nil {
return nil, err return nil, err
} }
c.rpcLimiterMutex.Lock() c.rpsLimiterMutex.Lock()
defer c.rpcLimiterMutex.Unlock() defer c.rpsLimiterMutex.Unlock()
if limiter, ok := c.limiterPerProvider[apiKey]; ok { if limiter, ok := c.limiterPerProvider[apiKey]; ok {
return limiter, nil return limiter, nil
} }
limiter := chain.NewRPCLimiter() limiter := chain.NewRPCRpsLimiter()
c.limiterPerProvider[apiKey] = limiter c.limiterPerProvider[apiKey] = limiter
return limiter, nil return limiter, nil
} }
@ -175,14 +175,14 @@ func (c *Client) getClientUsingCache(chainID uint64) (chain.ClientInterface, err
return nil, fmt.Errorf("dial upstream server: %s", err) return nil, fmt.Errorf("dial upstream server: %s", err)
} }
rpcLimiter, err := c.getRPCLimiter(network.RPCURL) rpcLimiter, err := c.getRPCRpsLimiter(network.RPCURL)
if err != nil { if err != nil {
return nil, fmt.Errorf("get RPC limiter: %s", err) return nil, fmt.Errorf("get RPC limiter: %s", err)
} }
var ( var (
rpcFallbackClient *gethrpc.Client rpcFallbackClient *gethrpc.Client
rpcFallbackLimiter *chain.RPCLimiter rpcFallbackLimiter *chain.RPCRpsLimiter
) )
if len(network.FallbackURL) > 0 { if len(network.FallbackURL) > 0 {
rpcFallbackClient, err = gethrpc.Dial(network.FallbackURL) rpcFallbackClient, err = gethrpc.Dial(network.FallbackURL)
@ -190,7 +190,7 @@ func (c *Client) getClientUsingCache(chainID uint64) (chain.ClientInterface, err
return nil, fmt.Errorf("dial upstream server: %s", err) return nil, fmt.Errorf("dial upstream server: %s", err)
} }
rpcFallbackLimiter, err = c.getRPCLimiter(network.FallbackURL) rpcFallbackLimiter, err = c.getRPCRpsLimiter(network.FallbackURL)
if err != nil { if err != nil {
return nil, fmt.Errorf("get RPC fallback limiter: %s", err) return nil, fmt.Errorf("get RPC fallback limiter: %s", err)
} }
@ -252,12 +252,12 @@ func (c *Client) UpdateUpstreamURL(url string) error {
if err != nil { if err != nil {
return err return err
} }
rpcLimiter, err := c.getRPCLimiter(url) rpsLimiter, err := c.getRPCRpsLimiter(url)
if err != nil { if err != nil {
return err return err
} }
c.Lock() c.Lock()
c.upstream = chain.NewSimpleClient(rpcLimiter, rpcClient, c.UpstreamChainID) c.upstream = chain.NewSimpleClient(rpsLimiter, rpcClient, c.UpstreamChainID)
c.upstreamURL = url c.upstreamURL = url
c.Unlock() c.Unlock()

View File

@ -24,11 +24,18 @@ type RPCStats struct {
CounterPerMethod map[string]uint `json:"methods"` CounterPerMethod map[string]uint `json:"methods"`
} }
// GetStats retrun RPC usage stats // GetStats returns RPC usage stats
func (api *PublicAPI) GetStats(context context.Context) (RPCStats, error) { func (api *PublicAPI) GetStats(context context.Context) (RPCStats, error) {
total, perMethod := getStats() total, perMethod := getStats()
counterPerMethod := make(map[string]uint)
perMethod.Range(func(key, value interface{}) bool {
counterPerMethod[key.(string)] = value.(uint)
return true
})
return RPCStats{ return RPCStats{
Total: total, Total: total,
CounterPerMethod: perMethod, CounterPerMethod: counterPerMethod,
}, nil }, nil
} }

View File

@ -2,47 +2,70 @@ package rpcstats
import ( import (
"sync" "sync"
"github.com/ethereum/go-ethereum/log"
) )
type RPCUsageStats struct { type RPCUsageStats struct {
total uint total uint
counterPerMethod map[string]uint counterPerMethod sync.Map
rw sync.RWMutex counterPerMethodPerTag sync.Map
} }
var stats *RPCUsageStats var stats *RPCUsageStats
func getInstance() *RPCUsageStats { func getInstance() *RPCUsageStats {
if stats == nil { if stats == nil {
stats = &RPCUsageStats{ stats = &RPCUsageStats{}
total: 0,
counterPerMethod: map[string]uint{},
}
} }
return stats return stats
} }
func getStats() (uint, map[string]uint) { func getStats() (uint, sync.Map) {
stats := getInstance() stats := getInstance()
stats.rw.RLock()
defer stats.rw.RUnlock()
return stats.total, stats.counterPerMethod return stats.total, stats.counterPerMethod
} }
// func getStatsWithTag(tag string) (sync.Map, bool) {
// stats := getInstance()
// value, ok := stats.counterPerMethodPerTag.Load(tag)
// return value.(sync.Map), ok
// }
func resetStats() { func resetStats() {
stats := getInstance() stats := getInstance()
stats.rw.Lock()
defer stats.rw.Unlock()
stats.total = 0 stats.total = 0
stats.counterPerMethod = map[string]uint{} stats.counterPerMethod = sync.Map{}
stats.counterPerMethodPerTag = sync.Map{}
} }
// func resetStatsWithTag(tag string) {
// stats := getInstance()
// stats.counterPerMethodPerTag.Delete(tag)
// }
func CountCall(method string) { func CountCall(method string) {
stats := getInstance() log.Info("CountCall", "method", method)
stats.rw.Lock()
defer stats.rw.Unlock()
stats := getInstance()
stats.total++ stats.total++
stats.counterPerMethod[method]++ value, _ := stats.counterPerMethod.LoadOrStore(method, uint(0))
stats.counterPerMethod.Store(method, value.(uint)+1)
}
func CountCallWithTag(method string, tag string) {
if tag == "" {
CountCall(method)
return
}
stats := getInstance()
value, _ := stats.counterPerMethodPerTag.LoadOrStore(tag, sync.Map{})
methodMap := value.(sync.Map)
value, _ = methodMap.LoadOrStore(method, uint(0))
methodMap.Store(method, value.(uint)+1)
log.Info("CountCallWithTag", "method", method, "tag", tag, "count", value.(uint)+1)
CountCall(method)
} }

View File

@ -5,6 +5,11 @@ import (
"time" "time"
) )
type Connectable interface {
SetIsConnected(bool)
IsConnected() bool
}
type StateChangeCb func(State) type StateChangeCb func(State)
type Status struct { type Status struct {

View File

@ -443,7 +443,7 @@ func (r *Reader) getWalletTokenBalances(ctx context.Context, addresses []common.
hasError := false hasError := false
if client, ok := clients[token.ChainID]; ok { if client, ok := clients[token.ChainID]; ok {
hasError = err != nil || !client.GetIsConnected() hasError = err != nil || !client.IsConnected()
} }
if !isVisible { if !isVisible {
isVisible = balance.Cmp(big.NewFloat(0.0)) > 0 || r.isCachedToken(cachedTokens, address, token.Symbol, token.ChainID) isVisible = balance.Cmp(big.NewFloat(0.0)) > 0 || r.isCachedToken(cachedTokens, address, token.Symbol, token.ChainID)
@ -576,7 +576,7 @@ func (r *Reader) GetWalletToken(ctx context.Context, addresses []common.Address)
} }
hasError := false hasError := false
if client, ok := clients[token.ChainID]; ok { if client, ok := clients[token.ChainID]; ok {
hasError = err != nil || !client.GetIsConnected() hasError = err != nil || !client.IsConnected()
} }
if !isVisible { if !isVisible {
isVisible = balance.Cmp(big.NewFloat(0.0)) > 0 || r.isCachedToken(cachedTokens, address, token.Symbol, token.ChainID) isVisible = balance.Cmp(big.NewFloat(0.0)) > 0 || r.isCachedToken(cachedTokens, address, token.Symbol, token.ChainID)

View File

@ -25,6 +25,9 @@ import (
var findBlocksRetryInterval = 5 * time.Second var findBlocksRetryInterval = 5 * time.Second
const transferHistoryTag = "transfer_history"
const newTransferHistoryTag = "new_transfer_history"
type nonceInfo struct { type nonceInfo struct {
nonce *int64 nonce *int64
blockNumber *big.Int blockNumber *big.Int
@ -431,7 +434,6 @@ type findBlocksCommand struct {
balanceCacher balance.Cacher balanceCacher balance.Cacher
feed *event.Feed feed *event.Feed
noLimit bool noLimit bool
transactionManager *TransactionManager
tokenManager *token.Manager tokenManager *token.Manager
fromBlockNumber *big.Int fromBlockNumber *big.Int
logsCheckLastKnownBlock *big.Int logsCheckLastKnownBlock *big.Int
@ -940,7 +942,6 @@ type loadBlocksAndTransfersCommand struct {
// Not to be set by the caller // Not to be set by the caller
transfersLoaded map[common.Address]bool // For event RecentHistoryReady to be sent only once per account during app lifetime transfersLoaded map[common.Address]bool // For event RecentHistoryReady to be sent only once per account during app lifetime
loops atomic.Int32 loops atomic.Int32
// onExit func(ctx context.Context, err error)
} }
func (c *loadBlocksAndTransfersCommand) incLoops() { func (c *loadBlocksAndTransfersCommand) incLoops() {
@ -1114,18 +1115,19 @@ func (c *loadBlocksAndTransfersCommand) fetchHistoryBlocksForAccount(group *asyn
for _, rangeItem := range ranges { for _, rangeItem := range ranges {
log.Debug("range item", "r", rangeItem, "n", c.chainClient.NetworkID(), "a", account) log.Debug("range item", "r", rangeItem, "n", c.chainClient.NetworkID(), "a", account)
chainClient := chain.ClientWithTag(c.chainClient, transferHistoryTag)
fbc := &findBlocksCommand{ fbc := &findBlocksCommand{
accounts: []common.Address{account}, accounts: []common.Address{account},
db: c.db, db: c.db,
accountsDB: c.accountsDB, accountsDB: c.accountsDB,
blockRangeDAO: c.blockRangeDAO, blockRangeDAO: c.blockRangeDAO,
chainClient: c.chainClient, chainClient: chainClient,
balanceCacher: c.balanceCacher, balanceCacher: c.balanceCacher,
feed: c.feed, feed: c.feed,
noLimit: false, noLimit: false,
fromBlockNumber: rangeItem[0], fromBlockNumber: rangeItem[0],
toBlockNumber: rangeItem[1], toBlockNumber: rangeItem[1],
transactionManager: c.transactionManager,
tokenManager: c.tokenManager, tokenManager: c.tokenManager,
blocksLoadedCh: blocksLoadedCh, blocksLoadedCh: blocksLoadedCh,
defaultNodeBlockChunkSize: DefaultNodeBlockChunkSize, defaultNodeBlockChunkSize: DefaultNodeBlockChunkSize,
@ -1156,7 +1158,6 @@ func (c *loadBlocksAndTransfersCommand) startFetchingNewBlocks(ctx context.Conte
feed: c.feed, feed: c.feed,
noLimit: false, noLimit: false,
fromBlockNumber: fromNum, fromBlockNumber: fromNum,
transactionManager: c.transactionManager,
tokenManager: c.tokenManager, tokenManager: c.tokenManager,
blocksLoadedCh: blocksLoadedCh, blocksLoadedCh: blocksLoadedCh,
defaultNodeBlockChunkSize: DefaultNodeBlockChunkSize, defaultNodeBlockChunkSize: DefaultNodeBlockChunkSize,