340 lines
9.3 KiB
Go
340 lines
9.3 KiB
Go
package transfer
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"math/big"
|
|
"sync"
|
|
|
|
"github.com/ethereum/go-ethereum/common"
|
|
"github.com/ethereum/go-ethereum/core/types"
|
|
"github.com/ethereum/go-ethereum/event"
|
|
"github.com/ethereum/go-ethereum/log"
|
|
"github.com/status-im/status-go/rpc/chain"
|
|
"github.com/status-im/status-go/services/wallet/async"
|
|
"github.com/status-im/status-go/services/wallet/balance"
|
|
"github.com/status-im/status-go/services/wallet/token"
|
|
"github.com/status-im/status-go/services/wallet/walletevent"
|
|
"github.com/status-im/status-go/transactions"
|
|
)
|
|
|
|
const (
|
|
ReactorNotStarted string = "reactor not started"
|
|
|
|
NonArchivalNodeBlockChunkSize = 100
|
|
DefaultNodeBlockChunkSize = 100000
|
|
)
|
|
|
|
var errAlreadyRunning = errors.New("already running")
|
|
|
|
type FetchStrategyType int32
|
|
|
|
const (
|
|
OnDemandFetchStrategyType FetchStrategyType = iota
|
|
SequentialFetchStrategyType
|
|
)
|
|
|
|
// HeaderReader interface for reading headers using block number or hash.
|
|
type HeaderReader interface {
|
|
HeaderByHash(ctx context.Context, hash common.Hash) (*types.Header, error)
|
|
HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error)
|
|
}
|
|
|
|
type HistoryFetcher interface {
|
|
start() error
|
|
stop()
|
|
kind() FetchStrategyType
|
|
|
|
getTransfersByAddress(ctx context.Context, chainID uint64, address common.Address, toBlock *big.Int,
|
|
limit int64, fetchMore bool) ([]Transfer, error)
|
|
}
|
|
|
|
func NewOnDemandFetchStrategy(
|
|
db *Database,
|
|
blockDAO *BlockDAO,
|
|
feed *event.Feed,
|
|
transactionManager *TransactionManager,
|
|
pendingTxManager *transactions.PendingTxTracker,
|
|
tokenManager *token.Manager,
|
|
chainClients map[uint64]chain.ClientInterface,
|
|
accounts []common.Address,
|
|
balanceCacher balance.Cacher,
|
|
) *OnDemandFetchStrategy {
|
|
strategy := &OnDemandFetchStrategy{
|
|
db: db,
|
|
blockDAO: blockDAO,
|
|
feed: feed,
|
|
balanceCacher: balanceCacher,
|
|
transactionManager: transactionManager,
|
|
pendingTxManager: pendingTxManager,
|
|
tokenManager: tokenManager,
|
|
chainClients: chainClients,
|
|
accounts: accounts,
|
|
}
|
|
|
|
return strategy
|
|
}
|
|
|
|
type OnDemandFetchStrategy struct {
|
|
db *Database
|
|
blockDAO *BlockDAO
|
|
feed *event.Feed
|
|
mu sync.Mutex
|
|
group *async.Group
|
|
balanceCacher balance.Cacher
|
|
transactionManager *TransactionManager
|
|
pendingTxManager *transactions.PendingTxTracker
|
|
tokenManager *token.Manager
|
|
chainClients map[uint64]chain.ClientInterface
|
|
accounts []common.Address
|
|
}
|
|
|
|
func (s *OnDemandFetchStrategy) newControlCommand(chainClient chain.ClientInterface, accounts []common.Address) *controlCommand {
|
|
signer := types.LatestSignerForChainID(chainClient.ToBigInt())
|
|
ctl := &controlCommand{
|
|
db: s.db,
|
|
chainClient: chainClient,
|
|
accounts: accounts,
|
|
blockDAO: s.blockDAO,
|
|
eth: ÐDownloader{
|
|
chainClient: chainClient,
|
|
accounts: accounts,
|
|
signer: signer,
|
|
db: s.db,
|
|
},
|
|
erc20: NewERC20TransfersDownloader(chainClient, accounts, signer),
|
|
feed: s.feed,
|
|
errorsCount: 0,
|
|
transactionManager: s.transactionManager,
|
|
pendingTxManager: s.pendingTxManager,
|
|
tokenManager: s.tokenManager,
|
|
balanceCacher: s.balanceCacher,
|
|
}
|
|
|
|
return ctl
|
|
}
|
|
|
|
func (s *OnDemandFetchStrategy) start() error {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
if s.group != nil {
|
|
return errAlreadyRunning
|
|
}
|
|
s.group = async.NewGroup(context.Background())
|
|
|
|
for _, chainClient := range s.chainClients {
|
|
ctl := s.newControlCommand(chainClient, s.accounts)
|
|
s.group.Add(ctl.Command())
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Stop stops reactor loop and waits till it exits.
|
|
func (s *OnDemandFetchStrategy) stop() {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
if s.group == nil {
|
|
return
|
|
}
|
|
s.group.Stop()
|
|
s.group.Wait()
|
|
s.group = nil
|
|
}
|
|
|
|
func (s *OnDemandFetchStrategy) kind() FetchStrategyType {
|
|
return OnDemandFetchStrategyType
|
|
}
|
|
|
|
func (s *OnDemandFetchStrategy) getTransfersByAddress(ctx context.Context, chainID uint64, address common.Address, toBlock *big.Int,
|
|
limit int64, fetchMore bool) ([]Transfer, error) {
|
|
|
|
log.Info("[WalletAPI:: GetTransfersByAddress] get transfers for an address", "address", address, "fetchMore", fetchMore,
|
|
"chainID", chainID, "toBlock", toBlock, "limit", limit)
|
|
|
|
rst, err := s.db.GetTransfersByAddress(chainID, address, toBlock, limit)
|
|
if err != nil {
|
|
log.Error("[WalletAPI:: GetTransfersByAddress] can't fetch transfers", "err", err)
|
|
return nil, err
|
|
}
|
|
|
|
transfersCount := int64(len(rst))
|
|
|
|
if fetchMore && limit > transfersCount {
|
|
|
|
block, err := s.blockDAO.GetFirstKnownBlock(chainID, address)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// if zero block was already checked there is nothing to find more
|
|
if block == nil || big.NewInt(0).Cmp(block) == 0 {
|
|
log.Info("[WalletAPI:: GetTransfersByAddress] ZERO block is found for", "address", address, "chaindID", chainID)
|
|
return rst, nil
|
|
}
|
|
|
|
chainClient, err := getChainClientByID(s.chainClients, chainID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
from, err := findFirstRange(ctx, address, block, chainClient)
|
|
if err != nil {
|
|
if nonArchivalNodeError(err) {
|
|
if s.feed != nil {
|
|
s.feed.Send(walletevent.Event{
|
|
Type: EventNonArchivalNodeDetected,
|
|
})
|
|
}
|
|
if block.Cmp(big.NewInt(NonArchivalNodeBlockChunkSize)) >= 0 {
|
|
from = big.NewInt(0).Sub(block, big.NewInt(NonArchivalNodeBlockChunkSize))
|
|
} else {
|
|
from = big.NewInt(0)
|
|
}
|
|
} else {
|
|
log.Error("first range error", "error", err)
|
|
return nil, err
|
|
}
|
|
}
|
|
fromByAddress := map[common.Address]*Block{address: {
|
|
Number: from,
|
|
}}
|
|
toByAddress := map[common.Address]*big.Int{address: block}
|
|
|
|
blocksCommand := &findAndCheckBlockRangeCommand{
|
|
accounts: []common.Address{address},
|
|
db: s.db,
|
|
chainClient: chainClient,
|
|
balanceCacher: s.balanceCacher,
|
|
feed: s.feed,
|
|
fromByAddress: fromByAddress,
|
|
toByAddress: toByAddress,
|
|
}
|
|
|
|
if err = blocksCommand.Command()(ctx); err != nil {
|
|
return nil, err
|
|
}
|
|
s.balanceCacher.Clear()
|
|
|
|
blocks, err := s.blockDAO.GetBlocksToLoadByAddress(chainID, address, numberOfBlocksCheckedPerIteration)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
log.Info("checking blocks again", "blocks", len(blocks))
|
|
if len(blocks) > 0 {
|
|
txCommand := &loadTransfersCommand{
|
|
accounts: []common.Address{address},
|
|
db: s.db,
|
|
blockDAO: s.blockDAO,
|
|
chainClient: chainClient,
|
|
transactionManager: s.transactionManager,
|
|
blocksLimit: numberOfBlocksCheckedPerIteration,
|
|
tokenManager: s.tokenManager,
|
|
}
|
|
|
|
err = txCommand.Command()(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
rst, err = s.db.GetTransfersByAddress(chainID, address, toBlock, limit)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
}
|
|
|
|
return rst, nil
|
|
}
|
|
|
|
// Reactor listens to new blocks and stores transfers into the database.
|
|
type Reactor struct {
|
|
db *Database
|
|
blockDAO *BlockDAO
|
|
feed *event.Feed
|
|
transactionManager *TransactionManager
|
|
pendingTxManager *transactions.PendingTxTracker
|
|
tokenManager *token.Manager
|
|
strategy HistoryFetcher
|
|
balanceCacher balance.Cacher
|
|
}
|
|
|
|
func NewReactor(db *Database, blockDAO *BlockDAO, feed *event.Feed, tm *TransactionManager,
|
|
pendingTxManager *transactions.PendingTxTracker, tokenManager *token.Manager,
|
|
balanceCacher balance.Cacher) *Reactor {
|
|
return &Reactor{
|
|
db: db,
|
|
blockDAO: blockDAO,
|
|
feed: feed,
|
|
transactionManager: tm,
|
|
pendingTxManager: pendingTxManager,
|
|
tokenManager: tokenManager,
|
|
balanceCacher: balanceCacher,
|
|
}
|
|
}
|
|
|
|
// Start runs reactor loop in background.
|
|
func (r *Reactor) start(chainClients map[uint64]chain.ClientInterface, accounts []common.Address,
|
|
loadAllTransfers bool) error {
|
|
|
|
r.strategy = r.createFetchStrategy(chainClients, accounts, loadAllTransfers)
|
|
return r.strategy.start()
|
|
}
|
|
|
|
// Stop stops reactor loop and waits till it exits.
|
|
func (r *Reactor) stop() {
|
|
if r.strategy != nil {
|
|
r.strategy.stop()
|
|
}
|
|
}
|
|
|
|
func (r *Reactor) restart(chainClients map[uint64]chain.ClientInterface, accounts []common.Address,
|
|
loadAllTransfers bool) error {
|
|
|
|
r.stop()
|
|
return r.start(chainClients, accounts, loadAllTransfers)
|
|
}
|
|
|
|
func (r *Reactor) createFetchStrategy(chainClients map[uint64]chain.ClientInterface,
|
|
accounts []common.Address, loadAllTransfers bool) HistoryFetcher {
|
|
|
|
if loadAllTransfers {
|
|
return NewSequentialFetchStrategy(
|
|
r.db,
|
|
r.blockDAO,
|
|
r.feed,
|
|
r.transactionManager,
|
|
r.pendingTxManager,
|
|
r.tokenManager,
|
|
chainClients,
|
|
accounts,
|
|
r.balanceCacher,
|
|
)
|
|
}
|
|
|
|
return NewOnDemandFetchStrategy(r.db, r.blockDAO, r.feed, r.transactionManager, r.pendingTxManager, r.tokenManager, chainClients, accounts, r.balanceCacher)
|
|
}
|
|
|
|
func (r *Reactor) getTransfersByAddress(ctx context.Context, chainID uint64, address common.Address, toBlock *big.Int,
|
|
limit int64, fetchMore bool) ([]Transfer, error) {
|
|
|
|
if r.strategy != nil {
|
|
return r.strategy.getTransfersByAddress(ctx, chainID, address, toBlock, limit, fetchMore)
|
|
}
|
|
|
|
return nil, errors.New(ReactorNotStarted)
|
|
}
|
|
|
|
func getChainClientByID(clients map[uint64]chain.ClientInterface, id uint64) (chain.ClientInterface, error) {
|
|
for _, client := range clients {
|
|
if client.NetworkID() == id {
|
|
return client, nil
|
|
}
|
|
}
|
|
|
|
return nil, fmt.Errorf("chain client not found with id=%d", id)
|
|
}
|