diff --git a/services/wallet/transfer/block_dao.go b/services/wallet/transfer/block_dao.go index a8ebdb7c9..8731c5e48 100644 --- a/services/wallet/transfer/block_dao.go +++ b/services/wallet/transfer/block_dao.go @@ -249,6 +249,30 @@ func (b *BlockDAO) GetLastSavedBlock(chainID uint64) (rst *DBHeader, err error) return nil, nil } +func (b *BlockDAO) GetFirstSavedBlock(chainID uint64, address common.Address) (rst *DBHeader, err error) { + query := `SELECT blk_number, blk_hash, loaded + FROM blocks + WHERE network_id = ? AND address = ? + ORDER BY blk_number LIMIT 1` + rows, err := b.db.Query(query, chainID, address) + if err != nil { + return + } + defer rows.Close() + + if rows.Next() { + header := &DBHeader{Hash: common.Hash{}, Number: new(big.Int)} + err = rows.Scan((*bigint.SQLBigInt)(header.Number), &header.Hash, &header.Loaded) + if err != nil { + return nil, err + } + + return header, nil + } + + return nil, nil +} + // TODO remove as not used func (b *BlockDAO) GetBlocks(chainID uint64) (rst []*DBHeader, err error) { query := `SELECT blk_number, blk_hash, address FROM blocks` diff --git a/services/wallet/transfer/block_ranges_sequential_dao.go b/services/wallet/transfer/block_ranges_sequential_dao.go index 52bc8e230..d36acd6e1 100644 --- a/services/wallet/transfer/block_ranges_sequential_dao.go +++ b/services/wallet/transfer/block_ranges_sequential_dao.go @@ -81,8 +81,7 @@ func (b *BlockRangeSequentialDAO) upsertRange(chainID uint64, account common.Add // to a greater value, because no history can be before some block that is considered // as a start of history, but due to concurrent block range checks, a newer greater block // can be found that matches criteria of a start block (nonce is zero, balances are equal) - if newBlockRange.Start != nil || (blockRange.Start != nil && newBlockRange.Start != nil && - blockRange.Start.Cmp(newBlockRange.Start) < 0) { + if newBlockRange.Start != nil && (blockRange.Start == nil || blockRange.Start.Cmp(newBlockRange.Start) < 0) { blockRange.Start = newBlockRange.Start } diff --git a/services/wallet/transfer/commands.go b/services/wallet/transfer/commands.go index 2e5241b5d..28bd8fe19 100644 --- a/services/wallet/transfer/commands.go +++ b/services/wallet/transfer/commands.go @@ -47,7 +47,6 @@ var ( ) type ethHistoricalCommand struct { - eth Downloader address common.Address chainClient *chain.ClientWithFallback balanceCache *balanceCache @@ -69,7 +68,8 @@ func (c *ethHistoricalCommand) Command() async.Command { } func (c *ethHistoricalCommand) Run(ctx context.Context) (err error) { - log.Info("eth historical downloader start", "address", c.address, "from", c.from.Number, "to", c.to, "noLimit", c.noLimit) + log.Info("eth historical downloader start", "chainID", c.chainClient.ChainID, "address", c.address, + "from", c.from.Number, "to", c.to, "noLimit", c.noLimit) start := time.Now() if c.from.Number != nil && c.from.Balance != nil { @@ -79,11 +79,12 @@ func (c *ethHistoricalCommand) Run(ctx context.Context) (err error) { c.balanceCache.addNonceToCache(c.address, c.from.Number, c.from.Nonce) } from, headers, startBlock, err := findBlocksWithEthTransfers(ctx, c.chainClient, - c.balanceCache, c.eth, c.address, c.from.Number, c.to, c.noLimit, c.threadLimit) + c.balanceCache, c.address, c.from.Number, c.to, c.noLimit, c.threadLimit) if err != nil { c.error = err - log.Error("failed to find blocks with transfers", "error", err) + log.Error("failed to find blocks with transfers", "error", err, "chainID", c.chainClient.ChainID, + "address", c.address, "from", c.from.Number, "to", c.to) return nil } @@ -343,12 +344,16 @@ func (c *controlCommand) Command() async.Command { type transfersCommand struct { db *Database + blockDAO *BlockDAO eth *ETHDownloader - blockNum *big.Int + blockNums []*big.Int address common.Address chainClient *chain.ClientWithFallback - fetchedTransfers []Transfer + blocksLimit int transactionManager *TransactionManager + + // result + fetchedTransfers []Transfer } func (c *transfersCommand) Command() async.Command { @@ -359,16 +364,66 @@ func (c *transfersCommand) Command() async.Command { } func (c *transfersCommand) Run(ctx context.Context) (err error) { - log.Debug("start transfersCommand", "chain", c.chainClient.ChainID, "address", c.address, "block", c.blockNum) + // Take blocks from cache if available and disrespect the limit + // If no blocks are available in cache, take blocks from DB respecting the limit + // If no limit is set, take all blocks from DB + for { + blocks := c.blockNums + if blocks == nil { + blocks, _ = c.blockDAO.GetBlocksByAddress(c.chainClient.ChainID, c.address, numberOfBlocksCheckedPerIteration) + } - startTs := time.Now() + for _, blockNum := range blocks { + log.Info("start transfersCommand", "chain", c.chainClient.ChainID, "address", c.address, "block", blockNum) - allTransfers, err := c.eth.GetTransfersByNumber(ctx, c.blockNum) - if err != nil { - log.Error("getTransfersByBlocks error", "error", err) - return err + startTs := time.Now() + + allTransfers, err := c.eth.GetTransfersByNumber(ctx, blockNum) + if err != nil { + log.Error("getTransfersByBlocks error", "error", err) + return err + } + + err = c.updateMultiTxFromPendingEntry(allTransfers) + if err != nil { + return err + } + + if len(allTransfers) > 0 { + err = c.db.SaveTransfersMarkBlocksLoaded(c.chainClient.ChainID, c.address, allTransfers, []*big.Int{blockNum}) + if err != nil { + log.Error("SaveTransfers error", "error", err) + return err + } + } else { + // If no transfers found, that is suspecting, because downloader returned this block as containing transfers + log.Error("no transfers found in block", "chain", c.chainClient.ChainID, "address", c.address, "block", blockNum) + + err = markBlocksAsLoaded(c.chainClient.ChainID, c.db.client, c.address, []*big.Int{blockNum}) + if err != nil { + log.Error("Mark blocks loaded error", "error", err) + return err + } + } + + c.fetchedTransfers = append(c.fetchedTransfers, allTransfers...) + + log.Debug("end transfersCommand", "chain", c.chainClient.ChainID, "address", c.address, + "block", blockNum, "len", len(allTransfers), "in", time.Since(startTs)) + } + + if c.blockNums != nil || len(blocks) == 0 || + (c.blocksLimit > noBlockLimit && len(blocks) >= c.blocksLimit) { + log.Debug("loadTransfers breaking loop on block limits reached or 0 blocks", "chain", c.chainClient.ChainID, + "address", c.address, "limit", c.blocksLimit, "blocks", len(blocks)) + break + } } + return nil +} + +func (c *transfersCommand) updateMultiTxFromPendingEntry(allTransfers []Transfer) error { // Update MultiTransactionID from pending entry for index := range allTransfers { transfer := &allTransfers[index] @@ -382,27 +437,6 @@ func (c *transfersCommand) Run(ctx context.Context) (err error) { } } - if len(allTransfers) > 0 { - err = c.db.SaveTransfersMarkBlocksLoaded(c.chainClient.ChainID, c.address, allTransfers, []*big.Int{c.blockNum}) - if err != nil { - log.Error("SaveTransfers error", "error", err) - return err - } - } else { - // If no transfers found, that is suspecting, because downloader returned this block as containing transfers - log.Error("no transfers found in block", "chain", c.chainClient.ChainID, "address", c.address, "block", c.blockNum) - - err = markBlocksAsLoaded(c.chainClient.ChainID, c.db.client, c.address, []*big.Int{c.blockNum}) - if err != nil { - log.Error("Mark blocks loaded error", "error", err) - return err - } - } - - c.fetchedTransfers = allTransfers - log.Debug("end transfersCommand", "chain", c.chainClient.ChainID, "address", c.address, - "block", c.blockNum, "len", len(allTransfers), "in", time.Since(startTs)) - return nil } @@ -485,22 +519,9 @@ func (c *findAndCheckBlockRangeCommand) Run(parent context.Context) (err error) erc20Headers := erc20HeadersByAddress[address] allHeaders := append(ethHeaders, erc20Headers...) - uniqHeadersByHash := map[common.Hash]*DBHeader{} - for _, header := range allHeaders { - uniqHeader, ok := uniqHeadersByHash[header.Hash] - if ok { - if len(header.Erc20Transfers) > 0 { - uniqHeader.Erc20Transfers = append(uniqHeader.Erc20Transfers, header.Erc20Transfers...) - } - uniqHeadersByHash[header.Hash] = uniqHeader - } else { - uniqHeadersByHash[header.Hash] = header - } - } - uniqHeaders := []*DBHeader{} - for _, header := range uniqHeadersByHash { - uniqHeaders = append(uniqHeaders, header) + if len(allHeaders) > 0 { + uniqHeaders = uniqueHeaders(allHeaders) } foundHeaders[address] = uniqHeaders @@ -542,17 +563,11 @@ func (c *findAndCheckBlockRangeCommand) fastIndex(ctx context.Context, bCache *b chainClient: c.chainClient, balanceCache: bCache, address: address, - eth: ÐDownloader{ - chainClient: c.chainClient, - accounts: []common.Address{address}, - signer: types.NewLondonSigner(c.chainClient.ToBigInt()), - db: c.db, - }, - feed: c.feed, - from: fromByAddress[address], - to: toByAddress[address], - noLimit: c.noLimit, - threadLimit: NoThreadLimit, + feed: c.feed, + from: fromByAddress[address], + to: toByAddress[address], + noLimit: c.noLimit, + threadLimit: NoThreadLimit, } commands[i] = eth group.Add(eth.Command()) @@ -621,30 +636,24 @@ func loadTransfers(ctx context.Context, accounts []common.Address, blockDAO *Blo commands := []*transfersCommand{} for _, address := range accounts { - blocks, ok := blocksByAddress[address] - - if !ok { - blocks, _ = blockDAO.GetBlocksByAddress(chainClient.ChainID, address, numberOfBlocksCheckedPerIteration) - } - - for _, block := range blocks { - transfers := &transfersCommand{ - db: db, + transfers := &transfersCommand{ + db: db, + blockDAO: blockDAO, + chainClient: chainClient, + address: address, + eth: ÐDownloader{ chainClient: chainClient, - address: address, - eth: ÐDownloader{ - chainClient: chainClient, - accounts: []common.Address{address}, - signer: types.NewLondonSigner(chainClient.ToBigInt()), - db: db, - }, - blockNum: block, - transactionManager: transactionManager, - } - commands = append(commands, transfers) - group.Add(transfers.Command()) + accounts: []common.Address{address}, + signer: types.NewLondonSigner(chainClient.ToBigInt()), + db: db, + }, + blockNums: blocksByAddress[address], + transactionManager: transactionManager, } + commands = append(commands, transfers) + group.Add(transfers.Command()) } + select { case <-ctx.Done(): return nil, ctx.Err() @@ -756,3 +765,25 @@ func findFirstRanges(c context.Context, accounts []common.Address, initialTo *bi return res, nil } + +func uniqueHeaders(allHeaders []*DBHeader) []*DBHeader { + uniqHeadersByHash := map[common.Hash]*DBHeader{} + for _, header := range allHeaders { + uniqHeader, ok := uniqHeadersByHash[header.Hash] + if ok { + if len(header.Erc20Transfers) > 0 { + uniqHeader.Erc20Transfers = append(uniqHeader.Erc20Transfers, header.Erc20Transfers...) + } + uniqHeadersByHash[header.Hash] = uniqHeader + } else { + uniqHeadersByHash[header.Hash] = header + } + } + + uniqHeaders := []*DBHeader{} + for _, header := range uniqHeadersByHash { + uniqHeaders = append(uniqHeaders, header) + } + + return uniqHeaders +} diff --git a/services/wallet/transfer/commands_sequential.go b/services/wallet/transfer/commands_sequential.go index d5f3fbbc0..b90fff8a1 100644 --- a/services/wallet/transfer/commands_sequential.go +++ b/services/wallet/transfer/commands_sequential.go @@ -46,6 +46,7 @@ func (c *findBlocksCommand) Run(parent context.Context) (err error) { rangeSize := big.NewInt(DefaultNodeBlockChunkSize) from, to := new(big.Int).Set(c.fromBlockNumber), new(big.Int).Set(c.toBlockNumber) + // Limit the range size to DefaultNodeBlockChunkSize if new(big.Int).Sub(to, from).Cmp(rangeSize) > 0 { from.Sub(to, rangeSize) @@ -54,19 +55,22 @@ func (c *findBlocksCommand) Run(parent context.Context) (err error) { for { headers, _ := c.checkRange(parent, from, to) if c.error != nil { - log.Error("findBlocksCommand checkRange", "error", c.error) + log.Error("findBlocksCommand checkRange", "error", c.error, "account", c.account, + "chain", c.chainClient.ChainID, "from", from, "to", to) break } - log.Debug("findBlocksCommand saving headers", "len", len(headers), "lastBlockNumber", to, - "balance", c.balanceCache.ReadCachedBalance(c.account, to), - "nonce", c.balanceCache.ReadCachedNonce(c.account, to)) + if len(headers) > 0 { + log.Debug("findBlocksCommand saving headers", "len", len(headers), "lastBlockNumber", to, + "balance", c.balanceCache.ReadCachedBalance(c.account, to), + "nonce", c.balanceCache.ReadCachedNonce(c.account, to)) - err = c.db.SaveBlocks(c.chainClient.ChainID, c.account, headers) - if err != nil { - c.error = err - // return err - break + err = c.db.SaveBlocks(c.chainClient.ChainID, c.account, headers) + if err != nil { + c.error = err + // return err + break + } } err = c.upsertBlockRange(&BlockRange{c.startBlockNumber, c.resFromBlock.Number, to}) @@ -109,29 +113,28 @@ func (c *findBlocksCommand) checkRange(parent context.Context, from *big.Int, to newFromBlock, ethHeaders, startBlock, err := c.fastIndex(parent, c.balanceCache, fromBlock, to) if err != nil { - log.Error("findBlocksCommand checkRange fastIndex", "err", err) + log.Error("findBlocksCommand checkRange fastIndex", "err", err, "account", c.account, + "chain", c.chainClient.ChainID) c.error = err // return err // In case c.noLimit is true, hystrix "max concurrency" may be reached and we will not be able to index ETH transfers return nil, nil } log.Debug("findBlocksCommand checkRange", "startBlock", startBlock, "newFromBlock", newFromBlock.Number, "toBlockNumber", to, "noLimit", c.noLimit) - // There should be transfers when either when we have found headers - // or newFromBlock is different from fromBlock - if len(ethHeaders) > 0 || newFromBlock.Number.Cmp(fromBlock.Number) != 0 { - erc20Headers, err := c.fastIndexErc20(parent, newFromBlock.Number, to) - if err != nil { - log.Error("findBlocksCommand checkRange fastIndexErc20", "err", err) - c.error = err - // return err - return nil, nil - } + // There could be incoming ERC20 transfers which don't change the balance + // and nonce of ETH account, so we keep looking for them + erc20Headers, err := c.fastIndexErc20(parent, newFromBlock.Number, to) + if err != nil { + log.Error("findBlocksCommand checkRange fastIndexErc20", "err", err) + c.error = err + // return err + return nil, nil + } - allHeaders := append(ethHeaders, erc20Headers...) + allHeaders := append(ethHeaders, erc20Headers...) - if len(allHeaders) > 0 { - foundHeaders = uniqueHeaders(allHeaders) - } + if len(allHeaders) > 0 { + foundHeaders = uniqueHeaders(allHeaders) } c.resFromBlock = newFromBlock @@ -156,7 +159,8 @@ func loadBlockRangeInfo(chainID uint64, account common.Address, blockDAO *BlockR return blockRange, nil } -// Returns if all the blocks prior to first known block are loaded, not considering +// Returns if all blocks are loaded, which means that start block (beginning of account history) +// has been found and all block headers saved to the DB func areAllHistoryBlocksLoaded(blockInfo *BlockRange) bool { if blockInfo == nil { return false @@ -171,6 +175,18 @@ func areAllHistoryBlocksLoaded(blockInfo *BlockRange) bool { return false } +func areAllHistoryBlocksLoadedForAddress(blockRangeDAO *BlockRangeSequentialDAO, chainID uint64, + address common.Address) (bool, error) { + + blockRange, err := blockRangeDAO.getBlockRange(chainID, address) + if err != nil { + log.Error("findBlocksCommand getBlockRange", "error", err) + return false, err + } + + return areAllHistoryBlocksLoaded(blockRange), nil +} + // run fast indexing for every accont up to canonical chain head minus safety depth. // every account will run it from last synced header. func (c *findBlocksCommand) fastIndex(ctx context.Context, bCache *balanceCache, @@ -186,17 +202,11 @@ func (c *findBlocksCommand) fastIndex(ctx context.Context, bCache *balanceCache, chainClient: c.chainClient, balanceCache: bCache, address: c.account, - eth: ÐDownloader{ - chainClient: c.chainClient, - accounts: []common.Address{c.account}, - signer: types.NewLondonSigner(c.chainClient.ToBigInt()), - db: c.db, - }, - feed: c.feed, - from: fromBlock, - to: toBlockNumber, - noLimit: c.noLimit, - threadLimit: SequentialThreadLimit, + feed: c.feed, + from: fromBlock, + to: toBlockNumber, + noLimit: c.noLimit, + threadLimit: SequentialThreadLimit, } group.Add(command.Command()) @@ -258,6 +268,7 @@ type loadAllTransfersCommand struct { blocksByAddress map[common.Address][]*big.Int transactionManager *TransactionManager blocksLimit int + feed *event.Feed } func (c *loadAllTransfersCommand) Command() async.Command { @@ -268,7 +279,71 @@ func (c *loadAllTransfersCommand) Command() async.Command { } func (c *loadAllTransfersCommand) Run(parent context.Context) error { - return loadTransfersLoop(parent, c.accounts, c.blockDAO, c.db, c.chainClient, c.blocksLimit, c.blocksByAddress, c.transactionManager) + start := time.Now() + group := async.NewGroup(parent) + + commands := []*transfersCommand{} + for _, address := range c.accounts { + transfers := &transfersCommand{ + db: c.db, + blockDAO: c.blockDAO, + chainClient: c.chainClient, + address: address, + eth: ÐDownloader{ + chainClient: c.chainClient, + accounts: []common.Address{address}, + signer: types.NewLondonSigner(c.chainClient.ToBigInt()), + db: c.db, + }, + blockNums: c.blocksByAddress[address], + blocksLimit: c.blocksLimit, + transactionManager: c.transactionManager, + } + commands = append(commands, transfers) + group.Add(transfers.Command()) + } + + select { + case <-parent.Done(): + log.Info("loadTransfers transfersCommand error", "chain", c.chainClient.ChainID, "error", parent.Err()) + return parent.Err() + case <-group.WaitAsync(): + log.Debug("loadTransfers finished for account", "in", time.Since(start), "chain", c.chainClient.ChainID, "limit", c.blocksLimit) + + c.notifyOfNewTransfers(commands) + } + + return nil +} + +func (c *loadAllTransfersCommand) notifyOfNewTransfers(commands []*transfersCommand) { + if c.feed != nil { + for _, command := range commands { + if len(command.fetchedTransfers) > 0 { + c.feed.Send(walletevent.Event{ + Type: EventNewTransfers, + Accounts: []common.Address{command.address}, + }) + } + } + } +} + +func newLoadBlocksAndTransfersCommand(accounts []common.Address, db *Database, + blockDAO *BlockDAO, chainClient *chain.ClientWithFallback, feed *event.Feed, + transactionManager *TransactionManager) *loadBlocksAndTransfersCommand { + + return &loadBlocksAndTransfersCommand{ + accounts: accounts, + db: db, + blockRangeDAO: &BlockRangeSequentialDAO{db.client}, + blockDAO: blockDAO, + chainClient: chainClient, + feed: feed, + errorsCount: 0, + transactionManager: transactionManager, + transfersLoaded: make(map[common.Address]bool), + } } type loadBlocksAndTransfersCommand struct { @@ -282,6 +357,9 @@ type loadBlocksAndTransfersCommand struct { errorsCount int // nonArchivalRPCNode bool // TODO Make use of it transactionManager *TransactionManager + + // Not to be set by the caller + transfersLoaded map[common.Address]bool // For event RecentHistoryReady to be sent only once per account during app lifetime } func (c *loadBlocksAndTransfersCommand) Run(parent context.Context) error { @@ -289,13 +367,6 @@ func (c *loadBlocksAndTransfersCommand) Run(parent context.Context) error { ctx := parent - if c.feed != nil { - c.feed.Send(walletevent.Event{ - Type: EventFetchingRecentHistory, - Accounts: c.accounts, - }) - } - if c.balanceCache == nil { c.balanceCache = newBalanceCache() // TODO - need to keep balanceCache in memory??? What about sharing it with other packages? } @@ -317,14 +388,25 @@ func (c *loadBlocksAndTransfersCommand) Run(parent context.Context) error { } allHistoryLoaded := areAllHistoryBlocksLoaded(blockRange) - toHistoryBlockNum := getToHistoryBlockNumber(headNum, blockRange, allHistoryLoaded) if !allHistoryLoaded { - c.fetchHistoryBlocks(ctx, group, address, blockRange, toHistoryBlockNum, headNum) + c.fetchHistoryBlocks(ctx, group, address, big.NewInt(0), toHistoryBlockNum) + } else { + if !c.transfersLoaded[address] { + transfersLoaded, err := c.areAllTransfersLoadedForAddress(address) + if err != nil { + return err + } + + if transfersLoaded { + c.transfersLoaded[address] = true + c.notifyHistoryReady(address) + } + } } - // If no block ranges are stored, all blocks will be fetched by startFetchingHistoryBlocks method + // If no block ranges are stored, all blocks will be fetched by fetchHistoryBlocks method if blockRange != nil { c.fetchNewBlocks(ctx, group, address, blockRange, headNum) } @@ -349,9 +431,9 @@ func (c *loadBlocksAndTransfersCommand) Command() async.Command { } func (c *loadBlocksAndTransfersCommand) fetchHistoryBlocks(ctx context.Context, group *async.Group, - address common.Address, blockRange *BlockRange, toHistoryBlockNum *big.Int, headNum *big.Int) { + address common.Address, from *big.Int, to *big.Int) { - log.Info("Launching history command") + log.Debug("Launching history command", "account", address, "from", from, "to", to) fbc := &findBlocksCommand{ account: address, @@ -361,8 +443,8 @@ func (c *loadBlocksAndTransfersCommand) fetchHistoryBlocks(ctx context.Context, balanceCache: c.balanceCache, feed: c.feed, noLimit: false, - fromBlockNumber: big.NewInt(0), // Beginning of the chain history - toBlockNumber: toHistoryBlockNum, + fromBlockNumber: from, + toBlockNumber: to, transactionManager: c.transactionManager, } group.Add(fbc.Command()) @@ -371,9 +453,10 @@ func (c *loadBlocksAndTransfersCommand) fetchHistoryBlocks(ctx context.Context, func (c *loadBlocksAndTransfersCommand) fetchNewBlocks(ctx context.Context, group *async.Group, address common.Address, blockRange *BlockRange, headNum *big.Int) { - log.Info("Launching new blocks command") fromBlockNumber := new(big.Int).Add(blockRange.LastKnown, big.NewInt(1)) + log.Debug("Launching new blocks command", "chainID", c.chainClient.ChainID, "account", address, "from", fromBlockNumber, "headNum", headNum) + // In case interval between checks is set smaller than block mining time, // we might need to wait for the next block to be mined if fromBlockNumber.Cmp(headNum) > 0 { @@ -403,80 +486,42 @@ func (c *loadBlocksAndTransfersCommand) fetchTransfers(ctx context.Context, grou chainClient: c.chainClient, transactionManager: c.transactionManager, blocksLimit: noBlockLimit, // load transfers from all `unloaded` blocks + feed: c.feed, } group.Add(txCommand.Command()) } -func loadTransfersLoop(ctx context.Context, accounts []common.Address, blockDAO *BlockDAO, db *Database, - chainClient *chain.ClientWithFallback, blocksLimitPerAccount int, blocksByAddress map[common.Address][]*big.Int, - transactionManager *TransactionManager) error { +func (c *loadBlocksAndTransfersCommand) notifyHistoryReady(address common.Address) { + if c.feed != nil { + c.feed.Send(walletevent.Event{ + Type: EventRecentHistoryReady, + Accounts: []common.Address{address}, + }) + } +} - log.Debug("loadTransfers start", "accounts", accounts, "chain", chainClient.ChainID, "limit", blocksLimitPerAccount) +func (c *loadBlocksAndTransfersCommand) areAllTransfersLoadedForAddress(address common.Address) (bool, error) { + allBlocksLoaded, err := areAllHistoryBlocksLoadedForAddress(c.blockRangeDAO, c.chainClient.ChainID, address) + if err != nil { + log.Error("loadBlockAndTransfersCommand allHistoryBlocksLoaded", "error", err) + return false, err + } - start := time.Now() - group := async.NewGroup(ctx) + if allBlocksLoaded { + firstHeader, err := c.blockDAO.GetFirstSavedBlock(c.chainClient.ChainID, address) + if err != nil { + log.Error("loadBlocksAndTransfersCommand GetFirstSavedBlock", "error", err) + return false, err + } - for _, address := range accounts { - // Take blocks from cache if available and disrespect the limit - // If no blocks are available in cache, take blocks from DB respecting the limit - // If no limit is set, take all blocks from DB - blocks, ok := blocksByAddress[address] - - commands := []*transfersCommand{} - for { - if !ok { - blocks, _ = blockDAO.GetBlocksByAddress(chainClient.ChainID, address, numberOfBlocksCheckedPerIteration) - } - - for _, block := range blocks { - transfers := &transfersCommand{ - db: db, - chainClient: chainClient, - address: address, - eth: ÐDownloader{ - chainClient: chainClient, - accounts: []common.Address{address}, - signer: types.NewLondonSigner(chainClient.ToBigInt()), - db: db, - }, - blockNum: block, - transactionManager: transactionManager, - } - commands = append(commands, transfers) - group.Add(transfers.Command()) - } - - // We need to wait until the retrieved blocks are processed, otherwise - // they will be retrieved again in the next iteration - // It blocks transfer loading for single account at a time - select { - case <-ctx.Done(): - log.Info("loadTransfers transfersCommand error", "chain", chainClient.ChainID, "address", address, "error", ctx.Err()) - continue - // return nil, ctx.Err() - case <-group.WaitAsync(): - // TODO Remove when done debugging - transfers := []Transfer{} - for _, command := range commands { - if len(command.fetchedTransfers) == 0 { - continue - } - - transfers = append(transfers, command.fetchedTransfers...) - } - log.Debug("loadTransfers finished for account", "address", address, "in", time.Since(start), "chain", chainClient.ChainID, "transfers", len(transfers), "limit", blocksLimitPerAccount) - } - - if ok || len(blocks) == 0 || - (blocksLimitPerAccount > noBlockLimit && len(blocks) >= blocksLimitPerAccount) { - log.Debug("loadTransfers breaking loop on block limits reached or 0 blocks", "chain", chainClient.ChainID, "address", address, "limit", blocksLimitPerAccount, "blocks", len(blocks)) - break - } + // If first block is Loaded, we have fetched all the transfers + if firstHeader != nil && firstHeader.Loaded { + return true, nil } } - return nil + return false, nil } func getHeadBlockNumber(parent context.Context, chainClient *chain.ClientWithFallback) (*big.Int, error) { @@ -490,28 +535,6 @@ func getHeadBlockNumber(parent context.Context, chainClient *chain.ClientWithFal return head.Number, err } -func uniqueHeaders(allHeaders []*DBHeader) []*DBHeader { - uniqHeadersByHash := map[common.Hash]*DBHeader{} - for _, header := range allHeaders { - uniqHeader, ok := uniqHeadersByHash[header.Hash] - if ok { - if len(header.Erc20Transfers) > 0 { - uniqHeader.Erc20Transfers = append(uniqHeader.Erc20Transfers, header.Erc20Transfers...) - } - uniqHeadersByHash[header.Hash] = uniqHeader - } else { - uniqHeadersByHash[header.Hash] = header - } - } - - uniqHeaders := []*DBHeader{} - for _, header := range uniqHeadersByHash { - uniqHeaders = append(uniqHeaders, header) - } - - return uniqHeaders -} - func nextRange(from *big.Int, zeroBlockNumber *big.Int) (*big.Int, *big.Int) { log.Debug("next range start", "from", from, "zeroBlockNumber", zeroBlockNumber) diff --git a/services/wallet/transfer/concurrent.go b/services/wallet/transfer/concurrent.go index 521a70b62..43ad1f1d3 100644 --- a/services/wallet/transfer/concurrent.go +++ b/services/wallet/transfer/concurrent.go @@ -93,9 +93,9 @@ type Downloader interface { // Returns new block ranges that contain transfers and found block headers that contain transfers, and a block where // beginning of trasfers history detected -func checkRangesWithStartBlock(parent context.Context, client BalanceReader, cache BalanceCache, downloader Downloader, - account common.Address, ranges [][]*big.Int, threadLimit uint32, startBlock *big.Int) (resRanges [][]*big.Int, - headers []*DBHeader, newStartBlock *big.Int, err error) { +func checkRangesWithStartBlock(parent context.Context, client BalanceReader, cache BalanceCache, + account common.Address, ranges [][]*big.Int, threadLimit uint32, startBlock *big.Int) ( + resRanges [][]*big.Int, headers []*DBHeader, newStartBlock *big.Int, err error) { log.Debug("start checkRanges", "account", account.Hex(), "ranges len", len(ranges)) @@ -208,8 +208,9 @@ func checkRangesWithStartBlock(parent context.Context, client BalanceReader, cac return c.GetRanges(), c.GetHeaders(), newStartBlock, nil } -func findBlocksWithEthTransfers(parent context.Context, client BalanceReader, cache BalanceCache, downloader Downloader, - account common.Address, low, high *big.Int, noLimit bool, threadLimit uint32) (from *big.Int, headers []*DBHeader, resStartBlock *big.Int, err error) { +func findBlocksWithEthTransfers(parent context.Context, client BalanceReader, cache BalanceCache, + account common.Address, low, high *big.Int, noLimit bool, threadLimit uint32) ( + from *big.Int, headers []*DBHeader, resStartBlock *big.Int, err error) { ranges := [][]*big.Int{{low, high}} from = big.NewInt(low.Int64()) @@ -222,7 +223,7 @@ func findBlocksWithEthTransfers(parent context.Context, client BalanceReader, ca // Check if there are transfers in blocks in ranges. To do that, nonce and balance is checked // the block ranges that have transfers are returned newRanges, newHeaders, strtBlock, err := checkRangesWithStartBlock(parent, client, cache, - downloader, account, ranges, threadLimit, resStartBlock) + account, ranges, threadLimit, resStartBlock) resStartBlock = strtBlock if err != nil { return nil, nil, nil, err diff --git a/services/wallet/transfer/concurrent_test.go b/services/wallet/transfer/concurrent_test.go index 510b6e3c1..0e9494961 100644 --- a/services/wallet/transfer/concurrent_test.go +++ b/services/wallet/transfer/concurrent_test.go @@ -128,7 +128,7 @@ func TestConcurrentEthDownloader(t *testing.T) { defer cancel() concurrent := NewConcurrentDownloader(ctx, 0) _, headers, _, _ := findBlocksWithEthTransfers( - ctx, tc.options.balances, newBalanceCache(), tc.options.batches, + ctx, tc.options.balances, newBalanceCache(), common.Address{}, zero, tc.options.last, false, NoThreadLimit) concurrent.Wait() require.NoError(t, concurrent.Error()) diff --git a/services/wallet/transfer/sequential_fetch_strategy.go b/services/wallet/transfer/sequential_fetch_strategy.go index 2ac5ee42f..50445f2db 100644 --- a/services/wallet/transfer/sequential_fetch_strategy.go +++ b/services/wallet/transfer/sequential_fetch_strategy.go @@ -10,6 +10,7 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/status-im/status-go/rpc/chain" "github.com/status-im/status-go/services/wallet/async" + "github.com/status-im/status-go/services/wallet/walletevent" ) func NewSequentialFetchStrategy(db *Database, blockDAO *BlockDAO, feed *event.Feed, @@ -41,17 +42,8 @@ type SequentialFetchStrategy struct { func (s *SequentialFetchStrategy) newCommand(chainClient *chain.ClientWithFallback, accounts []common.Address) async.Commander { - ctl := &loadBlocksAndTransfersCommand{ - db: s.db, - chainClient: chainClient, - accounts: accounts, - blockRangeDAO: &BlockRangeSequentialDAO{s.db.client}, - blockDAO: s.blockDAO, - feed: s.feed, - errorsCount: 0, - transactionManager: s.transactionManager, - } - return ctl + return newLoadBlocksAndTransfersCommand(accounts, s.db, s.blockDAO, chainClient, s.feed, + s.transactionManager) } func (s *SequentialFetchStrategy) start() error { @@ -63,6 +55,13 @@ func (s *SequentialFetchStrategy) start() error { } s.group = async.NewGroup(context.Background()) + if s.feed != nil { + s.feed.Send(walletevent.Event{ + Type: EventFetchingRecentHistory, + Accounts: s.accounts, + }) + } + for _, chainClient := range s.chainClients { ctl := s.newCommand(chainClient, s.accounts) s.group.Add(ctl.Command())