mirror of
https://github.com/status-im/status-go.git
synced 2025-01-10 14:47:06 +00:00
24bf9aada5
parameter to share between transfers and balance history in an upcoming commit. Had to refactor its interface for that reason.
340 lines
9.3 KiB
Go
340 lines
9.3 KiB
Go
package transfer
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"math/big"
|
|
"sync"
|
|
|
|
"github.com/ethereum/go-ethereum/common"
|
|
"github.com/ethereum/go-ethereum/core/types"
|
|
"github.com/ethereum/go-ethereum/event"
|
|
"github.com/ethereum/go-ethereum/log"
|
|
"github.com/status-im/status-go/rpc/chain"
|
|
"github.com/status-im/status-go/services/wallet/async"
|
|
"github.com/status-im/status-go/services/wallet/balance"
|
|
"github.com/status-im/status-go/services/wallet/token"
|
|
"github.com/status-im/status-go/services/wallet/walletevent"
|
|
"github.com/status-im/status-go/transactions"
|
|
)
|
|
|
|
const (
|
|
ReactorNotStarted string = "reactor not started"
|
|
|
|
NonArchivalNodeBlockChunkSize = 100
|
|
DefaultNodeBlockChunkSize = 100000
|
|
)
|
|
|
|
var errAlreadyRunning = errors.New("already running")
|
|
|
|
type FetchStrategyType int32
|
|
|
|
const (
|
|
OnDemandFetchStrategyType FetchStrategyType = iota
|
|
SequentialFetchStrategyType
|
|
)
|
|
|
|
// HeaderReader interface for reading headers using block number or hash.
|
|
type HeaderReader interface {
|
|
HeaderByHash(ctx context.Context, hash common.Hash) (*types.Header, error)
|
|
HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error)
|
|
}
|
|
|
|
type HistoryFetcher interface {
|
|
start() error
|
|
stop()
|
|
kind() FetchStrategyType
|
|
|
|
getTransfersByAddress(ctx context.Context, chainID uint64, address common.Address, toBlock *big.Int,
|
|
limit int64, fetchMore bool) ([]Transfer, error)
|
|
}
|
|
|
|
func NewOnDemandFetchStrategy(
|
|
db *Database,
|
|
blockDAO *BlockDAO,
|
|
feed *event.Feed,
|
|
transactionManager *TransactionManager,
|
|
pendingTxManager *transactions.PendingTxTracker,
|
|
tokenManager *token.Manager,
|
|
chainClients map[uint64]*chain.ClientWithFallback,
|
|
accounts []common.Address,
|
|
balanceCacher balance.Cacher,
|
|
) *OnDemandFetchStrategy {
|
|
strategy := &OnDemandFetchStrategy{
|
|
db: db,
|
|
blockDAO: blockDAO,
|
|
feed: feed,
|
|
balanceCacher: balanceCacher,
|
|
transactionManager: transactionManager,
|
|
pendingTxManager: pendingTxManager,
|
|
tokenManager: tokenManager,
|
|
chainClients: chainClients,
|
|
accounts: accounts,
|
|
}
|
|
|
|
return strategy
|
|
}
|
|
|
|
type OnDemandFetchStrategy struct {
|
|
db *Database
|
|
blockDAO *BlockDAO
|
|
feed *event.Feed
|
|
mu sync.Mutex
|
|
group *async.Group
|
|
balanceCacher balance.Cacher
|
|
transactionManager *TransactionManager
|
|
pendingTxManager *transactions.PendingTxTracker
|
|
tokenManager *token.Manager
|
|
chainClients map[uint64]*chain.ClientWithFallback
|
|
accounts []common.Address
|
|
}
|
|
|
|
func (s *OnDemandFetchStrategy) newControlCommand(chainClient *chain.ClientWithFallback, accounts []common.Address) *controlCommand {
|
|
signer := types.LatestSignerForChainID(chainClient.ToBigInt())
|
|
ctl := &controlCommand{
|
|
db: s.db,
|
|
chainClient: chainClient,
|
|
accounts: accounts,
|
|
blockDAO: s.blockDAO,
|
|
eth: ÐDownloader{
|
|
chainClient: chainClient,
|
|
accounts: accounts,
|
|
signer: signer,
|
|
db: s.db,
|
|
},
|
|
erc20: NewERC20TransfersDownloader(chainClient, accounts, signer),
|
|
feed: s.feed,
|
|
errorsCount: 0,
|
|
transactionManager: s.transactionManager,
|
|
pendingTxManager: s.pendingTxManager,
|
|
tokenManager: s.tokenManager,
|
|
balanceCacher: s.balanceCacher,
|
|
}
|
|
|
|
return ctl
|
|
}
|
|
|
|
func (s *OnDemandFetchStrategy) start() error {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
if s.group != nil {
|
|
return errAlreadyRunning
|
|
}
|
|
s.group = async.NewGroup(context.Background())
|
|
|
|
for _, chainClient := range s.chainClients {
|
|
ctl := s.newControlCommand(chainClient, s.accounts)
|
|
s.group.Add(ctl.Command())
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Stop stops reactor loop and waits till it exits.
|
|
func (s *OnDemandFetchStrategy) stop() {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
if s.group == nil {
|
|
return
|
|
}
|
|
s.group.Stop()
|
|
s.group.Wait()
|
|
s.group = nil
|
|
}
|
|
|
|
func (s *OnDemandFetchStrategy) kind() FetchStrategyType {
|
|
return OnDemandFetchStrategyType
|
|
}
|
|
|
|
func (s *OnDemandFetchStrategy) getTransfersByAddress(ctx context.Context, chainID uint64, address common.Address, toBlock *big.Int,
|
|
limit int64, fetchMore bool) ([]Transfer, error) {
|
|
|
|
log.Info("[WalletAPI:: GetTransfersByAddress] get transfers for an address", "address", address, "fetchMore", fetchMore,
|
|
"chainID", chainID, "toBlock", toBlock, "limit", limit)
|
|
|
|
rst, err := s.db.GetTransfersByAddress(chainID, address, toBlock, limit)
|
|
if err != nil {
|
|
log.Error("[WalletAPI:: GetTransfersByAddress] can't fetch transfers", "err", err)
|
|
return nil, err
|
|
}
|
|
|
|
transfersCount := int64(len(rst))
|
|
|
|
if fetchMore && limit > transfersCount {
|
|
|
|
block, err := s.blockDAO.GetFirstKnownBlock(chainID, address)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// if zero block was already checked there is nothing to find more
|
|
if block == nil || big.NewInt(0).Cmp(block) == 0 {
|
|
log.Info("[WalletAPI:: GetTransfersByAddress] ZERO block is found for", "address", address, "chaindID", chainID)
|
|
return rst, nil
|
|
}
|
|
|
|
chainClient, err := getChainClientByID(s.chainClients, chainID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
from, err := findFirstRange(ctx, address, block, chainClient)
|
|
if err != nil {
|
|
if nonArchivalNodeError(err) {
|
|
if s.feed != nil {
|
|
s.feed.Send(walletevent.Event{
|
|
Type: EventNonArchivalNodeDetected,
|
|
})
|
|
}
|
|
if block.Cmp(big.NewInt(NonArchivalNodeBlockChunkSize)) >= 0 {
|
|
from = big.NewInt(0).Sub(block, big.NewInt(NonArchivalNodeBlockChunkSize))
|
|
} else {
|
|
from = big.NewInt(0)
|
|
}
|
|
} else {
|
|
log.Error("first range error", "error", err)
|
|
return nil, err
|
|
}
|
|
}
|
|
fromByAddress := map[common.Address]*Block{address: {
|
|
Number: from,
|
|
}}
|
|
toByAddress := map[common.Address]*big.Int{address: block}
|
|
|
|
blocksCommand := &findAndCheckBlockRangeCommand{
|
|
accounts: []common.Address{address},
|
|
db: s.db,
|
|
chainClient: chainClient,
|
|
balanceCacher: s.balanceCacher,
|
|
feed: s.feed,
|
|
fromByAddress: fromByAddress,
|
|
toByAddress: toByAddress,
|
|
}
|
|
|
|
if err = blocksCommand.Command()(ctx); err != nil {
|
|
return nil, err
|
|
}
|
|
s.balanceCacher.Clear()
|
|
|
|
blocks, err := s.blockDAO.GetBlocksToLoadByAddress(chainID, address, numberOfBlocksCheckedPerIteration)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
log.Info("checking blocks again", "blocks", len(blocks))
|
|
if len(blocks) > 0 {
|
|
txCommand := &loadTransfersCommand{
|
|
accounts: []common.Address{address},
|
|
db: s.db,
|
|
blockDAO: s.blockDAO,
|
|
chainClient: chainClient,
|
|
transactionManager: s.transactionManager,
|
|
blocksLimit: numberOfBlocksCheckedPerIteration,
|
|
tokenManager: s.tokenManager,
|
|
}
|
|
|
|
err = txCommand.Command()(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
rst, err = s.db.GetTransfersByAddress(chainID, address, toBlock, limit)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
}
|
|
|
|
return rst, nil
|
|
}
|
|
|
|
// Reactor listens to new blocks and stores transfers into the database.
|
|
type Reactor struct {
|
|
db *Database
|
|
blockDAO *BlockDAO
|
|
feed *event.Feed
|
|
transactionManager *TransactionManager
|
|
pendingTxManager *transactions.PendingTxTracker
|
|
tokenManager *token.Manager
|
|
strategy HistoryFetcher
|
|
balanceCacher balance.Cacher
|
|
}
|
|
|
|
func NewReactor(db *Database, blockDAO *BlockDAO, feed *event.Feed, tm *TransactionManager,
|
|
pendingTxManager *transactions.PendingTxTracker, tokenManager *token.Manager,
|
|
balanceCacher balance.Cacher) *Reactor {
|
|
return &Reactor{
|
|
db: db,
|
|
blockDAO: blockDAO,
|
|
feed: feed,
|
|
transactionManager: tm,
|
|
pendingTxManager: pendingTxManager,
|
|
tokenManager: tokenManager,
|
|
balanceCacher: balanceCacher,
|
|
}
|
|
}
|
|
|
|
// Start runs reactor loop in background.
|
|
func (r *Reactor) start(chainClients map[uint64]*chain.ClientWithFallback, accounts []common.Address,
|
|
loadAllTransfers bool) error {
|
|
|
|
r.strategy = r.createFetchStrategy(chainClients, accounts, loadAllTransfers)
|
|
return r.strategy.start()
|
|
}
|
|
|
|
// Stop stops reactor loop and waits till it exits.
|
|
func (r *Reactor) stop() {
|
|
if r.strategy != nil {
|
|
r.strategy.stop()
|
|
}
|
|
}
|
|
|
|
func (r *Reactor) restart(chainClients map[uint64]*chain.ClientWithFallback, accounts []common.Address,
|
|
loadAllTransfers bool) error {
|
|
|
|
r.stop()
|
|
return r.start(chainClients, accounts, loadAllTransfers)
|
|
}
|
|
|
|
func (r *Reactor) createFetchStrategy(chainClients map[uint64]*chain.ClientWithFallback,
|
|
accounts []common.Address, loadAllTransfers bool) HistoryFetcher {
|
|
|
|
if loadAllTransfers {
|
|
return NewSequentialFetchStrategy(
|
|
r.db,
|
|
r.blockDAO,
|
|
r.feed,
|
|
r.transactionManager,
|
|
r.pendingTxManager,
|
|
r.tokenManager,
|
|
chainClients,
|
|
accounts,
|
|
r.balanceCacher,
|
|
)
|
|
}
|
|
|
|
return NewOnDemandFetchStrategy(r.db, r.blockDAO, r.feed, r.transactionManager, r.pendingTxManager, r.tokenManager, chainClients, accounts, r.balanceCacher)
|
|
}
|
|
|
|
func (r *Reactor) getTransfersByAddress(ctx context.Context, chainID uint64, address common.Address, toBlock *big.Int,
|
|
limit int64, fetchMore bool) ([]Transfer, error) {
|
|
|
|
if r.strategy != nil {
|
|
return r.strategy.getTransfersByAddress(ctx, chainID, address, toBlock, limit, fetchMore)
|
|
}
|
|
|
|
return nil, errors.New(ReactorNotStarted)
|
|
}
|
|
|
|
func getChainClientByID(clients map[uint64]*chain.ClientWithFallback, id uint64) (*chain.ClientWithFallback, error) {
|
|
for _, client := range clients {
|
|
if client.ChainID == id {
|
|
return client, nil
|
|
}
|
|
}
|
|
|
|
return nil, fmt.Errorf("chain client not found with id=%d", id)
|
|
}
|