package transfer import ( "context" "math/big" "time" "github.com/pkg/errors" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" "github.com/status-im/status-go/rpc/chain" "github.com/status-im/status-go/services/wallet/async" "github.com/status-im/status-go/services/wallet/walletevent" ) const ( allBlocksLoaded = "all blocks loaded" ) // TODO NewFindBlocksCommand type findBlocksCommand struct { account common.Address db *Database blockDAO *BlockRangeSequentialDAO chainClient *chain.ClientWithFallback balanceCache *balanceCache feed *event.Feed noLimit bool error error resFromBlock *Block startBlockNumber *big.Int transactionManager *TransactionManager } func (c *findBlocksCommand) Command() async.Command { return async.FiniteCommand{ Interval: 5 * time.Second, Runable: c.Run, }.Run } func (c *findBlocksCommand) Run(parent context.Context) (err error) { log.Info("start findBlocksCommand", "account", c.account, "chain", c.chainClient.ChainID, "noLimit", c.noLimit) rangeSize := big.NewInt(DefaultNodeBlockChunkSize) to, err := c.loadFirstKnownBlockNumber() log.Info("findBlocksCommand", "firstKnownBlockNumber", to, "error", err) if err != nil { if err.Error() != allBlocksLoaded { c.error = err } return nil // We break the loop if we fetched all the blocks } var head *types.Header = nil if to == nil { ctx, cancel := context.WithTimeout(parent, 3*time.Second) head, err = c.chainClient.HeaderByNumber(ctx, nil) cancel() if err != nil { c.error = err log.Error("findBlocksCommand failed to get head block", "error", err) return nil } log.Info("current head is", "chain", c.chainClient.ChainID, "block number", head.Number) to = new(big.Int).Set(head.Number) // deep copy } else { to.Sub(to, big.NewInt(1)) } var from = big.NewInt(0) if to.Cmp(rangeSize) > 0 { from.Sub(to, rangeSize) } for { headers, _ := c.checkRange(parent, from, to) if c.error != nil { log.Error("findBlocksCommand checkRange", "error", c.error) break } // 'to' is set to 'head' if 'last' block not found in DB if head != nil && to.Cmp(head.Number) == 0 { log.Info("upsert blockrange", "head", head.Number, "to", to, "chain", c.chainClient.ChainID, "account", c.account) err = c.blockDAO.upsertRange(c.chainClient.ChainID, c.account, c.startBlockNumber, c.resFromBlock.Number, to) if err != nil { c.error = err log.Error("findBlocksCommand upsertRange", "error", err) break } } log.Info("findBlocksCommand.Run()", "headers len", len(headers), "resFromBlock", c.resFromBlock.Number) err = c.blockDAO.updateFirstBlock(c.chainClient.ChainID, c.account, c.resFromBlock.Number) if err != nil { c.error = err log.Error("findBlocksCommand failed to update first block", "error", err) break } if c.startBlockNumber.Cmp(big.NewInt(0)) > 0 { err = c.blockDAO.updateStartBlock(c.chainClient.ChainID, c.account, c.startBlockNumber) if err != nil { c.error = err log.Error("findBlocksCommand failed to update start block", "error", err) break } } // Assign new range to.Sub(from, big.NewInt(1)) // it won't hit the cache, but we wont load the transfers twice if to.Cmp(rangeSize) > 0 { from.Sub(to, rangeSize) } else { from = big.NewInt(0) } if to.Cmp(big.NewInt(0)) <= 0 || (c.startBlockNumber != nil && c.startBlockNumber.Cmp(big.NewInt(0)) > 0 && to.Cmp(c.startBlockNumber) <= 0) { log.Info("Start block has been found, stop execution", "startBlock", c.startBlockNumber, "to", to) break } } log.Info("end findBlocksCommand", "account", c.account, "chain", c.chainClient.ChainID, "noLimit", c.noLimit) return nil } func (c *findBlocksCommand) checkRange(parent context.Context, from *big.Int, to *big.Int) ( foundHeaders []*DBHeader, err error) { fromBlock := &Block{Number: from} newFromBlock, ethHeaders, startBlock, err := c.fastIndex(parent, c.balanceCache, fromBlock, to) if err != nil { log.Info("findBlocksCommand checkRange fastIndex", "err", err) c.error = err // return err // In case c.noLimit is true, hystrix "max concurrency" may be reached and we will not be able to index ETH transfers return nil, nil } log.Info("findBlocksCommand checkRange", "startBlock", startBlock, "newFromBlock", newFromBlock.Number, "toBlockNumber", to, "noLimit", c.noLimit) // TODO There should be transfers when either when we have found headers // or when newFromBlock is different from fromBlock, but if I check for // ERC20 transfers only when there are ETH transfers, I will miss ERC20 transfers // if len(ethHeaders) > 0 || newFromBlock.Number.Cmp(fromBlock.Number) != 0 { // there is transaction history for this account erc20Headers, err := c.fastIndexErc20(parent, newFromBlock.Number, to) if err != nil { log.Info("findBlocksCommand checkRange fastIndexErc20", "err", err) c.error = err // return err return nil, nil } allHeaders := append(ethHeaders, erc20Headers...) if len(allHeaders) > 0 { uniqHeadersByHash := map[common.Hash]*DBHeader{} for _, header := range allHeaders { uniqHeader, ok := uniqHeadersByHash[header.Hash] if ok { if len(header.Erc20Transfers) > 0 { uniqHeader.Erc20Transfers = append(uniqHeader.Erc20Transfers, header.Erc20Transfers...) } uniqHeadersByHash[header.Hash] = uniqHeader } else { uniqHeadersByHash[header.Hash] = header } } uniqHeaders := []*DBHeader{} for _, header := range uniqHeadersByHash { uniqHeaders = append(uniqHeaders, header) } foundHeaders = uniqHeaders log.Info("saving headers", "len", len(uniqHeaders), "lastBlockNumber", to, "balance", c.balanceCache.ReadCachedBalance(c.account, to), "nonce", c.balanceCache.ReadCachedNonce(c.account, to)) err = c.db.SaveBlocks(c.chainClient.ChainID, c.account, uniqHeaders) if err != nil { c.error = err // return err return nil, nil } } // } c.resFromBlock = newFromBlock c.startBlockNumber = startBlock log.Info("end findBlocksCommand checkRange", "c.startBlock", c.startBlockNumber, "newFromBlock", newFromBlock.Number, "toBlockNumber", to, "c.resFromBlock", c.resFromBlock.Number) return } func (c *findBlocksCommand) loadFirstKnownBlockNumber() (*big.Int, error) { blockInfo, err := c.blockDAO.getBlockRange(c.chainClient.ChainID, c.account) if err != nil { log.Error("failed to load block ranges from database", "chain", c.chainClient.ChainID, "account", c.account, "error", err) return nil, err } if blockInfo != nil { log.Info("blockInfo for", "address", c.account, "chain", c.chainClient.ChainID, "Start", blockInfo.Start, "FirstKnown", blockInfo.FirstKnown, "LastKnown", blockInfo.LastKnown) // Check if we have fetched all blocks for this account if blockInfo.FirstKnown != nil && blockInfo.Start != nil && blockInfo.Start.Cmp(blockInfo.FirstKnown) >= 0 { log.Info("all blocks fetched", "chain", c.chainClient.ChainID, "account", c.account) return blockInfo.FirstKnown, errors.New(allBlocksLoaded) } return blockInfo.FirstKnown, nil } log.Info("no blockInfo for", "address", c.account, "chain", c.chainClient.ChainID) return nil, nil } // run fast indexing for every accont up to canonical chain head minus safety depth. // every account will run it from last synced header. func (c *findBlocksCommand) fastIndex(ctx context.Context, bCache *balanceCache, fromBlock *Block, toBlockNumber *big.Int) (resultingFrom *Block, headers []*DBHeader, startBlock *big.Int, err error) { log.Info("fast index started", "accounts", c.account, "from", fromBlock.Number, "to", toBlockNumber) start := time.Now() group := async.NewGroup(ctx) command := ðHistoricalCommand{ chainClient: c.chainClient, balanceCache: bCache, address: c.account, eth: ÐDownloader{ chainClient: c.chainClient, accounts: []common.Address{c.account}, signer: types.NewLondonSigner(c.chainClient.ToBigInt()), db: c.db, }, feed: c.feed, from: fromBlock, to: toBlockNumber, noLimit: c.noLimit, threadLimit: SequentialThreadLimit, } group.Add(command.Command()) select { case <-ctx.Done(): err = ctx.Err() log.Info("fast indexer ctx Done", "error", err) return case <-group.WaitAsync(): if command.error != nil { err = command.error return } resultingFrom = &Block{Number: command.resultingFrom} headers = command.foundHeaders startBlock = command.startBlock log.Info("fast indexer finished", "in", time.Since(start), "startBlock", command.startBlock, "resultingFrom", resultingFrom.Number, "headers", len(headers)) return } } // run fast indexing for every accont up to canonical chain head minus safety depth. // every account will run it from last synced header. func (c *findBlocksCommand) fastIndexErc20(ctx context.Context, fromBlockNumber *big.Int, toBlockNumber *big.Int) ([]*DBHeader, error) { start := time.Now() group := async.NewGroup(ctx) erc20 := &erc20HistoricalCommand{ erc20: NewERC20TransfersDownloader(c.chainClient, []common.Address{c.account}, types.NewLondonSigner(c.chainClient.ToBigInt())), chainClient: c.chainClient, feed: c.feed, address: c.account, from: fromBlockNumber, to: toBlockNumber, foundHeaders: []*DBHeader{}, } group.Add(erc20.Command()) select { case <-ctx.Done(): return nil, ctx.Err() case <-group.WaitAsync(): headers := erc20.foundHeaders log.Info("fast indexer Erc20 finished", "in", time.Since(start), "headers", len(headers)) return headers, nil } } // TODO Think on how to reuse loadTransfersCommand, as it shares many members and some methods // but does not need to return the transfers but only save them to DB, as there can be too many of them // and the logic of `loadTransfersLoop` is different from `loadTransfers“ type loadAllTransfersCommand struct { accounts []common.Address db *Database blockDAO *BlockDAO chainClient *chain.ClientWithFallback blocksByAddress map[common.Address][]*big.Int transactionManager *TransactionManager blocksLimit int } func (c *loadAllTransfersCommand) Command() async.Command { return async.FiniteCommand{ Interval: 5 * time.Second, Runable: c.Run, }.Run } func (c *loadAllTransfersCommand) Run(parent context.Context) error { return loadTransfersLoop(parent, c.accounts, c.blockDAO, c.db, c.chainClient, c.blocksLimit, c.blocksByAddress, c.transactionManager) } type loadBlocksAndTransfersCommand struct { accounts []common.Address db *Database blockRangeDAO *BlockRangeSequentialDAO blockDAO *BlockDAO chainClient *chain.ClientWithFallback feed *event.Feed balanceCache *balanceCache errorsCount int // nonArchivalRPCNode bool // TODO Make use of it transactionManager *TransactionManager } func (c *loadBlocksAndTransfersCommand) Run(parent context.Context) error { log.Info("start load all transfers command", "chain", c.chainClient.ChainID) ctx := parent if c.feed != nil { c.feed.Send(walletevent.Event{ Type: EventFetchingRecentHistory, Accounts: c.accounts, }) } if c.balanceCache == nil { c.balanceCache = newBalanceCache() // TODO - need to keep balanceCache in memory??? What about sharing it with other packages? } group := async.NewGroup(ctx) for _, address := range c.accounts { log.Info("start findBlocks command", "chain", c.chainClient.ChainID) fbc := &findBlocksCommand{ account: address, db: c.db, blockDAO: c.blockRangeDAO, chainClient: c.chainClient, balanceCache: c.balanceCache, feed: c.feed, noLimit: false, transactionManager: c.transactionManager, } group.Add(fbc.Command()) } txCommand := &loadAllTransfersCommand{ accounts: c.accounts, db: c.db, blockDAO: c.blockDAO, chainClient: c.chainClient, transactionManager: c.transactionManager, blocksLimit: noBlockLimit, // load transfers from all `unloaded` blocks } group.Add(txCommand.Command()) select { case <-ctx.Done(): return ctx.Err() case <-group.WaitAsync(): log.Info("end load all transfers command", "chain", c.chainClient.ChainID) return nil } } func (c *loadBlocksAndTransfersCommand) Command() async.Command { return async.InfiniteCommand{ Interval: 5 * time.Second, Runable: c.Run, }.Run } func loadTransfersLoop(ctx context.Context, accounts []common.Address, blockDAO *BlockDAO, db *Database, chainClient *chain.ClientWithFallback, blocksLimitPerAccount int, blocksByAddress map[common.Address][]*big.Int, transactionManager *TransactionManager) error { log.Info("loadTransfers start", "accounts", accounts, "chain", chainClient.ChainID, "limit", blocksLimitPerAccount) start := time.Now() group := async.NewGroup(ctx) for _, address := range accounts { // Take blocks from cache if available and disrespect the limit // If no blocks are available in cache, take blocks from DB respecting the limit // If no limit is set, take all blocks from DB blocks, ok := blocksByAddress[address] commands := []*transfersCommand{} for { if !ok { blocks, _ = blockDAO.GetBlocksByAddress(chainClient.ChainID, address, numberOfBlocksCheckedPerIteration) } for _, block := range blocks { transfers := &transfersCommand{ db: db, chainClient: chainClient, address: address, eth: ÐDownloader{ chainClient: chainClient, accounts: []common.Address{address}, signer: types.NewLondonSigner(chainClient.ToBigInt()), db: db, }, blockNum: block, transactionManager: transactionManager, } commands = append(commands, transfers) group.Add(transfers.Command()) } // We need to wait until the retrieved blocks are processed, otherwise // they will be retrieved again in the next iteration // It blocks transfer loading for single account at a time select { case <-ctx.Done(): log.Info("loadTransfers transfersCommand error", "chain", chainClient.ChainID, "address", address, "error", ctx.Err()) continue // return nil, ctx.Err() case <-group.WaitAsync(): // TODO Remove when done debugging transfers := []Transfer{} for _, command := range commands { if len(command.fetchedTransfers) == 0 { continue } transfers = append(transfers, command.fetchedTransfers...) } log.Info("loadTransfers finished for account", "address", address, "in", time.Since(start), "chain", chainClient.ChainID, "transfers", len(transfers), "limit", blocksLimitPerAccount) } log.Info("loadTransfers after select", "chain", chainClient.ChainID, "address", address, "blocks.len", len(blocks)) if ok || len(blocks) == 0 || (blocksLimitPerAccount > noBlockLimit && len(blocks) >= blocksLimitPerAccount) { log.Info("loadTransfers breaking loop on block limits reached or 0 blocks", "chain", chainClient.ChainID, "address", address, "limit", blocksLimitPerAccount, "blocks", len(blocks)) break } } } return nil }