feat: fetch balance history for ECR20 tokens

Changes

- Add token.Manager.GetTokenBalanceAt to fetch balance of a specific
block number of ECR20.
- Add tokenChainClientSource concrete implementation of chainClientSource
to fetch balance of ECR20 tokens.
- Chose the correct chainClientSource implementation based on the token
is native property.

Updates: #8862
This commit is contained in:
Stefan 2022-12-20 21:00:40 +02:00
parent 63efa6ae84
commit 4996a763cd
5 changed files with 140 additions and 57 deletions

View File

@ -37,11 +37,6 @@ func (api *API) StartWallet(ctx context.Context) error {
return api.reader.Start()
}
func (api *API) StartBalanceHistory(ctx context.Context) error {
api.s.transferController.StartBalanceHistory(api.s.rpcClient.NetworkManager, api.s.tokenManager)
return nil
}
func (api *API) GetWalletToken(ctx context.Context, addresses []common.Address) (map[common.Address][]Token, error) {
return api.reader.GetWalletToken(ctx, addresses)
}
@ -124,10 +119,15 @@ func (api *API) GetTokensBalancesForChainIDs(ctx context.Context, chainIDs []uin
return api.s.tokenManager.GetBalances(ctx, clients, accounts, addresses)
}
func (api *API) StartBalanceHistory(ctx context.Context) error {
api.s.transferController.StartBalanceHistory(api.s.rpcClient.NetworkManager, api.s.tokenManager)
return nil
}
// GetBalanceHistory retrieves native tokens only
// TODO: extended to support token balance history
func (api *API) GetBalanceHistory(ctx context.Context, chainID uint64, address common.Address, currency string, timeInterval transfer.BalanceHistoryTimeInterval) ([]*transfer.BalanceState, error) {
return api.s.transferController.GetBalanceHistoryAndInterruptUpdate(ctx, chainID, address, currency, timeInterval)
return api.s.transferController.GetBalanceHistoryAndInterruptUpdate(ctx, &transfer.BalanceHistoryRequirements{api.s.rpcClient.NetworkManager, api.s.tokenManager}, &transfer.BhIdentity{chainID, address, currency}, timeInterval)
}
func (api *API) GetTokens(ctx context.Context, chainID uint64) ([]*token.Token, error) {

View File

@ -359,6 +359,18 @@ func (tm *Manager) GetTokenBalance(ctx context.Context, client *chain.Client, ac
}, account)
}
func (tm *Manager) GetTokenBalanceAt(ctx context.Context, client *chain.Client, account common.Address, token common.Address, blockNumber *big.Int) (*big.Int, error) {
caller, err := ierc20.NewIERC20Caller(token, client)
if err != nil {
return nil, err
}
return caller.BalanceOf(&bind.CallOpts{
Context: ctx,
BlockNumber: blockNumber,
}, account)
}
func (tm *Manager) GetChainBalance(ctx context.Context, client *chain.Client, account common.Address) (*big.Int, error) {
return client.BalanceAt(ctx, account, nil)
}

View File

@ -190,7 +190,7 @@ const (
)
func (bh *BalanceHistory) getOrFetchCalibrationData(ctx context.Context, chainClient BlockInfoSource) ([]*blockInfo, error) {
dbData, err := bh.getDBBalanceEntriesTimeSortedAsc(&bhIdentity{chainClient.ChainID(), calibrationAddress(), calibrationCurrency}, nil, calibrationBitset, 1000)
dbData, err := bh.getDBBalanceEntriesTimeSortedAsc(&BhIdentity{chainClient.ChainID(), calibrationAddress(), calibrationCurrency}, nil, calibrationBitset, 1000)
if err != nil {
return nil, err
}
@ -345,7 +345,7 @@ func computeBlockInfoForTimestamp(timestamp int64, startCalibIdx int, calib []*b
// getDBBalanceEntriesByTimeIntervalAndSortedDesc returns nil if no entries are found
func (bh *BalanceHistory) mostRecentBalanceEntry(ctx context.Context, chainClient BlockInfoSource, address common.Address) (*BalanceState, error) {
outDataPoints, err := bh.getDBBalanceEntriesByTimeIntervalAndSortedDesc(&bhIdentity{chainClient.ChainID(), address, chainClient.Currency()}, nil, &bhFilter{minAllRangeTimestamp, maxAllRangeTimestamp, expandFlag(int(FilterIncludeAll))}, 1)
outDataPoints, err := bh.getDBBalanceEntriesByTimeIntervalAndSortedDesc(&BhIdentity{chainClient.ChainID(), address, chainClient.Currency()}, nil, &bhFilter{minAllRangeTimestamp, maxAllRangeTimestamp, expandFlag(int(FilterIncludeAll))}, 1)
if err != nil {
return nil, err
}
@ -396,7 +396,7 @@ func (bh *BalanceHistory) getBalanceHistoryFromBlocksSource(ctx context.Context,
currentBlockTimestamp = calib[0].timestamp
}
outDataPoints, err := bh.getDBBalanceEntriesByTimeIntervalAndSortedAsc(&bhIdentity{chainClient.ChainID(), address, chainClient.Currency()}, nil, &bhFilter{currentBlockTimestamp, maxAllRangeTimestamp, expandFlag(int(bitsetFilter))}, 1000)
outDataPoints, err := bh.getDBBalanceEntriesByTimeIntervalAndSortedAsc(&BhIdentity{chainClient.ChainID(), address, chainClient.Currency()}, nil, &bhFilter{currentBlockTimestamp, maxAllRangeTimestamp, expandFlag(int(bitsetFilter))}, 1000)
if err != nil {
return nil, err
}
@ -464,7 +464,7 @@ func (bh *BalanceHistory) getBalanceHistoryFromBlocksSource(ctx context.Context,
return points, nil
}
// Concrete implementation of BlockInfoSource interface
// Native token implementation of BlockInfoSource interface
type chainClientSource struct {
chainClient *chain.Client
currency string
@ -490,14 +490,47 @@ func (src *chainClientSource) TimeNow() int64 {
return time.Now().UTC().Unix()
}
func (bh *BalanceHistory) StartBalanceHistory(rpcClient *rpc.Client, networkManager *network.Manager, tokenManager *token.Manager) {
type tokenChainClientSource struct {
chainClientSource
TokenManager *token.Manager
NetworkManager *network.Manager
}
func (src *tokenChainClientSource) BalanceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (*big.Int, error) {
network := src.NetworkManager.Find(src.chainClient.ChainID)
if network == nil {
return nil, errors.New("network not found")
}
token := src.TokenManager.FindToken(network, src.currency)
if token == nil {
return nil, errors.New("token not found")
}
balance, err := src.TokenManager.GetTokenBalanceAt(ctx, src.chainClient, account, token.Address, blockNumber)
if err != nil {
if err.Error() == "no contract code at given address" {
// Ignore requests before contract deployment
balance = big.NewInt(0)
err = nil
} else {
return nil, err
}
}
return balance, err
}
type BalanceHistoryRequirements struct {
NetworkManager *network.Manager
TokenManager *token.Manager
}
func (bh *BalanceHistory) StartBalanceHistory(req *BalanceHistoryRequirements, rpcClient *rpc.Client) {
go func() {
bh.updateBalanceHistoryRunTask(rpcClient, networkManager, tokenManager)
bh.updateBalanceHistoryRunTask(rpcClient, req.NetworkManager, req.TokenManager)
timer := time.NewTimer(balanceHistoryUpdateInterval)
for range timer.C {
timer.Reset(balanceHistoryUpdateInterval)
bh.updateBalanceHistoryRunTask(rpcClient, networkManager, tokenManager)
bh.updateBalanceHistoryRunTask(rpcClient, req.NetworkManager, req.TokenManager)
}
}()
}
@ -514,10 +547,16 @@ func (bh *BalanceHistory) updateBalanceHistoryRunTask(rpcClient *rpc.Client, net
var err error
for retryCount < 3 {
err = bh.UpdateBalanceHistoryForAllEnabledNetworks(ctx, rpcClient, networkManager, tokenManager)
// If done or context was cancelled, we don't want to retry
if err == nil || ctx.Err() != nil {
return
}
retryCount++
select {
case <-ctx.Done(): //context cancelled
return
case <-time.After(time.Duration(retryCount*5) * time.Minute):
}
}
}, updateBalanceHistoryPriority)
}
@ -553,38 +592,76 @@ func (bh *BalanceHistory) UpdateBalanceHistoryForAllEnabledNetworks(ctx context.
for chainID, tokens := range tokens {
for _, token := range tokens {
var dataSource BlockInfoSource
chainClient, err := chain.NewClient(rpcClient, chainID)
if err != nil {
return err
}
if token.IsNative() {
chainClient, err := chain.NewClient(rpcClient, chainID)
if err != nil {
return err
}
dataSource = &chainClientSource{chainClient, token.Symbol}
} else {
dataSource = &tokenChainClientSource{
chainClientSource: chainClientSource{
chainClient: chainClient,
currency: token.Symbol,
},
TokenManager: tokenManager,
NetworkManager: networkManager,
}
}
for _, address := range addresses {
for currentInterval := int(BalanceHistory7Hours); currentInterval <= int(BalanceHistoryAllTime); currentInterval++ {
// Check context for cancellation every fetch attempt
select {
case <-ctx.Done():
return errors.New("context cancelled")
default:
}
_, err = bh.getBalanceHistoryFromBlocksSource(ctx, dataSource, common.Address(address), BalanceHistoryTimeInterval(currentInterval))
if err != nil {
return err
}
for _, address := range addresses {
for currentInterval := int(BalanceHistory7Hours); currentInterval <= int(BalanceHistoryAllTime); currentInterval++ {
// Check context for cancellation every fetch attempt
select {
case <-ctx.Done():
return errors.New("context cancelled")
default:
}
_, err = bh.getBalanceHistoryFromBlocksSource(ctx, dataSource, common.Address(address), BalanceHistoryTimeInterval(currentInterval))
if err != nil {
return err
}
}
} // TODO: implement ERC20 token balance history
}
}
}
return nil
}
func (c *Controller) GetBalanceHistoryAndInterruptUpdate(ctx context.Context, chainID uint64, address common.Address, currency string, timeInterval BalanceHistoryTimeInterval) ([]*BalanceState, error) {
func (c *Controller) GetBalanceHistoryAndInterruptUpdate(ctx context.Context, req *BalanceHistoryRequirements, identity *BhIdentity, timeInterval BalanceHistoryTimeInterval) ([]*BalanceState, error) {
resChannel := make(chan []*BalanceState)
errChannel := make(chan error)
c.balanceHistory.asyncQueue.RunTask(func(ctx context.Context) {
res, err := c.GetBalanceHistory(ctx, chainID, address, currency, timeInterval)
var dataSource BlockInfoSource
chainClient, err := chain.NewClient(c.rpcClient, identity.ChainID)
if err != nil {
errChannel <- err
return
}
network := req.NetworkManager.Find(identity.ChainID)
if network == nil {
errChannel <- errors.New("network not found")
return
}
token := req.TokenManager.FindToken(network, identity.Currency)
if token == nil {
errChannel <- errors.New("token not found")
return
}
if token.IsNative() {
dataSource = &chainClientSource{chainClient, identity.Currency}
} else {
dataSource = &tokenChainClientSource{
chainClientSource: chainClientSource{
chainClient: chainClient,
currency: identity.Currency,
},
TokenManager: req.TokenManager,
NetworkManager: req.NetworkManager,
}
}
res, err := c.GetBalanceHistory(ctx, dataSource, identity.Address, timeInterval)
if err != nil {
errChannel <- err
return
@ -599,13 +676,7 @@ func (c *Controller) GetBalanceHistoryAndInterruptUpdate(ctx context.Context, ch
}
}
func (c *Controller) GetBalanceHistory(ctx context.Context, chainID uint64, address common.Address, currency string, timeInterval BalanceHistoryTimeInterval) ([]*BalanceState, error) {
chainClient, err := chain.NewClient(c.rpcClient, chainID)
if err != nil {
return nil, err
}
dataSource := &chainClientSource{chainClient, currency}
func (c *Controller) GetBalanceHistory(ctx context.Context, dataSource BlockInfoSource, address common.Address, timeInterval BalanceHistoryTimeInterval) ([]*BalanceState, error) {
entries, err := c.balanceHistory.getBalanceHistoryFromBlocksSource(ctx, dataSource, address, timeInterval)
if err != nil {
return nil, err
@ -654,10 +725,10 @@ func (bh *BalanceHistory) getBalanceEntryFromDB(chainID uint64, address common.A
return
}
type bhIdentity struct {
chainID uint64
address common.Address
currency string
type BhIdentity struct {
ChainID uint64
Address common.Address
Currency string
}
type bhFilter struct {
@ -672,7 +743,7 @@ type bhFilter struct {
// minTimestamp and maxTimestamp interval filter the results by timestamp.
// bitsetFilter filters the results by bitset. This way higher values can include lower values to simulate time interval levels
// asc defines the order of the result by block number (which correlates also with time). If true, the result will be sorted by ascending, otherwise by descending timestamp
func (bh *BalanceHistory) getDBBalanceEntriesTimeSorted(identify *bhIdentity, startingAtBlock *big.Int, filter *bhFilter, maxEntries int, asc bool) ([]*balanceHistoryDBEntry, error) {
func (bh *BalanceHistory) getDBBalanceEntriesTimeSorted(identify *BhIdentity, startingAtBlock *big.Int, filter *bhFilter, maxEntries int, asc bool) ([]*balanceHistoryDBEntry, error) {
// Start from the first block in case a specific one was not provided
if startingAtBlock == nil {
startingAtBlock = big.NewInt(0)
@ -684,7 +755,7 @@ func (bh *BalanceHistory) getDBBalanceEntriesTimeSorted(identify *bhIdentity, st
} else {
queryStr = "SELECT block, timestamp, balance FROM balance_history WHERE chain_id = ? AND address = ? AND currency = ? AND block >= ? AND timestamp BETWEEN ? AND ? AND (bitset & ?) > 0 ORDER BY block DESC LIMIT ?"
}
rows, err := bh.db.Query(queryStr, identify.chainID, identify.address, identify.currency, (*bigint.SQLBigInt)(startingAtBlock), filter.minTimestamp, filter.maxTimestamp, filter.bitsetFilter, maxEntries)
rows, err := bh.db.Query(queryStr, identify.ChainID, identify.Address, identify.Currency, (*bigint.SQLBigInt)(startingAtBlock), filter.minTimestamp, filter.maxTimestamp, filter.bitsetFilter, maxEntries)
if err != nil {
return make([]*balanceHistoryDBEntry, 0), err
}
@ -695,8 +766,8 @@ func (bh *BalanceHistory) getDBBalanceEntriesTimeSorted(identify *bhIdentity, st
for rows.Next() && currentEntry < maxEntries {
entry := &balanceHistoryDBEntry{
chainID: 0,
address: identify.address,
currency: identify.currency,
address: identify.Address,
currency: identify.Currency,
block: new(big.Int),
balance: new(big.Int),
}
@ -704,25 +775,25 @@ func (bh *BalanceHistory) getDBBalanceEntriesTimeSorted(identify *bhIdentity, st
if err != nil {
return make([]*balanceHistoryDBEntry, 0), err
}
entry.chainID = identify.chainID
entry.chainID = identify.ChainID
result = append(result, entry)
currentEntry++
}
return result, nil
}
func (bh *BalanceHistory) getDBBalanceEntriesByTimeIntervalAndSortedAsc(identify *bhIdentity, startingAtBlock *big.Int, filter *bhFilter, maxEntries int) ([]*balanceHistoryDBEntry, error) {
func (bh *BalanceHistory) getDBBalanceEntriesByTimeIntervalAndSortedAsc(identify *BhIdentity, startingAtBlock *big.Int, filter *bhFilter, maxEntries int) ([]*balanceHistoryDBEntry, error) {
return bh.getDBBalanceEntriesTimeSorted(identify, startingAtBlock, filter, maxEntries, true)
}
func (bh *BalanceHistory) getDBBalanceEntriesByTimeIntervalAndSortedDesc(identify *bhIdentity, startingAtBlock *big.Int, filter *bhFilter, maxEntries int) ([]*balanceHistoryDBEntry, error) {
func (bh *BalanceHistory) getDBBalanceEntriesByTimeIntervalAndSortedDesc(identify *BhIdentity, startingAtBlock *big.Int, filter *bhFilter, maxEntries int) ([]*balanceHistoryDBEntry, error) {
return bh.getDBBalanceEntriesTimeSorted(identify, startingAtBlock, filter, maxEntries, false)
}
func (bh *BalanceHistory) getDBBalanceEntriesTimeSortedAsc(identify *bhIdentity, startingAtBlock *big.Int, bitsetFilter int, maxEntries int) ([]*balanceHistoryDBEntry, error) {
func (bh *BalanceHistory) getDBBalanceEntriesTimeSortedAsc(identify *BhIdentity, startingAtBlock *big.Int, bitsetFilter int, maxEntries int) ([]*balanceHistoryDBEntry, error) {
return bh.getDBBalanceEntriesTimeSorted(identify, startingAtBlock, &bhFilter{minAllRangeTimestamp, maxAllRangeTimestamp, bitsetFilter}, maxEntries, true)
}
func (bh *BalanceHistory) getDBBalanceEntriesTimeSortedDesc(identify *bhIdentity, startingAtBlock *big.Int, bitsetFilter int, maxEntries int) ([]*balanceHistoryDBEntry, error) {
func (bh *BalanceHistory) getDBBalanceEntriesTimeSortedDesc(identify *BhIdentity, startingAtBlock *big.Int, bitsetFilter int, maxEntries int) ([]*balanceHistoryDBEntry, error) {
return bh.getDBBalanceEntriesTimeSorted(identify, startingAtBlock, &bhFilter{minAllRangeTimestamp, maxAllRangeTimestamp, bitsetFilter}, maxEntries, false)
}

View File

@ -890,7 +890,7 @@ func TestBalanceHistoryGetOldestDataPoint(t *testing.T) {
require.NoError(t, err)
}
outDataPoints, err := bh.getDBBalanceEntriesTimeSortedAsc(&bhIdentity{testDataPoints[0].chainID, testDataPoints[0].address, testDataPoints[0].currency}, nil, 1, 1)
outDataPoints, err := bh.getDBBalanceEntriesTimeSortedAsc(&BhIdentity{testDataPoints[0].chainID, testDataPoints[0].address, testDataPoints[0].currency}, nil, 1, 1)
require.NoError(t, err)
require.NotEqual(t, outDataPoints, nil)
require.Equal(t, outDataPoints[0], testDataPoints[0])
@ -906,7 +906,7 @@ func TestBalanceHistoryGetLatestDataPoint(t *testing.T) {
require.NoError(t, err)
}
outDataPoints, err := bh.getDBBalanceEntriesTimeSortedDesc(&bhIdentity{testDataPoints[0].chainID, testDataPoints[0].address, testDataPoints[0].currency}, nil, 1, 1)
outDataPoints, err := bh.getDBBalanceEntriesTimeSortedDesc(&BhIdentity{testDataPoints[0].chainID, testDataPoints[0].address, testDataPoints[0].currency}, nil, 1, 1)
require.NoError(t, err)
require.NotEqual(t, outDataPoints, nil)
require.Equal(t, outDataPoints[0], testDataPoints[len(testDataPoints)-1])
@ -923,7 +923,7 @@ func TestBalanceHistoryGetClosestDataPointToTimestamp(t *testing.T) {
}
itemToGetIndex := 2
outDataPoints, err := bh.getDBBalanceEntriesByTimeIntervalAndSortedAsc(&bhIdentity{testDataPoints[0].chainID, testDataPoints[0].address, testDataPoints[0].currency}, nil, &bhFilter{testDataPoints[itemToGetIndex].timestamp, maxAllRangeTimestamp, 1}, 1)
outDataPoints, err := bh.getDBBalanceEntriesByTimeIntervalAndSortedAsc(&BhIdentity{testDataPoints[0].chainID, testDataPoints[0].address, testDataPoints[0].currency}, nil, &bhFilter{testDataPoints[itemToGetIndex].timestamp, maxAllRangeTimestamp, 1}, 1)
require.NoError(t, err)
require.NotEqual(t, outDataPoints, nil)
require.Equal(t, len(outDataPoints), 1)
@ -942,7 +942,7 @@ func TestBalanceHistoryGetDataPointsInTimeRange(t *testing.T) {
startIndex := 1
endIndex := 3
outDataPoints, err := bh.getDBBalanceEntriesByTimeIntervalAndSortedAsc(&bhIdentity{testDataPoints[0].chainID, testDataPoints[0].address, testDataPoints[0].currency}, nil, &bhFilter{testDataPoints[startIndex].timestamp, testDataPoints[endIndex].timestamp, 1}, 100)
outDataPoints, err := bh.getDBBalanceEntriesByTimeIntervalAndSortedAsc(&BhIdentity{testDataPoints[0].chainID, testDataPoints[0].address, testDataPoints[0].currency}, nil, &bhFilter{testDataPoints[startIndex].timestamp, testDataPoints[endIndex].timestamp, 1}, 100)
require.NoError(t, err)
require.NotEqual(t, outDataPoints, nil)
require.Equal(t, len(outDataPoints), endIndex-startIndex+1)

View File

@ -47,7 +47,7 @@ func (c *Controller) Start() {
}
func (c *Controller) StartBalanceHistory(networkManager *network.Manager, tokenManager *token.Manager) {
c.balanceHistory.StartBalanceHistory(c.rpcClient, networkManager, tokenManager)
c.balanceHistory.StartBalanceHistory(&BalanceHistoryRequirements{networkManager, tokenManager}, c.rpcClient)
}
func (c *Controller) Stop() {