package history import ( "context" "database/sql" "errors" "fmt" "math" "math/big" "reflect" "sort" "time" "go.uber.org/zap" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/event" statustypes "github.com/status-im/status-go/eth-node/types" "github.com/status-im/status-go/logutils" "github.com/status-im/status-go/multiaccounts/accounts" "github.com/status-im/status-go/params" statusrpc "github.com/status-im/status-go/rpc" "github.com/status-im/status-go/rpc/chain" "github.com/status-im/status-go/rpc/network" gocommon "github.com/status-im/status-go/common" "github.com/status-im/status-go/services/accounts/accountsevent" "github.com/status-im/status-go/services/wallet/balance" "github.com/status-im/status-go/services/wallet/market" "github.com/status-im/status-go/services/wallet/token" "github.com/status-im/status-go/services/wallet/transfer" "github.com/status-im/status-go/services/wallet/walletevent" ) const minPointsForGraph = 14 // for minimal time frame - 7 days, twice a day // EventBalanceHistoryUpdateStarted and EventBalanceHistoryUpdateDone are used to notify the UI that balance history is being updated const ( EventBalanceHistoryUpdateStarted walletevent.EventType = "wallet-balance-history-update-started" EventBalanceHistoryUpdateFinished walletevent.EventType = "wallet-balance-history-update-finished" EventBalanceHistoryUpdateFinishedWithError walletevent.EventType = "wallet-balance-history-update-finished-with-error" ) type ValuePoint struct { Value float64 `json:"value"` Timestamp uint64 `json:"time"` } func (vp *ValuePoint) String() string { return fmt.Sprintf("%d: %f", vp.Timestamp, vp.Value) } type Service struct { balance *Balance db *sql.DB accountsDB *accounts.Database accountFeed *event.Feed eventFeed *event.Feed rpcClient *statusrpc.Client networkManager *network.Manager tokenManager *token.Manager serviceContext context.Context cancelFn context.CancelFunc transferWatcher *Watcher accWatcher *accountsevent.Watcher exchange *Exchange balanceCache balance.CacheIface } func NewService(db *sql.DB, accountsDB *accounts.Database, accountFeed *event.Feed, eventFeed *event.Feed, rpcClient *statusrpc.Client, tokenManager *token.Manager, marketManager *market.Manager, balanceCache balance.CacheIface) *Service { return &Service{ balance: NewBalance(NewBalanceDB(db)), db: db, accountsDB: accountsDB, accountFeed: accountFeed, eventFeed: eventFeed, rpcClient: rpcClient, networkManager: rpcClient.NetworkManager, tokenManager: tokenManager, exchange: NewExchange(marketManager), balanceCache: balanceCache, } } func (s *Service) Stop() { if s.cancelFn != nil { s.cancelFn() } s.stopTransfersWatcher() s.stopAccountWatcher() } func (s *Service) triggerEvent(eventType walletevent.EventType, account statustypes.Address, message string) { s.eventFeed.Send(walletevent.Event{ Type: eventType, Accounts: []common.Address{ common.Address(account), }, Message: message, }) } func (s *Service) Start() { logutils.ZapLogger().Debug("Starting balance history service") s.startTransfersWatcher() s.startAccountWatcher() go func() { defer gocommon.LogOnPanic() s.serviceContext, s.cancelFn = context.WithCancel(context.Background()) err := s.updateBalanceHistory(s.serviceContext) if s.serviceContext.Err() != nil { s.triggerEvent(EventBalanceHistoryUpdateFinished, statustypes.Address{}, "Service canceled") } if err != nil { s.triggerEvent(EventBalanceHistoryUpdateFinishedWithError, statustypes.Address{}, err.Error()) } }() } func (s *Service) mergeChainsBalances(chainIDs []uint64, addresses []common.Address, tokenSymbol string, fromTimestamp uint64, data map[uint64][]*entry) ([]*DataPoint, error) { logutils.ZapLogger().Debug("Merging balances", zap.Stringers("address", addresses), zap.String("tokenSymbol", tokenSymbol), zap.Uint64("fromTimestamp", fromTimestamp), zap.Int("len(data)", len(data)), ) toTimestamp := uint64(time.Now().UTC().Unix()) allData := make([]*entry, 0) // Add edge points per chain // Iterate over chainIDs param, not data keys, because data may not contain all the chains, but we need edge points for all of them for _, chainID := range chainIDs { // edge points are needed to properly calculate total balance, as they contain the balance for the first and last timestamp chainData, err := s.balance.addEdgePoints(chainID, tokenSymbol, addresses, fromTimestamp, toTimestamp, data[chainID]) if err != nil { return nil, err } allData = append(allData, chainData...) } // Sort by timestamp sort.Slice(allData, func(i, j int) bool { return allData[i].timestamp < allData[j].timestamp }) // Add padding points to make chart look nice numEdgePoints := 2 * len(chainIDs) // 2 edge points per chain if len(allData) < minPointsForGraph { allData, _ = addPaddingPoints(tokenSymbol, addresses, toTimestamp, allData, minPointsForGraph+numEdgePoints) } return entriesToDataPoints(allData) } // Expects sorted data func entriesToDataPoints(data []*entry) ([]*DataPoint, error) { var resSlice []*DataPoint var groupedEntries []*entry // Entries with the same timestamp type AddressKey struct { Address common.Address ChainID uint64 } sumBalances := func(balanceMap map[AddressKey]*big.Int) *big.Int { // Sum balances of all accounts and chains in current timestamp sum := big.NewInt(0) for _, balance := range balanceMap { sum.Add(sum, balance) } return sum } updateBalanceMap := func(balanceMap map[AddressKey]*big.Int, entries []*entry) map[AddressKey]*big.Int { // Update balance map for this timestamp for _, entry := range entries { key := AddressKey{ Address: entry.address, ChainID: entry.chainID, } balanceMap[key] = entry.balance } return balanceMap } // Balance map always contains current balance for each address in specific timestamp // It is required to sum up balances from previous timestamp from accounts not present in current timestamp balanceMap := make(map[AddressKey]*big.Int) for _, entry := range data { if len(groupedEntries) > 0 { if entry.timestamp == groupedEntries[0].timestamp { groupedEntries = append(groupedEntries, entry) continue } else { // Split grouped entries into addresses balanceMap = updateBalanceMap(balanceMap, groupedEntries) // Calculate balance for all the addresses cumulativeBalance := sumBalances(balanceMap) // Points in slice contain balances for all chains resSlice = appendPointToSlice(resSlice, &DataPoint{ Timestamp: uint64(groupedEntries[0].timestamp), Balance: (*hexutil.Big)(cumulativeBalance), }) // Reset grouped entries groupedEntries = nil groupedEntries = append(groupedEntries, entry) } } else { groupedEntries = append(groupedEntries, entry) } } // If only edge points are present, groupedEntries will be non-empty if len(groupedEntries) > 0 { // Split grouped entries into addresses balanceMap = updateBalanceMap(balanceMap, groupedEntries) // Calculate balance for all the addresses cumulativeBalance := sumBalances(balanceMap) resSlice = appendPointToSlice(resSlice, &DataPoint{ Timestamp: uint64(groupedEntries[0].timestamp), Balance: (*hexutil.Big)(cumulativeBalance), }) } return resSlice, nil } func appendPointToSlice(slice []*DataPoint, point *DataPoint) []*DataPoint { // Replace the last point in slice if it has the same timestamp or add a new one if different if len(slice) > 0 { if slice[len(slice)-1].Timestamp != point.Timestamp { // Timestamps are different, appending to slice slice = append(slice, point) } else { // Replace last item in slice because timestamps are the same slice[len(slice)-1] = point } } else { slice = append(slice, point) } return slice } // GetBalanceHistory returns token count balance func (s *Service) GetBalanceHistory(ctx context.Context, chainIDs []uint64, addresses []common.Address, tokenSymbol string, currencySymbol string, fromTimestamp uint64) ([]*ValuePoint, error) { logutils.ZapLogger().Debug("GetBalanceHistory", zap.Uint64s("chainIDs", chainIDs), zap.Stringers("address", addresses), zap.String("tokenSymbol", tokenSymbol), zap.String("currencySymbol", currencySymbol), zap.Uint64("fromTimestamp", fromTimestamp), ) chainDataMap := make(map[uint64][]*entry) for _, chainID := range chainIDs { chainData, err := s.balance.get(ctx, chainID, tokenSymbol, addresses, fromTimestamp) // TODO Make chainID a slice? if err != nil { return nil, err } if len(chainData) == 0 { continue } chainDataMap[chainID] = chainData } // Need to get balance for all the chains for the first timestamp, otherwise total values will be incorrect data, err := s.mergeChainsBalances(chainIDs, addresses, tokenSymbol, fromTimestamp, chainDataMap) if err != nil { return nil, err } else if len(data) == 0 { return make([]*ValuePoint, 0), nil } return s.dataPointsToValuePoints(chainIDs, tokenSymbol, currencySymbol, data) } func (s *Service) dataPointsToValuePoints(chainIDs []uint64, tokenSymbol string, currencySymbol string, data []*DataPoint) ([]*ValuePoint, error) { if len(data) == 0 { return make([]*ValuePoint, 0), nil } // Check if historical exchange rate for data point is present and fetch remaining if not lastDayTime := time.Unix(int64(data[len(data)-1].Timestamp), 0).UTC() currentTime := time.Now().UTC() currentDayStart := time.Date(currentTime.Year(), currentTime.Month(), currentTime.Day(), 0, 0, 0, 0, time.UTC) if lastDayTime.After(currentDayStart) { // No chance to have today, use the previous day value for the last data point lastDayTime = lastDayTime.AddDate(0, 0, -1) } lastDayValue, err := s.exchange.GetExchangeRateForDay(tokenSymbol, currencySymbol, lastDayTime) if err != nil { err := s.exchange.FetchAndCacheMissingRates(tokenSymbol, currencySymbol) if err != nil { logutils.ZapLogger().Error("Error fetching exchange rates", zap.String("tokenSymbol", tokenSymbol), zap.String("currencySymbol", currencySymbol), zap.Error(err), ) return nil, err } lastDayValue, err = s.exchange.GetExchangeRateForDay(tokenSymbol, currencySymbol, lastDayTime) if err != nil { logutils.ZapLogger().Error("Exchange rate missing for", zap.String("tokenSymbol", tokenSymbol), zap.String("currencySymbol", currencySymbol), zap.Time("lastDayTime", lastDayTime), zap.Error(err), ) return nil, err } } decimals, err := s.decimalsForToken(tokenSymbol, chainIDs[0]) if err != nil { return nil, err } weisInOneMain := big.NewFloat(math.Pow(10, float64(decimals))) var res []*ValuePoint for _, d := range data { var dayValue float32 dayTime := time.Unix(int64(d.Timestamp), 0).UTC() if dayTime.After(currentDayStart) { // No chance to have today, use the previous day value for the last data point if lastDayValue > 0 { dayValue = lastDayValue } else { logutils.ZapLogger().Warn("Exchange rate missing for", zap.Time("dayTime", dayTime), zap.Error(err), ) continue } } else { dayValue, err = s.exchange.GetExchangeRateForDay(tokenSymbol, currencySymbol, dayTime) if err != nil { logutils.ZapLogger().Warn( "Exchange rate missing for", zap.Time("dayTime", dayTime), zap.Error(err), ) continue } } // The big.Int values are discarded, hence copy the original values res = append(res, &ValuePoint{ Timestamp: d.Timestamp, Value: tokenToValue((*big.Int)(d.Balance), dayValue, weisInOneMain), }) } return res, nil } func (s *Service) decimalsForToken(tokenSymbol string, chainID uint64) (int, error) { network := s.networkManager.Find(chainID) if network == nil { return 0, errors.New("network not found") } token := s.tokenManager.FindToken(network, tokenSymbol) if token == nil { return 0, errors.New("token not found") } return int(token.Decimals), nil } func tokenToValue(tokenCount *big.Int, mainDenominationValue float32, weisInOneMain *big.Float) float64 { weis := new(big.Float).SetInt(tokenCount) mainTokens := new(big.Float).Quo(weis, weisInOneMain) mainTokenValue := new(big.Float).SetFloat64(float64(mainDenominationValue)) res, accuracy := new(big.Float).Mul(mainTokens, mainTokenValue).Float64() if res == 0 && accuracy == big.Below { return math.SmallestNonzeroFloat64 } else if res == math.Inf(1) && accuracy == big.Above { return math.Inf(1) } return res } // updateBalanceHistory iterates over all networks depending on test/prod for the s.visibleTokenSymbol // and updates the balance history for the given address // // expects ctx to have cancellation support and processing to be cancelled by the caller func (s *Service) updateBalanceHistory(ctx context.Context) error { logutils.ZapLogger().Debug("updateBalanceHistory started") addresses, err := s.accountsDB.GetWalletAddresses() if err != nil { return err } areTestNetworksEnabled, err := s.accountsDB.GetTestNetworksEnabled() if err != nil { return err } onlyEnabledNetworks := false networks, err := s.networkManager.Get(onlyEnabledNetworks) if err != nil { return err } for _, address := range addresses { s.triggerEvent(EventBalanceHistoryUpdateStarted, address, "") for _, network := range networks { if network.IsTest != areTestNetworksEnabled { continue } entries, err := s.balance.db.getEntriesWithoutBalances(network.ChainID, common.Address(address)) if err != nil { logutils.ZapLogger().Error("Error getting blocks without balances", zap.Uint64("chainID", network.ChainID), zap.Stringer("address", address), zap.Error(err), ) return err } logutils.ZapLogger().Debug("Blocks without balances", zap.Uint64("chainID", network.ChainID), zap.Stringer("address", address), zap.Any("entries", entries), ) client, err := s.rpcClient.EthClient(network.ChainID) if err != nil { logutils.ZapLogger().Error("Error getting client", zap.Uint64("chainID", network.ChainID), zap.Stringer("address", address), zap.Error(err), ) return err } err = s.addEntriesToDB(ctx, client, network, address, entries) if err != nil { return err } } s.triggerEvent(EventBalanceHistoryUpdateFinished, address, "") } logutils.ZapLogger().Debug("updateBalanceHistory finished") return nil } func (s *Service) addEntriesToDB(ctx context.Context, client chain.ClientInterface, network *params.Network, address statustypes.Address, entries []*entry) (err error) { for _, entry := range entries { var balance *big.Int // tokenAddess is zero for native currency if (entry.tokenAddress == common.Address{}) { // Check in cache balance = s.balanceCache.GetBalance(common.Address(address), network.ChainID, entry.block) logutils.ZapLogger().Debug("Balance from cache", zap.Uint64("chainID", network.ChainID), zap.Stringer("address", address), zap.Uint64("block", entry.block.Uint64()), zap.Stringer("balance", balance), ) if balance == nil { balance, err = client.BalanceAt(ctx, common.Address(address), entry.block) if err != nil { logutils.ZapLogger().Error("Error getting balance", zap.Uint64("chainID", network.ChainID), zap.Stringer("address", address), zap.Error(err), zap.NamedError("unwrapped", errors.Unwrap(err)), ) return err } time.Sleep(50 * time.Millisecond) // TODO Remove this sleep after fixing exceeding rate limit } entry.tokenSymbol = network.NativeCurrencySymbol } else { // Check token first if it is supported token := s.tokenManager.FindTokenByAddress(network.ChainID, entry.tokenAddress) if token == nil { logutils.ZapLogger().Warn("Token not found", zap.Uint64("chainID", network.ChainID), zap.Stringer("address", address), zap.Stringer("tokenAddress", entry.tokenAddress), ) // TODO Add "supported=false" flag to such tokens to avoid checking them again and again continue // Skip token that we don't have symbol for. For example we don't have tokens in store for sepolia optimism } else { entry.tokenSymbol = token.Symbol } // Check balance for token balance, err = s.tokenManager.GetTokenBalanceAt(ctx, client, common.Address(address), entry.tokenAddress, entry.block) logutils.ZapLogger().Debug("Balance from token manager", zap.Uint64("chainID", network.ChainID), zap.Stringer("address", address), zap.Uint64("block", entry.block.Uint64()), zap.Stringer("balance", balance), ) if err != nil { logutils.ZapLogger().Error("Error getting token balance", zap.Uint64("chainID", network.ChainID), zap.Stringer("address", address), zap.Stringer("tokenAddress", entry.tokenAddress), zap.Error(err), ) return err } } entry.balance = balance err = s.balance.db.add(entry) if err != nil { logutils.ZapLogger().Error("Error adding balance", zap.Uint64("chainID", network.ChainID), zap.Stringer("address", address), zap.Error(err), ) return err } } return nil } func (s *Service) startTransfersWatcher() { if s.transferWatcher != nil { return } transferLoadedCb := func(chainID uint64, addresses []common.Address, block *big.Int) { logutils.ZapLogger().Debug("Balance history watcher: transfer loaded:", zap.Uint64("chainID", chainID), zap.Stringers("addresses", addresses), zap.Uint64("block", block.Uint64()), ) client, err := s.rpcClient.EthClient(chainID) if err != nil { logutils.ZapLogger().Error("Error getting client", zap.Uint64("chainID", chainID), zap.Error(err), ) return } network := s.networkManager.Find(chainID) if network == nil { logutils.ZapLogger().Error("Network not found", zap.Uint64("chainID", chainID)) return } transferDB := transfer.NewDB(s.db) for _, address := range addresses { transfers, err := transferDB.GetTransfersByAddressAndBlock(chainID, address, block, 1500) // 1500 is quite arbitrary and far from real, but should be enough to cover all transfers in a block if err != nil { logutils.ZapLogger().Error("Error getting transfers", zap.Uint64("chainID", chainID), zap.Stringer("address", address), zap.Error(err), ) continue } if len(transfers) == 0 { logutils.ZapLogger().Debug("No transfers found", zap.Uint64("chainID", chainID), zap.Stringer("address", address), zap.Uint64("block", block.Uint64()), ) continue } entries := transfersToEntries(address, block, transfers) // TODO Remove address and block after testing that they match unique := removeDuplicates(entries) logutils.ZapLogger().Debug("Entries after filtering", zap.Any("entries", entries), zap.Any("unique", unique), ) err = s.addEntriesToDB(s.serviceContext, client, network, statustypes.Address(address), unique) if err != nil { logutils.ZapLogger().Error("Error adding entries to DB", zap.Uint64("chainID", chainID), zap.Stringer("address", address), zap.Error(err), ) continue } // No event triggering here, because noone cares about balance history updates yet } } s.transferWatcher = NewWatcher(s.eventFeed, transferLoadedCb) s.transferWatcher.Start() } func removeDuplicates(entries []*entry) []*entry { unique := make([]*entry, 0, len(entries)) for _, entry := range entries { found := false for _, u := range unique { if reflect.DeepEqual(entry, u) { found = true break } } if !found { unique = append(unique, entry) } } return unique } func transfersToEntries(address common.Address, block *big.Int, transfers []transfer.Transfer) []*entry { entries := make([]*entry, 0) for _, transfer := range transfers { entry := &entry{ chainID: transfer.NetworkID, address: transfer.Address, tokenAddress: transfer.Log.Address, block: transfer.BlockNumber, timestamp: (int64)(transfer.Timestamp), } entries = append(entries, entry) } return entries } func (s *Service) stopTransfersWatcher() { if s.transferWatcher != nil { s.transferWatcher.Stop() s.transferWatcher = nil } } func (s *Service) startAccountWatcher() { if s.accWatcher == nil { s.accWatcher = accountsevent.NewWatcher(s.accountsDB, s.accountFeed, func(changedAddresses []common.Address, eventType accountsevent.EventType, currentAddresses []common.Address) { s.onAccountsChanged(changedAddresses, eventType, currentAddresses) }) } s.accWatcher.Start() } func (s *Service) stopAccountWatcher() { if s.accWatcher != nil { s.accWatcher.Stop() s.accWatcher = nil } } func (s *Service) onAccountsChanged(changedAddresses []common.Address, eventType accountsevent.EventType, currentAddresses []common.Address) { if eventType == accountsevent.EventTypeRemoved { for _, address := range changedAddresses { err := s.balance.db.removeBalanceHistory(address) if err != nil { logutils.ZapLogger().Error("Error removing balance history", zap.String("address", address.String()), zap.Error(err), ) } } } }