feat(wallet): SequentialFetchStrategy improvements: (#3551)

- reverted a change that stopped looking for ERC20 transfers if no nonce
  and balance change found within a block range for ETH
- implemented sending EventRecentHistoryReady event at a proper time
- moved EventFetchingRecentHistory event to Strategy type as it does not make
sense to send this event in loop
- moved iterating through blocks logic to inside of `loadTransfers` command, which
now accepts a block range.
- reuse `uniqueHeaders` function in commands.go
- clean up

Updates #10246
This commit is contained in:
IvanBelyakoff 2023-06-01 15:09:50 +02:00 committed by GitHub
parent 5d62a9eef4
commit 7adfbb5467
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 314 additions and 237 deletions

View File

@ -249,6 +249,30 @@ func (b *BlockDAO) GetLastSavedBlock(chainID uint64) (rst *DBHeader, err error)
return nil, nil
}
func (b *BlockDAO) GetFirstSavedBlock(chainID uint64, address common.Address) (rst *DBHeader, err error) {
query := `SELECT blk_number, blk_hash, loaded
FROM blocks
WHERE network_id = ? AND address = ?
ORDER BY blk_number LIMIT 1`
rows, err := b.db.Query(query, chainID, address)
if err != nil {
return
}
defer rows.Close()
if rows.Next() {
header := &DBHeader{Hash: common.Hash{}, Number: new(big.Int)}
err = rows.Scan((*bigint.SQLBigInt)(header.Number), &header.Hash, &header.Loaded)
if err != nil {
return nil, err
}
return header, nil
}
return nil, nil
}
// TODO remove as not used
func (b *BlockDAO) GetBlocks(chainID uint64) (rst []*DBHeader, err error) {
query := `SELECT blk_number, blk_hash, address FROM blocks`

View File

@ -81,8 +81,7 @@ func (b *BlockRangeSequentialDAO) upsertRange(chainID uint64, account common.Add
// to a greater value, because no history can be before some block that is considered
// as a start of history, but due to concurrent block range checks, a newer greater block
// can be found that matches criteria of a start block (nonce is zero, balances are equal)
if newBlockRange.Start != nil || (blockRange.Start != nil && newBlockRange.Start != nil &&
blockRange.Start.Cmp(newBlockRange.Start) < 0) {
if newBlockRange.Start != nil && (blockRange.Start == nil || blockRange.Start.Cmp(newBlockRange.Start) < 0) {
blockRange.Start = newBlockRange.Start
}

View File

@ -47,7 +47,6 @@ var (
)
type ethHistoricalCommand struct {
eth Downloader
address common.Address
chainClient *chain.ClientWithFallback
balanceCache *balanceCache
@ -69,7 +68,8 @@ func (c *ethHistoricalCommand) Command() async.Command {
}
func (c *ethHistoricalCommand) Run(ctx context.Context) (err error) {
log.Info("eth historical downloader start", "address", c.address, "from", c.from.Number, "to", c.to, "noLimit", c.noLimit)
log.Info("eth historical downloader start", "chainID", c.chainClient.ChainID, "address", c.address,
"from", c.from.Number, "to", c.to, "noLimit", c.noLimit)
start := time.Now()
if c.from.Number != nil && c.from.Balance != nil {
@ -79,11 +79,12 @@ func (c *ethHistoricalCommand) Run(ctx context.Context) (err error) {
c.balanceCache.addNonceToCache(c.address, c.from.Number, c.from.Nonce)
}
from, headers, startBlock, err := findBlocksWithEthTransfers(ctx, c.chainClient,
c.balanceCache, c.eth, c.address, c.from.Number, c.to, c.noLimit, c.threadLimit)
c.balanceCache, c.address, c.from.Number, c.to, c.noLimit, c.threadLimit)
if err != nil {
c.error = err
log.Error("failed to find blocks with transfers", "error", err)
log.Error("failed to find blocks with transfers", "error", err, "chainID", c.chainClient.ChainID,
"address", c.address, "from", c.from.Number, "to", c.to)
return nil
}
@ -343,12 +344,16 @@ func (c *controlCommand) Command() async.Command {
type transfersCommand struct {
db *Database
blockDAO *BlockDAO
eth *ETHDownloader
blockNum *big.Int
blockNums []*big.Int
address common.Address
chainClient *chain.ClientWithFallback
fetchedTransfers []Transfer
blocksLimit int
transactionManager *TransactionManager
// result
fetchedTransfers []Transfer
}
func (c *transfersCommand) Command() async.Command {
@ -359,16 +364,66 @@ func (c *transfersCommand) Command() async.Command {
}
func (c *transfersCommand) Run(ctx context.Context) (err error) {
log.Debug("start transfersCommand", "chain", c.chainClient.ChainID, "address", c.address, "block", c.blockNum)
// Take blocks from cache if available and disrespect the limit
// If no blocks are available in cache, take blocks from DB respecting the limit
// If no limit is set, take all blocks from DB
for {
blocks := c.blockNums
if blocks == nil {
blocks, _ = c.blockDAO.GetBlocksByAddress(c.chainClient.ChainID, c.address, numberOfBlocksCheckedPerIteration)
}
startTs := time.Now()
for _, blockNum := range blocks {
log.Info("start transfersCommand", "chain", c.chainClient.ChainID, "address", c.address, "block", blockNum)
allTransfers, err := c.eth.GetTransfersByNumber(ctx, c.blockNum)
if err != nil {
log.Error("getTransfersByBlocks error", "error", err)
return err
startTs := time.Now()
allTransfers, err := c.eth.GetTransfersByNumber(ctx, blockNum)
if err != nil {
log.Error("getTransfersByBlocks error", "error", err)
return err
}
err = c.updateMultiTxFromPendingEntry(allTransfers)
if err != nil {
return err
}
if len(allTransfers) > 0 {
err = c.db.SaveTransfersMarkBlocksLoaded(c.chainClient.ChainID, c.address, allTransfers, []*big.Int{blockNum})
if err != nil {
log.Error("SaveTransfers error", "error", err)
return err
}
} else {
// If no transfers found, that is suspecting, because downloader returned this block as containing transfers
log.Error("no transfers found in block", "chain", c.chainClient.ChainID, "address", c.address, "block", blockNum)
err = markBlocksAsLoaded(c.chainClient.ChainID, c.db.client, c.address, []*big.Int{blockNum})
if err != nil {
log.Error("Mark blocks loaded error", "error", err)
return err
}
}
c.fetchedTransfers = append(c.fetchedTransfers, allTransfers...)
log.Debug("end transfersCommand", "chain", c.chainClient.ChainID, "address", c.address,
"block", blockNum, "len", len(allTransfers), "in", time.Since(startTs))
}
if c.blockNums != nil || len(blocks) == 0 ||
(c.blocksLimit > noBlockLimit && len(blocks) >= c.blocksLimit) {
log.Debug("loadTransfers breaking loop on block limits reached or 0 blocks", "chain", c.chainClient.ChainID,
"address", c.address, "limit", c.blocksLimit, "blocks", len(blocks))
break
}
}
return nil
}
func (c *transfersCommand) updateMultiTxFromPendingEntry(allTransfers []Transfer) error {
// Update MultiTransactionID from pending entry
for index := range allTransfers {
transfer := &allTransfers[index]
@ -382,27 +437,6 @@ func (c *transfersCommand) Run(ctx context.Context) (err error) {
}
}
if len(allTransfers) > 0 {
err = c.db.SaveTransfersMarkBlocksLoaded(c.chainClient.ChainID, c.address, allTransfers, []*big.Int{c.blockNum})
if err != nil {
log.Error("SaveTransfers error", "error", err)
return err
}
} else {
// If no transfers found, that is suspecting, because downloader returned this block as containing transfers
log.Error("no transfers found in block", "chain", c.chainClient.ChainID, "address", c.address, "block", c.blockNum)
err = markBlocksAsLoaded(c.chainClient.ChainID, c.db.client, c.address, []*big.Int{c.blockNum})
if err != nil {
log.Error("Mark blocks loaded error", "error", err)
return err
}
}
c.fetchedTransfers = allTransfers
log.Debug("end transfersCommand", "chain", c.chainClient.ChainID, "address", c.address,
"block", c.blockNum, "len", len(allTransfers), "in", time.Since(startTs))
return nil
}
@ -485,22 +519,9 @@ func (c *findAndCheckBlockRangeCommand) Run(parent context.Context) (err error)
erc20Headers := erc20HeadersByAddress[address]
allHeaders := append(ethHeaders, erc20Headers...)
uniqHeadersByHash := map[common.Hash]*DBHeader{}
for _, header := range allHeaders {
uniqHeader, ok := uniqHeadersByHash[header.Hash]
if ok {
if len(header.Erc20Transfers) > 0 {
uniqHeader.Erc20Transfers = append(uniqHeader.Erc20Transfers, header.Erc20Transfers...)
}
uniqHeadersByHash[header.Hash] = uniqHeader
} else {
uniqHeadersByHash[header.Hash] = header
}
}
uniqHeaders := []*DBHeader{}
for _, header := range uniqHeadersByHash {
uniqHeaders = append(uniqHeaders, header)
if len(allHeaders) > 0 {
uniqHeaders = uniqueHeaders(allHeaders)
}
foundHeaders[address] = uniqHeaders
@ -542,17 +563,11 @@ func (c *findAndCheckBlockRangeCommand) fastIndex(ctx context.Context, bCache *b
chainClient: c.chainClient,
balanceCache: bCache,
address: address,
eth: &ETHDownloader{
chainClient: c.chainClient,
accounts: []common.Address{address},
signer: types.NewLondonSigner(c.chainClient.ToBigInt()),
db: c.db,
},
feed: c.feed,
from: fromByAddress[address],
to: toByAddress[address],
noLimit: c.noLimit,
threadLimit: NoThreadLimit,
feed: c.feed,
from: fromByAddress[address],
to: toByAddress[address],
noLimit: c.noLimit,
threadLimit: NoThreadLimit,
}
commands[i] = eth
group.Add(eth.Command())
@ -621,30 +636,24 @@ func loadTransfers(ctx context.Context, accounts []common.Address, blockDAO *Blo
commands := []*transfersCommand{}
for _, address := range accounts {
blocks, ok := blocksByAddress[address]
if !ok {
blocks, _ = blockDAO.GetBlocksByAddress(chainClient.ChainID, address, numberOfBlocksCheckedPerIteration)
}
for _, block := range blocks {
transfers := &transfersCommand{
db: db,
transfers := &transfersCommand{
db: db,
blockDAO: blockDAO,
chainClient: chainClient,
address: address,
eth: &ETHDownloader{
chainClient: chainClient,
address: address,
eth: &ETHDownloader{
chainClient: chainClient,
accounts: []common.Address{address},
signer: types.NewLondonSigner(chainClient.ToBigInt()),
db: db,
},
blockNum: block,
transactionManager: transactionManager,
}
commands = append(commands, transfers)
group.Add(transfers.Command())
accounts: []common.Address{address},
signer: types.NewLondonSigner(chainClient.ToBigInt()),
db: db,
},
blockNums: blocksByAddress[address],
transactionManager: transactionManager,
}
commands = append(commands, transfers)
group.Add(transfers.Command())
}
select {
case <-ctx.Done():
return nil, ctx.Err()
@ -756,3 +765,25 @@ func findFirstRanges(c context.Context, accounts []common.Address, initialTo *bi
return res, nil
}
func uniqueHeaders(allHeaders []*DBHeader) []*DBHeader {
uniqHeadersByHash := map[common.Hash]*DBHeader{}
for _, header := range allHeaders {
uniqHeader, ok := uniqHeadersByHash[header.Hash]
if ok {
if len(header.Erc20Transfers) > 0 {
uniqHeader.Erc20Transfers = append(uniqHeader.Erc20Transfers, header.Erc20Transfers...)
}
uniqHeadersByHash[header.Hash] = uniqHeader
} else {
uniqHeadersByHash[header.Hash] = header
}
}
uniqHeaders := []*DBHeader{}
for _, header := range uniqHeadersByHash {
uniqHeaders = append(uniqHeaders, header)
}
return uniqHeaders
}

View File

@ -46,6 +46,7 @@ func (c *findBlocksCommand) Run(parent context.Context) (err error) {
rangeSize := big.NewInt(DefaultNodeBlockChunkSize)
from, to := new(big.Int).Set(c.fromBlockNumber), new(big.Int).Set(c.toBlockNumber)
// Limit the range size to DefaultNodeBlockChunkSize
if new(big.Int).Sub(to, from).Cmp(rangeSize) > 0 {
from.Sub(to, rangeSize)
@ -54,19 +55,22 @@ func (c *findBlocksCommand) Run(parent context.Context) (err error) {
for {
headers, _ := c.checkRange(parent, from, to)
if c.error != nil {
log.Error("findBlocksCommand checkRange", "error", c.error)
log.Error("findBlocksCommand checkRange", "error", c.error, "account", c.account,
"chain", c.chainClient.ChainID, "from", from, "to", to)
break
}
log.Debug("findBlocksCommand saving headers", "len", len(headers), "lastBlockNumber", to,
"balance", c.balanceCache.ReadCachedBalance(c.account, to),
"nonce", c.balanceCache.ReadCachedNonce(c.account, to))
if len(headers) > 0 {
log.Debug("findBlocksCommand saving headers", "len", len(headers), "lastBlockNumber", to,
"balance", c.balanceCache.ReadCachedBalance(c.account, to),
"nonce", c.balanceCache.ReadCachedNonce(c.account, to))
err = c.db.SaveBlocks(c.chainClient.ChainID, c.account, headers)
if err != nil {
c.error = err
// return err
break
err = c.db.SaveBlocks(c.chainClient.ChainID, c.account, headers)
if err != nil {
c.error = err
// return err
break
}
}
err = c.upsertBlockRange(&BlockRange{c.startBlockNumber, c.resFromBlock.Number, to})
@ -109,29 +113,28 @@ func (c *findBlocksCommand) checkRange(parent context.Context, from *big.Int, to
newFromBlock, ethHeaders, startBlock, err := c.fastIndex(parent, c.balanceCache, fromBlock, to)
if err != nil {
log.Error("findBlocksCommand checkRange fastIndex", "err", err)
log.Error("findBlocksCommand checkRange fastIndex", "err", err, "account", c.account,
"chain", c.chainClient.ChainID)
c.error = err
// return err // In case c.noLimit is true, hystrix "max concurrency" may be reached and we will not be able to index ETH transfers
return nil, nil
}
log.Debug("findBlocksCommand checkRange", "startBlock", startBlock, "newFromBlock", newFromBlock.Number, "toBlockNumber", to, "noLimit", c.noLimit)
// There should be transfers when either when we have found headers
// or newFromBlock is different from fromBlock
if len(ethHeaders) > 0 || newFromBlock.Number.Cmp(fromBlock.Number) != 0 {
erc20Headers, err := c.fastIndexErc20(parent, newFromBlock.Number, to)
if err != nil {
log.Error("findBlocksCommand checkRange fastIndexErc20", "err", err)
c.error = err
// return err
return nil, nil
}
// There could be incoming ERC20 transfers which don't change the balance
// and nonce of ETH account, so we keep looking for them
erc20Headers, err := c.fastIndexErc20(parent, newFromBlock.Number, to)
if err != nil {
log.Error("findBlocksCommand checkRange fastIndexErc20", "err", err)
c.error = err
// return err
return nil, nil
}
allHeaders := append(ethHeaders, erc20Headers...)
allHeaders := append(ethHeaders, erc20Headers...)
if len(allHeaders) > 0 {
foundHeaders = uniqueHeaders(allHeaders)
}
if len(allHeaders) > 0 {
foundHeaders = uniqueHeaders(allHeaders)
}
c.resFromBlock = newFromBlock
@ -156,7 +159,8 @@ func loadBlockRangeInfo(chainID uint64, account common.Address, blockDAO *BlockR
return blockRange, nil
}
// Returns if all the blocks prior to first known block are loaded, not considering
// Returns if all blocks are loaded, which means that start block (beginning of account history)
// has been found and all block headers saved to the DB
func areAllHistoryBlocksLoaded(blockInfo *BlockRange) bool {
if blockInfo == nil {
return false
@ -171,6 +175,18 @@ func areAllHistoryBlocksLoaded(blockInfo *BlockRange) bool {
return false
}
func areAllHistoryBlocksLoadedForAddress(blockRangeDAO *BlockRangeSequentialDAO, chainID uint64,
address common.Address) (bool, error) {
blockRange, err := blockRangeDAO.getBlockRange(chainID, address)
if err != nil {
log.Error("findBlocksCommand getBlockRange", "error", err)
return false, err
}
return areAllHistoryBlocksLoaded(blockRange), nil
}
// run fast indexing for every accont up to canonical chain head minus safety depth.
// every account will run it from last synced header.
func (c *findBlocksCommand) fastIndex(ctx context.Context, bCache *balanceCache,
@ -186,17 +202,11 @@ func (c *findBlocksCommand) fastIndex(ctx context.Context, bCache *balanceCache,
chainClient: c.chainClient,
balanceCache: bCache,
address: c.account,
eth: &ETHDownloader{
chainClient: c.chainClient,
accounts: []common.Address{c.account},
signer: types.NewLondonSigner(c.chainClient.ToBigInt()),
db: c.db,
},
feed: c.feed,
from: fromBlock,
to: toBlockNumber,
noLimit: c.noLimit,
threadLimit: SequentialThreadLimit,
feed: c.feed,
from: fromBlock,
to: toBlockNumber,
noLimit: c.noLimit,
threadLimit: SequentialThreadLimit,
}
group.Add(command.Command())
@ -258,6 +268,7 @@ type loadAllTransfersCommand struct {
blocksByAddress map[common.Address][]*big.Int
transactionManager *TransactionManager
blocksLimit int
feed *event.Feed
}
func (c *loadAllTransfersCommand) Command() async.Command {
@ -268,7 +279,71 @@ func (c *loadAllTransfersCommand) Command() async.Command {
}
func (c *loadAllTransfersCommand) Run(parent context.Context) error {
return loadTransfersLoop(parent, c.accounts, c.blockDAO, c.db, c.chainClient, c.blocksLimit, c.blocksByAddress, c.transactionManager)
start := time.Now()
group := async.NewGroup(parent)
commands := []*transfersCommand{}
for _, address := range c.accounts {
transfers := &transfersCommand{
db: c.db,
blockDAO: c.blockDAO,
chainClient: c.chainClient,
address: address,
eth: &ETHDownloader{
chainClient: c.chainClient,
accounts: []common.Address{address},
signer: types.NewLondonSigner(c.chainClient.ToBigInt()),
db: c.db,
},
blockNums: c.blocksByAddress[address],
blocksLimit: c.blocksLimit,
transactionManager: c.transactionManager,
}
commands = append(commands, transfers)
group.Add(transfers.Command())
}
select {
case <-parent.Done():
log.Info("loadTransfers transfersCommand error", "chain", c.chainClient.ChainID, "error", parent.Err())
return parent.Err()
case <-group.WaitAsync():
log.Debug("loadTransfers finished for account", "in", time.Since(start), "chain", c.chainClient.ChainID, "limit", c.blocksLimit)
c.notifyOfNewTransfers(commands)
}
return nil
}
func (c *loadAllTransfersCommand) notifyOfNewTransfers(commands []*transfersCommand) {
if c.feed != nil {
for _, command := range commands {
if len(command.fetchedTransfers) > 0 {
c.feed.Send(walletevent.Event{
Type: EventNewTransfers,
Accounts: []common.Address{command.address},
})
}
}
}
}
func newLoadBlocksAndTransfersCommand(accounts []common.Address, db *Database,
blockDAO *BlockDAO, chainClient *chain.ClientWithFallback, feed *event.Feed,
transactionManager *TransactionManager) *loadBlocksAndTransfersCommand {
return &loadBlocksAndTransfersCommand{
accounts: accounts,
db: db,
blockRangeDAO: &BlockRangeSequentialDAO{db.client},
blockDAO: blockDAO,
chainClient: chainClient,
feed: feed,
errorsCount: 0,
transactionManager: transactionManager,
transfersLoaded: make(map[common.Address]bool),
}
}
type loadBlocksAndTransfersCommand struct {
@ -282,6 +357,9 @@ type loadBlocksAndTransfersCommand struct {
errorsCount int
// nonArchivalRPCNode bool // TODO Make use of it
transactionManager *TransactionManager
// Not to be set by the caller
transfersLoaded map[common.Address]bool // For event RecentHistoryReady to be sent only once per account during app lifetime
}
func (c *loadBlocksAndTransfersCommand) Run(parent context.Context) error {
@ -289,13 +367,6 @@ func (c *loadBlocksAndTransfersCommand) Run(parent context.Context) error {
ctx := parent
if c.feed != nil {
c.feed.Send(walletevent.Event{
Type: EventFetchingRecentHistory,
Accounts: c.accounts,
})
}
if c.balanceCache == nil {
c.balanceCache = newBalanceCache() // TODO - need to keep balanceCache in memory??? What about sharing it with other packages?
}
@ -317,14 +388,25 @@ func (c *loadBlocksAndTransfersCommand) Run(parent context.Context) error {
}
allHistoryLoaded := areAllHistoryBlocksLoaded(blockRange)
toHistoryBlockNum := getToHistoryBlockNumber(headNum, blockRange, allHistoryLoaded)
if !allHistoryLoaded {
c.fetchHistoryBlocks(ctx, group, address, blockRange, toHistoryBlockNum, headNum)
c.fetchHistoryBlocks(ctx, group, address, big.NewInt(0), toHistoryBlockNum)
} else {
if !c.transfersLoaded[address] {
transfersLoaded, err := c.areAllTransfersLoadedForAddress(address)
if err != nil {
return err
}
if transfersLoaded {
c.transfersLoaded[address] = true
c.notifyHistoryReady(address)
}
}
}
// If no block ranges are stored, all blocks will be fetched by startFetchingHistoryBlocks method
// If no block ranges are stored, all blocks will be fetched by fetchHistoryBlocks method
if blockRange != nil {
c.fetchNewBlocks(ctx, group, address, blockRange, headNum)
}
@ -349,9 +431,9 @@ func (c *loadBlocksAndTransfersCommand) Command() async.Command {
}
func (c *loadBlocksAndTransfersCommand) fetchHistoryBlocks(ctx context.Context, group *async.Group,
address common.Address, blockRange *BlockRange, toHistoryBlockNum *big.Int, headNum *big.Int) {
address common.Address, from *big.Int, to *big.Int) {
log.Info("Launching history command")
log.Debug("Launching history command", "account", address, "from", from, "to", to)
fbc := &findBlocksCommand{
account: address,
@ -361,8 +443,8 @@ func (c *loadBlocksAndTransfersCommand) fetchHistoryBlocks(ctx context.Context,
balanceCache: c.balanceCache,
feed: c.feed,
noLimit: false,
fromBlockNumber: big.NewInt(0), // Beginning of the chain history
toBlockNumber: toHistoryBlockNum,
fromBlockNumber: from,
toBlockNumber: to,
transactionManager: c.transactionManager,
}
group.Add(fbc.Command())
@ -371,9 +453,10 @@ func (c *loadBlocksAndTransfersCommand) fetchHistoryBlocks(ctx context.Context,
func (c *loadBlocksAndTransfersCommand) fetchNewBlocks(ctx context.Context, group *async.Group,
address common.Address, blockRange *BlockRange, headNum *big.Int) {
log.Info("Launching new blocks command")
fromBlockNumber := new(big.Int).Add(blockRange.LastKnown, big.NewInt(1))
log.Debug("Launching new blocks command", "chainID", c.chainClient.ChainID, "account", address, "from", fromBlockNumber, "headNum", headNum)
// In case interval between checks is set smaller than block mining time,
// we might need to wait for the next block to be mined
if fromBlockNumber.Cmp(headNum) > 0 {
@ -403,80 +486,42 @@ func (c *loadBlocksAndTransfersCommand) fetchTransfers(ctx context.Context, grou
chainClient: c.chainClient,
transactionManager: c.transactionManager,
blocksLimit: noBlockLimit, // load transfers from all `unloaded` blocks
feed: c.feed,
}
group.Add(txCommand.Command())
}
func loadTransfersLoop(ctx context.Context, accounts []common.Address, blockDAO *BlockDAO, db *Database,
chainClient *chain.ClientWithFallback, blocksLimitPerAccount int, blocksByAddress map[common.Address][]*big.Int,
transactionManager *TransactionManager) error {
func (c *loadBlocksAndTransfersCommand) notifyHistoryReady(address common.Address) {
if c.feed != nil {
c.feed.Send(walletevent.Event{
Type: EventRecentHistoryReady,
Accounts: []common.Address{address},
})
}
}
log.Debug("loadTransfers start", "accounts", accounts, "chain", chainClient.ChainID, "limit", blocksLimitPerAccount)
func (c *loadBlocksAndTransfersCommand) areAllTransfersLoadedForAddress(address common.Address) (bool, error) {
allBlocksLoaded, err := areAllHistoryBlocksLoadedForAddress(c.blockRangeDAO, c.chainClient.ChainID, address)
if err != nil {
log.Error("loadBlockAndTransfersCommand allHistoryBlocksLoaded", "error", err)
return false, err
}
start := time.Now()
group := async.NewGroup(ctx)
if allBlocksLoaded {
firstHeader, err := c.blockDAO.GetFirstSavedBlock(c.chainClient.ChainID, address)
if err != nil {
log.Error("loadBlocksAndTransfersCommand GetFirstSavedBlock", "error", err)
return false, err
}
for _, address := range accounts {
// Take blocks from cache if available and disrespect the limit
// If no blocks are available in cache, take blocks from DB respecting the limit
// If no limit is set, take all blocks from DB
blocks, ok := blocksByAddress[address]
commands := []*transfersCommand{}
for {
if !ok {
blocks, _ = blockDAO.GetBlocksByAddress(chainClient.ChainID, address, numberOfBlocksCheckedPerIteration)
}
for _, block := range blocks {
transfers := &transfersCommand{
db: db,
chainClient: chainClient,
address: address,
eth: &ETHDownloader{
chainClient: chainClient,
accounts: []common.Address{address},
signer: types.NewLondonSigner(chainClient.ToBigInt()),
db: db,
},
blockNum: block,
transactionManager: transactionManager,
}
commands = append(commands, transfers)
group.Add(transfers.Command())
}
// We need to wait until the retrieved blocks are processed, otherwise
// they will be retrieved again in the next iteration
// It blocks transfer loading for single account at a time
select {
case <-ctx.Done():
log.Info("loadTransfers transfersCommand error", "chain", chainClient.ChainID, "address", address, "error", ctx.Err())
continue
// return nil, ctx.Err()
case <-group.WaitAsync():
// TODO Remove when done debugging
transfers := []Transfer{}
for _, command := range commands {
if len(command.fetchedTransfers) == 0 {
continue
}
transfers = append(transfers, command.fetchedTransfers...)
}
log.Debug("loadTransfers finished for account", "address", address, "in", time.Since(start), "chain", chainClient.ChainID, "transfers", len(transfers), "limit", blocksLimitPerAccount)
}
if ok || len(blocks) == 0 ||
(blocksLimitPerAccount > noBlockLimit && len(blocks) >= blocksLimitPerAccount) {
log.Debug("loadTransfers breaking loop on block limits reached or 0 blocks", "chain", chainClient.ChainID, "address", address, "limit", blocksLimitPerAccount, "blocks", len(blocks))
break
}
// If first block is Loaded, we have fetched all the transfers
if firstHeader != nil && firstHeader.Loaded {
return true, nil
}
}
return nil
return false, nil
}
func getHeadBlockNumber(parent context.Context, chainClient *chain.ClientWithFallback) (*big.Int, error) {
@ -490,28 +535,6 @@ func getHeadBlockNumber(parent context.Context, chainClient *chain.ClientWithFal
return head.Number, err
}
func uniqueHeaders(allHeaders []*DBHeader) []*DBHeader {
uniqHeadersByHash := map[common.Hash]*DBHeader{}
for _, header := range allHeaders {
uniqHeader, ok := uniqHeadersByHash[header.Hash]
if ok {
if len(header.Erc20Transfers) > 0 {
uniqHeader.Erc20Transfers = append(uniqHeader.Erc20Transfers, header.Erc20Transfers...)
}
uniqHeadersByHash[header.Hash] = uniqHeader
} else {
uniqHeadersByHash[header.Hash] = header
}
}
uniqHeaders := []*DBHeader{}
for _, header := range uniqHeadersByHash {
uniqHeaders = append(uniqHeaders, header)
}
return uniqHeaders
}
func nextRange(from *big.Int, zeroBlockNumber *big.Int) (*big.Int, *big.Int) {
log.Debug("next range start", "from", from, "zeroBlockNumber", zeroBlockNumber)

View File

@ -93,9 +93,9 @@ type Downloader interface {
// Returns new block ranges that contain transfers and found block headers that contain transfers, and a block where
// beginning of trasfers history detected
func checkRangesWithStartBlock(parent context.Context, client BalanceReader, cache BalanceCache, downloader Downloader,
account common.Address, ranges [][]*big.Int, threadLimit uint32, startBlock *big.Int) (resRanges [][]*big.Int,
headers []*DBHeader, newStartBlock *big.Int, err error) {
func checkRangesWithStartBlock(parent context.Context, client BalanceReader, cache BalanceCache,
account common.Address, ranges [][]*big.Int, threadLimit uint32, startBlock *big.Int) (
resRanges [][]*big.Int, headers []*DBHeader, newStartBlock *big.Int, err error) {
log.Debug("start checkRanges", "account", account.Hex(), "ranges len", len(ranges))
@ -208,8 +208,9 @@ func checkRangesWithStartBlock(parent context.Context, client BalanceReader, cac
return c.GetRanges(), c.GetHeaders(), newStartBlock, nil
}
func findBlocksWithEthTransfers(parent context.Context, client BalanceReader, cache BalanceCache, downloader Downloader,
account common.Address, low, high *big.Int, noLimit bool, threadLimit uint32) (from *big.Int, headers []*DBHeader, resStartBlock *big.Int, err error) {
func findBlocksWithEthTransfers(parent context.Context, client BalanceReader, cache BalanceCache,
account common.Address, low, high *big.Int, noLimit bool, threadLimit uint32) (
from *big.Int, headers []*DBHeader, resStartBlock *big.Int, err error) {
ranges := [][]*big.Int{{low, high}}
from = big.NewInt(low.Int64())
@ -222,7 +223,7 @@ func findBlocksWithEthTransfers(parent context.Context, client BalanceReader, ca
// Check if there are transfers in blocks in ranges. To do that, nonce and balance is checked
// the block ranges that have transfers are returned
newRanges, newHeaders, strtBlock, err := checkRangesWithStartBlock(parent, client, cache,
downloader, account, ranges, threadLimit, resStartBlock)
account, ranges, threadLimit, resStartBlock)
resStartBlock = strtBlock
if err != nil {
return nil, nil, nil, err

View File

@ -128,7 +128,7 @@ func TestConcurrentEthDownloader(t *testing.T) {
defer cancel()
concurrent := NewConcurrentDownloader(ctx, 0)
_, headers, _, _ := findBlocksWithEthTransfers(
ctx, tc.options.balances, newBalanceCache(), tc.options.batches,
ctx, tc.options.balances, newBalanceCache(),
common.Address{}, zero, tc.options.last, false, NoThreadLimit)
concurrent.Wait()
require.NoError(t, concurrent.Error())

View File

@ -10,6 +10,7 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/status-im/status-go/rpc/chain"
"github.com/status-im/status-go/services/wallet/async"
"github.com/status-im/status-go/services/wallet/walletevent"
)
func NewSequentialFetchStrategy(db *Database, blockDAO *BlockDAO, feed *event.Feed,
@ -41,17 +42,8 @@ type SequentialFetchStrategy struct {
func (s *SequentialFetchStrategy) newCommand(chainClient *chain.ClientWithFallback,
accounts []common.Address) async.Commander {
ctl := &loadBlocksAndTransfersCommand{
db: s.db,
chainClient: chainClient,
accounts: accounts,
blockRangeDAO: &BlockRangeSequentialDAO{s.db.client},
blockDAO: s.blockDAO,
feed: s.feed,
errorsCount: 0,
transactionManager: s.transactionManager,
}
return ctl
return newLoadBlocksAndTransfersCommand(accounts, s.db, s.blockDAO, chainClient, s.feed,
s.transactionManager)
}
func (s *SequentialFetchStrategy) start() error {
@ -63,6 +55,13 @@ func (s *SequentialFetchStrategy) start() error {
}
s.group = async.NewGroup(context.Background())
if s.feed != nil {
s.feed.Send(walletevent.Event{
Type: EventFetchingRecentHistory,
Accounts: s.accounts,
})
}
for _, chainClient := range s.chainClients {
ctl := s.newCommand(chainClient, s.accounts)
s.group.Add(ctl.Command())