mirror of
https://github.com/status-im/status-go.git
synced 2025-01-24 21:49:54 +00:00
a6d553c937
The "not found" and "no contract" expected error is caught and wrapped by ClientWithFallback. The fetching of balance history of next blocks is aborted which is not desired. Fix by not treating the error as a connection error in ClientWithFallback.
363 lines
12 KiB
Go
363 lines
12 KiB
Go
package history
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"math/big"
|
|
"time"
|
|
|
|
"github.com/ethereum/go-ethereum"
|
|
"github.com/ethereum/go-ethereum/common"
|
|
"github.com/ethereum/go-ethereum/common/hexutil"
|
|
"github.com/ethereum/go-ethereum/core/types"
|
|
)
|
|
|
|
type Balance struct {
|
|
db *BalanceDB
|
|
}
|
|
|
|
const (
|
|
defaultChains = uint64(0)
|
|
aDay = time.Duration(24) * time.Hour
|
|
)
|
|
|
|
var averageBlockDurationForChain = map[uint64]time.Duration{
|
|
defaultChains: time.Duration(12000) * time.Millisecond,
|
|
10: time.Duration(400) * time.Millisecond, // Optimism
|
|
420: time.Duration(2000) * time.Millisecond, // Optimism Testnet
|
|
42161: time.Duration(300) * time.Millisecond, // Arbitrum
|
|
421611: time.Duration(1500) * time.Millisecond, // Arbitrum Testnet
|
|
}
|
|
|
|
// Must have a common divisor to share common blocks and increase the cache hit
|
|
const (
|
|
twiceADayStride time.Duration = time.Duration(12) * time.Hour
|
|
weekStride = 14 * twiceADayStride
|
|
monthsStride = 1 /*months*/ * 4 * weekStride
|
|
)
|
|
|
|
// bitsetFilters used to fetch relevant data points in one batch and to increase cache hit
|
|
const (
|
|
filterAllTime bitsetFilter = 1
|
|
filterWeekly bitsetFilter = 1 << 3
|
|
filterTwiceADay bitsetFilter = 1 << 5
|
|
)
|
|
|
|
type TimeInterval int
|
|
|
|
// Specific time intervals for which balance history can be fetched
|
|
const (
|
|
BalanceHistory7Days TimeInterval = iota + 1
|
|
BalanceHistory1Month
|
|
BalanceHistory6Months
|
|
BalanceHistory1Year
|
|
BalanceHistoryAllTime
|
|
)
|
|
|
|
var timeIntervalDuration = map[TimeInterval]time.Duration{
|
|
BalanceHistory7Days: time.Duration(7) * aDay,
|
|
BalanceHistory1Month: time.Duration(30) * aDay,
|
|
BalanceHistory6Months: time.Duration(6*30) * aDay,
|
|
BalanceHistory1Year: time.Duration(365) * aDay,
|
|
}
|
|
|
|
var timeIntervalToBitsetFilter = map[TimeInterval]bitsetFilter{
|
|
BalanceHistory7Days: filterTwiceADay,
|
|
BalanceHistory1Month: filterTwiceADay,
|
|
BalanceHistory6Months: filterWeekly,
|
|
BalanceHistory1Year: filterWeekly,
|
|
BalanceHistoryAllTime: filterAllTime,
|
|
}
|
|
|
|
var timeIntervalToStrideDuration = map[TimeInterval]time.Duration{
|
|
BalanceHistory7Days: twiceADayStride,
|
|
BalanceHistory1Month: twiceADayStride,
|
|
BalanceHistory6Months: weekStride,
|
|
BalanceHistory1Year: weekStride,
|
|
BalanceHistoryAllTime: monthsStride,
|
|
}
|
|
|
|
func strideBlockCount(timeInterval TimeInterval, chainID uint64) int {
|
|
blockDuration, found := averageBlockDurationForChain[chainID]
|
|
if !found {
|
|
blockDuration = averageBlockDurationForChain[defaultChains]
|
|
}
|
|
|
|
return int(timeIntervalToStrideDuration[timeInterval] / blockDuration)
|
|
}
|
|
|
|
func NewBalance(db *BalanceDB) *Balance {
|
|
return &Balance{
|
|
db: db,
|
|
}
|
|
}
|
|
|
|
// DataSource used as an abstraction to fetch required data from a specific blockchain
|
|
type DataSource interface {
|
|
HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error)
|
|
BalanceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (*big.Int, error)
|
|
ChainID() uint64
|
|
Currency() string
|
|
TimeNow() int64
|
|
}
|
|
|
|
type DataPoint struct {
|
|
Balance *hexutil.Big
|
|
Timestamp uint64
|
|
BlockNumber *hexutil.Big
|
|
}
|
|
|
|
// fetchAndCache will process the last available block if blocNo is nil
|
|
// reuses previous fetched blocks timestamp to avoid fetching block headers again
|
|
func (b *Balance) fetchAndCache(ctx context.Context, source DataSource, address common.Address, blockNo *big.Int, bitset bitsetFilter) (*DataPoint, *big.Int, error) {
|
|
var outEntry *entry
|
|
var err error
|
|
if blockNo != nil {
|
|
cached, bitsetList, err := b.db.get(&assetIdentity{source.ChainID(), address, source.Currency()}, blockNo, 1, asc)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
if len(cached) > 0 && cached[0].block.Cmp(blockNo) == 0 {
|
|
// found a match update bitset
|
|
err := b.db.updateBitset(&assetIdentity{source.ChainID(), address, source.Currency()}, blockNo, bitset|bitsetList[0])
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
return &DataPoint{
|
|
Balance: (*hexutil.Big)(cached[0].balance),
|
|
Timestamp: uint64(cached[0].timestamp),
|
|
BlockNumber: (*hexutil.Big)(cached[0].block),
|
|
}, blockNo, nil
|
|
}
|
|
|
|
// otherwise try fetch any to get the timestamp info
|
|
outEntry, _, err = b.db.getFirst(source.ChainID(), blockNo)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
}
|
|
var timestamp int64
|
|
if outEntry != nil {
|
|
timestamp = outEntry.timestamp
|
|
} else {
|
|
header, err := source.HeaderByNumber(ctx, blockNo)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
blockNo = new(big.Int).Set(header.Number)
|
|
timestamp = int64(header.Time)
|
|
}
|
|
|
|
currentBalance, err := source.BalanceAt(ctx, address, blockNo)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
entry := entry{
|
|
chainID: source.ChainID(),
|
|
address: address,
|
|
tokenSymbol: source.Currency(),
|
|
block: new(big.Int).Set(blockNo),
|
|
balance: currentBalance,
|
|
timestamp: timestamp,
|
|
}
|
|
err = b.db.add(&entry, bitset)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
var dataPoint DataPoint
|
|
dataPoint.Balance = (*hexutil.Big)(currentBalance)
|
|
dataPoint.Timestamp = uint64(timestamp)
|
|
return &dataPoint, blockNo, nil
|
|
}
|
|
|
|
// update retrieves the balance history for a specified asset from the database initially
|
|
// and supplements any missing information from the blockchain to minimize the number of RPC calls.
|
|
// if context is cancelled it will return with error
|
|
func (b *Balance) update(ctx context.Context, source DataSource, address common.Address, timeInterval TimeInterval) error {
|
|
startTimestamp := int64(0)
|
|
fetchTimestamp := int64(0)
|
|
endTime := source.TimeNow()
|
|
if timeInterval != BalanceHistoryAllTime {
|
|
// Ensure we always get the complete range by fetching the next block also
|
|
startTimestamp = endTime - int64(timeIntervalDuration[timeInterval].Seconds())
|
|
fetchTimestamp = startTimestamp - int64(timeIntervalToStrideDuration[timeInterval].Seconds())
|
|
}
|
|
identity := &assetIdentity{source.ChainID(), address, source.Currency()}
|
|
firstCached, err := b.firstCachedStartingAt(identity, fetchTimestamp, timeInterval)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var oldestCached *big.Int
|
|
var oldestTimestamp int64
|
|
var newestCached *big.Int
|
|
if firstCached != nil {
|
|
oldestCached = new(big.Int).Set(firstCached.block)
|
|
oldestTimestamp = firstCached.timestamp
|
|
lastCached, err := b.lastCached(identity, timeInterval)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
newestCached = new(big.Int).Set(lastCached.block)
|
|
} else {
|
|
var fetchBlock *big.Int
|
|
lastEntry, _, err := b.db.getLastEntryForChain(source.ChainID())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if lastEntry != nil {
|
|
fetchBlock = new(big.Int).Set(lastEntry.block)
|
|
}
|
|
mostRecentDataPoint, mostRecentBlock, err := b.fetchAndCache(ctx, source, address, fetchBlock, timeIntervalToBitsetFilter[timeInterval])
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
oldestCached = new(big.Int).Set(mostRecentBlock)
|
|
oldestTimestamp = int64(mostRecentDataPoint.Timestamp)
|
|
newestCached = new(big.Int).Set(mostRecentBlock)
|
|
}
|
|
|
|
if oldestTimestamp > startTimestamp {
|
|
err := b.fetchBackwardAndCache(ctx, source, address, oldestCached, startTimestamp, timeInterval)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Fetch forward if didn't update in a stride duration
|
|
err = b.fetchForwardAndCache(ctx, source, address, newestCached, timeInterval)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// get returns the balance history for the given address and time interval until endTimestamp
|
|
func (b *Balance) get(ctx context.Context, chainID uint64, currency string, address common.Address, endTimestamp int64, timeInterval TimeInterval) ([]*DataPoint, error) {
|
|
startTimestamp := int64(0)
|
|
fetchTimestamp := int64(0)
|
|
if timeInterval != BalanceHistoryAllTime {
|
|
// Ensure we always get the complete range by fetching the next block also
|
|
startTimestamp = endTimestamp - int64(timeIntervalDuration[timeInterval].Seconds())
|
|
fetchTimestamp = startTimestamp - int64(timeIntervalToStrideDuration[timeInterval].Seconds())
|
|
}
|
|
cached, _, err := b.db.filter(&assetIdentity{chainID, address, currency}, nil, &balanceFilter{fetchTimestamp, endTimestamp, expandFlag(timeIntervalToBitsetFilter[timeInterval])}, 800, asc)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
points := make([]*DataPoint, 0, len(cached)+1)
|
|
for _, entry := range cached {
|
|
dataPoint := DataPoint{
|
|
Balance: (*hexutil.Big)(entry.balance),
|
|
Timestamp: uint64(entry.timestamp),
|
|
BlockNumber: (*hexutil.Big)(entry.block),
|
|
}
|
|
points = append(points, &dataPoint)
|
|
}
|
|
|
|
lastCached, _, err := b.db.get(&assetIdentity{chainID, address, currency}, nil, 1, desc)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if len(lastCached) > 0 && len(cached) > 0 && lastCached[0].block.Cmp(cached[len(cached)-1].block) > 0 {
|
|
points = append(points, &DataPoint{
|
|
Balance: (*hexutil.Big)(lastCached[0].balance),
|
|
Timestamp: uint64(lastCached[0].timestamp),
|
|
BlockNumber: (*hexutil.Big)(lastCached[0].block),
|
|
})
|
|
}
|
|
|
|
return points, nil
|
|
}
|
|
|
|
// fetchBackwardAndCache fetches and adds to DB balance entries starting one stride before the endBlock and stops
|
|
// when reaching a block timestamp older than startTimestamp or genesis block
|
|
// relies on the approximation of a block length to match averageBlockDurationForChain for sampling the data
|
|
func (b *Balance) fetchBackwardAndCache(ctx context.Context, source DataSource, address common.Address, endBlock *big.Int, startTimestamp int64, timeInterval TimeInterval) error {
|
|
stride := strideBlockCount(timeInterval, source.ChainID())
|
|
nextBlock := new(big.Int).Set(endBlock)
|
|
for nextBlock.Cmp(big.NewInt(1)) > 0 {
|
|
if shouldCancel(ctx) {
|
|
return errors.New("context cancelled")
|
|
}
|
|
|
|
nextBlock.Sub(nextBlock, big.NewInt(int64(stride)))
|
|
if nextBlock.Cmp(big.NewInt(0)) <= 0 {
|
|
// we reached the genesis block which doesn't have a usable timestamp, fetch next
|
|
nextBlock.SetUint64(1)
|
|
}
|
|
|
|
dataPoint, _, err := b.fetchAndCache(ctx, source, address, nextBlock, timeIntervalToBitsetFilter[timeInterval])
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Allow to go back one stride to match the requested interval
|
|
if int64(dataPoint.Timestamp) < startTimestamp {
|
|
return nil
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// fetchForwardAndCache fetches and adds to DB balance entries starting one stride before the startBlock and stops
|
|
// when block not found
|
|
// relies on the approximation of a block length to match averageBlockDurationForChain
|
|
func (b *Balance) fetchForwardAndCache(ctx context.Context, source DataSource, address common.Address, startBlock *big.Int, timeInterval TimeInterval) error {
|
|
stride := strideBlockCount(timeInterval, source.ChainID())
|
|
nextBlock := new(big.Int).Set(startBlock)
|
|
for {
|
|
if shouldCancel(ctx) {
|
|
return errors.New("context cancelled")
|
|
}
|
|
|
|
nextBlock.Add(nextBlock, big.NewInt(int64(stride)))
|
|
_, _, err := b.fetchAndCache(ctx, source, address, nextBlock, timeIntervalToBitsetFilter[timeInterval])
|
|
if err != nil {
|
|
if err == ethereum.NotFound {
|
|
// We overshoot, stop and return what we have
|
|
return nil
|
|
}
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
// firstCachedStartingAt returns first cached entry for the given identity and time interval starting at fetchTimestamp or nil if none found
|
|
func (b *Balance) firstCachedStartingAt(identity *assetIdentity, startTimestamp int64, timeInterval TimeInterval) (first *entry, err error) {
|
|
entries, _, err := b.db.filter(identity, nil, &balanceFilter{startTimestamp, maxAllRangeTimestamp, expandFlag(timeIntervalToBitsetFilter[timeInterval])}, 1, desc)
|
|
if err != nil {
|
|
return nil, err
|
|
} else if len(entries) == 0 {
|
|
return nil, nil
|
|
}
|
|
return entries[0], nil
|
|
}
|
|
|
|
// lastCached returns last cached entry for the given identity and time interval or nil if none found
|
|
func (b *Balance) lastCached(identity *assetIdentity, timeInterval TimeInterval) (first *entry, err error) {
|
|
entries, _, err := b.db.filter(identity, nil, &balanceFilter{minAllRangeTimestamp, maxAllRangeTimestamp, expandFlag(timeIntervalToBitsetFilter[timeInterval])}, 1, desc)
|
|
if err != nil {
|
|
return nil, err
|
|
} else if len(entries) == 0 {
|
|
return nil, nil
|
|
}
|
|
return entries[0], nil
|
|
}
|
|
|
|
// shouldCancel returns true if the context has been cancelled and task should be aborted
|
|
func shouldCancel(ctx context.Context) bool {
|
|
select {
|
|
case <-ctx.Done():
|
|
return true
|
|
default:
|
|
}
|
|
return false
|
|
}
|