feat_: Cache GetWalletToken method and split circuits

This commits does a few things:

1) Adds cache of token amount to the GetWalletToken endpoint, used by
   mobile, in case the user is offline.

2) Split circuits by chain-id (when available) and by host+index when
   not

3) It makes GetWalletToken always refresh, as that's directed from an
   user action and we want to respect that. A cool down of 10s should be
   added in the future to avoid spamming.
This commit is contained in:
Andrea Maria Piana 2024-08-14 14:15:14 +01:00
parent dd787c982a
commit da70d6f1b5
5 changed files with 33 additions and 87 deletions

View File

@ -973,6 +973,7 @@ func (c *ClientWithFallback) toggleConnectionState(err error) {
connected := true
if err != nil {
if !isVMError(err) && !errors.Is(err, ErrRequestsOverLimit) && !errors.Is(err, context.Canceled) {
log.Warn("Error not in chain call", "error", err, "chain", c.ChainID)
connected = false
} else {
log.Warn("Error in chain call", "error", err)

View File

@ -11,6 +11,7 @@ import (
"net/url"
"reflect"
"runtime"
"strings"
"sync"
"time"
@ -156,11 +157,15 @@ func NewClient(client *gethrpc.Client, upstreamChainID uint64, upstream params.U
if err != nil {
return nil, fmt.Errorf("get RPC limiter: %s", err)
}
hostPortUpstream, err := extractHostAndPortFromURL(c.upstreamURL)
hostPortUpstream, err := extractHostFromURL(c.upstreamURL)
if err != nil {
hostPortUpstream = "upstream"
}
c.upstream = chain.NewSimpleClient(*chain.NewEthClient(ethclient.NewClient(upstreamClient), limiter, upstreamClient, hostPortUpstream), upstreamChainID)
// Include the chain-id in the rpc client
rpcName := fmt.Sprintf("%s-chain-id-%d", hostPortUpstream, upstreamChainID)
c.upstream = chain.NewSimpleClient(*chain.NewEthClient(ethclient.NewClient(upstreamClient), limiter, upstreamClient, rpcName), upstreamChainID)
}
c.router = newRouter(c.upstreamEnabled)
@ -176,7 +181,7 @@ func (c *Client) SetWalletNotifier(notifier func(chainID uint64, message string)
c.walletNotifier = notifier
}
func extractHostAndPortFromURL(inputURL string) (string, error) {
func extractHostFromURL(inputURL string) (string, error) {
parsedURL, err := url.Parse(inputURL)
if err != nil {
return "", err
@ -223,7 +228,7 @@ func (c *Client) getClientUsingCache(chainID uint64) (chain.ClientInterface, err
return nil, fmt.Errorf("could not find network: %d", chainID)
}
ethClients := c.getEthClents(network)
ethClients := c.getEthClients(network)
if len(ethClients) == 0 {
return nil, fmt.Errorf("could not find any RPC URL for chain: %d", chainID)
}
@ -234,7 +239,7 @@ func (c *Client) getClientUsingCache(chainID uint64) (chain.ClientInterface, err
return client, nil
}
func (c *Client) getEthClents(network *params.Network) []*chain.EthClient {
func (c *Client) getEthClients(network *params.Network) []*chain.EthClient {
urls := make(map[string]string)
keys := make([]string, 0)
authMap := make(map[string]string)
@ -259,7 +264,7 @@ func (c *Client) getEthClents(network *params.Network) []*chain.EthClient {
urls["fallback"] = network.FallbackURL
ethClients := make([]*chain.EthClient, 0)
for _, key := range keys {
for index, key := range keys {
var rpcClient *gethrpc.Client
var rpcLimiter *chain.RPCRpsLimiter
var err error
@ -285,17 +290,22 @@ func (c *Client) getEthClents(network *params.Network) []*chain.EthClient {
c.log.Error("dial server "+key, "error", err)
}
hostPort, err = extractHostAndPortFromURL(url)
if err != nil {
hostPort = key
// If using the status-proxy, consider each endpoint as a separate provider
circuitKey := fmt.Sprintf("%s-%d", key, index)
// Otherwise host is good enough
if !strings.Contains(url, "status.im") {
hostPort, err = extractHostFromURL(url)
if err == nil {
circuitKey = hostPort
}
}
rpcLimiter, err = c.getRPCRpsLimiter(hostPort)
rpcLimiter, err = c.getRPCRpsLimiter(circuitKey)
if err != nil {
c.log.Error("get RPC limiter "+key, "error", err)
}
ethClients = append(ethClients, chain.NewEthClient(ethclient.NewClient(rpcClient), rpcLimiter, rpcClient, hostPort))
ethClients = append(ethClients, chain.NewEthClient(ethclient.NewClient(rpcClient), rpcLimiter, rpcClient, circuitKey))
}
}
@ -357,7 +367,7 @@ func (c *Client) UpdateUpstreamURL(url string) error {
return err
}
c.Lock()
hostPortUpstream, err := extractHostAndPortFromURL(url)
hostPortUpstream, err := extractHostFromURL(url)
if err != nil {
hostPortUpstream = "upstream"
}

View File

@ -152,7 +152,7 @@ func (api *API) FetchOrGetCachedWalletBalances(ctx context.Context, addresses []
return nil, err
}
return api.reader.FetchOrGetCachedWalletBalances(ctx, clients, addresses)
return api.reader.FetchOrGetCachedWalletBalances(ctx, clients, addresses, false)
}
type DerivedAddress struct {

View File

@ -261,8 +261,9 @@ func (r *Reader) invalidateBalanceCache() {
r.refreshBalanceCache = true
}
func (r *Reader) FetchOrGetCachedWalletBalances(ctx context.Context, clients map[uint64]chain.ClientInterface, addresses []common.Address) (map[common.Address][]token.StorageToken, error) {
needFetch := !r.isBalanceCacheValid(addresses) || r.isBalanceUpdateNeededAnyway(clients, addresses)
func (r *Reader) FetchOrGetCachedWalletBalances(ctx context.Context, clients map[uint64]chain.ClientInterface, addresses []common.Address, forceRefresh bool) (map[common.Address][]token.StorageToken, error) {
needFetch := forceRefresh || !r.isBalanceCacheValid(addresses) || r.isBalanceUpdateNeededAnyway(clients, addresses)
if needFetch {
_, err := r.FetchBalances(ctx, clients, addresses)
if err != nil {
@ -420,7 +421,6 @@ func (r *Reader) balancesToTokensByAddress(connectedPerChain map[uint64]bool, ad
return result
}
// For tokens with single symbol, create a chain balance for each chain
func (r *Reader) createBalancePerChainPerSymbol(
address common.Address,
balances map[uint64]map[common.Address]map[common.Address]*hexutil.Big,
@ -461,84 +461,19 @@ func (r *Reader) createBalancePerChainPerSymbol(
}
func (r *Reader) GetWalletToken(ctx context.Context, clients map[uint64]chain.ClientInterface, addresses []common.Address, currency string) (map[common.Address][]token.StorageToken, error) {
cachedTokens, err := r.getCachedWalletTokensWithoutMarketData()
if err != nil {
return nil, err
}
chainIDs := maps.Keys(clients)
currencies := make([]string, 0)
currencies = append(currencies, currency)
currencies = append(currencies, getFixedCurrencies()...)
allTokens, err := r.tokenManager.GetTokensByChainIDs(chainIDs)
result, err := r.FetchOrGetCachedWalletBalances(ctx, clients, addresses, true)
if err != nil {
return nil, err
}
tokenAddresses := getTokenAddresses(allTokens)
balances, err := r.tokenManager.GetBalancesByChain(ctx, clients, addresses, tokenAddresses)
if err != nil {
log.Info("tokenManager.GetBalancesByChain error", "err", err)
return nil, err
}
verifiedTokens, unverifiedTokens := splitVerifiedTokens(allTokens)
tokenSymbols := make([]string, 0)
result := make(map[common.Address][]token.StorageToken)
for _, address := range addresses {
for _, tokenList := range [][]*token.Token{verifiedTokens, unverifiedTokens} {
for symbol, tokens := range getTokenBySymbols(tokenList) {
balancesPerChain := make(map[uint64]token.ChainBalance)
decimals := tokens[0].Decimals
isVisible := false
for _, tok := range tokens {
hexBalance := balances[tok.ChainID][address][tok.Address]
balance := big.NewFloat(0.0)
if hexBalance != nil {
balance = new(big.Float).Quo(
new(big.Float).SetInt(hexBalance.ToInt()),
big.NewFloat(math.Pow(10, float64(decimals))),
)
}
hasError := false
if client, ok := clients[tok.ChainID]; ok {
hasError = err != nil || !client.IsConnected()
}
if !isVisible {
isVisible = balance.Cmp(big.NewFloat(0.0)) > 0 || isCachedToken(cachedTokens, address, tok.Symbol, tok.ChainID)
}
balancesPerChain[tok.ChainID] = token.ChainBalance{
RawBalance: hexBalance.ToInt().String(),
Balance: balance,
Address: tok.Address,
ChainID: tok.ChainID,
HasError: hasError,
}
}
if !isVisible && !belongsToMandatoryTokens(symbol) {
continue
}
walletToken := token.StorageToken{
Token: token.Token{
Name: tokens[0].Name,
Symbol: symbol,
Decimals: decimals,
PegSymbol: token.GetTokenPegSymbol(symbol),
Verified: tokens[0].Verified,
CommunityData: tokens[0].CommunityData,
Image: tokens[0].Image,
},
BalancesPerChain: balancesPerChain,
}
tokenSymbols = append(tokenSymbols, symbol)
result[address] = append(result[address], walletToken)
}
for _, storageTokens := range result {
for _, t := range storageTokens {
tokenSymbols = append(tokenSymbols, t.Token.Symbol)
}
}

View File

@ -1058,6 +1058,6 @@ func TestFetchOrGetCachedWalletBalances(t *testing.T) {
clients := map[uint64]chain.ClientInterface{}
addresses := []common.Address{}
_, err := reader.FetchOrGetCachedWalletBalances(context.TODO(), clients, addresses)
_, err := reader.FetchOrGetCachedWalletBalances(context.TODO(), clients, addresses, false)
require.Error(t, err)
}