feat(wallet): remove old OnDemand transfer fetch strategy as it not

functional and not maintained any more and we will work on improvement
of SequentialFetchStrategy
This commit is contained in:
Ivan Belyakov 2023-11-15 15:30:56 +01:00 committed by IvanBelyakoff
parent dd5e45d81c
commit f4fc0ea324
6 changed files with 31 additions and 709 deletions

View File

@ -532,8 +532,6 @@ type WalletConfig struct {
AlchemyAPIKeys map[uint64]string `json:"AlchemyAPIKeys"` AlchemyAPIKeys map[uint64]string `json:"AlchemyAPIKeys"`
InfuraAPIKey string `json:"InfuraAPIKey"` InfuraAPIKey string `json:"InfuraAPIKey"`
InfuraAPIKeySecret string `json:"InfuraAPIKeySecret"` InfuraAPIKeySecret string `json:"InfuraAPIKeySecret"`
// LoadAllTransfers should be false to reduce network traffic and harddrive space consumption when loading tranfers
LoadAllTransfers bool `json:"LoadAllTransfers"`
} }
// LocalNotificationsConfig extra configuration for localnotifications.Service. // LocalNotificationsConfig extra configuration for localnotifications.Service.

View File

@ -99,7 +99,7 @@ func NewService(
savedAddressesManager := &SavedAddressesManager{db: db} savedAddressesManager := &SavedAddressesManager{db: db}
transactionManager := transfer.NewTransactionManager(db, gethManager, transactor, config, accountsDB, pendingTxManager, feed) transactionManager := transfer.NewTransactionManager(db, gethManager, transactor, config, accountsDB, pendingTxManager, feed)
transferController := transfer.NewTransferController(db, accountsDB, rpcClient, accountFeed, feed, transactionManager, pendingTxManager, transferController := transfer.NewTransferController(db, accountsDB, rpcClient, accountFeed, feed, transactionManager, pendingTxManager,
tokenManager, balanceCacher, config.WalletConfig.LoadAllTransfers) tokenManager, balanceCacher)
cryptoCompare := cryptocompare.NewClient() cryptoCompare := cryptocompare.NewClient()
coingecko := coingecko.NewClient() coingecko := coingecko.NewClient()
marketManager := market.NewManager(cryptoCompare, coingecko, feed) marketManager := market.NewManager(cryptoCompare, coingecko, feed)

View File

@ -4,7 +4,6 @@ import (
"context" "context"
"database/sql" "database/sql"
"math/big" "math/big"
"strings"
"time" "time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
@ -43,7 +42,6 @@ const (
var ( var (
// This will work only for binance testnet as mainnet doesn't support // This will work only for binance testnet as mainnet doesn't support
// archival request. // archival request.
binanceChainMaxInitialRange = big.NewInt(500000)
binanceChainErc20BatchSize = big.NewInt(5000) binanceChainErc20BatchSize = big.NewInt(5000)
goerliErc20BatchSize = big.NewInt(100000) goerliErc20BatchSize = big.NewInt(100000)
goerliErc20ArbitrumBatchSize = big.NewInt(10000) goerliErc20ArbitrumBatchSize = big.NewInt(10000)
@ -176,192 +174,6 @@ func (c *erc20HistoricalCommand) Run(ctx context.Context) (err error) {
return nil return nil
} }
// controlCommand implements following procedure (following parts are executed sequeantially):
// - verifies that the last header that was synced is still in the canonical chain
// - runs fast indexing for each account separately
// - starts listening to new blocks and watches for reorgs
type controlCommand struct {
accounts []common.Address
db *Database
blockDAO *BlockDAO
eth *ETHDownloader
erc20 *ERC20TransfersDownloader
chainClient chain.ClientInterface
feed *event.Feed
errorsCount int
nonArchivalRPCNode bool
transactionManager *TransactionManager
pendingTxManager *transactions.PendingTxTracker
tokenManager *token.Manager
balanceCacher balance.Cacher
}
func (c *controlCommand) LoadTransfers(ctx context.Context, limit int) error {
return loadTransfers(ctx, c.accounts, c.blockDAO, c.db, c.chainClient, limit, make(map[common.Address][]*big.Int),
c.transactionManager, c.pendingTxManager, c.tokenManager, c.feed)
}
func (c *controlCommand) Run(parent context.Context) error {
log.Debug("start control command")
ctx, cancel := context.WithTimeout(parent, 3*time.Second)
head, err := c.chainClient.HeaderByNumber(ctx, nil)
cancel()
if err != nil {
if c.NewError(err) {
return nil
}
return err
}
if c.feed != nil {
c.feed.Send(walletevent.Event{
Type: EventFetchingRecentHistory,
Accounts: c.accounts,
})
}
log.Debug("current head is", "block number", head.Number)
// Get last known block for each account
lastKnownEthBlocks, accountsWithoutHistory, err := c.blockDAO.GetLastKnownBlockByAddresses(c.chainClient.NetworkID(), c.accounts)
if err != nil {
log.Error("failed to load last head from database", "error", err)
if c.NewError(err) {
return nil
}
return err
}
// For accounts without history, find the block where 20 < headNonce - nonce < 25 (blocks have between 20-25 transactions)
fromMap := map[common.Address]*big.Int{}
if !c.nonArchivalRPCNode {
fromMap, err = findFirstRanges(parent, accountsWithoutHistory, head.Number, c.chainClient)
if err != nil {
if c.NewError(err) {
return nil
}
return err
}
}
// Set "fromByAddress" from the information we have
target := head.Number
fromByAddress := map[common.Address]*Block{}
toByAddress := map[common.Address]*big.Int{}
for _, address := range c.accounts {
from, ok := lastKnownEthBlocks[address]
if !ok {
from = &Block{Number: fromMap[address]}
}
if c.nonArchivalRPCNode {
from = &Block{Number: big.NewInt(0).Sub(target, big.NewInt(100))}
}
fromByAddress[address] = from
toByAddress[address] = target
}
cmnd := &findAndCheckBlockRangeCommand{
accounts: c.accounts,
db: c.db,
blockDAO: c.blockDAO,
chainClient: c.chainClient,
balanceCacher: c.balanceCacher,
feed: c.feed,
fromByAddress: fromByAddress,
toByAddress: toByAddress,
}
err = cmnd.Command()(parent)
if err != nil {
if c.NewError(err) {
return nil
}
return err
}
if cmnd.error != nil {
if c.NewError(cmnd.error) {
return nil
}
return cmnd.error
}
err = c.LoadTransfers(parent, numberOfBlocksCheckedPerIteration)
if err != nil {
if c.NewError(err) {
return nil
}
return err
}
if c.feed != nil {
events := map[common.Address]walletevent.Event{}
for _, address := range c.accounts {
event := walletevent.Event{
Type: EventNewTransfers,
Accounts: []common.Address{address},
ChainID: c.chainClient.NetworkID(),
}
for _, header := range cmnd.foundHeaders[address] {
if event.BlockNumber == nil || header.Number.Cmp(event.BlockNumber) == 1 {
event.BlockNumber = header.Number
}
}
if event.BlockNumber != nil {
events[address] = event
}
}
for _, event := range events {
c.feed.Send(event)
}
c.feed.Send(walletevent.Event{
Type: EventRecentHistoryReady,
Accounts: c.accounts,
BlockNumber: target,
})
}
log.Debug("end control command")
return err
}
func nonArchivalNodeError(err error) bool {
return strings.Contains(err.Error(), "missing trie node") ||
strings.Contains(err.Error(), "project ID does not have access to archive state")
}
func (c *controlCommand) NewError(err error) bool {
c.errorsCount++
log.Error("controlCommand error", "chainID", c.chainClient.NetworkID(), "error", err, "counter", c.errorsCount)
if nonArchivalNodeError(err) {
log.Info("Non archival node detected", "chainID", c.chainClient.NetworkID())
c.nonArchivalRPCNode = true
c.feed.Send(walletevent.Event{
Type: EventNonArchivalNodeDetected,
})
}
if c.errorsCount >= 3 {
c.feed.Send(walletevent.Event{
Type: EventFetchingHistoryError,
Message: err.Error(),
})
return true
}
return false
}
func (c *controlCommand) Command() async.Command {
return async.FiniteCommand{
Interval: 5 * time.Second,
Runable: c.Run,
}.Run
}
type transfersCommand struct { type transfersCommand struct {
db *Database db *Database
blockDAO *BlockDAO blockDAO *BlockDAO
@ -678,172 +490,6 @@ func (c *loadTransfersCommand) Run(parent context.Context) (err error) {
return return
} }
type findAndCheckBlockRangeCommand struct {
accounts []common.Address
db *Database
blockDAO *BlockDAO
chainClient chain.ClientInterface
balanceCacher balance.Cacher
feed *event.Feed
fromByAddress map[common.Address]*Block
toByAddress map[common.Address]*big.Int
foundHeaders map[common.Address][]*DBHeader
noLimit bool
error error
}
func (c *findAndCheckBlockRangeCommand) Command() async.Command {
return async.FiniteCommand{
Interval: 5 * time.Second,
Runable: c.Run,
}.Run
}
func (c *findAndCheckBlockRangeCommand) Run(parent context.Context) error {
log.Debug("start findAndCHeckBlockRangeCommand")
newFromByAddress, ethHeadersByAddress, err := c.fastIndex(parent, c.balanceCacher, c.fromByAddress, c.toByAddress)
if err != nil {
c.error = err
// return err // In case c.noLimit is true, hystrix "max concurrency" may be reached and we will not be able to index ETH transfers. But if we return error, we will get stuck in inifinite loop.
return nil
}
if c.noLimit {
newFromByAddress = map[common.Address]*big.Int{}
for _, address := range c.accounts {
newFromByAddress[address] = c.fromByAddress[address].Number
}
}
erc20HeadersByAddress, err := c.fastIndexErc20(parent, newFromByAddress, c.toByAddress)
if err != nil {
return err
}
foundHeaders := map[common.Address][]*DBHeader{}
for _, address := range c.accounts {
ethHeaders := ethHeadersByAddress[address]
erc20Headers := erc20HeadersByAddress[address]
allHeaders := append(ethHeaders, erc20Headers...)
log.Debug("allHeaders found for account", "address", address, "allHeaders.len", len(allHeaders))
// Ensure only 1 DBHeader per block hash.
uniqHeaders := []*DBHeader{}
if len(allHeaders) > 0 {
uniqHeaders = uniqueHeaderPerBlockHash(allHeaders)
}
// Ensure only 1 PreloadedTransaction per transaction hash during block discovery.
// Full list of SubTransactions will be obtained from the receipt logs
// at a later stage.
for _, header := range uniqHeaders {
header.PreloadedTransactions = uniquePreloadedTransactionPerTxHash(header.PreloadedTransactions)
}
foundHeaders[address] = uniqHeaders
lastBlockNumber := c.toByAddress[address]
log.Debug("saving headers", "len", len(uniqHeaders), "lastBlockNumber", lastBlockNumber,
"balance", c.balanceCacher.Cache().GetBalance(address, c.chainClient.NetworkID(), lastBlockNumber),
"nonce", c.balanceCacher.Cache().GetNonce(address, c.chainClient.NetworkID(), lastBlockNumber))
to := &Block{
Number: lastBlockNumber,
Balance: c.balanceCacher.Cache().GetBalance(address, c.chainClient.NetworkID(), lastBlockNumber),
Nonce: c.balanceCacher.Cache().GetNonce(address, c.chainClient.NetworkID(), lastBlockNumber),
}
log.Debug("uniqHeaders found for account", "address", address, "uniqHeaders.len", len(uniqHeaders))
err = c.db.ProcessBlocks(c.chainClient.NetworkID(), address, newFromByAddress[address], to, uniqHeaders)
if err != nil {
return err
}
}
c.foundHeaders = foundHeaders
log.Debug("end findAndCheckBlockRangeCommand")
return nil
}
// run fast indexing for every accont up to canonical chain head minus safety depth.
// every account will run it from last synced header.
func (c *findAndCheckBlockRangeCommand) fastIndex(ctx context.Context, bCacher balance.Cacher,
fromByAddress map[common.Address]*Block, toByAddress map[common.Address]*big.Int) (map[common.Address]*big.Int,
map[common.Address][]*DBHeader, error) {
log.Debug("fast indexer started")
start := time.Now()
group := async.NewGroup(ctx)
commands := make([]*ethHistoricalCommand, len(c.accounts))
for i, address := range c.accounts {
eth := &ethHistoricalCommand{
chainClient: c.chainClient,
balanceCacher: bCacher,
address: address,
feed: c.feed,
from: fromByAddress[address],
to: toByAddress[address],
noLimit: c.noLimit,
threadLimit: NoThreadLimit,
}
commands[i] = eth
group.Add(eth.Command())
}
select {
case <-ctx.Done():
return nil, nil, ctx.Err()
case <-group.WaitAsync():
resultingFromByAddress := map[common.Address]*big.Int{}
headers := map[common.Address][]*DBHeader{}
for _, command := range commands {
if command.error != nil {
return nil, nil, command.error
}
resultingFromByAddress[command.address] = command.resultingFrom
headers[command.address] = command.foundHeaders
}
log.Debug("fast indexer finished", "in", time.Since(start))
return resultingFromByAddress, headers, nil
}
}
// run fast indexing for every accont up to canonical chain head minus safety depth.
// every account will run it from last synced header.
func (c *findAndCheckBlockRangeCommand) fastIndexErc20(ctx context.Context, fromByAddress map[common.Address]*big.Int, toByAddress map[common.Address]*big.Int) (map[common.Address][]*DBHeader, error) {
log.Debug("fast indexer Erc20 started")
start := time.Now()
group := async.NewGroup(ctx)
commands := make([]*erc20HistoricalCommand, len(c.accounts))
for i, address := range c.accounts {
erc20 := &erc20HistoricalCommand{
erc20: NewERC20TransfersDownloader(c.chainClient, []common.Address{address}, types.LatestSignerForChainID(c.chainClient.ToBigInt()), false),
chainClient: c.chainClient,
feed: c.feed,
address: address,
from: fromByAddress[address],
to: toByAddress[address],
foundHeaders: []*DBHeader{},
}
commands[i] = erc20
group.Add(erc20.Command())
}
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-group.WaitAsync():
headers := map[common.Address][]*DBHeader{}
for _, command := range commands {
headers[command.address] = command.foundHeaders
}
log.Debug("fast indexer Erc20 finished", "in", time.Since(start))
return headers, nil
}
}
func loadTransfers(ctx context.Context, accounts []common.Address, blockDAO *BlockDAO, db *Database, func loadTransfers(ctx context.Context, accounts []common.Address, blockDAO *BlockDAO, db *Database,
chainClient chain.ClientInterface, blocksLimitPerAccount int, blocksByAddress map[common.Address][]*big.Int, chainClient chain.ClientInterface, blocksLimitPerAccount int, blocksByAddress map[common.Address][]*big.Int,
transactionManager *TransactionManager, pendingTxManager *transactions.PendingTxTracker, transactionManager *TransactionManager, pendingTxManager *transactions.PendingTxTracker,
@ -888,90 +534,6 @@ func isBinanceChain(chainID uint64) bool {
return chainID == binancChainID || chainID == binanceTestChainID return chainID == binancChainID || chainID == binanceTestChainID
} }
func getLowestFrom(chainID uint64, to *big.Int) *big.Int {
from := big.NewInt(0)
if isBinanceChain(chainID) && big.NewInt(0).Sub(to, from).Cmp(binanceChainMaxInitialRange) == 1 {
from = big.NewInt(0).Sub(to, binanceChainMaxInitialRange)
}
return from
}
// Finds the latest range up to initialTo where the number of transactions is between 20 and 25
func findFirstRange(c context.Context, account common.Address, initialTo *big.Int, client chain.ClientInterface) (*big.Int, error) {
log.Debug("findFirstRange", "account", account, "initialTo", initialTo, "client", client)
from := getLowestFrom(client.NetworkID(), initialTo)
to := initialTo
goal := uint64(20)
if from.Cmp(to) == 0 {
return to, nil
}
firstNonce, err := client.NonceAt(c, account, to) // this is the latest nonce actually
log.Debug("find range with 20 <= len(tx) <= 25", "account", account, "firstNonce", firstNonce, "from", from, "to", to)
if err != nil {
return nil, err
}
if firstNonce <= goal {
return from, nil
}
nonceDiff := firstNonce
iterations := 0
for iterations < 50 {
iterations = iterations + 1
if nonceDiff > goal {
// from = (from + to) / 2
from = from.Add(from, to)
from = from.Div(from, big.NewInt(2))
} else {
// from = from - (to - from) / 2
// to = from
diff := big.NewInt(0).Sub(to, from)
diff.Div(diff, big.NewInt(2))
to = big.NewInt(from.Int64())
from.Sub(from, diff)
}
fromNonce, err := client.NonceAt(c, account, from)
if err != nil {
return nil, err
}
nonceDiff = firstNonce - fromNonce
log.Debug("next nonce", "from", from, "n", fromNonce, "diff", firstNonce-fromNonce)
if goal <= nonceDiff && nonceDiff <= (goal+5) {
log.Debug("range found", "account", account, "from", from, "to", to)
return from, nil
}
}
log.Debug("range found", "account", account, "from", from, "to", to)
return from, nil
}
// Finds the latest ranges up to initialTo where the number of transactions is between 20 and 25
func findFirstRanges(c context.Context, accounts []common.Address, initialTo *big.Int, client chain.ClientInterface) (map[common.Address]*big.Int, error) {
res := map[common.Address]*big.Int{}
for _, address := range accounts {
from, err := findFirstRange(c, address, initialTo, client)
if err != nil {
return nil, err
}
res[address] = from
}
return res, nil
}
// Ensure 1 DBHeader per Block Hash // Ensure 1 DBHeader per Block Hash
func uniqueHeaderPerBlockHash(allHeaders []*DBHeader) []*DBHeader { func uniqueHeaderPerBlockHash(allHeaders []*DBHeader) []*DBHeader {
uniqHeadersByHash := map[common.Hash]*DBHeader{} uniqHeadersByHash := map[common.Hash]*DBHeader{}
@ -995,20 +557,6 @@ func uniqueHeaderPerBlockHash(allHeaders []*DBHeader) []*DBHeader {
return uniqHeaders return uniqHeaders
} }
// Ensure 1 PreloadedTransaction per Transaction Hash
func uniquePreloadedTransactionPerTxHash(allTransactions []*PreloadedTransaction) []*PreloadedTransaction {
uniqTransactionsByTransactionHash := map[common.Hash]*PreloadedTransaction{}
for _, transaction := range allTransactions {
uniqTransactionsByTransactionHash[transaction.Log.TxHash] = transaction
}
uniqTransactions := []*PreloadedTransaction{}
for _, transaction := range uniqTransactionsByTransactionHash {
uniqTransactions = append(uniqTransactions, transaction)
}
return uniqTransactions
}
// Organize subTransactions by Transaction Hash // Organize subTransactions by Transaction Hash
func subTransactionListToTransactionsByTxHash(subTransactions []Transfer) map[common.Hash]Transaction { func subTransactionListToTransactionsByTxHash(subTransactions []Transfer) map[common.Hash]Transaction {
rst := map[common.Hash]Transaction{} rst := map[common.Hash]Transaction{}

View File

@ -34,12 +34,11 @@ type Controller struct {
pendingTxManager *transactions.PendingTxTracker pendingTxManager *transactions.PendingTxTracker
tokenManager *token.Manager tokenManager *token.Manager
balanceCacher balance.Cacher balanceCacher balance.Cacher
loadAllTransfers bool
} }
func NewTransferController(db *sql.DB, accountsDB *statusaccounts.Database, rpcClient *rpc.Client, accountFeed *event.Feed, transferFeed *event.Feed, func NewTransferController(db *sql.DB, accountsDB *statusaccounts.Database, rpcClient *rpc.Client, accountFeed *event.Feed, transferFeed *event.Feed,
transactionManager *TransactionManager, pendingTxManager *transactions.PendingTxTracker, tokenManager *token.Manager, transactionManager *TransactionManager, pendingTxManager *transactions.PendingTxTracker, tokenManager *token.Manager,
balanceCacher balance.Cacher, loadAllTransfers bool) *Controller { balanceCacher balance.Cacher) *Controller {
blockDAO := &BlockDAO{db} blockDAO := &BlockDAO{db}
return &Controller{ return &Controller{
@ -53,7 +52,6 @@ func NewTransferController(db *sql.DB, accountsDB *statusaccounts.Database, rpcC
pendingTxManager: pendingTxManager, pendingTxManager: pendingTxManager,
tokenManager: tokenManager, tokenManager: tokenManager,
balanceCacher: balanceCacher, balanceCacher: balanceCacher,
loadAllTransfers: loadAllTransfers,
} }
} }
@ -93,7 +91,7 @@ func (c *Controller) CheckRecentHistory(chainIDs []uint64, accounts []common.Add
} }
if c.reactor != nil { if c.reactor != nil {
err := c.reactor.restart(chainClients, accounts, c.loadAllTransfers) err := c.reactor.restart(chainClients, accounts)
if err != nil { if err != nil {
return err return err
} }
@ -114,13 +112,13 @@ func (c *Controller) CheckRecentHistory(chainIDs []uint64, accounts []common.Add
c.reactor = NewReactor(c.db, c.blockDAO, c.TransferFeed, c.transactionManager, c.reactor = NewReactor(c.db, c.blockDAO, c.TransferFeed, c.transactionManager,
c.pendingTxManager, c.tokenManager, c.balanceCacher, omitHistory) c.pendingTxManager, c.tokenManager, c.balanceCacher, omitHistory)
err = c.reactor.start(chainClients, accounts, c.loadAllTransfers) err = c.reactor.start(chainClients, accounts)
if err != nil { if err != nil {
return err return err
} }
c.group.Add(func(ctx context.Context) error { c.group.Add(func(ctx context.Context) error {
return watchAccountsChanges(ctx, c.accountFeed, c.reactor, chainClients, accounts, c.loadAllTransfers) return watchAccountsChanges(ctx, c.accountFeed, c.reactor, chainClients, accounts)
}) })
} }
return nil return nil
@ -129,7 +127,7 @@ func (c *Controller) CheckRecentHistory(chainIDs []uint64, accounts []common.Add
// watchAccountsChanges subscribes to a feed and watches for changes in accounts list. If there are new or removed accounts // watchAccountsChanges subscribes to a feed and watches for changes in accounts list. If there are new or removed accounts
// reactor will be restarted. // reactor will be restarted.
func watchAccountsChanges(ctx context.Context, accountFeed *event.Feed, reactor *Reactor, func watchAccountsChanges(ctx context.Context, accountFeed *event.Feed, reactor *Reactor,
chainClients map[uint64]chain.ClientInterface, initial []common.Address, loadAllTransfers bool) error { chainClients map[uint64]chain.ClientInterface, initial []common.Address) error {
ch := make(chan accountsevent.Event, 1) // it may block if the rate of updates will be significantly higher ch := make(chan accountsevent.Event, 1) // it may block if the rate of updates will be significantly higher
sub := accountFeed.Subscribe(ch) sub := accountFeed.Subscribe(ch)
@ -167,7 +165,7 @@ func watchAccountsChanges(ctx context.Context, accountFeed *event.Feed, reactor
listenList := mapToList(listen) listenList := mapToList(listen)
log.Debug("list of accounts was changed from a previous version. reactor will be restarted", "new", listenList) log.Debug("list of accounts was changed from a previous version. reactor will be restarted", "new", listenList)
err := reactor.restart(chainClients, listenList, loadAllTransfers) err := reactor.restart(chainClients, listenList)
if err != nil { if err != nil {
log.Error("failed to restart reactor with new accounts", "error", err) log.Error("failed to restart reactor with new accounts", "error", err)
} }
@ -225,7 +223,7 @@ func (c *Controller) LoadTransferByHash(ctx context.Context, rpcClient *rpc.Clie
func (c *Controller) GetTransfersByAddress(ctx context.Context, chainID uint64, address common.Address, toBlock *big.Int, func (c *Controller) GetTransfersByAddress(ctx context.Context, chainID uint64, address common.Address, toBlock *big.Int,
limit int64, fetchMore bool) ([]View, error) { limit int64, fetchMore bool) ([]View, error) {
rst, err := c.reactor.getTransfersByAddress(ctx, chainID, address, toBlock, limit, fetchMore) rst, err := c.reactor.getTransfersByAddress(ctx, chainID, address, toBlock, limit)
if err != nil { if err != nil {
log.Error("[WalletAPI:: GetTransfersByAddress] can't fetch transfers", "err", err) log.Error("[WalletAPI:: GetTransfersByAddress] can't fetch transfers", "err", err)
return nil, err return nil, err

View File

@ -3,19 +3,14 @@ package transfer
import ( import (
"context" "context"
"errors" "errors"
"fmt"
"math/big" "math/big"
"sync"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/status-im/status-go/rpc/chain" "github.com/status-im/status-go/rpc/chain"
"github.com/status-im/status-go/services/wallet/async"
"github.com/status-im/status-go/services/wallet/balance" "github.com/status-im/status-go/services/wallet/balance"
"github.com/status-im/status-go/services/wallet/token" "github.com/status-im/status-go/services/wallet/token"
"github.com/status-im/status-go/services/wallet/walletevent"
"github.com/status-im/status-go/transactions" "github.com/status-im/status-go/transactions"
) )
@ -31,8 +26,7 @@ var errAlreadyRunning = errors.New("already running")
type FetchStrategyType int32 type FetchStrategyType int32
const ( const (
OnDemandFetchStrategyType FetchStrategyType = iota SequentialFetchStrategyType FetchStrategyType = iota
SequentialFetchStrategyType
) )
// HeaderReader interface for reading headers using block number or hash. // HeaderReader interface for reading headers using block number or hash.
@ -47,206 +41,7 @@ type HistoryFetcher interface {
kind() FetchStrategyType kind() FetchStrategyType
getTransfersByAddress(ctx context.Context, chainID uint64, address common.Address, toBlock *big.Int, getTransfersByAddress(ctx context.Context, chainID uint64, address common.Address, toBlock *big.Int,
limit int64, fetchMore bool) ([]Transfer, error) limit int64) ([]Transfer, error)
}
func NewOnDemandFetchStrategy(
db *Database,
blockDAO *BlockDAO,
feed *event.Feed,
transactionManager *TransactionManager,
pendingTxManager *transactions.PendingTxTracker,
tokenManager *token.Manager,
chainClients map[uint64]chain.ClientInterface,
accounts []common.Address,
balanceCacher balance.Cacher,
) *OnDemandFetchStrategy {
strategy := &OnDemandFetchStrategy{
db: db,
blockDAO: blockDAO,
feed: feed,
balanceCacher: balanceCacher,
transactionManager: transactionManager,
pendingTxManager: pendingTxManager,
tokenManager: tokenManager,
chainClients: chainClients,
accounts: accounts,
}
return strategy
}
type OnDemandFetchStrategy struct {
db *Database
blockDAO *BlockDAO
feed *event.Feed
mu sync.Mutex
group *async.Group
balanceCacher balance.Cacher
transactionManager *TransactionManager
pendingTxManager *transactions.PendingTxTracker
tokenManager *token.Manager
chainClients map[uint64]chain.ClientInterface
accounts []common.Address
}
func (s *OnDemandFetchStrategy) newControlCommand(chainClient chain.ClientInterface, accounts []common.Address) *controlCommand {
signer := types.LatestSignerForChainID(chainClient.ToBigInt())
ctl := &controlCommand{
db: s.db,
chainClient: chainClient,
accounts: accounts,
blockDAO: s.blockDAO,
eth: &ETHDownloader{
chainClient: chainClient,
accounts: accounts,
signer: signer,
db: s.db,
},
erc20: NewERC20TransfersDownloader(chainClient, accounts, signer, false),
feed: s.feed,
errorsCount: 0,
transactionManager: s.transactionManager,
pendingTxManager: s.pendingTxManager,
tokenManager: s.tokenManager,
balanceCacher: s.balanceCacher,
}
return ctl
}
func (s *OnDemandFetchStrategy) start() error {
s.mu.Lock()
defer s.mu.Unlock()
if s.group != nil {
return errAlreadyRunning
}
s.group = async.NewGroup(context.Background())
for _, chainClient := range s.chainClients {
ctl := s.newControlCommand(chainClient, s.accounts)
s.group.Add(ctl.Command())
}
return nil
}
// Stop stops reactor loop and waits till it exits.
func (s *OnDemandFetchStrategy) stop() {
s.mu.Lock()
defer s.mu.Unlock()
if s.group == nil {
return
}
s.group.Stop()
s.group.Wait()
s.group = nil
}
func (s *OnDemandFetchStrategy) kind() FetchStrategyType {
return OnDemandFetchStrategyType
}
func (s *OnDemandFetchStrategy) getTransfersByAddress(ctx context.Context, chainID uint64, address common.Address, toBlock *big.Int,
limit int64, fetchMore bool) ([]Transfer, error) {
log.Info("[WalletAPI:: GetTransfersByAddress] get transfers for an address", "address", address, "fetchMore", fetchMore,
"chainID", chainID, "toBlock", toBlock, "limit", limit)
rst, err := s.db.GetTransfersByAddress(chainID, address, toBlock, limit)
if err != nil {
log.Error("[WalletAPI:: GetTransfersByAddress] can't fetch transfers", "err", err)
return nil, err
}
transfersCount := int64(len(rst))
if fetchMore && limit > transfersCount {
block, err := s.blockDAO.GetFirstKnownBlock(chainID, address)
if err != nil {
return nil, err
}
// if zero block was already checked there is nothing to find more
if block == nil || big.NewInt(0).Cmp(block) == 0 {
log.Info("[WalletAPI:: GetTransfersByAddress] ZERO block is found for", "address", address, "chaindID", chainID)
return rst, nil
}
chainClient, err := getChainClientByID(s.chainClients, chainID)
if err != nil {
return nil, err
}
from, err := findFirstRange(ctx, address, block, chainClient)
if err != nil {
if nonArchivalNodeError(err) {
if s.feed != nil {
s.feed.Send(walletevent.Event{
Type: EventNonArchivalNodeDetected,
})
}
if block.Cmp(big.NewInt(NonArchivalNodeBlockChunkSize)) >= 0 {
from = big.NewInt(0).Sub(block, big.NewInt(NonArchivalNodeBlockChunkSize))
} else {
from = big.NewInt(0)
}
} else {
log.Error("first range error", "error", err)
return nil, err
}
}
fromByAddress := map[common.Address]*Block{address: {
Number: from,
}}
toByAddress := map[common.Address]*big.Int{address: block}
blocksCommand := &findAndCheckBlockRangeCommand{
accounts: []common.Address{address},
db: s.db,
chainClient: chainClient,
balanceCacher: s.balanceCacher,
feed: s.feed,
fromByAddress: fromByAddress,
toByAddress: toByAddress,
}
if err = blocksCommand.Command()(ctx); err != nil {
return nil, err
}
blocks, err := s.blockDAO.GetBlocksToLoadByAddress(chainID, address, numberOfBlocksCheckedPerIteration)
if err != nil {
return nil, err
}
log.Info("checking blocks again", "blocks", len(blocks))
if len(blocks) > 0 {
txCommand := &loadTransfersCommand{
accounts: []common.Address{address},
db: s.db,
blockDAO: s.blockDAO,
chainClient: chainClient,
transactionManager: s.transactionManager,
blocksLimit: numberOfBlocksCheckedPerIteration,
tokenManager: s.tokenManager,
}
err = txCommand.Command()(ctx)
if err != nil {
return nil, err
}
rst, err = s.db.GetTransfersByAddress(chainID, address, toBlock, limit)
if err != nil {
return nil, err
}
}
}
return rst, nil
} }
// Reactor listens to new blocks and stores transfers into the database. // Reactor listens to new blocks and stores transfers into the database.
@ -278,10 +73,9 @@ func NewReactor(db *Database, blockDAO *BlockDAO, feed *event.Feed, tm *Transact
} }
// Start runs reactor loop in background. // Start runs reactor loop in background.
func (r *Reactor) start(chainClients map[uint64]chain.ClientInterface, accounts []common.Address, func (r *Reactor) start(chainClients map[uint64]chain.ClientInterface, accounts []common.Address) error {
loadAllTransfers bool) error {
r.strategy = r.createFetchStrategy(chainClients, accounts, loadAllTransfers) r.strategy = r.createFetchStrategy(chainClients, accounts)
return r.strategy.start() return r.strategy.start()
} }
@ -292,50 +86,35 @@ func (r *Reactor) stop() {
} }
} }
func (r *Reactor) restart(chainClients map[uint64]chain.ClientInterface, accounts []common.Address, func (r *Reactor) restart(chainClients map[uint64]chain.ClientInterface, accounts []common.Address) error {
loadAllTransfers bool) error {
r.stop() r.stop()
return r.start(chainClients, accounts, loadAllTransfers) return r.start(chainClients, accounts)
} }
func (r *Reactor) createFetchStrategy(chainClients map[uint64]chain.ClientInterface, func (r *Reactor) createFetchStrategy(chainClients map[uint64]chain.ClientInterface,
accounts []common.Address, loadAllTransfers bool) HistoryFetcher { accounts []common.Address) HistoryFetcher {
if loadAllTransfers { return NewSequentialFetchStrategy(
return NewSequentialFetchStrategy( r.db,
r.db, r.blockDAO,
r.blockDAO, r.feed,
r.feed, r.transactionManager,
r.transactionManager, r.pendingTxManager,
r.pendingTxManager, r.tokenManager,
r.tokenManager, chainClients,
chainClients, accounts,
accounts, r.balanceCacher,
r.balanceCacher, r.omitHistory,
r.omitHistory, )
)
}
return NewOnDemandFetchStrategy(r.db, r.blockDAO, r.feed, r.transactionManager, r.pendingTxManager, r.tokenManager, chainClients, accounts, r.balanceCacher)
} }
func (r *Reactor) getTransfersByAddress(ctx context.Context, chainID uint64, address common.Address, toBlock *big.Int, func (r *Reactor) getTransfersByAddress(ctx context.Context, chainID uint64, address common.Address, toBlock *big.Int,
limit int64, fetchMore bool) ([]Transfer, error) { limit int64) ([]Transfer, error) {
if r.strategy != nil { if r.strategy != nil {
return r.strategy.getTransfersByAddress(ctx, chainID, address, toBlock, limit, fetchMore) return r.strategy.getTransfersByAddress(ctx, chainID, address, toBlock, limit)
} }
return nil, errors.New(ReactorNotStarted) return nil, errors.New(ReactorNotStarted)
} }
func getChainClientByID(clients map[uint64]chain.ClientInterface, id uint64) (chain.ClientInterface, error) {
for _, client := range clients {
if client.NetworkID() == id {
return client, nil
}
}
return nil, fmt.Errorf("chain client not found with id=%d", id)
}

View File

@ -103,11 +103,10 @@ func (s *SequentialFetchStrategy) kind() FetchStrategyType {
return SequentialFetchStrategyType return SequentialFetchStrategyType
} }
// TODO: remove fetchMore parameter from here and interface, it is used by OnDemandFetchStrategy only
func (s *SequentialFetchStrategy) getTransfersByAddress(ctx context.Context, chainID uint64, address common.Address, toBlock *big.Int, func (s *SequentialFetchStrategy) getTransfersByAddress(ctx context.Context, chainID uint64, address common.Address, toBlock *big.Int,
limit int64, fetchMore bool) ([]Transfer, error) { limit int64) ([]Transfer, error) {
log.Info("[WalletAPI:: GetTransfersByAddress] get transfers for an address", "address", address, "fetchMore", fetchMore, log.Debug("[WalletAPI:: GetTransfersByAddress] get transfers for an address", "address", address,
"chainID", chainID, "toBlock", toBlock, "limit", limit) "chainID", chainID, "toBlock", toBlock, "limit", limit)
rst, err := s.db.GetTransfersByAddress(chainID, address, toBlock, limit) rst, err := s.db.GetTransfersByAddress(chainID, address, toBlock, limit)