From da70d6f1b5f82e365014f3d18f03e0cfcd02d3c1 Mon Sep 17 00:00:00 2001 From: Andrea Maria Piana Date: Wed, 14 Aug 2024 14:15:14 +0100 Subject: [PATCH] feat_: Cache GetWalletToken method and split circuits This commits does a few things: 1) Adds cache of token amount to the GetWalletToken endpoint, used by mobile, in case the user is offline. 2) Split circuits by chain-id (when available) and by host+index when not 3) It makes GetWalletToken always refresh, as that's directed from an user action and we want to respect that. A cool down of 10s should be added in the future to avoid spamming. --- rpc/chain/client.go | 1 + rpc/client.go | 34 +++++++++----- services/wallet/api.go | 2 +- services/wallet/reader.go | 81 ++++------------------------------ services/wallet/reader_test.go | 2 +- 5 files changed, 33 insertions(+), 87 deletions(-) diff --git a/rpc/chain/client.go b/rpc/chain/client.go index 2d3d556ea..baad43139 100644 --- a/rpc/chain/client.go +++ b/rpc/chain/client.go @@ -973,6 +973,7 @@ func (c *ClientWithFallback) toggleConnectionState(err error) { connected := true if err != nil { if !isVMError(err) && !errors.Is(err, ErrRequestsOverLimit) && !errors.Is(err, context.Canceled) { + log.Warn("Error not in chain call", "error", err, "chain", c.ChainID) connected = false } else { log.Warn("Error in chain call", "error", err) diff --git a/rpc/client.go b/rpc/client.go index caf9e75a3..e3b400d95 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -11,6 +11,7 @@ import ( "net/url" "reflect" "runtime" + "strings" "sync" "time" @@ -156,11 +157,15 @@ func NewClient(client *gethrpc.Client, upstreamChainID uint64, upstream params.U if err != nil { return nil, fmt.Errorf("get RPC limiter: %s", err) } - hostPortUpstream, err := extractHostAndPortFromURL(c.upstreamURL) + hostPortUpstream, err := extractHostFromURL(c.upstreamURL) if err != nil { hostPortUpstream = "upstream" } - c.upstream = chain.NewSimpleClient(*chain.NewEthClient(ethclient.NewClient(upstreamClient), limiter, upstreamClient, hostPortUpstream), upstreamChainID) + + // Include the chain-id in the rpc client + rpcName := fmt.Sprintf("%s-chain-id-%d", hostPortUpstream, upstreamChainID) + + c.upstream = chain.NewSimpleClient(*chain.NewEthClient(ethclient.NewClient(upstreamClient), limiter, upstreamClient, rpcName), upstreamChainID) } c.router = newRouter(c.upstreamEnabled) @@ -176,7 +181,7 @@ func (c *Client) SetWalletNotifier(notifier func(chainID uint64, message string) c.walletNotifier = notifier } -func extractHostAndPortFromURL(inputURL string) (string, error) { +func extractHostFromURL(inputURL string) (string, error) { parsedURL, err := url.Parse(inputURL) if err != nil { return "", err @@ -223,7 +228,7 @@ func (c *Client) getClientUsingCache(chainID uint64) (chain.ClientInterface, err return nil, fmt.Errorf("could not find network: %d", chainID) } - ethClients := c.getEthClents(network) + ethClients := c.getEthClients(network) if len(ethClients) == 0 { return nil, fmt.Errorf("could not find any RPC URL for chain: %d", chainID) } @@ -234,7 +239,7 @@ func (c *Client) getClientUsingCache(chainID uint64) (chain.ClientInterface, err return client, nil } -func (c *Client) getEthClents(network *params.Network) []*chain.EthClient { +func (c *Client) getEthClients(network *params.Network) []*chain.EthClient { urls := make(map[string]string) keys := make([]string, 0) authMap := make(map[string]string) @@ -259,7 +264,7 @@ func (c *Client) getEthClents(network *params.Network) []*chain.EthClient { urls["fallback"] = network.FallbackURL ethClients := make([]*chain.EthClient, 0) - for _, key := range keys { + for index, key := range keys { var rpcClient *gethrpc.Client var rpcLimiter *chain.RPCRpsLimiter var err error @@ -285,17 +290,22 @@ func (c *Client) getEthClents(network *params.Network) []*chain.EthClient { c.log.Error("dial server "+key, "error", err) } - hostPort, err = extractHostAndPortFromURL(url) - if err != nil { - hostPort = key + // If using the status-proxy, consider each endpoint as a separate provider + circuitKey := fmt.Sprintf("%s-%d", key, index) + // Otherwise host is good enough + if !strings.Contains(url, "status.im") { + hostPort, err = extractHostFromURL(url) + if err == nil { + circuitKey = hostPort + } } - rpcLimiter, err = c.getRPCRpsLimiter(hostPort) + rpcLimiter, err = c.getRPCRpsLimiter(circuitKey) if err != nil { c.log.Error("get RPC limiter "+key, "error", err) } - ethClients = append(ethClients, chain.NewEthClient(ethclient.NewClient(rpcClient), rpcLimiter, rpcClient, hostPort)) + ethClients = append(ethClients, chain.NewEthClient(ethclient.NewClient(rpcClient), rpcLimiter, rpcClient, circuitKey)) } } @@ -357,7 +367,7 @@ func (c *Client) UpdateUpstreamURL(url string) error { return err } c.Lock() - hostPortUpstream, err := extractHostAndPortFromURL(url) + hostPortUpstream, err := extractHostFromURL(url) if err != nil { hostPortUpstream = "upstream" } diff --git a/services/wallet/api.go b/services/wallet/api.go index 9fc65be84..0ed526afe 100644 --- a/services/wallet/api.go +++ b/services/wallet/api.go @@ -152,7 +152,7 @@ func (api *API) FetchOrGetCachedWalletBalances(ctx context.Context, addresses [] return nil, err } - return api.reader.FetchOrGetCachedWalletBalances(ctx, clients, addresses) + return api.reader.FetchOrGetCachedWalletBalances(ctx, clients, addresses, false) } type DerivedAddress struct { diff --git a/services/wallet/reader.go b/services/wallet/reader.go index 2a34667db..06a2719b3 100644 --- a/services/wallet/reader.go +++ b/services/wallet/reader.go @@ -261,8 +261,9 @@ func (r *Reader) invalidateBalanceCache() { r.refreshBalanceCache = true } -func (r *Reader) FetchOrGetCachedWalletBalances(ctx context.Context, clients map[uint64]chain.ClientInterface, addresses []common.Address) (map[common.Address][]token.StorageToken, error) { - needFetch := !r.isBalanceCacheValid(addresses) || r.isBalanceUpdateNeededAnyway(clients, addresses) +func (r *Reader) FetchOrGetCachedWalletBalances(ctx context.Context, clients map[uint64]chain.ClientInterface, addresses []common.Address, forceRefresh bool) (map[common.Address][]token.StorageToken, error) { + needFetch := forceRefresh || !r.isBalanceCacheValid(addresses) || r.isBalanceUpdateNeededAnyway(clients, addresses) + if needFetch { _, err := r.FetchBalances(ctx, clients, addresses) if err != nil { @@ -420,7 +421,6 @@ func (r *Reader) balancesToTokensByAddress(connectedPerChain map[uint64]bool, ad return result } -// For tokens with single symbol, create a chain balance for each chain func (r *Reader) createBalancePerChainPerSymbol( address common.Address, balances map[uint64]map[common.Address]map[common.Address]*hexutil.Big, @@ -461,84 +461,19 @@ func (r *Reader) createBalancePerChainPerSymbol( } func (r *Reader) GetWalletToken(ctx context.Context, clients map[uint64]chain.ClientInterface, addresses []common.Address, currency string) (map[common.Address][]token.StorageToken, error) { - cachedTokens, err := r.getCachedWalletTokensWithoutMarketData() - if err != nil { - return nil, err - } - - chainIDs := maps.Keys(clients) - currencies := make([]string, 0) currencies = append(currencies, currency) currencies = append(currencies, getFixedCurrencies()...) - allTokens, err := r.tokenManager.GetTokensByChainIDs(chainIDs) + + result, err := r.FetchOrGetCachedWalletBalances(ctx, clients, addresses, true) if err != nil { return nil, err } - tokenAddresses := getTokenAddresses(allTokens) - - balances, err := r.tokenManager.GetBalancesByChain(ctx, clients, addresses, tokenAddresses) - if err != nil { - log.Info("tokenManager.GetBalancesByChain error", "err", err) - return nil, err - } - - verifiedTokens, unverifiedTokens := splitVerifiedTokens(allTokens) tokenSymbols := make([]string, 0) - result := make(map[common.Address][]token.StorageToken) - - for _, address := range addresses { - for _, tokenList := range [][]*token.Token{verifiedTokens, unverifiedTokens} { - for symbol, tokens := range getTokenBySymbols(tokenList) { - balancesPerChain := make(map[uint64]token.ChainBalance) - decimals := tokens[0].Decimals - isVisible := false - for _, tok := range tokens { - hexBalance := balances[tok.ChainID][address][tok.Address] - balance := big.NewFloat(0.0) - if hexBalance != nil { - balance = new(big.Float).Quo( - new(big.Float).SetInt(hexBalance.ToInt()), - big.NewFloat(math.Pow(10, float64(decimals))), - ) - } - hasError := false - if client, ok := clients[tok.ChainID]; ok { - hasError = err != nil || !client.IsConnected() - } - if !isVisible { - isVisible = balance.Cmp(big.NewFloat(0.0)) > 0 || isCachedToken(cachedTokens, address, tok.Symbol, tok.ChainID) - } - balancesPerChain[tok.ChainID] = token.ChainBalance{ - RawBalance: hexBalance.ToInt().String(), - Balance: balance, - Address: tok.Address, - ChainID: tok.ChainID, - HasError: hasError, - } - } - - if !isVisible && !belongsToMandatoryTokens(symbol) { - continue - } - - walletToken := token.StorageToken{ - Token: token.Token{ - Name: tokens[0].Name, - Symbol: symbol, - Decimals: decimals, - PegSymbol: token.GetTokenPegSymbol(symbol), - Verified: tokens[0].Verified, - CommunityData: tokens[0].CommunityData, - Image: tokens[0].Image, - }, - BalancesPerChain: balancesPerChain, - } - - tokenSymbols = append(tokenSymbols, symbol) - result[address] = append(result[address], walletToken) - } + for _, storageTokens := range result { + for _, t := range storageTokens { + tokenSymbols = append(tokenSymbols, t.Token.Symbol) } } diff --git a/services/wallet/reader_test.go b/services/wallet/reader_test.go index 9e20315aa..c4a98d7b7 100644 --- a/services/wallet/reader_test.go +++ b/services/wallet/reader_test.go @@ -1058,6 +1058,6 @@ func TestFetchOrGetCachedWalletBalances(t *testing.T) { clients := map[uint64]chain.ClientInterface{} addresses := []common.Address{} - _, err := reader.FetchOrGetCachedWalletBalances(context.TODO(), clients, addresses) + _, err := reader.FetchOrGetCachedWalletBalances(context.TODO(), clients, addresses, false) require.Error(t, err) }