From 4a1f751ced081091d747cf8a1f91a39472be36cc Mon Sep 17 00:00:00 2001 From: Ivan Belyakov Date: Mon, 20 May 2024 13:21:21 +0200 Subject: [PATCH] feat(wallet)_: Implemented RPC limiter interface Split ClientInterface to aggregation of multiple interfaces Added tags to RPC stats API Use tagged RPC client for transfers commands Implemented general interface for RPC limiting --- rpc/chain/client.go | 108 ++++++++++++------ rpc/chain/rpc_limiter.go | 76 ++++++++++-- rpc/client.go | 26 ++--- services/rpcstats/api.go | 11 +- services/rpcstats/stats.go | 59 +++++++--- services/wallet/connection/status.go | 5 + services/wallet/reader.go | 4 +- .../wallet/transfer/commands_sequential.go | 11 +- 8 files changed, 219 insertions(+), 81 deletions(-) diff --git a/rpc/chain/client.go b/rpc/chain/client.go index 7c1e72532..a75f04ac4 100644 --- a/rpc/chain/client.go +++ b/rpc/chain/client.go @@ -19,13 +19,14 @@ import ( "github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/rpc" "github.com/status-im/status-go/services/rpcstats" + "github.com/status-im/status-go/services/wallet/connection" ) type BatchCallClient interface { BatchCallContext(ctx context.Context, b []rpc.BatchElem) error } -type ClientInterface interface { +type ChainInterface interface { BatchCallContext(ctx context.Context, b []rpc.BatchElem) error HeaderByHash(ctx context.Context, hash common.Hash) (*types.Header, error) BlockByHash(ctx context.Context, hash common.Hash) (*types.Block, error) @@ -41,35 +42,61 @@ type ClientInterface interface { CodeAt(ctx context.Context, contract common.Address, blockNumber *big.Int) ([]byte, error) CallContract(ctx context.Context, call ethereum.CallMsg, blockNumber *big.Int) ([]byte, error) CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error - GetWalletNotifier() func(chainId uint64, message string) - SetWalletNotifier(notifier func(chainId uint64, message string)) TransactionByHash(ctx context.Context, hash common.Hash) (*types.Transaction, bool, error) TransactionReceipt(ctx context.Context, txHash common.Hash) (*types.Receipt, error) BlockNumber(ctx context.Context) (uint64, error) - SetIsConnected(value bool) - GetIsConnected() bool +} + +type ClientInterface interface { + ChainInterface + GetWalletNotifier() func(chainId uint64, message string) + SetWalletNotifier(notifier func(chainId uint64, message string)) + connection.Connectable bind.ContractCaller bind.ContractTransactor bind.ContractFilterer } +type Tagger interface { + Tag() string + SetTag(tag string) + DeepCopyTag() Tagger +} + +func DeepCopyTagger(t Tagger) Tagger { + return t.DeepCopyTag() +} + +// Shallow copy of the client with a deep copy of tag +func ClientWithTag(chainClient ClientInterface, tag string) ClientInterface { + newClient := chainClient + if tagIface, ok := chainClient.(Tagger); ok { + tagIface = DeepCopyTagger(tagIface) + tagIface.SetTag(tag) + newClient = tagIface.(ClientInterface) + } + + return newClient +} + type ClientWithFallback struct { ChainID uint64 main *ethclient.Client fallback *ethclient.Client - mainLimiter *RPCLimiter - fallbackLimiter *RPCLimiter + mainLimiter *RPCRpsLimiter + fallbackLimiter *RPCRpsLimiter mainRPC *rpc.Client fallbackRPC *rpc.Client WalletNotifier func(chainId uint64, message string) - IsConnected bool - IsConnectedLock sync.RWMutex + isConnected bool + isConnectedLock sync.RWMutex LastCheckedAt int64 circuitBreakerCmdName string + tag string } // Don't mark connection as failed if we get one of these errors @@ -98,7 +125,7 @@ type CommandResult struct { err error } -func NewSimpleClient(mainLimiter *RPCLimiter, main *rpc.Client, chainID uint64) *ClientWithFallback { +func NewSimpleClient(mainLimiter *RPCRpsLimiter, main *rpc.Client, chainID uint64) *ClientWithFallback { circuitBreakerCmdName := fmt.Sprintf("ethClient_%d", chainID) hystrix.ConfigureCommand(circuitBreakerCmdName, hystrix.CommandConfig{ Timeout: 10000, @@ -115,13 +142,13 @@ func NewSimpleClient(mainLimiter *RPCLimiter, main *rpc.Client, chainID uint64) fallbackLimiter: nil, mainRPC: main, fallbackRPC: nil, - IsConnected: true, + isConnected: true, LastCheckedAt: time.Now().Unix(), circuitBreakerCmdName: circuitBreakerCmdName, } } -func NewClient(mainLimiter *RPCLimiter, main *rpc.Client, fallbackLimiter *RPCLimiter, fallback *rpc.Client, chainID uint64) *ClientWithFallback { +func NewClient(mainLimiter *RPCRpsLimiter, main *rpc.Client, fallbackLimiter *RPCRpsLimiter, fallback *rpc.Client, chainID uint64) *ClientWithFallback { circuitBreakerCmdName := fmt.Sprintf("ethClient_%d", chainID) hystrix.ConfigureCommand(circuitBreakerCmdName, hystrix.CommandConfig{ Timeout: 20000, @@ -142,7 +169,7 @@ func NewClient(mainLimiter *RPCLimiter, main *rpc.Client, fallbackLimiter *RPCLi fallbackLimiter: fallbackLimiter, mainRPC: main, fallbackRPC: fallback, - IsConnected: true, + isConnected: true, LastCheckedAt: time.Now().Unix(), circuitBreakerCmdName: circuitBreakerCmdName, } @@ -178,20 +205,20 @@ func isRPSLimitError(err error) bool { } func (c *ClientWithFallback) SetIsConnected(value bool) { - c.IsConnectedLock.Lock() - defer c.IsConnectedLock.Unlock() + c.isConnectedLock.Lock() + defer c.isConnectedLock.Unlock() c.LastCheckedAt = time.Now().Unix() if !value { - if c.IsConnected { + if c.isConnected { if c.WalletNotifier != nil { c.WalletNotifier(c.ChainID, "down") } - c.IsConnected = false + c.isConnected = false } } else { - if !c.IsConnected { - c.IsConnected = true + if !c.isConnected { + c.isConnected = true if c.WalletNotifier != nil { c.WalletNotifier(c.ChainID, "up") } @@ -199,10 +226,10 @@ func (c *ClientWithFallback) SetIsConnected(value bool) { } } -func (c *ClientWithFallback) GetIsConnected() bool { - c.IsConnectedLock.RLock() - defer c.IsConnectedLock.RUnlock() - return c.IsConnected +func (c *ClientWithFallback) IsConnected() bool { + c.isConnectedLock.RLock() + defer c.isConnectedLock.RUnlock() + return c.isConnected } func (c *ClientWithFallback) makeCall(ctx context.Context, main func() ([]any, error), fallback func() ([]any, error)) ([]any, error) { @@ -292,7 +319,7 @@ func (c *ClientWithFallback) makeCall(ctx context.Context, main func() ([]any, e } func (c *ClientWithFallback) BlockByHash(ctx context.Context, hash common.Hash) (*types.Block, error) { - rpcstats.CountCall("eth_BlockByHash") + rpcstats.CountCallWithTag("eth_BlockByHash", c.tag) res, err := c.makeCall( ctx, @@ -310,7 +337,7 @@ func (c *ClientWithFallback) BlockByHash(ctx context.Context, hash common.Hash) } func (c *ClientWithFallback) BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error) { - rpcstats.CountCall("eth_BlockByNumber") + rpcstats.CountCallWithTag("eth_BlockByNumber", c.tag) res, err := c.makeCall( ctx, func() ([]any, error) { a, err := c.main.BlockByNumber(ctx, number); return []any{a}, err }, @@ -327,7 +354,7 @@ func (c *ClientWithFallback) BlockByNumber(ctx context.Context, number *big.Int) } func (c *ClientWithFallback) BlockNumber(ctx context.Context) (uint64, error) { - rpcstats.CountCall("eth_BlockNumber") + rpcstats.CountCallWithTag("eth_BlockNumber", c.tag) res, err := c.makeCall( ctx, @@ -345,7 +372,7 @@ func (c *ClientWithFallback) BlockNumber(ctx context.Context) (uint64, error) { } func (c *ClientWithFallback) PeerCount(ctx context.Context) (uint64, error) { - rpcstats.CountCall("eth_PeerCount") + rpcstats.CountCallWithTag("eth_PeerCount", c.tag) res, err := c.makeCall( ctx, @@ -363,7 +390,7 @@ func (c *ClientWithFallback) PeerCount(ctx context.Context) (uint64, error) { } func (c *ClientWithFallback) HeaderByHash(ctx context.Context, hash common.Hash) (*types.Header, error) { - rpcstats.CountCall("eth_HeaderByHash") + rpcstats.CountCallWithTag("eth_HeaderByHash", c.tag) res, err := c.makeCall( ctx, func() ([]any, error) { a, err := c.main.HeaderByHash(ctx, hash); return []any{a}, err }, @@ -378,7 +405,7 @@ func (c *ClientWithFallback) HeaderByHash(ctx context.Context, hash common.Hash) } func (c *ClientWithFallback) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) { - rpcstats.CountCall("eth_HeaderByNumber") + rpcstats.CountCallWithTag("eth_HeaderByNumber", c.tag) res, err := c.makeCall( ctx, func() ([]any, error) { a, err := c.main.HeaderByNumber(ctx, number); return []any{a}, err }, @@ -393,7 +420,7 @@ func (c *ClientWithFallback) HeaderByNumber(ctx context.Context, number *big.Int } func (c *ClientWithFallback) TransactionByHash(ctx context.Context, hash common.Hash) (*types.Transaction, bool, error) { - rpcstats.CountCall("eth_TransactionByHash") + rpcstats.CountCallWithTag("eth_TransactionByHash", c.tag) res, err := c.makeCall( ctx, @@ -526,7 +553,7 @@ func (c *ClientWithFallback) NetworkID() uint64 { } func (c *ClientWithFallback) BalanceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (*big.Int, error) { - rpcstats.CountCall("eth_BalanceAt") + rpcstats.CountCallWithTag("eth_BalanceAt", c.tag) res, err := c.makeCall( ctx, @@ -586,7 +613,7 @@ func (c *ClientWithFallback) CodeAt(ctx context.Context, account common.Address, } func (c *ClientWithFallback) NonceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (uint64, error) { - rpcstats.CountCall("eth_NonceAt") + rpcstats.CountCallWithTag("eth_NonceAt", c.tag) res, err := c.makeCall( ctx, @@ -604,7 +631,7 @@ func (c *ClientWithFallback) NonceAt(ctx context.Context, account common.Address } func (c *ClientWithFallback) FilterLogs(ctx context.Context, q ethereum.FilterQuery) ([]types.Log, error) { - rpcstats.CountCall("eth_FilterLogs") + rpcstats.CountCallWithTag("eth_FilterLogs", c.tag) res, err := c.makeCall( ctx, @@ -925,7 +952,7 @@ func (c *ClientWithFallback) GetBaseFeeFromBlock(ctx context.Context, blockNumbe // This function preserves the additional data. This is the cheapest way to obtain // the block hash for a given block number. func (c *ClientWithFallback) CallBlockHashByTransaction(ctx context.Context, blockNumber *big.Int, index uint) (common.Hash, error) { - rpcstats.CountCall("eth_FullTransactionByBlockNumberAndIndex") + rpcstats.CountCallWithTag("eth_FullTransactionByBlockNumberAndIndex", c.tag) res, err := c.makeCall( ctx, @@ -965,3 +992,16 @@ func (c *ClientWithFallback) toggleConnectionState(err error) { } c.SetIsConnected(connected) } + +func (c *ClientWithFallback) Tag() string { + return c.tag +} + +func (c *ClientWithFallback) SetTag(tag string) { + c.tag = tag +} + +func (c *ClientWithFallback) DeepCopyTag() Tagger { + copy := *c + return © +} diff --git a/rpc/chain/rpc_limiter.go b/rpc/chain/rpc_limiter.go index fc7c4c22a..be81921df 100644 --- a/rpc/chain/rpc_limiter.go +++ b/rpc/chain/rpc_limiter.go @@ -5,6 +5,7 @@ import ( "sync" "time" + "github.com/ethereum/go-ethereum/log" "github.com/google/uuid" ) @@ -25,7 +26,68 @@ type callerOnWait struct { ch chan bool } -type RPCLimiter struct { +type RequestsStorage interface { + Get(tag string) (RequestData, error) + Set(data RequestData) error +} + +type RequestData struct { + Tag string + CreatedAt time.Time + Period time.Duration +} + +type RequestLimiter interface { + SetMaxRequests(tag string, maxRequests int, interval time.Duration) + IsLimitReached(tag string) bool +} + +type RPCRequestLimiter struct { + storage RequestsStorage +} + +func NewRequestLimiter(storage RequestsStorage) *RPCRequestLimiter { + return &RPCRequestLimiter{ + storage: storage, + } +} + +func (rl *RPCRequestLimiter) SetMaxRequests(tag string, maxRequests int, interval time.Duration) { + err := rl.saveToStorage(tag, maxRequests, interval) + if err != nil { + log.Error("Failed to save request data to storage", "error", err) + return + } + + // Set max requests logic here +} + +func (rl *RPCRequestLimiter) saveToStorage(tag string, maxRequests int, interval time.Duration) error { + data := RequestData{ + Tag: tag, + CreatedAt: time.Now(), + Period: interval, + } + + err := rl.storage.Set(data) + if err != nil { + return err + } + + return nil +} + +func (rl *RPCRequestLimiter) IsLimitReached(tag string) bool { + data, err := rl.storage.Get(tag) + if err != nil { + log.Error("Failed to get request data from storage", "error", err, "tag", tag) + return false + } + + return time.Since(data.CreatedAt) >= data.Period +} + +type RPCRpsLimiter struct { uuid uuid.UUID maxRequestsPerSecond int @@ -40,9 +102,9 @@ type RPCLimiter struct { quit chan bool } -func NewRPCLimiter() *RPCLimiter { +func NewRPCRpsLimiter() *RPCRpsLimiter { - limiter := RPCLimiter{ + limiter := RPCRpsLimiter{ uuid: uuid.New(), maxRequestsPerSecond: defaultMaxRequestsPerSecond, quit: make(chan bool), @@ -53,7 +115,7 @@ func NewRPCLimiter() *RPCLimiter { return &limiter } -func (rl *RPCLimiter) ReduceLimit() { +func (rl *RPCRpsLimiter) ReduceLimit() { rl.maxRequestsPerSecondMutex.Lock() defer rl.maxRequestsPerSecondMutex.Unlock() if rl.maxRequestsPerSecond <= minRequestsPerSecond { @@ -62,7 +124,7 @@ func (rl *RPCLimiter) ReduceLimit() { rl.maxRequestsPerSecond = rl.maxRequestsPerSecond - requestsPerSecondStep } -func (rl *RPCLimiter) start() { +func (rl *RPCRpsLimiter) start() { ticker := time.NewTicker(tickerInterval) go func() { for { @@ -115,7 +177,7 @@ func (rl *RPCLimiter) start() { }() } -func (rl *RPCLimiter) Stop() { +func (rl *RPCRpsLimiter) Stop() { rl.quit <- true close(rl.quit) for _, callerOnWait := range rl.callersOnWaitForRequests { @@ -124,7 +186,7 @@ func (rl *RPCLimiter) Stop() { rl.callersOnWaitForRequests = nil } -func (rl *RPCLimiter) WaitForRequestsAvailability(requests int) error { +func (rl *RPCRpsLimiter) WaitForRequestsAvailability(requests int) error { if requests > rl.maxRequestsPerSecond { return ErrRequestsOverLimit } diff --git a/rpc/client.go b/rpc/client.go index 59ca47e4d..0eb471dad 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -53,8 +53,8 @@ type Client struct { upstream chain.ClientInterface rpcClientsMutex sync.RWMutex rpcClients map[uint64]chain.ClientInterface - rpcLimiterMutex sync.RWMutex - limiterPerProvider map[string]*chain.RPCLimiter + rpsLimiterMutex sync.RWMutex + limiterPerProvider map[string]*chain.RPCRpsLimiter router *router NetworkManager *network.Manager @@ -93,7 +93,7 @@ func NewClient(client *gethrpc.Client, upstreamChainID uint64, upstream params.U NetworkManager: networkManager, handlers: make(map[string]Handler), rpcClients: make(map[uint64]chain.ClientInterface), - limiterPerProvider: make(map[string]*chain.RPCLimiter), + limiterPerProvider: make(map[string]*chain.RPCRpsLimiter), log: log, } @@ -105,7 +105,7 @@ func NewClient(client *gethrpc.Client, upstreamChainID uint64, upstream params.U if err != nil { return nil, fmt.Errorf("dial upstream server: %s", err) } - limiter, err := c.getRPCLimiter(c.upstreamURL) + limiter, err := c.getRPCRpsLimiter(c.upstreamURL) if err != nil { return nil, fmt.Errorf("get RPC limiter: %s", err) } @@ -137,17 +137,17 @@ func extractLastParamFromURL(inputURL string) (string, error) { return lastSegment, nil } -func (c *Client) getRPCLimiter(URL string) (*chain.RPCLimiter, error) { +func (c *Client) getRPCRpsLimiter(URL string) (*chain.RPCRpsLimiter, error) { apiKey, err := extractLastParamFromURL(URL) if err != nil { return nil, err } - c.rpcLimiterMutex.Lock() - defer c.rpcLimiterMutex.Unlock() + c.rpsLimiterMutex.Lock() + defer c.rpsLimiterMutex.Unlock() if limiter, ok := c.limiterPerProvider[apiKey]; ok { return limiter, nil } - limiter := chain.NewRPCLimiter() + limiter := chain.NewRPCRpsLimiter() c.limiterPerProvider[apiKey] = limiter return limiter, nil } @@ -175,14 +175,14 @@ func (c *Client) getClientUsingCache(chainID uint64) (chain.ClientInterface, err return nil, fmt.Errorf("dial upstream server: %s", err) } - rpcLimiter, err := c.getRPCLimiter(network.RPCURL) + rpcLimiter, err := c.getRPCRpsLimiter(network.RPCURL) if err != nil { return nil, fmt.Errorf("get RPC limiter: %s", err) } var ( rpcFallbackClient *gethrpc.Client - rpcFallbackLimiter *chain.RPCLimiter + rpcFallbackLimiter *chain.RPCRpsLimiter ) if len(network.FallbackURL) > 0 { rpcFallbackClient, err = gethrpc.Dial(network.FallbackURL) @@ -190,7 +190,7 @@ func (c *Client) getClientUsingCache(chainID uint64) (chain.ClientInterface, err return nil, fmt.Errorf("dial upstream server: %s", err) } - rpcFallbackLimiter, err = c.getRPCLimiter(network.FallbackURL) + rpcFallbackLimiter, err = c.getRPCRpsLimiter(network.FallbackURL) if err != nil { return nil, fmt.Errorf("get RPC fallback limiter: %s", err) } @@ -252,12 +252,12 @@ func (c *Client) UpdateUpstreamURL(url string) error { if err != nil { return err } - rpcLimiter, err := c.getRPCLimiter(url) + rpsLimiter, err := c.getRPCRpsLimiter(url) if err != nil { return err } c.Lock() - c.upstream = chain.NewSimpleClient(rpcLimiter, rpcClient, c.UpstreamChainID) + c.upstream = chain.NewSimpleClient(rpsLimiter, rpcClient, c.UpstreamChainID) c.upstreamURL = url c.Unlock() diff --git a/services/rpcstats/api.go b/services/rpcstats/api.go index 8bc41620c..896a9b5e3 100644 --- a/services/rpcstats/api.go +++ b/services/rpcstats/api.go @@ -24,11 +24,18 @@ type RPCStats struct { CounterPerMethod map[string]uint `json:"methods"` } -// GetStats retrun RPC usage stats +// GetStats returns RPC usage stats func (api *PublicAPI) GetStats(context context.Context) (RPCStats, error) { total, perMethod := getStats() + + counterPerMethod := make(map[string]uint) + perMethod.Range(func(key, value interface{}) bool { + counterPerMethod[key.(string)] = value.(uint) + return true + }) + return RPCStats{ Total: total, - CounterPerMethod: perMethod, + CounterPerMethod: counterPerMethod, }, nil } diff --git a/services/rpcstats/stats.go b/services/rpcstats/stats.go index c9a0da673..27a5484da 100644 --- a/services/rpcstats/stats.go +++ b/services/rpcstats/stats.go @@ -2,47 +2,70 @@ package rpcstats import ( "sync" + + "github.com/ethereum/go-ethereum/log" ) type RPCUsageStats struct { - total uint - counterPerMethod map[string]uint - rw sync.RWMutex + total uint + counterPerMethod sync.Map + counterPerMethodPerTag sync.Map } var stats *RPCUsageStats func getInstance() *RPCUsageStats { if stats == nil { - stats = &RPCUsageStats{ - total: 0, - counterPerMethod: map[string]uint{}, - } + stats = &RPCUsageStats{} } return stats } -func getStats() (uint, map[string]uint) { +func getStats() (uint, sync.Map) { stats := getInstance() - stats.rw.RLock() - defer stats.rw.RUnlock() return stats.total, stats.counterPerMethod } +// func getStatsWithTag(tag string) (sync.Map, bool) { +// stats := getInstance() +// value, ok := stats.counterPerMethodPerTag.Load(tag) +// return value.(sync.Map), ok +// } + func resetStats() { stats := getInstance() - stats.rw.Lock() - defer stats.rw.Unlock() - stats.total = 0 - stats.counterPerMethod = map[string]uint{} + stats.counterPerMethod = sync.Map{} + stats.counterPerMethodPerTag = sync.Map{} } +// func resetStatsWithTag(tag string) { +// stats := getInstance() +// stats.counterPerMethodPerTag.Delete(tag) +// } + func CountCall(method string) { - stats := getInstance() - stats.rw.Lock() - defer stats.rw.Unlock() + log.Info("CountCall", "method", method) + stats := getInstance() stats.total++ - stats.counterPerMethod[method]++ + value, _ := stats.counterPerMethod.LoadOrStore(method, uint(0)) + stats.counterPerMethod.Store(method, value.(uint)+1) +} + +func CountCallWithTag(method string, tag string) { + if tag == "" { + CountCall(method) + return + } + + stats := getInstance() + value, _ := stats.counterPerMethodPerTag.LoadOrStore(tag, sync.Map{}) + methodMap := value.(sync.Map) + value, _ = methodMap.LoadOrStore(method, uint(0)) + methodMap.Store(method, value.(uint)+1) + + log.Info("CountCallWithTag", "method", method, "tag", tag, "count", value.(uint)+1) + + CountCall(method) } diff --git a/services/wallet/connection/status.go b/services/wallet/connection/status.go index e5f1a46a4..22d2ab288 100644 --- a/services/wallet/connection/status.go +++ b/services/wallet/connection/status.go @@ -5,6 +5,11 @@ import ( "time" ) +type Connectable interface { + SetIsConnected(bool) + IsConnected() bool +} + type StateChangeCb func(State) type Status struct { diff --git a/services/wallet/reader.go b/services/wallet/reader.go index f41018fcb..253cbb544 100644 --- a/services/wallet/reader.go +++ b/services/wallet/reader.go @@ -443,7 +443,7 @@ func (r *Reader) getWalletTokenBalances(ctx context.Context, addresses []common. hasError := false if client, ok := clients[token.ChainID]; ok { - hasError = err != nil || !client.GetIsConnected() + hasError = err != nil || !client.IsConnected() } if !isVisible { isVisible = balance.Cmp(big.NewFloat(0.0)) > 0 || r.isCachedToken(cachedTokens, address, token.Symbol, token.ChainID) @@ -576,7 +576,7 @@ func (r *Reader) GetWalletToken(ctx context.Context, addresses []common.Address) } hasError := false if client, ok := clients[token.ChainID]; ok { - hasError = err != nil || !client.GetIsConnected() + hasError = err != nil || !client.IsConnected() } if !isVisible { isVisible = balance.Cmp(big.NewFloat(0.0)) > 0 || r.isCachedToken(cachedTokens, address, token.Symbol, token.ChainID) diff --git a/services/wallet/transfer/commands_sequential.go b/services/wallet/transfer/commands_sequential.go index 30f38f164..64556ac25 100644 --- a/services/wallet/transfer/commands_sequential.go +++ b/services/wallet/transfer/commands_sequential.go @@ -25,6 +25,9 @@ import ( var findBlocksRetryInterval = 5 * time.Second +const transferHistoryTag = "transfer_history" +const newTransferHistoryTag = "new_transfer_history" + type nonceInfo struct { nonce *int64 blockNumber *big.Int @@ -431,7 +434,6 @@ type findBlocksCommand struct { balanceCacher balance.Cacher feed *event.Feed noLimit bool - transactionManager *TransactionManager tokenManager *token.Manager fromBlockNumber *big.Int logsCheckLastKnownBlock *big.Int @@ -940,7 +942,6 @@ type loadBlocksAndTransfersCommand struct { // Not to be set by the caller transfersLoaded map[common.Address]bool // For event RecentHistoryReady to be sent only once per account during app lifetime loops atomic.Int32 - // onExit func(ctx context.Context, err error) } func (c *loadBlocksAndTransfersCommand) incLoops() { @@ -1114,18 +1115,19 @@ func (c *loadBlocksAndTransfersCommand) fetchHistoryBlocksForAccount(group *asyn for _, rangeItem := range ranges { log.Debug("range item", "r", rangeItem, "n", c.chainClient.NetworkID(), "a", account) + + chainClient := chain.ClientWithTag(c.chainClient, transferHistoryTag) fbc := &findBlocksCommand{ accounts: []common.Address{account}, db: c.db, accountsDB: c.accountsDB, blockRangeDAO: c.blockRangeDAO, - chainClient: c.chainClient, + chainClient: chainClient, balanceCacher: c.balanceCacher, feed: c.feed, noLimit: false, fromBlockNumber: rangeItem[0], toBlockNumber: rangeItem[1], - transactionManager: c.transactionManager, tokenManager: c.tokenManager, blocksLoadedCh: blocksLoadedCh, defaultNodeBlockChunkSize: DefaultNodeBlockChunkSize, @@ -1156,7 +1158,6 @@ func (c *loadBlocksAndTransfersCommand) startFetchingNewBlocks(ctx context.Conte feed: c.feed, noLimit: false, fromBlockNumber: fromNum, - transactionManager: c.transactionManager, tokenManager: c.tokenManager, blocksLoadedCh: blocksLoadedCh, defaultNodeBlockChunkSize: DefaultNodeBlockChunkSize,