Stefan a6d553c937 fix(wallet) ClientWithFallback handling state errors required by BH
The "not found" and "no contract" expected error is caught and wrapped
by ClientWithFallback. The fetching of balance history of next blocks
is aborted which is not desired.

Fix by not treating the error as a connection error in
ClientWithFallback.
2023-05-02 12:07:06 +02:00

363 lines
12 KiB
Go

package history
import (
"context"
"errors"
"math/big"
"time"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
)
type Balance struct {
db *BalanceDB
}
const (
defaultChains = uint64(0)
aDay = time.Duration(24) * time.Hour
)
var averageBlockDurationForChain = map[uint64]time.Duration{
defaultChains: time.Duration(12000) * time.Millisecond,
10: time.Duration(400) * time.Millisecond, // Optimism
420: time.Duration(2000) * time.Millisecond, // Optimism Testnet
42161: time.Duration(300) * time.Millisecond, // Arbitrum
421611: time.Duration(1500) * time.Millisecond, // Arbitrum Testnet
}
// Must have a common divisor to share common blocks and increase the cache hit
const (
twiceADayStride time.Duration = time.Duration(12) * time.Hour
weekStride = 14 * twiceADayStride
monthsStride = 1 /*months*/ * 4 * weekStride
)
// bitsetFilters used to fetch relevant data points in one batch and to increase cache hit
const (
filterAllTime bitsetFilter = 1
filterWeekly bitsetFilter = 1 << 3
filterTwiceADay bitsetFilter = 1 << 5
)
type TimeInterval int
// Specific time intervals for which balance history can be fetched
const (
BalanceHistory7Days TimeInterval = iota + 1
BalanceHistory1Month
BalanceHistory6Months
BalanceHistory1Year
BalanceHistoryAllTime
)
var timeIntervalDuration = map[TimeInterval]time.Duration{
BalanceHistory7Days: time.Duration(7) * aDay,
BalanceHistory1Month: time.Duration(30) * aDay,
BalanceHistory6Months: time.Duration(6*30) * aDay,
BalanceHistory1Year: time.Duration(365) * aDay,
}
var timeIntervalToBitsetFilter = map[TimeInterval]bitsetFilter{
BalanceHistory7Days: filterTwiceADay,
BalanceHistory1Month: filterTwiceADay,
BalanceHistory6Months: filterWeekly,
BalanceHistory1Year: filterWeekly,
BalanceHistoryAllTime: filterAllTime,
}
var timeIntervalToStrideDuration = map[TimeInterval]time.Duration{
BalanceHistory7Days: twiceADayStride,
BalanceHistory1Month: twiceADayStride,
BalanceHistory6Months: weekStride,
BalanceHistory1Year: weekStride,
BalanceHistoryAllTime: monthsStride,
}
func strideBlockCount(timeInterval TimeInterval, chainID uint64) int {
blockDuration, found := averageBlockDurationForChain[chainID]
if !found {
blockDuration = averageBlockDurationForChain[defaultChains]
}
return int(timeIntervalToStrideDuration[timeInterval] / blockDuration)
}
func NewBalance(db *BalanceDB) *Balance {
return &Balance{
db: db,
}
}
// DataSource used as an abstraction to fetch required data from a specific blockchain
type DataSource interface {
HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error)
BalanceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (*big.Int, error)
ChainID() uint64
Currency() string
TimeNow() int64
}
type DataPoint struct {
Balance *hexutil.Big
Timestamp uint64
BlockNumber *hexutil.Big
}
// fetchAndCache will process the last available block if blocNo is nil
// reuses previous fetched blocks timestamp to avoid fetching block headers again
func (b *Balance) fetchAndCache(ctx context.Context, source DataSource, address common.Address, blockNo *big.Int, bitset bitsetFilter) (*DataPoint, *big.Int, error) {
var outEntry *entry
var err error
if blockNo != nil {
cached, bitsetList, err := b.db.get(&assetIdentity{source.ChainID(), address, source.Currency()}, blockNo, 1, asc)
if err != nil {
return nil, nil, err
}
if len(cached) > 0 && cached[0].block.Cmp(blockNo) == 0 {
// found a match update bitset
err := b.db.updateBitset(&assetIdentity{source.ChainID(), address, source.Currency()}, blockNo, bitset|bitsetList[0])
if err != nil {
return nil, nil, err
}
return &DataPoint{
Balance: (*hexutil.Big)(cached[0].balance),
Timestamp: uint64(cached[0].timestamp),
BlockNumber: (*hexutil.Big)(cached[0].block),
}, blockNo, nil
}
// otherwise try fetch any to get the timestamp info
outEntry, _, err = b.db.getFirst(source.ChainID(), blockNo)
if err != nil {
return nil, nil, err
}
}
var timestamp int64
if outEntry != nil {
timestamp = outEntry.timestamp
} else {
header, err := source.HeaderByNumber(ctx, blockNo)
if err != nil {
return nil, nil, err
}
blockNo = new(big.Int).Set(header.Number)
timestamp = int64(header.Time)
}
currentBalance, err := source.BalanceAt(ctx, address, blockNo)
if err != nil {
return nil, nil, err
}
entry := entry{
chainID: source.ChainID(),
address: address,
tokenSymbol: source.Currency(),
block: new(big.Int).Set(blockNo),
balance: currentBalance,
timestamp: timestamp,
}
err = b.db.add(&entry, bitset)
if err != nil {
return nil, nil, err
}
var dataPoint DataPoint
dataPoint.Balance = (*hexutil.Big)(currentBalance)
dataPoint.Timestamp = uint64(timestamp)
return &dataPoint, blockNo, nil
}
// update retrieves the balance history for a specified asset from the database initially
// and supplements any missing information from the blockchain to minimize the number of RPC calls.
// if context is cancelled it will return with error
func (b *Balance) update(ctx context.Context, source DataSource, address common.Address, timeInterval TimeInterval) error {
startTimestamp := int64(0)
fetchTimestamp := int64(0)
endTime := source.TimeNow()
if timeInterval != BalanceHistoryAllTime {
// Ensure we always get the complete range by fetching the next block also
startTimestamp = endTime - int64(timeIntervalDuration[timeInterval].Seconds())
fetchTimestamp = startTimestamp - int64(timeIntervalToStrideDuration[timeInterval].Seconds())
}
identity := &assetIdentity{source.ChainID(), address, source.Currency()}
firstCached, err := b.firstCachedStartingAt(identity, fetchTimestamp, timeInterval)
if err != nil {
return err
}
var oldestCached *big.Int
var oldestTimestamp int64
var newestCached *big.Int
if firstCached != nil {
oldestCached = new(big.Int).Set(firstCached.block)
oldestTimestamp = firstCached.timestamp
lastCached, err := b.lastCached(identity, timeInterval)
if err != nil {
return err
}
newestCached = new(big.Int).Set(lastCached.block)
} else {
var fetchBlock *big.Int
lastEntry, _, err := b.db.getLastEntryForChain(source.ChainID())
if err != nil {
return err
}
if lastEntry != nil {
fetchBlock = new(big.Int).Set(lastEntry.block)
}
mostRecentDataPoint, mostRecentBlock, err := b.fetchAndCache(ctx, source, address, fetchBlock, timeIntervalToBitsetFilter[timeInterval])
if err != nil {
return err
}
oldestCached = new(big.Int).Set(mostRecentBlock)
oldestTimestamp = int64(mostRecentDataPoint.Timestamp)
newestCached = new(big.Int).Set(mostRecentBlock)
}
if oldestTimestamp > startTimestamp {
err := b.fetchBackwardAndCache(ctx, source, address, oldestCached, startTimestamp, timeInterval)
if err != nil {
return err
}
}
// Fetch forward if didn't update in a stride duration
err = b.fetchForwardAndCache(ctx, source, address, newestCached, timeInterval)
if err != nil {
return err
}
return nil
}
// get returns the balance history for the given address and time interval until endTimestamp
func (b *Balance) get(ctx context.Context, chainID uint64, currency string, address common.Address, endTimestamp int64, timeInterval TimeInterval) ([]*DataPoint, error) {
startTimestamp := int64(0)
fetchTimestamp := int64(0)
if timeInterval != BalanceHistoryAllTime {
// Ensure we always get the complete range by fetching the next block also
startTimestamp = endTimestamp - int64(timeIntervalDuration[timeInterval].Seconds())
fetchTimestamp = startTimestamp - int64(timeIntervalToStrideDuration[timeInterval].Seconds())
}
cached, _, err := b.db.filter(&assetIdentity{chainID, address, currency}, nil, &balanceFilter{fetchTimestamp, endTimestamp, expandFlag(timeIntervalToBitsetFilter[timeInterval])}, 800, asc)
if err != nil {
return nil, err
}
points := make([]*DataPoint, 0, len(cached)+1)
for _, entry := range cached {
dataPoint := DataPoint{
Balance: (*hexutil.Big)(entry.balance),
Timestamp: uint64(entry.timestamp),
BlockNumber: (*hexutil.Big)(entry.block),
}
points = append(points, &dataPoint)
}
lastCached, _, err := b.db.get(&assetIdentity{chainID, address, currency}, nil, 1, desc)
if err != nil {
return nil, err
}
if len(lastCached) > 0 && len(cached) > 0 && lastCached[0].block.Cmp(cached[len(cached)-1].block) > 0 {
points = append(points, &DataPoint{
Balance: (*hexutil.Big)(lastCached[0].balance),
Timestamp: uint64(lastCached[0].timestamp),
BlockNumber: (*hexutil.Big)(lastCached[0].block),
})
}
return points, nil
}
// fetchBackwardAndCache fetches and adds to DB balance entries starting one stride before the endBlock and stops
// when reaching a block timestamp older than startTimestamp or genesis block
// relies on the approximation of a block length to match averageBlockDurationForChain for sampling the data
func (b *Balance) fetchBackwardAndCache(ctx context.Context, source DataSource, address common.Address, endBlock *big.Int, startTimestamp int64, timeInterval TimeInterval) error {
stride := strideBlockCount(timeInterval, source.ChainID())
nextBlock := new(big.Int).Set(endBlock)
for nextBlock.Cmp(big.NewInt(1)) > 0 {
if shouldCancel(ctx) {
return errors.New("context cancelled")
}
nextBlock.Sub(nextBlock, big.NewInt(int64(stride)))
if nextBlock.Cmp(big.NewInt(0)) <= 0 {
// we reached the genesis block which doesn't have a usable timestamp, fetch next
nextBlock.SetUint64(1)
}
dataPoint, _, err := b.fetchAndCache(ctx, source, address, nextBlock, timeIntervalToBitsetFilter[timeInterval])
if err != nil {
return err
}
// Allow to go back one stride to match the requested interval
if int64(dataPoint.Timestamp) < startTimestamp {
return nil
}
}
return nil
}
// fetchForwardAndCache fetches and adds to DB balance entries starting one stride before the startBlock and stops
// when block not found
// relies on the approximation of a block length to match averageBlockDurationForChain
func (b *Balance) fetchForwardAndCache(ctx context.Context, source DataSource, address common.Address, startBlock *big.Int, timeInterval TimeInterval) error {
stride := strideBlockCount(timeInterval, source.ChainID())
nextBlock := new(big.Int).Set(startBlock)
for {
if shouldCancel(ctx) {
return errors.New("context cancelled")
}
nextBlock.Add(nextBlock, big.NewInt(int64(stride)))
_, _, err := b.fetchAndCache(ctx, source, address, nextBlock, timeIntervalToBitsetFilter[timeInterval])
if err != nil {
if err == ethereum.NotFound {
// We overshoot, stop and return what we have
return nil
}
return err
}
}
}
// firstCachedStartingAt returns first cached entry for the given identity and time interval starting at fetchTimestamp or nil if none found
func (b *Balance) firstCachedStartingAt(identity *assetIdentity, startTimestamp int64, timeInterval TimeInterval) (first *entry, err error) {
entries, _, err := b.db.filter(identity, nil, &balanceFilter{startTimestamp, maxAllRangeTimestamp, expandFlag(timeIntervalToBitsetFilter[timeInterval])}, 1, desc)
if err != nil {
return nil, err
} else if len(entries) == 0 {
return nil, nil
}
return entries[0], nil
}
// lastCached returns last cached entry for the given identity and time interval or nil if none found
func (b *Balance) lastCached(identity *assetIdentity, timeInterval TimeInterval) (first *entry, err error) {
entries, _, err := b.db.filter(identity, nil, &balanceFilter{minAllRangeTimestamp, maxAllRangeTimestamp, expandFlag(timeIntervalToBitsetFilter[timeInterval])}, 1, desc)
if err != nil {
return nil, err
} else if len(entries) == 0 {
return nil, nil
}
return entries[0], nil
}
// shouldCancel returns true if the context has been cancelled and task should be aborted
func shouldCancel(ctx context.Context) bool {
select {
case <-ctx.Done():
return true
default:
}
return false
}