2022-11-15 12:14:41 +00:00
package history
import (
"context"
"database/sql"
"errors"
2023-01-25 18:28:51 +00:00
"math"
2022-11-15 12:14:41 +00:00
"math/big"
2023-01-20 16:06:50 +00:00
"sort"
2022-11-15 12:14:41 +00:00
"sync"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
statustypes "github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/multiaccounts/accounts"
statusrpc "github.com/status-im/status-go/rpc"
"github.com/status-im/status-go/rpc/network"
"github.com/status-im/status-go/services/wallet/chain"
2023-02-21 09:05:16 +00:00
"github.com/status-im/status-go/services/wallet/market"
2022-11-15 12:14:41 +00:00
"github.com/status-im/status-go/services/wallet/token"
"github.com/status-im/status-go/services/wallet/walletevent"
)
// EventBalanceHistoryUpdateStarted and EventBalanceHistoryUpdateDone are used to notify the UI that balance history is being updated
const (
EventBalanceHistoryUpdateStarted walletevent . EventType = "wallet-balance-history-update-started"
EventBalanceHistoryUpdateFinished walletevent . EventType = "wallet-balance-history-update-finished"
EventBalanceHistoryUpdateFinishedWithError walletevent . EventType = "wallet-balance-history-update-finished-with-error"
balanceHistoryUpdateInterval = 12 * time . Hour
)
type Service struct {
balance * Balance
db * sql . DB
eventFeed * event . Feed
rpcClient * statusrpc . Client
networkManager * network . Manager
tokenManager * token . Manager
serviceContext context . Context
cancelFn context . CancelFunc
2023-01-25 18:28:51 +00:00
exchange * Exchange
2022-11-15 12:14:41 +00:00
timer * time . Timer
visibleTokenSymbols [ ] string
2023-01-25 18:28:51 +00:00
visibleTokenSymbolsMutex sync . Mutex
2022-11-15 12:14:41 +00:00
}
2023-01-20 16:06:50 +00:00
type chainIdentity uint64
2023-02-21 09:05:16 +00:00
func NewService ( db * sql . DB , eventFeed * event . Feed , rpcClient * statusrpc . Client , tokenManager * token . Manager , marketManager * market . Manager ) * Service {
2022-11-15 12:14:41 +00:00
return & Service {
balance : NewBalance ( NewBalanceDB ( db ) ) ,
db : db ,
eventFeed : eventFeed ,
rpcClient : rpcClient ,
networkManager : rpcClient . NetworkManager ,
tokenManager : tokenManager ,
2023-02-21 09:05:16 +00:00
exchange : NewExchange ( marketManager ) ,
2022-11-15 12:14:41 +00:00
}
}
func ( s * Service ) Stop ( ) {
if s . cancelFn != nil {
s . cancelFn ( )
}
}
func ( s * Service ) triggerEvent ( eventType walletevent . EventType , account statustypes . Address , message string ) {
s . eventFeed . Send ( walletevent . Event {
Type : eventType ,
Accounts : [ ] common . Address {
common . Address ( account ) ,
} ,
Message : message ,
} )
}
2023-01-25 18:28:51 +00:00
func ( s * Service ) Start ( ) {
2022-11-15 12:14:41 +00:00
go func ( ) {
s . serviceContext , s . cancelFn = context . WithCancel ( context . Background ( ) )
s . timer = time . NewTimer ( balanceHistoryUpdateInterval )
update := func ( ) ( exit bool ) {
err := s . updateBalanceHistoryForAllEnabledNetworks ( s . serviceContext )
if s . serviceContext . Err ( ) != nil {
s . triggerEvent ( EventBalanceHistoryUpdateFinished , statustypes . Address { } , "Service canceled" )
s . timer . Stop ( )
return true
}
if err != nil {
s . triggerEvent ( EventBalanceHistoryUpdateFinishedWithError , statustypes . Address { } , err . Error ( ) )
}
return false
}
if update ( ) {
return
}
for range s . timer . C {
s . resetTimer ( balanceHistoryUpdateInterval )
if update ( ) {
return
}
}
} ( )
}
func ( s * Service ) resetTimer ( interval time . Duration ) {
if s . timer != nil {
s . timer . Stop ( )
s . timer . Reset ( interval )
}
}
func ( s * Service ) UpdateVisibleTokens ( symbols [ ] string ) {
s . visibleTokenSymbolsMutex . Lock ( )
defer s . visibleTokenSymbolsMutex . Unlock ( )
startUpdate := len ( s . visibleTokenSymbols ) == 0 && len ( symbols ) > 0
s . visibleTokenSymbols = symbols
if startUpdate {
s . resetTimer ( 0 )
}
}
func ( s * Service ) isTokenVisible ( tokenSymbol string ) bool {
s . visibleTokenSymbolsMutex . Lock ( )
defer s . visibleTokenSymbolsMutex . Unlock ( )
for _ , visibleSymbol := range s . visibleTokenSymbols {
if visibleSymbol == tokenSymbol {
return true
}
}
return false
}
// Native token implementation of DataSource interface
type chainClientSource struct {
chainClient * chain . Client
currency string
}
func ( src * chainClientSource ) HeaderByNumber ( ctx context . Context , blockNo * big . Int ) ( * types . Header , error ) {
return src . chainClient . HeaderByNumber ( ctx , blockNo )
}
func ( src * chainClientSource ) BalanceAt ( ctx context . Context , account common . Address , blockNo * big . Int ) ( * big . Int , error ) {
return src . chainClient . BalanceAt ( ctx , account , blockNo )
}
func ( src * chainClientSource ) ChainID ( ) uint64 {
return src . chainClient . ChainID
}
func ( src * chainClientSource ) Currency ( ) string {
return src . currency
}
func ( src * chainClientSource ) TimeNow ( ) int64 {
return time . Now ( ) . UTC ( ) . Unix ( )
}
type tokenChainClientSource struct {
chainClientSource
TokenManager * token . Manager
NetworkManager * network . Manager
firstUnavailableBlockNo * big . Int
}
func ( src * tokenChainClientSource ) BalanceAt ( ctx context . Context , account common . Address , blockNumber * big . Int ) ( * big . Int , error ) {
network := src . NetworkManager . Find ( src . chainClient . ChainID )
if network == nil {
return nil , errors . New ( "network not found" )
}
token := src . TokenManager . FindToken ( network , src . currency )
if token == nil {
return nil , errors . New ( "token not found" )
}
if src . firstUnavailableBlockNo != nil && blockNumber . Cmp ( src . firstUnavailableBlockNo ) < 0 {
return big . NewInt ( 0 ) , nil
}
balance , err := src . TokenManager . GetTokenBalanceAt ( ctx , src . chainClient , account , token . Address , blockNumber )
if err != nil {
if err . Error ( ) == "no contract code at given address" {
// Ignore requests before contract deployment and mark this state for future requests
src . firstUnavailableBlockNo = new ( big . Int ) . Set ( blockNumber )
return big . NewInt ( 0 ) , nil
}
return nil , err
}
return balance , err
}
2023-01-25 18:28:51 +00:00
type ValuePoint struct {
Value float64 ` json:"value" `
Timestamp uint64 ` json:"time" `
BlockNumber * hexutil . Big ` json:"blockNumber" `
}
2022-11-15 12:14:41 +00:00
// GetBalanceHistory returns token count balance
2023-01-25 18:28:51 +00:00
func ( s * Service ) GetBalanceHistory ( ctx context . Context , chainIDs [ ] uint64 , address common . Address , tokenSymbol string , currencySymbol string , endTimestamp int64 , timeInterval TimeInterval ) ( [ ] * ValuePoint , error ) {
// Retrieve cached data for all chains
2023-01-20 16:06:50 +00:00
allData := make ( map [ chainIdentity ] [ ] * DataPoint )
2022-11-15 12:14:41 +00:00
for _ , chainID := range chainIDs {
2023-01-25 18:28:51 +00:00
data , err := s . balance . get ( ctx , chainID , tokenSymbol , address , endTimestamp , timeInterval )
2022-11-15 12:14:41 +00:00
if err != nil {
return nil , err
}
if len ( data ) > 0 {
2023-01-20 16:06:50 +00:00
allData [ chainIdentity ( chainID ) ] = data
2023-01-25 18:28:51 +00:00
} else {
return make ( [ ] * ValuePoint , 0 ) , nil
2022-11-15 12:14:41 +00:00
}
}
2023-01-25 18:28:51 +00:00
data , err := mergeDataPoints ( allData , strideDuration ( timeInterval ) )
if err != nil {
return nil , err
} else if len ( data ) == 0 {
return make ( [ ] * ValuePoint , 0 ) , nil
}
// Check if historical exchange rate for data point is present and fetch remaining if not
lastDayTime := time . Unix ( int64 ( data [ len ( data ) - 1 ] . Timestamp ) , 0 ) . UTC ( )
currentTime := time . Now ( ) . UTC ( )
currentDayStart := time . Date ( currentTime . Year ( ) , currentTime . Month ( ) , currentTime . Day ( ) , 0 , 0 , 0 , 0 , time . UTC )
if lastDayTime . After ( currentDayStart ) {
// No chance to have today, use the previous day value for the last data point
lastDayTime = lastDayTime . AddDate ( 0 , 0 , - 1 )
}
_ , err = s . exchange . GetExchangeRateForDay ( tokenSymbol , currencySymbol , lastDayTime )
if err != nil {
err := s . exchange . FetchAndCacheMissingRates ( tokenSymbol , currencySymbol )
if err != nil {
return nil , err
}
}
decimals , err := s . decimalsForToken ( tokenSymbol , chainIDs [ 0 ] )
if err != nil {
return nil , err
}
weisInOneMain := big . NewFloat ( math . Pow ( 10 , float64 ( decimals ) ) )
var res [ ] * ValuePoint
for _ , d := range data {
dayTime := time . Unix ( int64 ( d . Timestamp ) , 0 ) . UTC ( )
if dayTime . After ( currentDayStart ) {
// No chance to have today, use the previous day value for the last data point
dayTime = lastDayTime
}
dayValue , err := s . exchange . GetExchangeRateForDay ( tokenSymbol , currencySymbol , dayTime )
if err != nil {
log . Warn ( "Echange rate missing for" , dayTime , "- err" , err )
continue
}
// The big.Int values are discarded, hence copy the original values
res = append ( res , & ValuePoint {
Timestamp : d . Timestamp ,
Value : tokenToValue ( ( * big . Int ) ( d . Balance ) , dayValue , weisInOneMain ) ,
BlockNumber : d . BlockNumber ,
} )
}
return res , nil
}
func ( s * Service ) decimalsForToken ( tokenSymbol string , chainID uint64 ) ( int , error ) {
network := s . networkManager . Find ( chainID )
if network == nil {
return 0 , errors . New ( "network not found" )
}
token := s . tokenManager . FindToken ( network , tokenSymbol )
if token == nil {
return 0 , errors . New ( "token not found" )
}
return int ( token . Decimals ) , nil
}
func tokenToValue ( tokenCount * big . Int , mainDenominationValue float32 , weisInOneMain * big . Float ) float64 {
weis := new ( big . Float ) . SetInt ( tokenCount )
mainTokens := new ( big . Float ) . Quo ( weis , weisInOneMain )
mainTokenValue := new ( big . Float ) . SetFloat64 ( float64 ( mainDenominationValue ) )
res , accuracy := new ( big . Float ) . Mul ( mainTokens , mainTokenValue ) . Float64 ( )
if res == 0 && accuracy == big . Below {
return math . SmallestNonzeroFloat64
} else if res == math . Inf ( 1 ) && accuracy == big . Above {
return math . Inf ( 1 )
}
return res
2022-11-15 12:14:41 +00:00
}
2023-01-20 16:06:50 +00:00
// mergeDataPoints merges close in time block numbers. Drops the ones that are not in a stride duration
// this should improve merging balance data from different chains which are incompatible due to different timelines
// and block length
func mergeDataPoints ( data map [ chainIdentity ] [ ] * DataPoint , stride time . Duration ) ( [ ] * DataPoint , error ) {
2023-01-25 18:28:51 +00:00
// Special cases
2022-11-15 12:14:41 +00:00
if len ( data ) == 0 {
return make ( [ ] * DataPoint , 0 ) , nil
2023-01-25 18:28:51 +00:00
} else if len ( data ) == 1 {
for k := range data {
return data [ k ] , nil
}
2022-11-15 12:14:41 +00:00
}
res := make ( [ ] * DataPoint , 0 )
2023-01-25 18:28:51 +00:00
strideStart , pos := findFirstStrideWindow ( data , stride )
2023-01-20 16:06:50 +00:00
for {
strideEnd := strideStart + int64 ( stride . Seconds ( ) )
2023-01-25 18:28:51 +00:00
2023-01-20 16:06:50 +00:00
// - Gather all points in the stride window starting with current pos
var strideIdentities map [ chainIdentity ] [ ] timeIdentity
strideIdentities , pos = dataInStrideWindowAndNextPos ( data , pos , strideEnd )
// Check if all chains have data
strideComplete := true
2022-11-15 12:14:41 +00:00
for k := range data {
2023-01-20 16:06:50 +00:00
_ , strideComplete = strideIdentities [ k ]
if ! strideComplete {
break
2022-11-15 12:14:41 +00:00
}
}
2023-01-20 16:06:50 +00:00
if strideComplete {
chainMaxBalance := make ( map [ chainIdentity ] * DataPoint )
for chainID , identities := range strideIdentities {
for _ , identity := range identities {
_ , exists := chainMaxBalance [ chainID ]
2023-01-25 18:28:51 +00:00
if exists && ( * big . Int ) ( identity . dataPoint ( data ) . Balance ) . Cmp ( ( * big . Int ) ( chainMaxBalance [ chainID ] . Balance ) ) <= 0 {
2023-01-20 16:06:50 +00:00
continue
}
chainMaxBalance [ chainID ] = identity . dataPoint ( data )
2022-11-15 12:14:41 +00:00
}
}
2023-01-20 16:06:50 +00:00
balance := big . NewInt ( 0 )
for _ , chainBalance := range chainMaxBalance {
2023-01-25 18:28:51 +00:00
balance . Add ( balance , ( * big . Int ) ( chainBalance . Balance ) )
}
// if last stride, the timestamp might be in the future
if strideEnd > time . Now ( ) . UTC ( ) . Unix ( ) {
strideEnd = time . Now ( ) . UTC ( ) . Unix ( )
2023-01-20 16:06:50 +00:00
}
2023-01-25 18:28:51 +00:00
2022-11-15 12:14:41 +00:00
res = append ( res , & DataPoint {
2023-01-20 16:06:50 +00:00
Timestamp : uint64 ( strideEnd ) ,
2023-01-25 18:28:51 +00:00
Balance : ( * hexutil . Big ) ( balance ) ,
2023-01-20 16:06:50 +00:00
BlockNumber : ( * hexutil . Big ) ( getBlockID ( chainMaxBalance ) ) ,
2022-11-15 12:14:41 +00:00
} )
}
2023-01-20 16:06:50 +00:00
if allPastEnd ( data , pos ) {
return res , nil
}
strideStart = strideEnd
}
}
func getBlockID ( chainBalance map [ chainIdentity ] * DataPoint ) * big . Int {
var res * big . Int
for _ , balance := range chainBalance {
if res == nil {
res = new ( big . Int ) . Set ( balance . BlockNumber . ToInt ( ) )
} else if res . Cmp ( balance . BlockNumber . ToInt ( ) ) != 0 {
return nil
}
}
return res
}
type timeIdentity struct {
chain chainIdentity
index int
}
func ( i timeIdentity ) dataPoint ( data map [ chainIdentity ] [ ] * DataPoint ) * DataPoint {
return data [ i . chain ] [ i . index ]
}
func ( i timeIdentity ) atEnd ( data map [ chainIdentity ] [ ] * DataPoint ) bool {
return ( i . index + 1 ) == len ( data [ i . chain ] )
}
func ( i timeIdentity ) pastEnd ( data map [ chainIdentity ] [ ] * DataPoint ) bool {
return i . index >= len ( data [ i . chain ] )
}
func allPastEnd ( data map [ chainIdentity ] [ ] * DataPoint , pos map [ chainIdentity ] int ) bool {
for chainID := range pos {
if ! ( timeIdentity { chainID , pos [ chainID ] } ) . pastEnd ( data ) {
return false
}
}
return true
}
2023-01-25 18:28:51 +00:00
// findFirstStrideWindow returns the start of the first stride window (timestamp and all positions)
//
// Note: tried to implement finding an optimal stride window but it was becoming too complicated and not worth it given that it will potentially save the first and last stride but it is not guaranteed. Current implementation should give good results as long as the the DataPoints are regular enough
func findFirstStrideWindow ( data map [ chainIdentity ] [ ] * DataPoint , stride time . Duration ) ( firstTimestamp int64 , pos map [ chainIdentity ] int ) {
pos = make ( map [ chainIdentity ] int )
2023-01-20 16:06:50 +00:00
for k := range data {
pos [ k ] = 0
}
cur := sortTimeAsc ( data , pos )
2023-01-25 18:28:51 +00:00
return int64 ( cur [ 0 ] . dataPoint ( data ) . Timestamp ) , pos
2023-01-20 16:06:50 +00:00
}
func copyMap [ K comparable , V any ] ( original map [ K ] V ) map [ K ] V {
copy := make ( map [ K ] V , len ( original ) )
for key , value := range original {
copy [ key ] = value
}
return copy
}
// startPos might have indexes past the end of the data for a chain
func dataInStrideWindowAndNextPos ( data map [ chainIdentity ] [ ] * DataPoint , startPos map [ chainIdentity ] int , endT int64 ) ( identities map [ chainIdentity ] [ ] timeIdentity , nextPos map [ chainIdentity ] int ) {
pos := copyMap ( startPos )
identities = make ( map [ chainIdentity ] [ ] timeIdentity )
// Identify the current oldest and newest block
lastLen := int ( - 1 )
for lastLen < len ( identities ) {
lastLen = len ( identities )
sorted := sortTimeAsc ( data , pos )
for _ , identity := range sorted {
if identity . dataPoint ( data ) . Timestamp < uint64 ( endT ) {
identities [ identity . chain ] = append ( identities [ identity . chain ] , identity )
pos [ identity . chain ] ++
2022-11-15 12:14:41 +00:00
}
}
}
2023-01-20 16:06:50 +00:00
return identities , pos
}
// sortTimeAsc expect indexes in pos past the end of the data for a chain
func sortTimeAsc ( data map [ chainIdentity ] [ ] * DataPoint , pos map [ chainIdentity ] int ) [ ] timeIdentity {
res := make ( [ ] timeIdentity , 0 , len ( data ) )
for k := range data {
identity := timeIdentity {
chain : k ,
index : pos [ k ] ,
}
if ! identity . pastEnd ( data ) {
res = append ( res , identity )
}
}
sort . Slice ( res , func ( i , j int ) bool {
return res [ i ] . dataPoint ( data ) . Timestamp < res [ j ] . dataPoint ( data ) . Timestamp
} )
return res
2022-11-15 12:14:41 +00:00
}
// updateBalanceHistoryForAllEnabledNetworks iterates over all enabled and supported networks for the s.visibleTokenSymbol
// and updates the balance history for the given address
//
// expects ctx to have cancellation support and processing to be cancelled by the caller
func ( s * Service ) updateBalanceHistoryForAllEnabledNetworks ( ctx context . Context ) error {
accountsDB , err := accounts . NewDB ( s . db )
if err != nil {
return err
}
addresses , err := accountsDB . GetWalletAddresses ( )
if err != nil {
return err
}
networks , err := s . networkManager . Get ( true )
if err != nil {
return err
}
for _ , address := range addresses {
s . triggerEvent ( EventBalanceHistoryUpdateStarted , address , "" )
for _ , network := range networks {
tokensForChain , err := s . tokenManager . GetTokens ( network . ChainID )
if err != nil {
tokensForChain = make ( [ ] * token . Token , 0 )
}
tokensForChain = append ( tokensForChain , s . tokenManager . ToToken ( network ) )
for _ , token := range tokensForChain {
if ! s . isTokenVisible ( token . Symbol ) {
continue
}
var dataSource DataSource
chainClient , err := chain . NewClient ( s . rpcClient , network . ChainID )
if err != nil {
return err
}
if token . IsNative ( ) {
dataSource = & chainClientSource { chainClient , token . Symbol }
} else {
dataSource = & tokenChainClientSource {
chainClientSource : chainClientSource {
chainClient : chainClient ,
currency : token . Symbol ,
} ,
TokenManager : s . tokenManager ,
NetworkManager : s . networkManager ,
}
}
for currentInterval := int ( BalanceHistoryAllTime ) ; currentInterval >= int ( BalanceHistory7Days ) ; currentInterval -- {
select {
case <- ctx . Done ( ) :
return errors . New ( "context cancelled" )
default :
}
err = s . balance . update ( ctx , dataSource , common . Address ( address ) , TimeInterval ( currentInterval ) )
if err != nil {
log . Warn ( "Error updating balance history" , "chainID" , dataSource . ChainID ( ) , "currency" , dataSource . Currency ( ) , "address" , address . String ( ) , "interval" , currentInterval , "err" , err )
}
}
}
}
s . triggerEvent ( EventBalanceHistoryUpdateFinished , address , "" )
}
return nil
}