status-go/services/wallet/transfer/reactor.go

339 lines
9.3 KiB
Go

package transfer
import (
"context"
"errors"
"fmt"
"math/big"
"sync"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/status-im/status-go/rpc/chain"
"github.com/status-im/status-go/services/wallet/async"
"github.com/status-im/status-go/services/wallet/balance"
"github.com/status-im/status-go/services/wallet/token"
"github.com/status-im/status-go/services/wallet/walletevent"
"github.com/status-im/status-go/transactions"
)
const (
ReactorNotStarted string = "reactor not started"
NonArchivalNodeBlockChunkSize = 100
DefaultNodeBlockChunkSize = 100000
)
var errAlreadyRunning = errors.New("already running")
type FetchStrategyType int32
const (
OnDemandFetchStrategyType FetchStrategyType = iota
SequentialFetchStrategyType
)
// HeaderReader interface for reading headers using block number or hash.
type HeaderReader interface {
HeaderByHash(ctx context.Context, hash common.Hash) (*types.Header, error)
HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error)
}
type HistoryFetcher interface {
start() error
stop()
kind() FetchStrategyType
getTransfersByAddress(ctx context.Context, chainID uint64, address common.Address, toBlock *big.Int,
limit int64, fetchMore bool) ([]Transfer, error)
}
func NewOnDemandFetchStrategy(
db *Database,
blockDAO *BlockDAO,
feed *event.Feed,
transactionManager *TransactionManager,
pendingTxManager *transactions.PendingTxTracker,
tokenManager *token.Manager,
chainClients map[uint64]chain.ClientInterface,
accounts []common.Address,
balanceCacher balance.Cacher,
) *OnDemandFetchStrategy {
strategy := &OnDemandFetchStrategy{
db: db,
blockDAO: blockDAO,
feed: feed,
balanceCacher: balanceCacher,
transactionManager: transactionManager,
pendingTxManager: pendingTxManager,
tokenManager: tokenManager,
chainClients: chainClients,
accounts: accounts,
}
return strategy
}
type OnDemandFetchStrategy struct {
db *Database
blockDAO *BlockDAO
feed *event.Feed
mu sync.Mutex
group *async.Group
balanceCacher balance.Cacher
transactionManager *TransactionManager
pendingTxManager *transactions.PendingTxTracker
tokenManager *token.Manager
chainClients map[uint64]chain.ClientInterface
accounts []common.Address
}
func (s *OnDemandFetchStrategy) newControlCommand(chainClient chain.ClientInterface, accounts []common.Address) *controlCommand {
signer := types.LatestSignerForChainID(chainClient.ToBigInt())
ctl := &controlCommand{
db: s.db,
chainClient: chainClient,
accounts: accounts,
blockDAO: s.blockDAO,
eth: &ETHDownloader{
chainClient: chainClient,
accounts: accounts,
signer: signer,
db: s.db,
},
erc20: NewERC20TransfersDownloader(chainClient, accounts, signer, false),
feed: s.feed,
errorsCount: 0,
transactionManager: s.transactionManager,
pendingTxManager: s.pendingTxManager,
tokenManager: s.tokenManager,
balanceCacher: s.balanceCacher,
}
return ctl
}
func (s *OnDemandFetchStrategy) start() error {
s.mu.Lock()
defer s.mu.Unlock()
if s.group != nil {
return errAlreadyRunning
}
s.group = async.NewGroup(context.Background())
for _, chainClient := range s.chainClients {
ctl := s.newControlCommand(chainClient, s.accounts)
s.group.Add(ctl.Command())
}
return nil
}
// Stop stops reactor loop and waits till it exits.
func (s *OnDemandFetchStrategy) stop() {
s.mu.Lock()
defer s.mu.Unlock()
if s.group == nil {
return
}
s.group.Stop()
s.group.Wait()
s.group = nil
}
func (s *OnDemandFetchStrategy) kind() FetchStrategyType {
return OnDemandFetchStrategyType
}
func (s *OnDemandFetchStrategy) getTransfersByAddress(ctx context.Context, chainID uint64, address common.Address, toBlock *big.Int,
limit int64, fetchMore bool) ([]Transfer, error) {
log.Info("[WalletAPI:: GetTransfersByAddress] get transfers for an address", "address", address, "fetchMore", fetchMore,
"chainID", chainID, "toBlock", toBlock, "limit", limit)
rst, err := s.db.GetTransfersByAddress(chainID, address, toBlock, limit)
if err != nil {
log.Error("[WalletAPI:: GetTransfersByAddress] can't fetch transfers", "err", err)
return nil, err
}
transfersCount := int64(len(rst))
if fetchMore && limit > transfersCount {
block, err := s.blockDAO.GetFirstKnownBlock(chainID, address)
if err != nil {
return nil, err
}
// if zero block was already checked there is nothing to find more
if block == nil || big.NewInt(0).Cmp(block) == 0 {
log.Info("[WalletAPI:: GetTransfersByAddress] ZERO block is found for", "address", address, "chaindID", chainID)
return rst, nil
}
chainClient, err := getChainClientByID(s.chainClients, chainID)
if err != nil {
return nil, err
}
from, err := findFirstRange(ctx, address, block, chainClient)
if err != nil {
if nonArchivalNodeError(err) {
if s.feed != nil {
s.feed.Send(walletevent.Event{
Type: EventNonArchivalNodeDetected,
})
}
if block.Cmp(big.NewInt(NonArchivalNodeBlockChunkSize)) >= 0 {
from = big.NewInt(0).Sub(block, big.NewInt(NonArchivalNodeBlockChunkSize))
} else {
from = big.NewInt(0)
}
} else {
log.Error("first range error", "error", err)
return nil, err
}
}
fromByAddress := map[common.Address]*Block{address: {
Number: from,
}}
toByAddress := map[common.Address]*big.Int{address: block}
blocksCommand := &findAndCheckBlockRangeCommand{
accounts: []common.Address{address},
db: s.db,
chainClient: chainClient,
balanceCacher: s.balanceCacher,
feed: s.feed,
fromByAddress: fromByAddress,
toByAddress: toByAddress,
}
if err = blocksCommand.Command()(ctx); err != nil {
return nil, err
}
blocks, err := s.blockDAO.GetBlocksToLoadByAddress(chainID, address, numberOfBlocksCheckedPerIteration)
if err != nil {
return nil, err
}
log.Info("checking blocks again", "blocks", len(blocks))
if len(blocks) > 0 {
txCommand := &loadTransfersCommand{
accounts: []common.Address{address},
db: s.db,
blockDAO: s.blockDAO,
chainClient: chainClient,
transactionManager: s.transactionManager,
blocksLimit: numberOfBlocksCheckedPerIteration,
tokenManager: s.tokenManager,
}
err = txCommand.Command()(ctx)
if err != nil {
return nil, err
}
rst, err = s.db.GetTransfersByAddress(chainID, address, toBlock, limit)
if err != nil {
return nil, err
}
}
}
return rst, nil
}
// Reactor listens to new blocks and stores transfers into the database.
type Reactor struct {
db *Database
blockDAO *BlockDAO
feed *event.Feed
transactionManager *TransactionManager
pendingTxManager *transactions.PendingTxTracker
tokenManager *token.Manager
strategy HistoryFetcher
balanceCacher balance.Cacher
}
func NewReactor(db *Database, blockDAO *BlockDAO, feed *event.Feed, tm *TransactionManager,
pendingTxManager *transactions.PendingTxTracker, tokenManager *token.Manager,
balanceCacher balance.Cacher) *Reactor {
return &Reactor{
db: db,
blockDAO: blockDAO,
feed: feed,
transactionManager: tm,
pendingTxManager: pendingTxManager,
tokenManager: tokenManager,
balanceCacher: balanceCacher,
}
}
// Start runs reactor loop in background.
func (r *Reactor) start(chainClients map[uint64]chain.ClientInterface, accounts []common.Address,
loadAllTransfers bool) error {
r.strategy = r.createFetchStrategy(chainClients, accounts, loadAllTransfers)
return r.strategy.start()
}
// Stop stops reactor loop and waits till it exits.
func (r *Reactor) stop() {
if r.strategy != nil {
r.strategy.stop()
}
}
func (r *Reactor) restart(chainClients map[uint64]chain.ClientInterface, accounts []common.Address,
loadAllTransfers bool) error {
r.stop()
return r.start(chainClients, accounts, loadAllTransfers)
}
func (r *Reactor) createFetchStrategy(chainClients map[uint64]chain.ClientInterface,
accounts []common.Address, loadAllTransfers bool) HistoryFetcher {
if loadAllTransfers {
return NewSequentialFetchStrategy(
r.db,
r.blockDAO,
r.feed,
r.transactionManager,
r.pendingTxManager,
r.tokenManager,
chainClients,
accounts,
r.balanceCacher,
)
}
return NewOnDemandFetchStrategy(r.db, r.blockDAO, r.feed, r.transactionManager, r.pendingTxManager, r.tokenManager, chainClients, accounts, r.balanceCacher)
}
func (r *Reactor) getTransfersByAddress(ctx context.Context, chainID uint64, address common.Address, toBlock *big.Int,
limit int64, fetchMore bool) ([]Transfer, error) {
if r.strategy != nil {
return r.strategy.getTransfersByAddress(ctx, chainID, address, toBlock, limit, fetchMore)
}
return nil, errors.New(ReactorNotStarted)
}
func getChainClientByID(clients map[uint64]chain.ClientInterface, id uint64) (chain.ClientInterface, error) {
for _, client := range clients {
if client.NetworkID() == id {
return client, nil
}
}
return nil, fmt.Errorf("chain client not found with id=%d", id)
}