diff --git a/params/config.go b/params/config.go index f696d82d1..d1b0c7eed 100644 --- a/params/config.go +++ b/params/config.go @@ -532,8 +532,6 @@ type WalletConfig struct { AlchemyAPIKeys map[uint64]string `json:"AlchemyAPIKeys"` InfuraAPIKey string `json:"InfuraAPIKey"` InfuraAPIKeySecret string `json:"InfuraAPIKeySecret"` - // LoadAllTransfers should be false to reduce network traffic and harddrive space consumption when loading tranfers - LoadAllTransfers bool `json:"LoadAllTransfers"` } // LocalNotificationsConfig extra configuration for localnotifications.Service. diff --git a/services/wallet/service.go b/services/wallet/service.go index af98c17e2..8960ceac1 100644 --- a/services/wallet/service.go +++ b/services/wallet/service.go @@ -99,7 +99,7 @@ func NewService( savedAddressesManager := &SavedAddressesManager{db: db} transactionManager := transfer.NewTransactionManager(db, gethManager, transactor, config, accountsDB, pendingTxManager, feed) transferController := transfer.NewTransferController(db, accountsDB, rpcClient, accountFeed, feed, transactionManager, pendingTxManager, - tokenManager, balanceCacher, config.WalletConfig.LoadAllTransfers) + tokenManager, balanceCacher) cryptoCompare := cryptocompare.NewClient() coingecko := coingecko.NewClient() marketManager := market.NewManager(cryptoCompare, coingecko, feed) diff --git a/services/wallet/transfer/commands.go b/services/wallet/transfer/commands.go index c29b31366..cbf5951e6 100644 --- a/services/wallet/transfer/commands.go +++ b/services/wallet/transfer/commands.go @@ -4,7 +4,6 @@ import ( "context" "database/sql" "math/big" - "strings" "time" "github.com/ethereum/go-ethereum/common" @@ -43,7 +42,6 @@ const ( var ( // This will work only for binance testnet as mainnet doesn't support // archival request. - binanceChainMaxInitialRange = big.NewInt(500000) binanceChainErc20BatchSize = big.NewInt(5000) goerliErc20BatchSize = big.NewInt(100000) goerliErc20ArbitrumBatchSize = big.NewInt(10000) @@ -176,192 +174,6 @@ func (c *erc20HistoricalCommand) Run(ctx context.Context) (err error) { return nil } -// controlCommand implements following procedure (following parts are executed sequeantially): -// - verifies that the last header that was synced is still in the canonical chain -// - runs fast indexing for each account separately -// - starts listening to new blocks and watches for reorgs -type controlCommand struct { - accounts []common.Address - db *Database - blockDAO *BlockDAO - eth *ETHDownloader - erc20 *ERC20TransfersDownloader - chainClient chain.ClientInterface - feed *event.Feed - errorsCount int - nonArchivalRPCNode bool - transactionManager *TransactionManager - pendingTxManager *transactions.PendingTxTracker - tokenManager *token.Manager - balanceCacher balance.Cacher -} - -func (c *controlCommand) LoadTransfers(ctx context.Context, limit int) error { - return loadTransfers(ctx, c.accounts, c.blockDAO, c.db, c.chainClient, limit, make(map[common.Address][]*big.Int), - c.transactionManager, c.pendingTxManager, c.tokenManager, c.feed) -} - -func (c *controlCommand) Run(parent context.Context) error { - log.Debug("start control command") - - ctx, cancel := context.WithTimeout(parent, 3*time.Second) - head, err := c.chainClient.HeaderByNumber(ctx, nil) - cancel() - if err != nil { - if c.NewError(err) { - return nil - } - return err - } - - if c.feed != nil { - c.feed.Send(walletevent.Event{ - Type: EventFetchingRecentHistory, - Accounts: c.accounts, - }) - } - - log.Debug("current head is", "block number", head.Number) - - // Get last known block for each account - lastKnownEthBlocks, accountsWithoutHistory, err := c.blockDAO.GetLastKnownBlockByAddresses(c.chainClient.NetworkID(), c.accounts) - if err != nil { - log.Error("failed to load last head from database", "error", err) - if c.NewError(err) { - return nil - } - return err - } - - // For accounts without history, find the block where 20 < headNonce - nonce < 25 (blocks have between 20-25 transactions) - fromMap := map[common.Address]*big.Int{} - - if !c.nonArchivalRPCNode { - fromMap, err = findFirstRanges(parent, accountsWithoutHistory, head.Number, c.chainClient) - if err != nil { - if c.NewError(err) { - return nil - } - return err - } - } - - // Set "fromByAddress" from the information we have - target := head.Number - fromByAddress := map[common.Address]*Block{} - toByAddress := map[common.Address]*big.Int{} - - for _, address := range c.accounts { - from, ok := lastKnownEthBlocks[address] - if !ok { - from = &Block{Number: fromMap[address]} - } - if c.nonArchivalRPCNode { - from = &Block{Number: big.NewInt(0).Sub(target, big.NewInt(100))} - } - - fromByAddress[address] = from - toByAddress[address] = target - } - - cmnd := &findAndCheckBlockRangeCommand{ - accounts: c.accounts, - db: c.db, - blockDAO: c.blockDAO, - chainClient: c.chainClient, - balanceCacher: c.balanceCacher, - feed: c.feed, - fromByAddress: fromByAddress, - toByAddress: toByAddress, - } - - err = cmnd.Command()(parent) - if err != nil { - if c.NewError(err) { - return nil - } - return err - } - - if cmnd.error != nil { - if c.NewError(cmnd.error) { - return nil - } - return cmnd.error - } - - err = c.LoadTransfers(parent, numberOfBlocksCheckedPerIteration) - if err != nil { - if c.NewError(err) { - return nil - } - return err - } - - if c.feed != nil { - events := map[common.Address]walletevent.Event{} - for _, address := range c.accounts { - event := walletevent.Event{ - Type: EventNewTransfers, - Accounts: []common.Address{address}, - ChainID: c.chainClient.NetworkID(), - } - for _, header := range cmnd.foundHeaders[address] { - if event.BlockNumber == nil || header.Number.Cmp(event.BlockNumber) == 1 { - event.BlockNumber = header.Number - } - } - if event.BlockNumber != nil { - events[address] = event - } - } - - for _, event := range events { - c.feed.Send(event) - } - - c.feed.Send(walletevent.Event{ - Type: EventRecentHistoryReady, - Accounts: c.accounts, - BlockNumber: target, - }) - } - log.Debug("end control command") - return err -} - -func nonArchivalNodeError(err error) bool { - return strings.Contains(err.Error(), "missing trie node") || - strings.Contains(err.Error(), "project ID does not have access to archive state") -} - -func (c *controlCommand) NewError(err error) bool { - c.errorsCount++ - log.Error("controlCommand error", "chainID", c.chainClient.NetworkID(), "error", err, "counter", c.errorsCount) - if nonArchivalNodeError(err) { - log.Info("Non archival node detected", "chainID", c.chainClient.NetworkID()) - c.nonArchivalRPCNode = true - c.feed.Send(walletevent.Event{ - Type: EventNonArchivalNodeDetected, - }) - } - if c.errorsCount >= 3 { - c.feed.Send(walletevent.Event{ - Type: EventFetchingHistoryError, - Message: err.Error(), - }) - return true - } - return false -} - -func (c *controlCommand) Command() async.Command { - return async.FiniteCommand{ - Interval: 5 * time.Second, - Runable: c.Run, - }.Run -} - type transfersCommand struct { db *Database blockDAO *BlockDAO @@ -678,172 +490,6 @@ func (c *loadTransfersCommand) Run(parent context.Context) (err error) { return } -type findAndCheckBlockRangeCommand struct { - accounts []common.Address - db *Database - blockDAO *BlockDAO - chainClient chain.ClientInterface - balanceCacher balance.Cacher - feed *event.Feed - fromByAddress map[common.Address]*Block - toByAddress map[common.Address]*big.Int - foundHeaders map[common.Address][]*DBHeader - noLimit bool - error error -} - -func (c *findAndCheckBlockRangeCommand) Command() async.Command { - return async.FiniteCommand{ - Interval: 5 * time.Second, - Runable: c.Run, - }.Run -} - -func (c *findAndCheckBlockRangeCommand) Run(parent context.Context) error { - log.Debug("start findAndCHeckBlockRangeCommand") - - newFromByAddress, ethHeadersByAddress, err := c.fastIndex(parent, c.balanceCacher, c.fromByAddress, c.toByAddress) - if err != nil { - c.error = err - // return err // In case c.noLimit is true, hystrix "max concurrency" may be reached and we will not be able to index ETH transfers. But if we return error, we will get stuck in inifinite loop. - return nil - } - if c.noLimit { - newFromByAddress = map[common.Address]*big.Int{} - for _, address := range c.accounts { - newFromByAddress[address] = c.fromByAddress[address].Number - } - } - erc20HeadersByAddress, err := c.fastIndexErc20(parent, newFromByAddress, c.toByAddress) - if err != nil { - return err - } - - foundHeaders := map[common.Address][]*DBHeader{} - for _, address := range c.accounts { - ethHeaders := ethHeadersByAddress[address] - erc20Headers := erc20HeadersByAddress[address] - allHeaders := append(ethHeaders, erc20Headers...) - - log.Debug("allHeaders found for account", "address", address, "allHeaders.len", len(allHeaders)) - - // Ensure only 1 DBHeader per block hash. - uniqHeaders := []*DBHeader{} - if len(allHeaders) > 0 { - uniqHeaders = uniqueHeaderPerBlockHash(allHeaders) - } - - // Ensure only 1 PreloadedTransaction per transaction hash during block discovery. - // Full list of SubTransactions will be obtained from the receipt logs - // at a later stage. - for _, header := range uniqHeaders { - header.PreloadedTransactions = uniquePreloadedTransactionPerTxHash(header.PreloadedTransactions) - } - - foundHeaders[address] = uniqHeaders - - lastBlockNumber := c.toByAddress[address] - log.Debug("saving headers", "len", len(uniqHeaders), "lastBlockNumber", lastBlockNumber, - "balance", c.balanceCacher.Cache().GetBalance(address, c.chainClient.NetworkID(), lastBlockNumber), - "nonce", c.balanceCacher.Cache().GetNonce(address, c.chainClient.NetworkID(), lastBlockNumber)) - - to := &Block{ - Number: lastBlockNumber, - Balance: c.balanceCacher.Cache().GetBalance(address, c.chainClient.NetworkID(), lastBlockNumber), - Nonce: c.balanceCacher.Cache().GetNonce(address, c.chainClient.NetworkID(), lastBlockNumber), - } - log.Debug("uniqHeaders found for account", "address", address, "uniqHeaders.len", len(uniqHeaders)) - err = c.db.ProcessBlocks(c.chainClient.NetworkID(), address, newFromByAddress[address], to, uniqHeaders) - if err != nil { - return err - } - } - - c.foundHeaders = foundHeaders - - log.Debug("end findAndCheckBlockRangeCommand") - return nil -} - -// run fast indexing for every accont up to canonical chain head minus safety depth. -// every account will run it from last synced header. -func (c *findAndCheckBlockRangeCommand) fastIndex(ctx context.Context, bCacher balance.Cacher, - fromByAddress map[common.Address]*Block, toByAddress map[common.Address]*big.Int) (map[common.Address]*big.Int, - map[common.Address][]*DBHeader, error) { - - log.Debug("fast indexer started") - - start := time.Now() - group := async.NewGroup(ctx) - - commands := make([]*ethHistoricalCommand, len(c.accounts)) - for i, address := range c.accounts { - eth := ðHistoricalCommand{ - chainClient: c.chainClient, - balanceCacher: bCacher, - address: address, - feed: c.feed, - from: fromByAddress[address], - to: toByAddress[address], - noLimit: c.noLimit, - threadLimit: NoThreadLimit, - } - commands[i] = eth - group.Add(eth.Command()) - } - select { - case <-ctx.Done(): - return nil, nil, ctx.Err() - case <-group.WaitAsync(): - resultingFromByAddress := map[common.Address]*big.Int{} - headers := map[common.Address][]*DBHeader{} - for _, command := range commands { - if command.error != nil { - return nil, nil, command.error - } - resultingFromByAddress[command.address] = command.resultingFrom - headers[command.address] = command.foundHeaders - } - log.Debug("fast indexer finished", "in", time.Since(start)) - return resultingFromByAddress, headers, nil - } -} - -// run fast indexing for every accont up to canonical chain head minus safety depth. -// every account will run it from last synced header. -func (c *findAndCheckBlockRangeCommand) fastIndexErc20(ctx context.Context, fromByAddress map[common.Address]*big.Int, toByAddress map[common.Address]*big.Int) (map[common.Address][]*DBHeader, error) { - log.Debug("fast indexer Erc20 started") - - start := time.Now() - group := async.NewGroup(ctx) - - commands := make([]*erc20HistoricalCommand, len(c.accounts)) - for i, address := range c.accounts { - erc20 := &erc20HistoricalCommand{ - erc20: NewERC20TransfersDownloader(c.chainClient, []common.Address{address}, types.LatestSignerForChainID(c.chainClient.ToBigInt()), false), - chainClient: c.chainClient, - feed: c.feed, - address: address, - from: fromByAddress[address], - to: toByAddress[address], - foundHeaders: []*DBHeader{}, - } - commands[i] = erc20 - group.Add(erc20.Command()) - } - select { - case <-ctx.Done(): - return nil, ctx.Err() - case <-group.WaitAsync(): - headers := map[common.Address][]*DBHeader{} - for _, command := range commands { - headers[command.address] = command.foundHeaders - } - log.Debug("fast indexer Erc20 finished", "in", time.Since(start)) - return headers, nil - } -} - func loadTransfers(ctx context.Context, accounts []common.Address, blockDAO *BlockDAO, db *Database, chainClient chain.ClientInterface, blocksLimitPerAccount int, blocksByAddress map[common.Address][]*big.Int, transactionManager *TransactionManager, pendingTxManager *transactions.PendingTxTracker, @@ -888,90 +534,6 @@ func isBinanceChain(chainID uint64) bool { return chainID == binancChainID || chainID == binanceTestChainID } -func getLowestFrom(chainID uint64, to *big.Int) *big.Int { - from := big.NewInt(0) - if isBinanceChain(chainID) && big.NewInt(0).Sub(to, from).Cmp(binanceChainMaxInitialRange) == 1 { - from = big.NewInt(0).Sub(to, binanceChainMaxInitialRange) - } - - return from -} - -// Finds the latest range up to initialTo where the number of transactions is between 20 and 25 -func findFirstRange(c context.Context, account common.Address, initialTo *big.Int, client chain.ClientInterface) (*big.Int, error) { - log.Debug("findFirstRange", "account", account, "initialTo", initialTo, "client", client) - - from := getLowestFrom(client.NetworkID(), initialTo) - to := initialTo - goal := uint64(20) - - if from.Cmp(to) == 0 { - return to, nil - } - - firstNonce, err := client.NonceAt(c, account, to) // this is the latest nonce actually - log.Debug("find range with 20 <= len(tx) <= 25", "account", account, "firstNonce", firstNonce, "from", from, "to", to) - - if err != nil { - return nil, err - } - - if firstNonce <= goal { - return from, nil - } - - nonceDiff := firstNonce - iterations := 0 - for iterations < 50 { - iterations = iterations + 1 - - if nonceDiff > goal { - // from = (from + to) / 2 - from = from.Add(from, to) - from = from.Div(from, big.NewInt(2)) - } else { - // from = from - (to - from) / 2 - // to = from - diff := big.NewInt(0).Sub(to, from) - diff.Div(diff, big.NewInt(2)) - to = big.NewInt(from.Int64()) - from.Sub(from, diff) - } - fromNonce, err := client.NonceAt(c, account, from) - if err != nil { - return nil, err - } - nonceDiff = firstNonce - fromNonce - - log.Debug("next nonce", "from", from, "n", fromNonce, "diff", firstNonce-fromNonce) - - if goal <= nonceDiff && nonceDiff <= (goal+5) { - log.Debug("range found", "account", account, "from", from, "to", to) - return from, nil - } - } - - log.Debug("range found", "account", account, "from", from, "to", to) - - return from, nil -} - -// Finds the latest ranges up to initialTo where the number of transactions is between 20 and 25 -func findFirstRanges(c context.Context, accounts []common.Address, initialTo *big.Int, client chain.ClientInterface) (map[common.Address]*big.Int, error) { - res := map[common.Address]*big.Int{} - - for _, address := range accounts { - from, err := findFirstRange(c, address, initialTo, client) - if err != nil { - return nil, err - } - - res[address] = from - } - - return res, nil -} - // Ensure 1 DBHeader per Block Hash func uniqueHeaderPerBlockHash(allHeaders []*DBHeader) []*DBHeader { uniqHeadersByHash := map[common.Hash]*DBHeader{} @@ -995,20 +557,6 @@ func uniqueHeaderPerBlockHash(allHeaders []*DBHeader) []*DBHeader { return uniqHeaders } -// Ensure 1 PreloadedTransaction per Transaction Hash -func uniquePreloadedTransactionPerTxHash(allTransactions []*PreloadedTransaction) []*PreloadedTransaction { - uniqTransactionsByTransactionHash := map[common.Hash]*PreloadedTransaction{} - for _, transaction := range allTransactions { - uniqTransactionsByTransactionHash[transaction.Log.TxHash] = transaction - } - uniqTransactions := []*PreloadedTransaction{} - for _, transaction := range uniqTransactionsByTransactionHash { - uniqTransactions = append(uniqTransactions, transaction) - } - - return uniqTransactions -} - // Organize subTransactions by Transaction Hash func subTransactionListToTransactionsByTxHash(subTransactions []Transfer) map[common.Hash]Transaction { rst := map[common.Hash]Transaction{} diff --git a/services/wallet/transfer/controller.go b/services/wallet/transfer/controller.go index a9f29c938..8c146847e 100644 --- a/services/wallet/transfer/controller.go +++ b/services/wallet/transfer/controller.go @@ -34,12 +34,11 @@ type Controller struct { pendingTxManager *transactions.PendingTxTracker tokenManager *token.Manager balanceCacher balance.Cacher - loadAllTransfers bool } func NewTransferController(db *sql.DB, accountsDB *statusaccounts.Database, rpcClient *rpc.Client, accountFeed *event.Feed, transferFeed *event.Feed, transactionManager *TransactionManager, pendingTxManager *transactions.PendingTxTracker, tokenManager *token.Manager, - balanceCacher balance.Cacher, loadAllTransfers bool) *Controller { + balanceCacher balance.Cacher) *Controller { blockDAO := &BlockDAO{db} return &Controller{ @@ -53,7 +52,6 @@ func NewTransferController(db *sql.DB, accountsDB *statusaccounts.Database, rpcC pendingTxManager: pendingTxManager, tokenManager: tokenManager, balanceCacher: balanceCacher, - loadAllTransfers: loadAllTransfers, } } @@ -93,7 +91,7 @@ func (c *Controller) CheckRecentHistory(chainIDs []uint64, accounts []common.Add } if c.reactor != nil { - err := c.reactor.restart(chainClients, accounts, c.loadAllTransfers) + err := c.reactor.restart(chainClients, accounts) if err != nil { return err } @@ -114,13 +112,13 @@ func (c *Controller) CheckRecentHistory(chainIDs []uint64, accounts []common.Add c.reactor = NewReactor(c.db, c.blockDAO, c.TransferFeed, c.transactionManager, c.pendingTxManager, c.tokenManager, c.balanceCacher, omitHistory) - err = c.reactor.start(chainClients, accounts, c.loadAllTransfers) + err = c.reactor.start(chainClients, accounts) if err != nil { return err } c.group.Add(func(ctx context.Context) error { - return watchAccountsChanges(ctx, c.accountFeed, c.reactor, chainClients, accounts, c.loadAllTransfers) + return watchAccountsChanges(ctx, c.accountFeed, c.reactor, chainClients, accounts) }) } return nil @@ -129,7 +127,7 @@ func (c *Controller) CheckRecentHistory(chainIDs []uint64, accounts []common.Add // watchAccountsChanges subscribes to a feed and watches for changes in accounts list. If there are new or removed accounts // reactor will be restarted. func watchAccountsChanges(ctx context.Context, accountFeed *event.Feed, reactor *Reactor, - chainClients map[uint64]chain.ClientInterface, initial []common.Address, loadAllTransfers bool) error { + chainClients map[uint64]chain.ClientInterface, initial []common.Address) error { ch := make(chan accountsevent.Event, 1) // it may block if the rate of updates will be significantly higher sub := accountFeed.Subscribe(ch) @@ -167,7 +165,7 @@ func watchAccountsChanges(ctx context.Context, accountFeed *event.Feed, reactor listenList := mapToList(listen) log.Debug("list of accounts was changed from a previous version. reactor will be restarted", "new", listenList) - err := reactor.restart(chainClients, listenList, loadAllTransfers) + err := reactor.restart(chainClients, listenList) if err != nil { log.Error("failed to restart reactor with new accounts", "error", err) } @@ -225,7 +223,7 @@ func (c *Controller) LoadTransferByHash(ctx context.Context, rpcClient *rpc.Clie func (c *Controller) GetTransfersByAddress(ctx context.Context, chainID uint64, address common.Address, toBlock *big.Int, limit int64, fetchMore bool) ([]View, error) { - rst, err := c.reactor.getTransfersByAddress(ctx, chainID, address, toBlock, limit, fetchMore) + rst, err := c.reactor.getTransfersByAddress(ctx, chainID, address, toBlock, limit) if err != nil { log.Error("[WalletAPI:: GetTransfersByAddress] can't fetch transfers", "err", err) return nil, err diff --git a/services/wallet/transfer/reactor.go b/services/wallet/transfer/reactor.go index 6d8e4543b..fdcd639ca 100644 --- a/services/wallet/transfer/reactor.go +++ b/services/wallet/transfer/reactor.go @@ -3,19 +3,14 @@ package transfer import ( "context" "errors" - "fmt" "math/big" - "sync" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/event" - "github.com/ethereum/go-ethereum/log" "github.com/status-im/status-go/rpc/chain" - "github.com/status-im/status-go/services/wallet/async" "github.com/status-im/status-go/services/wallet/balance" "github.com/status-im/status-go/services/wallet/token" - "github.com/status-im/status-go/services/wallet/walletevent" "github.com/status-im/status-go/transactions" ) @@ -31,8 +26,7 @@ var errAlreadyRunning = errors.New("already running") type FetchStrategyType int32 const ( - OnDemandFetchStrategyType FetchStrategyType = iota - SequentialFetchStrategyType + SequentialFetchStrategyType FetchStrategyType = iota ) // HeaderReader interface for reading headers using block number or hash. @@ -47,206 +41,7 @@ type HistoryFetcher interface { kind() FetchStrategyType getTransfersByAddress(ctx context.Context, chainID uint64, address common.Address, toBlock *big.Int, - limit int64, fetchMore bool) ([]Transfer, error) -} - -func NewOnDemandFetchStrategy( - db *Database, - blockDAO *BlockDAO, - feed *event.Feed, - transactionManager *TransactionManager, - pendingTxManager *transactions.PendingTxTracker, - tokenManager *token.Manager, - chainClients map[uint64]chain.ClientInterface, - accounts []common.Address, - balanceCacher balance.Cacher, -) *OnDemandFetchStrategy { - strategy := &OnDemandFetchStrategy{ - db: db, - blockDAO: blockDAO, - feed: feed, - balanceCacher: balanceCacher, - transactionManager: transactionManager, - pendingTxManager: pendingTxManager, - tokenManager: tokenManager, - chainClients: chainClients, - accounts: accounts, - } - - return strategy -} - -type OnDemandFetchStrategy struct { - db *Database - blockDAO *BlockDAO - feed *event.Feed - mu sync.Mutex - group *async.Group - balanceCacher balance.Cacher - transactionManager *TransactionManager - pendingTxManager *transactions.PendingTxTracker - tokenManager *token.Manager - chainClients map[uint64]chain.ClientInterface - accounts []common.Address -} - -func (s *OnDemandFetchStrategy) newControlCommand(chainClient chain.ClientInterface, accounts []common.Address) *controlCommand { - signer := types.LatestSignerForChainID(chainClient.ToBigInt()) - ctl := &controlCommand{ - db: s.db, - chainClient: chainClient, - accounts: accounts, - blockDAO: s.blockDAO, - eth: ÐDownloader{ - chainClient: chainClient, - accounts: accounts, - signer: signer, - db: s.db, - }, - erc20: NewERC20TransfersDownloader(chainClient, accounts, signer, false), - feed: s.feed, - errorsCount: 0, - transactionManager: s.transactionManager, - pendingTxManager: s.pendingTxManager, - tokenManager: s.tokenManager, - balanceCacher: s.balanceCacher, - } - - return ctl -} - -func (s *OnDemandFetchStrategy) start() error { - s.mu.Lock() - defer s.mu.Unlock() - - if s.group != nil { - return errAlreadyRunning - } - s.group = async.NewGroup(context.Background()) - - for _, chainClient := range s.chainClients { - ctl := s.newControlCommand(chainClient, s.accounts) - s.group.Add(ctl.Command()) - } - - return nil -} - -// Stop stops reactor loop and waits till it exits. -func (s *OnDemandFetchStrategy) stop() { - s.mu.Lock() - defer s.mu.Unlock() - if s.group == nil { - return - } - s.group.Stop() - s.group.Wait() - s.group = nil -} - -func (s *OnDemandFetchStrategy) kind() FetchStrategyType { - return OnDemandFetchStrategyType -} - -func (s *OnDemandFetchStrategy) getTransfersByAddress(ctx context.Context, chainID uint64, address common.Address, toBlock *big.Int, - limit int64, fetchMore bool) ([]Transfer, error) { - - log.Info("[WalletAPI:: GetTransfersByAddress] get transfers for an address", "address", address, "fetchMore", fetchMore, - "chainID", chainID, "toBlock", toBlock, "limit", limit) - - rst, err := s.db.GetTransfersByAddress(chainID, address, toBlock, limit) - if err != nil { - log.Error("[WalletAPI:: GetTransfersByAddress] can't fetch transfers", "err", err) - return nil, err - } - - transfersCount := int64(len(rst)) - - if fetchMore && limit > transfersCount { - - block, err := s.blockDAO.GetFirstKnownBlock(chainID, address) - if err != nil { - return nil, err - } - - // if zero block was already checked there is nothing to find more - if block == nil || big.NewInt(0).Cmp(block) == 0 { - log.Info("[WalletAPI:: GetTransfersByAddress] ZERO block is found for", "address", address, "chaindID", chainID) - return rst, nil - } - - chainClient, err := getChainClientByID(s.chainClients, chainID) - if err != nil { - return nil, err - } - - from, err := findFirstRange(ctx, address, block, chainClient) - if err != nil { - if nonArchivalNodeError(err) { - if s.feed != nil { - s.feed.Send(walletevent.Event{ - Type: EventNonArchivalNodeDetected, - }) - } - if block.Cmp(big.NewInt(NonArchivalNodeBlockChunkSize)) >= 0 { - from = big.NewInt(0).Sub(block, big.NewInt(NonArchivalNodeBlockChunkSize)) - } else { - from = big.NewInt(0) - } - } else { - log.Error("first range error", "error", err) - return nil, err - } - } - fromByAddress := map[common.Address]*Block{address: { - Number: from, - }} - toByAddress := map[common.Address]*big.Int{address: block} - - blocksCommand := &findAndCheckBlockRangeCommand{ - accounts: []common.Address{address}, - db: s.db, - chainClient: chainClient, - balanceCacher: s.balanceCacher, - feed: s.feed, - fromByAddress: fromByAddress, - toByAddress: toByAddress, - } - - if err = blocksCommand.Command()(ctx); err != nil { - return nil, err - } - - blocks, err := s.blockDAO.GetBlocksToLoadByAddress(chainID, address, numberOfBlocksCheckedPerIteration) - if err != nil { - return nil, err - } - - log.Info("checking blocks again", "blocks", len(blocks)) - if len(blocks) > 0 { - txCommand := &loadTransfersCommand{ - accounts: []common.Address{address}, - db: s.db, - blockDAO: s.blockDAO, - chainClient: chainClient, - transactionManager: s.transactionManager, - blocksLimit: numberOfBlocksCheckedPerIteration, - tokenManager: s.tokenManager, - } - - err = txCommand.Command()(ctx) - if err != nil { - return nil, err - } - - rst, err = s.db.GetTransfersByAddress(chainID, address, toBlock, limit) - if err != nil { - return nil, err - } - } - } - - return rst, nil + limit int64) ([]Transfer, error) } // Reactor listens to new blocks and stores transfers into the database. @@ -278,10 +73,9 @@ func NewReactor(db *Database, blockDAO *BlockDAO, feed *event.Feed, tm *Transact } // Start runs reactor loop in background. -func (r *Reactor) start(chainClients map[uint64]chain.ClientInterface, accounts []common.Address, - loadAllTransfers bool) error { +func (r *Reactor) start(chainClients map[uint64]chain.ClientInterface, accounts []common.Address) error { - r.strategy = r.createFetchStrategy(chainClients, accounts, loadAllTransfers) + r.strategy = r.createFetchStrategy(chainClients, accounts) return r.strategy.start() } @@ -292,50 +86,35 @@ func (r *Reactor) stop() { } } -func (r *Reactor) restart(chainClients map[uint64]chain.ClientInterface, accounts []common.Address, - loadAllTransfers bool) error { +func (r *Reactor) restart(chainClients map[uint64]chain.ClientInterface, accounts []common.Address) error { r.stop() - return r.start(chainClients, accounts, loadAllTransfers) + return r.start(chainClients, accounts) } func (r *Reactor) createFetchStrategy(chainClients map[uint64]chain.ClientInterface, - accounts []common.Address, loadAllTransfers bool) HistoryFetcher { + accounts []common.Address) HistoryFetcher { - if loadAllTransfers { - return NewSequentialFetchStrategy( - r.db, - r.blockDAO, - r.feed, - r.transactionManager, - r.pendingTxManager, - r.tokenManager, - chainClients, - accounts, - r.balanceCacher, - r.omitHistory, - ) - } - - return NewOnDemandFetchStrategy(r.db, r.blockDAO, r.feed, r.transactionManager, r.pendingTxManager, r.tokenManager, chainClients, accounts, r.balanceCacher) + return NewSequentialFetchStrategy( + r.db, + r.blockDAO, + r.feed, + r.transactionManager, + r.pendingTxManager, + r.tokenManager, + chainClients, + accounts, + r.balanceCacher, + r.omitHistory, + ) } func (r *Reactor) getTransfersByAddress(ctx context.Context, chainID uint64, address common.Address, toBlock *big.Int, - limit int64, fetchMore bool) ([]Transfer, error) { + limit int64) ([]Transfer, error) { if r.strategy != nil { - return r.strategy.getTransfersByAddress(ctx, chainID, address, toBlock, limit, fetchMore) + return r.strategy.getTransfersByAddress(ctx, chainID, address, toBlock, limit) } return nil, errors.New(ReactorNotStarted) } - -func getChainClientByID(clients map[uint64]chain.ClientInterface, id uint64) (chain.ClientInterface, error) { - for _, client := range clients { - if client.NetworkID() == id { - return client, nil - } - } - - return nil, fmt.Errorf("chain client not found with id=%d", id) -} diff --git a/services/wallet/transfer/sequential_fetch_strategy.go b/services/wallet/transfer/sequential_fetch_strategy.go index 4c80bfa0c..954cfcdcd 100644 --- a/services/wallet/transfer/sequential_fetch_strategy.go +++ b/services/wallet/transfer/sequential_fetch_strategy.go @@ -103,11 +103,10 @@ func (s *SequentialFetchStrategy) kind() FetchStrategyType { return SequentialFetchStrategyType } -// TODO: remove fetchMore parameter from here and interface, it is used by OnDemandFetchStrategy only func (s *SequentialFetchStrategy) getTransfersByAddress(ctx context.Context, chainID uint64, address common.Address, toBlock *big.Int, - limit int64, fetchMore bool) ([]Transfer, error) { + limit int64) ([]Transfer, error) { - log.Info("[WalletAPI:: GetTransfersByAddress] get transfers for an address", "address", address, "fetchMore", fetchMore, + log.Debug("[WalletAPI:: GetTransfersByAddress] get transfers for an address", "address", address, "chainID", chainID, "toBlock", toBlock, "limit", limit) rst, err := s.db.GetTransfersByAddress(chainID, address, toBlock, limit)