From 4996a763cd8749596df3b2550c9507d4235e38c8 Mon Sep 17 00:00:00 2001 From: Stefan Date: Tue, 20 Dec 2022 21:00:40 +0200 Subject: [PATCH] feat: fetch balance history for ECR20 tokens Changes - Add token.Manager.GetTokenBalanceAt to fetch balance of a specific block number of ECR20. - Add tokenChainClientSource concrete implementation of chainClientSource to fetch balance of ECR20 tokens. - Chose the correct chainClientSource implementation based on the token is native property. Updates: #8862 --- services/wallet/api.go | 12 +- services/wallet/token/token.go | 12 ++ services/wallet/transfer/balance_history.go | 163 +++++++++++++----- .../wallet/transfer/balance_history_test.go | 8 +- services/wallet/transfer/controller.go | 2 +- 5 files changed, 140 insertions(+), 57 deletions(-) diff --git a/services/wallet/api.go b/services/wallet/api.go index 92e4039ba..6bd7b73a3 100644 --- a/services/wallet/api.go +++ b/services/wallet/api.go @@ -37,11 +37,6 @@ func (api *API) StartWallet(ctx context.Context) error { return api.reader.Start() } -func (api *API) StartBalanceHistory(ctx context.Context) error { - api.s.transferController.StartBalanceHistory(api.s.rpcClient.NetworkManager, api.s.tokenManager) - return nil -} - func (api *API) GetWalletToken(ctx context.Context, addresses []common.Address) (map[common.Address][]Token, error) { return api.reader.GetWalletToken(ctx, addresses) } @@ -124,10 +119,15 @@ func (api *API) GetTokensBalancesForChainIDs(ctx context.Context, chainIDs []uin return api.s.tokenManager.GetBalances(ctx, clients, accounts, addresses) } +func (api *API) StartBalanceHistory(ctx context.Context) error { + api.s.transferController.StartBalanceHistory(api.s.rpcClient.NetworkManager, api.s.tokenManager) + return nil +} + // GetBalanceHistory retrieves native tokens only // TODO: extended to support token balance history func (api *API) GetBalanceHistory(ctx context.Context, chainID uint64, address common.Address, currency string, timeInterval transfer.BalanceHistoryTimeInterval) ([]*transfer.BalanceState, error) { - return api.s.transferController.GetBalanceHistoryAndInterruptUpdate(ctx, chainID, address, currency, timeInterval) + return api.s.transferController.GetBalanceHistoryAndInterruptUpdate(ctx, &transfer.BalanceHistoryRequirements{api.s.rpcClient.NetworkManager, api.s.tokenManager}, &transfer.BhIdentity{chainID, address, currency}, timeInterval) } func (api *API) GetTokens(ctx context.Context, chainID uint64) ([]*token.Token, error) { diff --git a/services/wallet/token/token.go b/services/wallet/token/token.go index 6ea8b0d27..a0ea558ff 100644 --- a/services/wallet/token/token.go +++ b/services/wallet/token/token.go @@ -359,6 +359,18 @@ func (tm *Manager) GetTokenBalance(ctx context.Context, client *chain.Client, ac }, account) } +func (tm *Manager) GetTokenBalanceAt(ctx context.Context, client *chain.Client, account common.Address, token common.Address, blockNumber *big.Int) (*big.Int, error) { + caller, err := ierc20.NewIERC20Caller(token, client) + if err != nil { + return nil, err + } + + return caller.BalanceOf(&bind.CallOpts{ + Context: ctx, + BlockNumber: blockNumber, + }, account) +} + func (tm *Manager) GetChainBalance(ctx context.Context, client *chain.Client, account common.Address) (*big.Int, error) { return client.BalanceAt(ctx, account, nil) } diff --git a/services/wallet/transfer/balance_history.go b/services/wallet/transfer/balance_history.go index 46e423b72..5f258565b 100644 --- a/services/wallet/transfer/balance_history.go +++ b/services/wallet/transfer/balance_history.go @@ -190,7 +190,7 @@ const ( ) func (bh *BalanceHistory) getOrFetchCalibrationData(ctx context.Context, chainClient BlockInfoSource) ([]*blockInfo, error) { - dbData, err := bh.getDBBalanceEntriesTimeSortedAsc(&bhIdentity{chainClient.ChainID(), calibrationAddress(), calibrationCurrency}, nil, calibrationBitset, 1000) + dbData, err := bh.getDBBalanceEntriesTimeSortedAsc(&BhIdentity{chainClient.ChainID(), calibrationAddress(), calibrationCurrency}, nil, calibrationBitset, 1000) if err != nil { return nil, err } @@ -345,7 +345,7 @@ func computeBlockInfoForTimestamp(timestamp int64, startCalibIdx int, calib []*b // getDBBalanceEntriesByTimeIntervalAndSortedDesc returns nil if no entries are found func (bh *BalanceHistory) mostRecentBalanceEntry(ctx context.Context, chainClient BlockInfoSource, address common.Address) (*BalanceState, error) { - outDataPoints, err := bh.getDBBalanceEntriesByTimeIntervalAndSortedDesc(&bhIdentity{chainClient.ChainID(), address, chainClient.Currency()}, nil, &bhFilter{minAllRangeTimestamp, maxAllRangeTimestamp, expandFlag(int(FilterIncludeAll))}, 1) + outDataPoints, err := bh.getDBBalanceEntriesByTimeIntervalAndSortedDesc(&BhIdentity{chainClient.ChainID(), address, chainClient.Currency()}, nil, &bhFilter{minAllRangeTimestamp, maxAllRangeTimestamp, expandFlag(int(FilterIncludeAll))}, 1) if err != nil { return nil, err } @@ -396,7 +396,7 @@ func (bh *BalanceHistory) getBalanceHistoryFromBlocksSource(ctx context.Context, currentBlockTimestamp = calib[0].timestamp } - outDataPoints, err := bh.getDBBalanceEntriesByTimeIntervalAndSortedAsc(&bhIdentity{chainClient.ChainID(), address, chainClient.Currency()}, nil, &bhFilter{currentBlockTimestamp, maxAllRangeTimestamp, expandFlag(int(bitsetFilter))}, 1000) + outDataPoints, err := bh.getDBBalanceEntriesByTimeIntervalAndSortedAsc(&BhIdentity{chainClient.ChainID(), address, chainClient.Currency()}, nil, &bhFilter{currentBlockTimestamp, maxAllRangeTimestamp, expandFlag(int(bitsetFilter))}, 1000) if err != nil { return nil, err } @@ -464,7 +464,7 @@ func (bh *BalanceHistory) getBalanceHistoryFromBlocksSource(ctx context.Context, return points, nil } -// Concrete implementation of BlockInfoSource interface +// Native token implementation of BlockInfoSource interface type chainClientSource struct { chainClient *chain.Client currency string @@ -490,14 +490,47 @@ func (src *chainClientSource) TimeNow() int64 { return time.Now().UTC().Unix() } -func (bh *BalanceHistory) StartBalanceHistory(rpcClient *rpc.Client, networkManager *network.Manager, tokenManager *token.Manager) { +type tokenChainClientSource struct { + chainClientSource + TokenManager *token.Manager + NetworkManager *network.Manager +} + +func (src *tokenChainClientSource) BalanceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (*big.Int, error) { + network := src.NetworkManager.Find(src.chainClient.ChainID) + if network == nil { + return nil, errors.New("network not found") + } + token := src.TokenManager.FindToken(network, src.currency) + if token == nil { + return nil, errors.New("token not found") + } + balance, err := src.TokenManager.GetTokenBalanceAt(ctx, src.chainClient, account, token.Address, blockNumber) + if err != nil { + if err.Error() == "no contract code at given address" { + // Ignore requests before contract deployment + balance = big.NewInt(0) + err = nil + } else { + return nil, err + } + } + return balance, err +} + +type BalanceHistoryRequirements struct { + NetworkManager *network.Manager + TokenManager *token.Manager +} + +func (bh *BalanceHistory) StartBalanceHistory(req *BalanceHistoryRequirements, rpcClient *rpc.Client) { go func() { - bh.updateBalanceHistoryRunTask(rpcClient, networkManager, tokenManager) + bh.updateBalanceHistoryRunTask(rpcClient, req.NetworkManager, req.TokenManager) timer := time.NewTimer(balanceHistoryUpdateInterval) for range timer.C { timer.Reset(balanceHistoryUpdateInterval) - bh.updateBalanceHistoryRunTask(rpcClient, networkManager, tokenManager) + bh.updateBalanceHistoryRunTask(rpcClient, req.NetworkManager, req.TokenManager) } }() } @@ -514,10 +547,16 @@ func (bh *BalanceHistory) updateBalanceHistoryRunTask(rpcClient *rpc.Client, net var err error for retryCount < 3 { err = bh.UpdateBalanceHistoryForAllEnabledNetworks(ctx, rpcClient, networkManager, tokenManager) + // If done or context was cancelled, we don't want to retry if err == nil || ctx.Err() != nil { return } retryCount++ + select { + case <-ctx.Done(): //context cancelled + return + case <-time.After(time.Duration(retryCount*5) * time.Minute): + } } }, updateBalanceHistoryPriority) } @@ -553,38 +592,76 @@ func (bh *BalanceHistory) UpdateBalanceHistoryForAllEnabledNetworks(ctx context. for chainID, tokens := range tokens { for _, token := range tokens { var dataSource BlockInfoSource + chainClient, err := chain.NewClient(rpcClient, chainID) + if err != nil { + return err + } if token.IsNative() { - chainClient, err := chain.NewClient(rpcClient, chainID) - if err != nil { - return err - } dataSource = &chainClientSource{chainClient, token.Symbol} + } else { + dataSource = &tokenChainClientSource{ + chainClientSource: chainClientSource{ + chainClient: chainClient, + currency: token.Symbol, + }, + TokenManager: tokenManager, + NetworkManager: networkManager, + } + } - for _, address := range addresses { - for currentInterval := int(BalanceHistory7Hours); currentInterval <= int(BalanceHistoryAllTime); currentInterval++ { - // Check context for cancellation every fetch attempt - select { - case <-ctx.Done(): - return errors.New("context cancelled") - default: - } - _, err = bh.getBalanceHistoryFromBlocksSource(ctx, dataSource, common.Address(address), BalanceHistoryTimeInterval(currentInterval)) - if err != nil { - return err - } + for _, address := range addresses { + for currentInterval := int(BalanceHistory7Hours); currentInterval <= int(BalanceHistoryAllTime); currentInterval++ { + // Check context for cancellation every fetch attempt + select { + case <-ctx.Done(): + return errors.New("context cancelled") + default: + } + _, err = bh.getBalanceHistoryFromBlocksSource(ctx, dataSource, common.Address(address), BalanceHistoryTimeInterval(currentInterval)) + if err != nil { + return err } } - } // TODO: implement ERC20 token balance history + } } } return nil } -func (c *Controller) GetBalanceHistoryAndInterruptUpdate(ctx context.Context, chainID uint64, address common.Address, currency string, timeInterval BalanceHistoryTimeInterval) ([]*BalanceState, error) { +func (c *Controller) GetBalanceHistoryAndInterruptUpdate(ctx context.Context, req *BalanceHistoryRequirements, identity *BhIdentity, timeInterval BalanceHistoryTimeInterval) ([]*BalanceState, error) { resChannel := make(chan []*BalanceState) errChannel := make(chan error) c.balanceHistory.asyncQueue.RunTask(func(ctx context.Context) { - res, err := c.GetBalanceHistory(ctx, chainID, address, currency, timeInterval) + var dataSource BlockInfoSource + chainClient, err := chain.NewClient(c.rpcClient, identity.ChainID) + if err != nil { + errChannel <- err + return + } + + network := req.NetworkManager.Find(identity.ChainID) + if network == nil { + errChannel <- errors.New("network not found") + return + } + token := req.TokenManager.FindToken(network, identity.Currency) + if token == nil { + errChannel <- errors.New("token not found") + return + } + if token.IsNative() { + dataSource = &chainClientSource{chainClient, identity.Currency} + } else { + dataSource = &tokenChainClientSource{ + chainClientSource: chainClientSource{ + chainClient: chainClient, + currency: identity.Currency, + }, + TokenManager: req.TokenManager, + NetworkManager: req.NetworkManager, + } + } + res, err := c.GetBalanceHistory(ctx, dataSource, identity.Address, timeInterval) if err != nil { errChannel <- err return @@ -599,13 +676,7 @@ func (c *Controller) GetBalanceHistoryAndInterruptUpdate(ctx context.Context, ch } } -func (c *Controller) GetBalanceHistory(ctx context.Context, chainID uint64, address common.Address, currency string, timeInterval BalanceHistoryTimeInterval) ([]*BalanceState, error) { - chainClient, err := chain.NewClient(c.rpcClient, chainID) - if err != nil { - return nil, err - } - - dataSource := &chainClientSource{chainClient, currency} +func (c *Controller) GetBalanceHistory(ctx context.Context, dataSource BlockInfoSource, address common.Address, timeInterval BalanceHistoryTimeInterval) ([]*BalanceState, error) { entries, err := c.balanceHistory.getBalanceHistoryFromBlocksSource(ctx, dataSource, address, timeInterval) if err != nil { return nil, err @@ -654,10 +725,10 @@ func (bh *BalanceHistory) getBalanceEntryFromDB(chainID uint64, address common.A return } -type bhIdentity struct { - chainID uint64 - address common.Address - currency string +type BhIdentity struct { + ChainID uint64 + Address common.Address + Currency string } type bhFilter struct { @@ -672,7 +743,7 @@ type bhFilter struct { // minTimestamp and maxTimestamp interval filter the results by timestamp. // bitsetFilter filters the results by bitset. This way higher values can include lower values to simulate time interval levels // asc defines the order of the result by block number (which correlates also with time). If true, the result will be sorted by ascending, otherwise by descending timestamp -func (bh *BalanceHistory) getDBBalanceEntriesTimeSorted(identify *bhIdentity, startingAtBlock *big.Int, filter *bhFilter, maxEntries int, asc bool) ([]*balanceHistoryDBEntry, error) { +func (bh *BalanceHistory) getDBBalanceEntriesTimeSorted(identify *BhIdentity, startingAtBlock *big.Int, filter *bhFilter, maxEntries int, asc bool) ([]*balanceHistoryDBEntry, error) { // Start from the first block in case a specific one was not provided if startingAtBlock == nil { startingAtBlock = big.NewInt(0) @@ -684,7 +755,7 @@ func (bh *BalanceHistory) getDBBalanceEntriesTimeSorted(identify *bhIdentity, st } else { queryStr = "SELECT block, timestamp, balance FROM balance_history WHERE chain_id = ? AND address = ? AND currency = ? AND block >= ? AND timestamp BETWEEN ? AND ? AND (bitset & ?) > 0 ORDER BY block DESC LIMIT ?" } - rows, err := bh.db.Query(queryStr, identify.chainID, identify.address, identify.currency, (*bigint.SQLBigInt)(startingAtBlock), filter.minTimestamp, filter.maxTimestamp, filter.bitsetFilter, maxEntries) + rows, err := bh.db.Query(queryStr, identify.ChainID, identify.Address, identify.Currency, (*bigint.SQLBigInt)(startingAtBlock), filter.minTimestamp, filter.maxTimestamp, filter.bitsetFilter, maxEntries) if err != nil { return make([]*balanceHistoryDBEntry, 0), err } @@ -695,8 +766,8 @@ func (bh *BalanceHistory) getDBBalanceEntriesTimeSorted(identify *bhIdentity, st for rows.Next() && currentEntry < maxEntries { entry := &balanceHistoryDBEntry{ chainID: 0, - address: identify.address, - currency: identify.currency, + address: identify.Address, + currency: identify.Currency, block: new(big.Int), balance: new(big.Int), } @@ -704,25 +775,25 @@ func (bh *BalanceHistory) getDBBalanceEntriesTimeSorted(identify *bhIdentity, st if err != nil { return make([]*balanceHistoryDBEntry, 0), err } - entry.chainID = identify.chainID + entry.chainID = identify.ChainID result = append(result, entry) currentEntry++ } return result, nil } -func (bh *BalanceHistory) getDBBalanceEntriesByTimeIntervalAndSortedAsc(identify *bhIdentity, startingAtBlock *big.Int, filter *bhFilter, maxEntries int) ([]*balanceHistoryDBEntry, error) { +func (bh *BalanceHistory) getDBBalanceEntriesByTimeIntervalAndSortedAsc(identify *BhIdentity, startingAtBlock *big.Int, filter *bhFilter, maxEntries int) ([]*balanceHistoryDBEntry, error) { return bh.getDBBalanceEntriesTimeSorted(identify, startingAtBlock, filter, maxEntries, true) } -func (bh *BalanceHistory) getDBBalanceEntriesByTimeIntervalAndSortedDesc(identify *bhIdentity, startingAtBlock *big.Int, filter *bhFilter, maxEntries int) ([]*balanceHistoryDBEntry, error) { +func (bh *BalanceHistory) getDBBalanceEntriesByTimeIntervalAndSortedDesc(identify *BhIdentity, startingAtBlock *big.Int, filter *bhFilter, maxEntries int) ([]*balanceHistoryDBEntry, error) { return bh.getDBBalanceEntriesTimeSorted(identify, startingAtBlock, filter, maxEntries, false) } -func (bh *BalanceHistory) getDBBalanceEntriesTimeSortedAsc(identify *bhIdentity, startingAtBlock *big.Int, bitsetFilter int, maxEntries int) ([]*balanceHistoryDBEntry, error) { +func (bh *BalanceHistory) getDBBalanceEntriesTimeSortedAsc(identify *BhIdentity, startingAtBlock *big.Int, bitsetFilter int, maxEntries int) ([]*balanceHistoryDBEntry, error) { return bh.getDBBalanceEntriesTimeSorted(identify, startingAtBlock, &bhFilter{minAllRangeTimestamp, maxAllRangeTimestamp, bitsetFilter}, maxEntries, true) } -func (bh *BalanceHistory) getDBBalanceEntriesTimeSortedDesc(identify *bhIdentity, startingAtBlock *big.Int, bitsetFilter int, maxEntries int) ([]*balanceHistoryDBEntry, error) { +func (bh *BalanceHistory) getDBBalanceEntriesTimeSortedDesc(identify *BhIdentity, startingAtBlock *big.Int, bitsetFilter int, maxEntries int) ([]*balanceHistoryDBEntry, error) { return bh.getDBBalanceEntriesTimeSorted(identify, startingAtBlock, &bhFilter{minAllRangeTimestamp, maxAllRangeTimestamp, bitsetFilter}, maxEntries, false) } diff --git a/services/wallet/transfer/balance_history_test.go b/services/wallet/transfer/balance_history_test.go index 80868725a..6036f819f 100644 --- a/services/wallet/transfer/balance_history_test.go +++ b/services/wallet/transfer/balance_history_test.go @@ -890,7 +890,7 @@ func TestBalanceHistoryGetOldestDataPoint(t *testing.T) { require.NoError(t, err) } - outDataPoints, err := bh.getDBBalanceEntriesTimeSortedAsc(&bhIdentity{testDataPoints[0].chainID, testDataPoints[0].address, testDataPoints[0].currency}, nil, 1, 1) + outDataPoints, err := bh.getDBBalanceEntriesTimeSortedAsc(&BhIdentity{testDataPoints[0].chainID, testDataPoints[0].address, testDataPoints[0].currency}, nil, 1, 1) require.NoError(t, err) require.NotEqual(t, outDataPoints, nil) require.Equal(t, outDataPoints[0], testDataPoints[0]) @@ -906,7 +906,7 @@ func TestBalanceHistoryGetLatestDataPoint(t *testing.T) { require.NoError(t, err) } - outDataPoints, err := bh.getDBBalanceEntriesTimeSortedDesc(&bhIdentity{testDataPoints[0].chainID, testDataPoints[0].address, testDataPoints[0].currency}, nil, 1, 1) + outDataPoints, err := bh.getDBBalanceEntriesTimeSortedDesc(&BhIdentity{testDataPoints[0].chainID, testDataPoints[0].address, testDataPoints[0].currency}, nil, 1, 1) require.NoError(t, err) require.NotEqual(t, outDataPoints, nil) require.Equal(t, outDataPoints[0], testDataPoints[len(testDataPoints)-1]) @@ -923,7 +923,7 @@ func TestBalanceHistoryGetClosestDataPointToTimestamp(t *testing.T) { } itemToGetIndex := 2 - outDataPoints, err := bh.getDBBalanceEntriesByTimeIntervalAndSortedAsc(&bhIdentity{testDataPoints[0].chainID, testDataPoints[0].address, testDataPoints[0].currency}, nil, &bhFilter{testDataPoints[itemToGetIndex].timestamp, maxAllRangeTimestamp, 1}, 1) + outDataPoints, err := bh.getDBBalanceEntriesByTimeIntervalAndSortedAsc(&BhIdentity{testDataPoints[0].chainID, testDataPoints[0].address, testDataPoints[0].currency}, nil, &bhFilter{testDataPoints[itemToGetIndex].timestamp, maxAllRangeTimestamp, 1}, 1) require.NoError(t, err) require.NotEqual(t, outDataPoints, nil) require.Equal(t, len(outDataPoints), 1) @@ -942,7 +942,7 @@ func TestBalanceHistoryGetDataPointsInTimeRange(t *testing.T) { startIndex := 1 endIndex := 3 - outDataPoints, err := bh.getDBBalanceEntriesByTimeIntervalAndSortedAsc(&bhIdentity{testDataPoints[0].chainID, testDataPoints[0].address, testDataPoints[0].currency}, nil, &bhFilter{testDataPoints[startIndex].timestamp, testDataPoints[endIndex].timestamp, 1}, 100) + outDataPoints, err := bh.getDBBalanceEntriesByTimeIntervalAndSortedAsc(&BhIdentity{testDataPoints[0].chainID, testDataPoints[0].address, testDataPoints[0].currency}, nil, &bhFilter{testDataPoints[startIndex].timestamp, testDataPoints[endIndex].timestamp, 1}, 100) require.NoError(t, err) require.NotEqual(t, outDataPoints, nil) require.Equal(t, len(outDataPoints), endIndex-startIndex+1) diff --git a/services/wallet/transfer/controller.go b/services/wallet/transfer/controller.go index b13e56f8c..4d8c664d7 100644 --- a/services/wallet/transfer/controller.go +++ b/services/wallet/transfer/controller.go @@ -47,7 +47,7 @@ func (c *Controller) Start() { } func (c *Controller) StartBalanceHistory(networkManager *network.Manager, tokenManager *token.Manager) { - c.balanceHistory.StartBalanceHistory(c.rpcClient, networkManager, tokenManager) + c.balanceHistory.StartBalanceHistory(&BalanceHistoryRequirements{networkManager, tokenManager}, c.rpcClient) } func (c *Controller) Stop() {