Stefan 601484af3e feat: Retrieve balance history for native token
Add functionality to sample and retrieve balance history and cache
it in memory for the current transfer controller.

The end of the balance history is snapped at twice per day to
avoid having to query the blockchain again for each fetching within
12 hours interval

The functionality will be extended with DB caching, API call batching,
"smarter" cache hitting and syncing between devices

Updates: #7662
2022-11-07 22:39:21 +01:00

423 lines
12 KiB
Go

package transfer
import (
"context"
"database/sql"
"math/big"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/status-im/status-go/multiaccounts/accounts"
"github.com/status-im/status-go/rpc"
"github.com/status-im/status-go/services/wallet/async"
"github.com/status-im/status-go/services/wallet/chain"
)
type Controller struct {
db *Database
rpcClient *rpc.Client
signals *SignalsTransmitter
block *Block
reactor *Reactor
accountFeed *event.Feed
TransferFeed *event.Feed
group *async.Group
balanceCache *balanceCache
}
func NewTransferController(db *sql.DB, rpcClient *rpc.Client, accountFeed *event.Feed) *Controller {
transferFeed := &event.Feed{}
signals := &SignalsTransmitter{
publisher: transferFeed,
}
block := &Block{db}
return &Controller{
db: NewDB(db),
block: block,
rpcClient: rpcClient,
signals: signals,
accountFeed: accountFeed,
TransferFeed: transferFeed,
}
}
func (c *Controller) Start() error {
c.group = async.NewGroup(context.Background())
return c.signals.Start()
}
func (c *Controller) Stop() {
c.signals.Stop()
if c.reactor != nil {
c.reactor.stop()
}
if c.group != nil {
c.group.Stop()
c.group.Wait()
c.group = nil
}
}
func (c *Controller) SetInitialBlocksRange(chainIDs []uint64) error {
chainClients, err := chain.NewClients(c.rpcClient, chainIDs)
if err != nil {
return err
}
for _, chainClient := range chainClients {
err := c.block.setInitialBlocksRange(chainClient)
if err != nil {
return err
}
}
return nil
}
func (c *Controller) CheckRecentHistory(chainIDs []uint64, accounts []common.Address) error {
if len(accounts) == 0 {
log.Info("no accounts provided")
return nil
}
if len(chainIDs) == 0 {
log.Info("no chain provided")
return nil
}
err := c.block.mergeBlocksRanges(chainIDs, accounts)
if err != nil {
return err
}
chainClients, err := chain.NewClients(c.rpcClient, chainIDs)
if err != nil {
return err
}
if c.reactor != nil {
err := c.reactor.restart(chainClients, accounts)
if err != nil {
return err
}
}
c.reactor = &Reactor{
db: c.db,
feed: c.TransferFeed,
block: c.block,
}
err = c.reactor.start(chainClients, accounts)
if err != nil {
return err
}
c.group.Add(func(ctx context.Context) error {
return watchAccountsChanges(ctx, c.accountFeed, c.reactor, chainClients, accounts)
})
return nil
}
// watchAccountsChanges subsribes to a feed and watches for changes in accounts list. If there are new or removed accounts
// reactor will be restarted.
func watchAccountsChanges(ctx context.Context, accountFeed *event.Feed, reactor *Reactor, chainClients []*chain.Client, initial []common.Address) error {
accounts := make(chan []*accounts.Account, 1) // it may block if the rate of updates will be significantly higher
sub := accountFeed.Subscribe(accounts)
defer sub.Unsubscribe()
listen := make(map[common.Address]struct{}, len(initial))
for _, address := range initial {
listen[address] = struct{}{}
}
for {
select {
case <-ctx.Done():
return nil
case err := <-sub.Err():
if err != nil {
log.Error("accounts watcher subscription failed", "error", err)
}
case n := <-accounts:
log.Debug("wallet received updated list of accounts", "accounts", n)
restart := false
for _, acc := range n {
_, exist := listen[common.Address(acc.Address)]
if !exist {
listen[common.Address(acc.Address)] = struct{}{}
restart = true
}
}
if !restart {
continue
}
listenList := mapToList(listen)
log.Debug("list of accounts was changed from a previous version. reactor will be restarted", "new", listenList)
err := reactor.restart(chainClients, listenList)
if err != nil {
log.Error("failed to restart reactor with new accounts", "error", err)
}
}
}
}
func mapToList(m map[common.Address]struct{}) []common.Address {
rst := make([]common.Address, 0, len(m))
for address := range m {
rst = append(rst, address)
}
return rst
}
func (c *Controller) LoadTransferByHash(ctx context.Context, rpcClient *rpc.Client, address common.Address, hash common.Hash) error {
chainClient, err := chain.NewClient(rpcClient, rpcClient.UpstreamChainID)
if err != nil {
return err
}
signer := types.NewLondonSigner(chainClient.ToBigInt())
transfer, err := getTransferByHash(ctx, chainClient, signer, address, hash)
if err != nil {
return err
}
transfers := []Transfer{*transfer}
err = c.db.InsertBlock(rpcClient.UpstreamChainID, address, transfer.BlockNumber, transfer.BlockHash)
if err != nil {
return err
}
blocks := []*big.Int{transfer.BlockNumber}
err = c.db.SaveTranfers(rpcClient.UpstreamChainID, address, transfers, blocks)
if err != nil {
return err
}
return nil
}
func (c *Controller) GetTransfersByAddress(ctx context.Context, chainID uint64, address common.Address, toBlock *big.Int, limit int64, fetchMore bool) ([]View, error) {
log.Debug("[WalletAPI:: GetTransfersByAddress] get transfers for an address", "address", address)
rst, err := c.db.GetTransfersByAddress(chainID, address, toBlock, limit)
if err != nil {
log.Error("[WalletAPI:: GetTransfersByAddress] can't fetch transfers", "err", err)
return nil, err
}
transfersCount := int64(len(rst))
chainClient, err := chain.NewClient(c.rpcClient, chainID)
if err != nil {
return nil, err
}
if fetchMore && limit > transfersCount {
block, err := c.block.GetFirstKnownBlock(chainID, address)
if err != nil {
return nil, err
}
// if zero block was already checked there is nothing to find more
if block == nil || big.NewInt(0).Cmp(block) == 0 {
return castToTransferViews(rst), nil
}
from, err := findFirstRange(ctx, address, block, chainClient)
if err != nil {
if nonArchivalNodeError(err) {
c.TransferFeed.Send(Event{
Type: EventNonArchivalNodeDetected,
})
from = big.NewInt(0).Sub(block, big.NewInt(100))
} else {
log.Error("first range error", "error", err)
return nil, err
}
}
fromByAddress := map[common.Address]*LastKnownBlock{address: {
Number: from,
}}
toByAddress := map[common.Address]*big.Int{address: block}
if c.balanceCache == nil {
c.balanceCache = newBalanceCache()
}
blocksCommand := &findAndCheckBlockRangeCommand{
accounts: []common.Address{address},
db: c.db,
chainClient: chainClient,
balanceCache: c.balanceCache,
feed: c.TransferFeed,
fromByAddress: fromByAddress,
toByAddress: toByAddress,
}
if err = blocksCommand.Command()(ctx); err != nil {
return nil, err
}
blocks, err := c.block.GetBlocksByAddress(chainID, address, numberOfBlocksCheckedPerIteration)
if err != nil {
return nil, err
}
log.Info("checking blocks again", "blocks", len(blocks))
if len(blocks) > 0 {
txCommand := &loadTransfersCommand{
accounts: []common.Address{address},
db: c.db,
block: c.block,
chainClient: chainClient,
}
err = txCommand.Command()(ctx)
if err != nil {
return nil, err
}
rst, err = c.db.GetTransfersByAddress(chainID, address, toBlock, limit)
if err != nil {
return nil, err
}
}
}
return castToTransferViews(rst), nil
}
func (c *Controller) GetCachedBalances(ctx context.Context, chainID uint64, addresses []common.Address) ([]LastKnownBlockView, error) {
result, error := c.block.getLastKnownBalances(chainID, addresses)
if error != nil {
return nil, error
}
return blocksToViews(result), nil
}
type BalanceState struct {
Value *hexutil.Big `json:"value"`
Timestamp uint64 `json:"time"`
}
type BalanceHistoryTimeInterval int
const (
BalanceHistory7Hours BalanceHistoryTimeInterval = iota + 1
BalanceHistory1Month
BalanceHistory6Months
BalanceHistory1Year
BalanceHistoryAllTime
)
var balanceHistoryTimeIntervalToHoursPerStep = map[BalanceHistoryTimeInterval]int64{
BalanceHistory7Hours: 2,
BalanceHistory1Month: 12,
BalanceHistory6Months: (24 * 7) / 2,
BalanceHistory1Year: 24 * 7,
}
var balanceHistoryTimeIntervalToSampleNo = map[BalanceHistoryTimeInterval]int64{
BalanceHistory7Hours: 84,
BalanceHistory1Month: 60,
BalanceHistory6Months: 52,
BalanceHistory1Year: 52,
BalanceHistoryAllTime: 50,
}
// GetBalanceHistory expect a time precision of +/- average block time (~12s)
// implementation relies that a block has constant time length to save block header requests
func (c *Controller) GetBalanceHistory(ctx context.Context, chainID uint64, address common.Address, timeInterval BalanceHistoryTimeInterval) ([]BalanceState, error) {
chainClient, err := chain.NewClient(c.rpcClient, chainID)
if err != nil {
return nil, err
}
if c.balanceCache == nil {
c.balanceCache = newBalanceCache()
}
if c.balanceCache.history == nil {
c.balanceCache.history = new(balanceHistoryCache)
}
currentTimestamp := time.Now().Unix()
lastBlockNo := big.NewInt(0)
var lastBlockTimestamp int64
if (currentTimestamp - c.balanceCache.history.lastBlockTimestamp) >= (12 * 60 * 60) {
lastBlock, err := chainClient.BlockByNumber(ctx, nil)
if err != nil {
return nil, err
}
lastBlockNo.Set(lastBlock.Number())
lastBlockTimestamp = int64(lastBlock.Time())
c.balanceCache.history.lastBlockNo = big.NewInt(0).Set(lastBlockNo)
c.balanceCache.history.lastBlockTimestamp = lastBlockTimestamp
} else {
lastBlockNo.Set(c.balanceCache.history.lastBlockNo)
lastBlockTimestamp = c.balanceCache.history.lastBlockTimestamp
}
initialBlock, err := chainClient.BlockByNumber(ctx, big.NewInt(1))
if err != nil {
return nil, err
}
initialBlockNo := big.NewInt(0).Set(initialBlock.Number())
initialBlockTimestamp := int64(initialBlock.Time())
allTimeBlockCount := big.NewInt(0).Sub(lastBlockNo, initialBlockNo)
allTimeInterval := lastBlockTimestamp - initialBlockTimestamp
// Expected to be around 12
blockDuration := float64(allTimeInterval) / float64(allTimeBlockCount.Int64())
lastBlockTime := time.Unix(lastBlockTimestamp, 0)
// Snap to the beginning of the day or half day which is the closest to the last block
hour := 0
if lastBlockTime.Hour() >= 12 {
hour = 12
}
lastTime := time.Date(lastBlockTime.Year(), lastBlockTime.Month(), lastBlockTime.Day(), hour, 0, 0, 0, lastBlockTime.Location())
endBlockTimestamp := lastTime.Unix()
blockGaps := big.NewInt(int64(float64(lastBlockTimestamp-endBlockTimestamp) / blockDuration))
endBlockNo := big.NewInt(0).Sub(lastBlockNo, blockGaps)
totalBlockCount, startTimestamp := int64(0), int64(0)
if timeInterval == BalanceHistoryAllTime {
startTimestamp = initialBlockTimestamp
totalBlockCount = endBlockNo.Int64()
} else {
secondsToNow := balanceHistoryTimeIntervalToHoursPerStep[timeInterval] * 3600 * (balanceHistoryTimeIntervalToSampleNo[timeInterval])
startTimestamp = endBlockTimestamp - secondsToNow
totalBlockCount = int64(float64(secondsToNow) / blockDuration)
}
blocksInStep := totalBlockCount / (balanceHistoryTimeIntervalToSampleNo[timeInterval])
stepDuration := int64(float64(blocksInStep) * blockDuration)
points := make([]BalanceState, 0)
nextBlockNumber := big.NewInt(0).Set(endBlockNo)
nextTimestamp := endBlockTimestamp
for nextTimestamp >= startTimestamp && nextBlockNumber.Cmp(initialBlockNo) >= 0 && nextBlockNumber.Cmp(big.NewInt(0)) > 0 {
newBlockNo := big.NewInt(0).Set(nextBlockNumber)
currentBalance, err := c.balanceCache.BalanceAt(ctx, chainClient, address, newBlockNo)
if err != nil {
return nil, err
}
var currentBalanceState BalanceState
currentBalanceState.Value = (*hexutil.Big)(currentBalance)
currentBalanceState.Timestamp = uint64(nextTimestamp)
points = append([]BalanceState{currentBalanceState}, points...)
// decrease block number and timestamp
nextTimestamp -= stepDuration
nextBlockNumber.Sub(nextBlockNumber, big.NewInt(blocksInStep))
}
return points, nil
}