Add new transfers fetching for sequential strategy (#3520)

* chore(wallet): refactor sequential transfers commands

* feat(desktop/wallet): add fetching new blocks and transfers for
transfers SequentialFetchStrategy

Updates #10246
This commit is contained in:
IvanBelyakoff 2023-05-26 10:27:48 +02:00 committed by GitHub
parent 03d9af0b95
commit dc84afb751
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 280 additions and 251 deletions

View File

@ -2,7 +2,6 @@ package transfer
import ( import (
"database/sql" "database/sql"
"fmt"
"math/big" "math/big"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
@ -10,12 +9,6 @@ import (
"github.com/status-im/status-go/services/wallet/bigint" "github.com/status-im/status-go/services/wallet/bigint"
) )
const (
firstBlockColumn = "blk_first"
lastBlockColumn = "blk_last"
startBlockColumn = "blk_start"
)
type BlockRangeSequentialDAO struct { type BlockRangeSequentialDAO struct {
db *sql.DB db *sql.DB
} }
@ -58,12 +51,12 @@ func (b *BlockRangeSequentialDAO) getBlockRange(chainID uint64, address common.A
// //
//lint:ignore U1000 Ignore unused function temporarily //lint:ignore U1000 Ignore unused function temporarily
func (b *BlockRangeSequentialDAO) deleteRange(chainID uint64, account common.Address) error { func (b *BlockRangeSequentialDAO) deleteRange(chainID uint64, account common.Address) error {
log.Info("delete blocks range", "account", account, "network", chainID) log.Debug("delete blocks range", "account", account, "network", chainID)
delete, err := b.db.Prepare(`DELETE FROM blocks_ranges_sequential delete, err := b.db.Prepare(`DELETE FROM blocks_ranges_sequential
WHERE address = ? WHERE address = ?
AND network_id = ?`) AND network_id = ?`)
if err != nil { if err != nil {
log.Info("some error", "error", err) log.Error("Failed to prepare deletion of sequential block range", "error", err)
return err return err
} }
@ -71,76 +64,54 @@ func (b *BlockRangeSequentialDAO) deleteRange(chainID uint64, account common.Add
return err return err
} }
func (b *BlockRangeSequentialDAO) updateStartBlock(chainID uint64, account common.Address, block *big.Int) (err error) {
return updateBlock(b.db, chainID, account, startBlockColumn, block)
}
//lint:ignore U1000 Ignore unused function temporarily, TODO use it when new transfers are fetched
func (b *BlockRangeSequentialDAO) updateLastBlock(chainID uint64, account common.Address, block *big.Int) (err error) {
return updateBlock(b.db, chainID, account, lastBlockColumn, block)
}
func (b *BlockRangeSequentialDAO) updateFirstBlock(chainID uint64, account common.Address, block *big.Int) (err error) {
return updateBlock(b.db, chainID, account, firstBlockColumn, block)
}
func updateBlock(creator statementCreator, chainID uint64, account common.Address,
blockColumn string, block *big.Int) (err error) {
update, err := creator.Prepare(fmt.Sprintf(`UPDATE blocks_ranges_sequential
SET %s = ?
WHERE address = ?
AND network_id = ?`, blockColumn))
if err != nil {
return err
}
_, err = update.Exec((*bigint.SQLBigInt)(block), account, chainID)
if err != nil {
return err
}
return
}
func (b *BlockRangeSequentialDAO) upsertRange(chainID uint64, account common.Address, func (b *BlockRangeSequentialDAO) upsertRange(chainID uint64, account common.Address,
start *big.Int, first *big.Int, last *big.Int) (err error) { newBlockRange *BlockRange) (err error) {
log.Info("upsert blocks range", "account", account, "network id", chainID, "start", start, "first", first, "last", last) log.Debug("upsert blocks range", "account", account, "chainID", chainID,
"start", newBlockRange.Start, "first", newBlockRange.FirstKnown, "last", newBlockRange.LastKnown)
update, err := b.db.Prepare(`UPDATE blocks_ranges_sequential
SET blk_start = ?,
blk_first = ?,
blk_last = ?
WHERE address = ?
AND network_id = ?`)
blockRange, err := b.getBlockRange(chainID, account)
if err != nil { if err != nil {
return err return err
} }
res, err := update.Exec((*bigint.SQLBigInt)(start), (*bigint.SQLBigInt)(first), (*bigint.SQLBigInt)(last), account, chainID) // Update existing range
if blockRange != nil {
if err != nil { // Ovewrite start block if there was not any or if new one is older, because it can be precised only
return err // to a greater value, because no history can be before some block that is considered
} // as a start of history, but due to concurrent block range checks, a newer greater block
affected, err := res.RowsAffected() // can be found that matches criteria of a start block (nonce is zero, balances are equal)
if err != nil { if newBlockRange.Start != nil || (blockRange.Start != nil && newBlockRange.Start != nil &&
return err blockRange.Start.Cmp(newBlockRange.Start) < 0) {
} blockRange.Start = newBlockRange.Start
if affected == 0 {
insert, err := b.db.Prepare("INSERT INTO blocks_ranges_sequential (network_id, address, blk_first, blk_last, blk_start) VALUES (?, ?, ?, ?, ?)")
if err != nil {
return err
} }
_, err = insert.Exec(chainID, account, (*bigint.SQLBigInt)(first), (*bigint.SQLBigInt)(last), (*bigint.SQLBigInt)(start)) // Overwrite first known block if there was not any or if new one is older
if err != nil { if (blockRange.FirstKnown == nil && newBlockRange.FirstKnown != nil) ||
return err (blockRange.FirstKnown != nil && newBlockRange.FirstKnown != nil && blockRange.FirstKnown.Cmp(newBlockRange.FirstKnown) > 0) {
blockRange.FirstKnown = newBlockRange.FirstKnown
} }
// Overwrite last known block if there was not any or if new one is newer
if (blockRange.LastKnown == nil && newBlockRange.LastKnown != nil) ||
(blockRange.LastKnown != nil && newBlockRange.LastKnown != nil && blockRange.LastKnown.Cmp(newBlockRange.LastKnown) < 0) {
blockRange.LastKnown = newBlockRange.LastKnown
}
log.Debug("update blocks range", "account", account, "chainID", chainID,
"start", blockRange.Start, "first", blockRange.FirstKnown, "last", blockRange.LastKnown)
} else {
blockRange = newBlockRange
} }
upsert, err := b.db.Prepare(`REPLACE INTO blocks_ranges_sequential
(network_id, address, blk_start, blk_first, blk_last) VALUES (?, ?, ?, ?, ?)`)
if err != nil {
return err
}
_, err = upsert.Exec(chainID, account, (*bigint.SQLBigInt)(blockRange.Start), (*bigint.SQLBigInt)(blockRange.FirstKnown),
(*bigint.SQLBigInt)(blockRange.LastKnown))
return return
} }

View File

@ -5,8 +5,6 @@ import (
"math/big" "math/big"
"time" "time"
"github.com/pkg/errors"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/event"
@ -16,23 +14,23 @@ import (
"github.com/status-im/status-go/services/wallet/walletevent" "github.com/status-im/status-go/services/wallet/walletevent"
) )
const (
allBlocksLoaded = "all blocks loaded"
)
// TODO NewFindBlocksCommand // TODO NewFindBlocksCommand
type findBlocksCommand struct { type findBlocksCommand struct {
account common.Address account common.Address
db *Database db *Database
blockDAO *BlockRangeSequentialDAO blockRangeDAO *BlockRangeSequentialDAO
chainClient *chain.ClientWithFallback chainClient *chain.ClientWithFallback
balanceCache *balanceCache balanceCache *balanceCache
feed *event.Feed feed *event.Feed
noLimit bool noLimit bool
error error
resFromBlock *Block
startBlockNumber *big.Int
transactionManager *TransactionManager transactionManager *TransactionManager
fromBlockNumber *big.Int
toBlockNumber *big.Int
// Not to be set by the caller
resFromBlock *Block
startBlockNumber *big.Int
error error
} }
func (c *findBlocksCommand) Command() async.Command { func (c *findBlocksCommand) Command() async.Command {
@ -43,43 +41,13 @@ func (c *findBlocksCommand) Command() async.Command {
} }
func (c *findBlocksCommand) Run(parent context.Context) (err error) { func (c *findBlocksCommand) Run(parent context.Context) (err error) {
log.Info("start findBlocksCommand", "account", c.account, "chain", c.chainClient.ChainID, "noLimit", c.noLimit) log.Debug("start findBlocksCommand", "account", c.account, "chain", c.chainClient.ChainID, "noLimit", c.noLimit)
rangeSize := big.NewInt(DefaultNodeBlockChunkSize) rangeSize := big.NewInt(DefaultNodeBlockChunkSize)
to, err := c.loadFirstKnownBlockNumber() from, to := new(big.Int).Set(c.fromBlockNumber), new(big.Int).Set(c.toBlockNumber)
log.Info("findBlocksCommand", "firstKnownBlockNumber", to, "error", err) // Limit the range size to DefaultNodeBlockChunkSize
if new(big.Int).Sub(to, from).Cmp(rangeSize) > 0 {
if err != nil {
if err.Error() != allBlocksLoaded {
c.error = err
}
return nil // We break the loop if we fetched all the blocks
}
var head *types.Header = nil
if to == nil {
ctx, cancel := context.WithTimeout(parent, 3*time.Second)
head, err = c.chainClient.HeaderByNumber(ctx, nil)
cancel()
if err != nil {
c.error = err
log.Error("findBlocksCommand failed to get head block", "error", err)
return nil
}
log.Info("current head is", "chain", c.chainClient.ChainID, "block number", head.Number)
to = new(big.Int).Set(head.Number) // deep copy
} else {
to.Sub(to, big.NewInt(1))
}
var from = big.NewInt(0)
if to.Cmp(rangeSize) > 0 {
from.Sub(to, rangeSize) from.Sub(to, rangeSize)
} }
@ -90,52 +58,46 @@ func (c *findBlocksCommand) Run(parent context.Context) (err error) {
break break
} }
// 'to' is set to 'head' if 'last' block not found in DB log.Debug("findBlocksCommand saving headers", "len", len(headers), "lastBlockNumber", to,
if head != nil && to.Cmp(head.Number) == 0 { "balance", c.balanceCache.ReadCachedBalance(c.account, to),
log.Info("upsert blockrange", "head", head.Number, "to", to, "chain", c.chainClient.ChainID, "account", c.account) "nonce", c.balanceCache.ReadCachedNonce(c.account, to))
err = c.blockDAO.upsertRange(c.chainClient.ChainID, c.account, c.startBlockNumber, err = c.db.SaveBlocks(c.chainClient.ChainID, c.account, headers)
c.resFromBlock.Number, to)
if err != nil {
c.error = err
log.Error("findBlocksCommand upsertRange", "error", err)
break
}
}
log.Info("findBlocksCommand.Run()", "headers len", len(headers), "resFromBlock", c.resFromBlock.Number)
err = c.blockDAO.updateFirstBlock(c.chainClient.ChainID, c.account, c.resFromBlock.Number)
if err != nil { if err != nil {
c.error = err c.error = err
log.Error("findBlocksCommand failed to update first block", "error", err) // return err
break break
} }
if c.startBlockNumber.Cmp(big.NewInt(0)) > 0 { err = c.upsertBlockRange(&BlockRange{c.startBlockNumber, c.resFromBlock.Number, to})
err = c.blockDAO.updateStartBlock(c.chainClient.ChainID, c.account, c.startBlockNumber) if err != nil {
if err != nil { break
c.error = err
log.Error("findBlocksCommand failed to update start block", "error", err)
break
}
} }
// Assign new range from, to = nextRange(c.resFromBlock.Number, c.fromBlockNumber)
to.Sub(from, big.NewInt(1)) // it won't hit the cache, but we wont load the transfers twice
if to.Cmp(rangeSize) > 0 {
from.Sub(to, rangeSize)
} else {
from = big.NewInt(0)
}
if to.Cmp(big.NewInt(0)) <= 0 || (c.startBlockNumber != nil && if to.Cmp(c.fromBlockNumber) <= 0 || (c.startBlockNumber != nil &&
c.startBlockNumber.Cmp(big.NewInt(0)) > 0 && to.Cmp(c.startBlockNumber) <= 0) { c.startBlockNumber.Cmp(big.NewInt(0)) > 0 && to.Cmp(c.startBlockNumber) <= 0) {
log.Info("Start block has been found, stop execution", "startBlock", c.startBlockNumber, "to", to) log.Debug("Checked all ranges, stop execution", "startBlock", c.startBlockNumber, "from", from, "to", to)
break break
} }
} }
log.Info("end findBlocksCommand", "account", c.account, "chain", c.chainClient.ChainID, "noLimit", c.noLimit) log.Debug("end findBlocksCommand", "account", c.account, "chain", c.chainClient.ChainID, "noLimit", c.noLimit)
return nil
}
func (c *findBlocksCommand) upsertBlockRange(blockRange *BlockRange) error {
log.Debug("upsert block range", "Start", blockRange.Start, "FirstKnown", blockRange.FirstKnown, "LastKnown", blockRange.LastKnown,
"chain", c.chainClient.ChainID, "account", c.account)
err := c.blockRangeDAO.upsertRange(c.chainClient.ChainID, c.account, blockRange)
if err != nil {
c.error = err
log.Error("findBlocksCommand upsertRange", "error", err)
return err
}
return nil return nil
} }
@ -147,95 +109,66 @@ func (c *findBlocksCommand) checkRange(parent context.Context, from *big.Int, to
newFromBlock, ethHeaders, startBlock, err := c.fastIndex(parent, c.balanceCache, fromBlock, to) newFromBlock, ethHeaders, startBlock, err := c.fastIndex(parent, c.balanceCache, fromBlock, to)
if err != nil { if err != nil {
log.Info("findBlocksCommand checkRange fastIndex", "err", err) log.Error("findBlocksCommand checkRange fastIndex", "err", err)
c.error = err c.error = err
// return err // In case c.noLimit is true, hystrix "max concurrency" may be reached and we will not be able to index ETH transfers // return err // In case c.noLimit is true, hystrix "max concurrency" may be reached and we will not be able to index ETH transfers
return nil, nil return nil, nil
} }
log.Info("findBlocksCommand checkRange", "startBlock", startBlock, "newFromBlock", newFromBlock.Number, "toBlockNumber", to, "noLimit", c.noLimit) log.Debug("findBlocksCommand checkRange", "startBlock", startBlock, "newFromBlock", newFromBlock.Number, "toBlockNumber", to, "noLimit", c.noLimit)
// TODO There should be transfers when either when we have found headers // There should be transfers when either when we have found headers
// or when newFromBlock is different from fromBlock, but if I check for // or newFromBlock is different from fromBlock
// ERC20 transfers only when there are ETH transfers, I will miss ERC20 transfers if len(ethHeaders) > 0 || newFromBlock.Number.Cmp(fromBlock.Number) != 0 {
erc20Headers, err := c.fastIndexErc20(parent, newFromBlock.Number, to)
// if len(ethHeaders) > 0 || newFromBlock.Number.Cmp(fromBlock.Number) != 0 { // there is transaction history for this account
erc20Headers, err := c.fastIndexErc20(parent, newFromBlock.Number, to)
if err != nil {
log.Info("findBlocksCommand checkRange fastIndexErc20", "err", err)
c.error = err
// return err
return nil, nil
}
allHeaders := append(ethHeaders, erc20Headers...)
if len(allHeaders) > 0 {
uniqHeadersByHash := map[common.Hash]*DBHeader{}
for _, header := range allHeaders {
uniqHeader, ok := uniqHeadersByHash[header.Hash]
if ok {
if len(header.Erc20Transfers) > 0 {
uniqHeader.Erc20Transfers = append(uniqHeader.Erc20Transfers, header.Erc20Transfers...)
}
uniqHeadersByHash[header.Hash] = uniqHeader
} else {
uniqHeadersByHash[header.Hash] = header
}
}
uniqHeaders := []*DBHeader{}
for _, header := range uniqHeadersByHash {
uniqHeaders = append(uniqHeaders, header)
}
foundHeaders = uniqHeaders
log.Info("saving headers", "len", len(uniqHeaders), "lastBlockNumber", to,
"balance", c.balanceCache.ReadCachedBalance(c.account, to),
"nonce", c.balanceCache.ReadCachedNonce(c.account, to))
err = c.db.SaveBlocks(c.chainClient.ChainID, c.account, uniqHeaders)
if err != nil { if err != nil {
log.Error("findBlocksCommand checkRange fastIndexErc20", "err", err)
c.error = err c.error = err
// return err // return err
return nil, nil return nil, nil
} }
allHeaders := append(ethHeaders, erc20Headers...)
if len(allHeaders) > 0 {
foundHeaders = uniqueHeaders(allHeaders)
}
} }
// }
c.resFromBlock = newFromBlock c.resFromBlock = newFromBlock
c.startBlockNumber = startBlock c.startBlockNumber = startBlock
log.Info("end findBlocksCommand checkRange", "c.startBlock", c.startBlockNumber, "newFromBlock", newFromBlock.Number, log.Debug("end findBlocksCommand checkRange", "c.startBlock", c.startBlockNumber, "newFromBlock", newFromBlock.Number,
"toBlockNumber", to, "c.resFromBlock", c.resFromBlock.Number) "toBlockNumber", to, "c.resFromBlock", c.resFromBlock.Number)
return return
} }
func (c *findBlocksCommand) loadFirstKnownBlockNumber() (*big.Int, error) { func loadBlockRangeInfo(chainID uint64, account common.Address, blockDAO *BlockRangeSequentialDAO) (
blockInfo, err := c.blockDAO.getBlockRange(c.chainClient.ChainID, c.account) *BlockRange, error) {
blockRange, err := blockDAO.getBlockRange(chainID, account)
if err != nil { if err != nil {
log.Error("failed to load block ranges from database", "chain", c.chainClient.ChainID, "account", c.account, "error", err) log.Error("failed to load block ranges from database", "chain", chainID, "account", account,
"error", err)
return nil, err return nil, err
} }
if blockInfo != nil { return blockRange, nil
log.Info("blockInfo for", "address", c.account, "chain", c.chainClient.ChainID, "Start", }
blockInfo.Start, "FirstKnown", blockInfo.FirstKnown, "LastKnown", blockInfo.LastKnown)
// Check if we have fetched all blocks for this account // Returns if all the blocks prior to first known block are loaded, not considering
if blockInfo.FirstKnown != nil && blockInfo.Start != nil && blockInfo.Start.Cmp(blockInfo.FirstKnown) >= 0 { func areAllHistoryBlocksLoaded(blockInfo *BlockRange) bool {
log.Info("all blocks fetched", "chain", c.chainClient.ChainID, "account", c.account) if blockInfo == nil {
return blockInfo.FirstKnown, errors.New(allBlocksLoaded) return false
}
return blockInfo.FirstKnown, nil
} }
log.Info("no blockInfo for", "address", c.account, "chain", c.chainClient.ChainID) if blockInfo.FirstKnown != nil && blockInfo.Start != nil &&
blockInfo.Start.Cmp(blockInfo.FirstKnown) >= 0 {
return nil, nil return true
}
return false
} }
// run fast indexing for every accont up to canonical chain head minus safety depth. // run fast indexing for every accont up to canonical chain head minus safety depth.
@ -244,7 +177,7 @@ func (c *findBlocksCommand) fastIndex(ctx context.Context, bCache *balanceCache,
fromBlock *Block, toBlockNumber *big.Int) (resultingFrom *Block, headers []*DBHeader, fromBlock *Block, toBlockNumber *big.Int) (resultingFrom *Block, headers []*DBHeader,
startBlock *big.Int, err error) { startBlock *big.Int, err error) {
log.Info("fast index started", "accounts", c.account, "from", fromBlock.Number, "to", toBlockNumber) log.Debug("fast index started", "accounts", c.account, "from", fromBlock.Number, "to", toBlockNumber)
start := time.Now() start := time.Now()
group := async.NewGroup(ctx) group := async.NewGroup(ctx)
@ -280,7 +213,7 @@ func (c *findBlocksCommand) fastIndex(ctx context.Context, bCache *balanceCache,
resultingFrom = &Block{Number: command.resultingFrom} resultingFrom = &Block{Number: command.resultingFrom}
headers = command.foundHeaders headers = command.foundHeaders
startBlock = command.startBlock startBlock = command.startBlock
log.Info("fast indexer finished", "in", time.Since(start), "startBlock", command.startBlock, "resultingFrom", resultingFrom.Number, "headers", len(headers)) log.Debug("fast indexer finished", "in", time.Since(start), "startBlock", command.startBlock, "resultingFrom", resultingFrom.Number, "headers", len(headers))
return return
} }
} }
@ -309,7 +242,7 @@ func (c *findBlocksCommand) fastIndexErc20(ctx context.Context, fromBlockNumber
return nil, ctx.Err() return nil, ctx.Err()
case <-group.WaitAsync(): case <-group.WaitAsync():
headers := erc20.foundHeaders headers := erc20.foundHeaders
log.Info("fast indexer Erc20 finished", "in", time.Since(start), "headers", len(headers)) log.Debug("fast indexer Erc20 finished", "in", time.Since(start), "headers", len(headers))
return headers, nil return headers, nil
} }
} }
@ -352,7 +285,7 @@ type loadBlocksAndTransfersCommand struct {
} }
func (c *loadBlocksAndTransfersCommand) Run(parent context.Context) error { func (c *loadBlocksAndTransfersCommand) Run(parent context.Context) error {
log.Info("start load all transfers command", "chain", c.chainClient.ChainID) log.Debug("start load all transfers command", "chain", c.chainClient.ChainID)
ctx := parent ctx := parent
@ -369,22 +302,100 @@ func (c *loadBlocksAndTransfersCommand) Run(parent context.Context) error {
group := async.NewGroup(ctx) group := async.NewGroup(ctx)
for _, address := range c.accounts { headNum, err := getHeadBlockNumber(parent, c.chainClient)
log.Info("start findBlocks command", "chain", c.chainClient.ChainID) if err != nil {
// c.error = err
fbc := &findBlocksCommand{ return err // Might need to retry a couple of times
account: address,
db: c.db,
blockDAO: c.blockRangeDAO,
chainClient: c.chainClient,
balanceCache: c.balanceCache,
feed: c.feed,
noLimit: false,
transactionManager: c.transactionManager,
}
group.Add(fbc.Command())
} }
for _, address := range c.accounts {
blockRange, err := loadBlockRangeInfo(c.chainClient.ChainID, address, c.blockRangeDAO)
if err != nil {
log.Error("findBlocksCommand loadBlockRangeInfo", "error", err)
// c.error = err
return err // Will keep spinning forever nomatter what
}
allHistoryLoaded := areAllHistoryBlocksLoaded(blockRange)
toHistoryBlockNum := getToHistoryBlockNumber(headNum, blockRange, allHistoryLoaded)
if !allHistoryLoaded {
c.fetchHistoryBlocks(ctx, group, address, blockRange, toHistoryBlockNum, headNum)
}
// If no block ranges are stored, all blocks will be fetched by startFetchingHistoryBlocks method
if blockRange != nil {
c.fetchNewBlocks(ctx, group, address, blockRange, headNum)
}
}
c.fetchTransfers(ctx, group)
select {
case <-ctx.Done():
return ctx.Err()
case <-group.WaitAsync():
log.Debug("end load all transfers command", "chain", c.chainClient.ChainID)
return nil
}
}
func (c *loadBlocksAndTransfersCommand) Command() async.Command {
return async.InfiniteCommand{
Interval: 13 * time.Second, // Slightly more that block mining time
Runable: c.Run,
}.Run
}
func (c *loadBlocksAndTransfersCommand) fetchHistoryBlocks(ctx context.Context, group *async.Group,
address common.Address, blockRange *BlockRange, toHistoryBlockNum *big.Int, headNum *big.Int) {
log.Info("Launching history command")
fbc := &findBlocksCommand{
account: address,
db: c.db,
blockRangeDAO: c.blockRangeDAO,
chainClient: c.chainClient,
balanceCache: c.balanceCache,
feed: c.feed,
noLimit: false,
fromBlockNumber: big.NewInt(0), // Beginning of the chain history
toBlockNumber: toHistoryBlockNum,
transactionManager: c.transactionManager,
}
group.Add(fbc.Command())
}
func (c *loadBlocksAndTransfersCommand) fetchNewBlocks(ctx context.Context, group *async.Group,
address common.Address, blockRange *BlockRange, headNum *big.Int) {
log.Info("Launching new blocks command")
fromBlockNumber := new(big.Int).Add(blockRange.LastKnown, big.NewInt(1))
// In case interval between checks is set smaller than block mining time,
// we might need to wait for the next block to be mined
if fromBlockNumber.Cmp(headNum) > 0 {
return
}
newBlocksCmd := &findBlocksCommand{
account: address,
db: c.db,
blockRangeDAO: c.blockRangeDAO,
chainClient: c.chainClient,
balanceCache: c.balanceCache,
feed: c.feed,
noLimit: false,
fromBlockNumber: fromBlockNumber,
toBlockNumber: headNum,
transactionManager: c.transactionManager,
}
group.Add(newBlocksCmd.Command())
}
func (c *loadBlocksAndTransfersCommand) fetchTransfers(ctx context.Context, group *async.Group) {
txCommand := &loadAllTransfersCommand{ txCommand := &loadAllTransfersCommand{
accounts: c.accounts, accounts: c.accounts,
db: c.db, db: c.db,
@ -395,28 +406,13 @@ func (c *loadBlocksAndTransfersCommand) Run(parent context.Context) error {
} }
group.Add(txCommand.Command()) group.Add(txCommand.Command())
select {
case <-ctx.Done():
return ctx.Err()
case <-group.WaitAsync():
log.Info("end load all transfers command", "chain", c.chainClient.ChainID)
return nil
}
}
func (c *loadBlocksAndTransfersCommand) Command() async.Command {
return async.InfiniteCommand{
Interval: 5 * time.Second,
Runable: c.Run,
}.Run
} }
func loadTransfersLoop(ctx context.Context, accounts []common.Address, blockDAO *BlockDAO, db *Database, func loadTransfersLoop(ctx context.Context, accounts []common.Address, blockDAO *BlockDAO, db *Database,
chainClient *chain.ClientWithFallback, blocksLimitPerAccount int, blocksByAddress map[common.Address][]*big.Int, chainClient *chain.ClientWithFallback, blocksLimitPerAccount int, blocksByAddress map[common.Address][]*big.Int,
transactionManager *TransactionManager) error { transactionManager *TransactionManager) error {
log.Info("loadTransfers start", "accounts", accounts, "chain", chainClient.ChainID, "limit", blocksLimitPerAccount) log.Debug("loadTransfers start", "accounts", accounts, "chain", chainClient.ChainID, "limit", blocksLimitPerAccount)
start := time.Now() start := time.Now()
group := async.NewGroup(ctx) group := async.NewGroup(ctx)
@ -469,14 +465,12 @@ func loadTransfersLoop(ctx context.Context, accounts []common.Address, blockDAO
transfers = append(transfers, command.fetchedTransfers...) transfers = append(transfers, command.fetchedTransfers...)
} }
log.Info("loadTransfers finished for account", "address", address, "in", time.Since(start), "chain", chainClient.ChainID, "transfers", len(transfers), "limit", blocksLimitPerAccount) log.Debug("loadTransfers finished for account", "address", address, "in", time.Since(start), "chain", chainClient.ChainID, "transfers", len(transfers), "limit", blocksLimitPerAccount)
} }
log.Info("loadTransfers after select", "chain", chainClient.ChainID, "address", address, "blocks.len", len(blocks))
if ok || len(blocks) == 0 || if ok || len(blocks) == 0 ||
(blocksLimitPerAccount > noBlockLimit && len(blocks) >= blocksLimitPerAccount) { (blocksLimitPerAccount > noBlockLimit && len(blocks) >= blocksLimitPerAccount) {
log.Info("loadTransfers breaking loop on block limits reached or 0 blocks", "chain", chainClient.ChainID, "address", address, "limit", blocksLimitPerAccount, "blocks", len(blocks)) log.Debug("loadTransfers breaking loop on block limits reached or 0 blocks", "chain", chainClient.ChainID, "address", address, "limit", blocksLimitPerAccount, "blocks", len(blocks))
break break
} }
} }
@ -484,3 +478,66 @@ func loadTransfersLoop(ctx context.Context, accounts []common.Address, blockDAO
return nil return nil
} }
func getHeadBlockNumber(parent context.Context, chainClient *chain.ClientWithFallback) (*big.Int, error) {
ctx, cancel := context.WithTimeout(parent, 3*time.Second)
head, err := chainClient.HeaderByNumber(ctx, nil)
cancel()
if err != nil {
return nil, err
}
return head.Number, err
}
func uniqueHeaders(allHeaders []*DBHeader) []*DBHeader {
uniqHeadersByHash := map[common.Hash]*DBHeader{}
for _, header := range allHeaders {
uniqHeader, ok := uniqHeadersByHash[header.Hash]
if ok {
if len(header.Erc20Transfers) > 0 {
uniqHeader.Erc20Transfers = append(uniqHeader.Erc20Transfers, header.Erc20Transfers...)
}
uniqHeadersByHash[header.Hash] = uniqHeader
} else {
uniqHeadersByHash[header.Hash] = header
}
}
uniqHeaders := []*DBHeader{}
for _, header := range uniqHeadersByHash {
uniqHeaders = append(uniqHeaders, header)
}
return uniqHeaders
}
func nextRange(from *big.Int, zeroBlockNumber *big.Int) (*big.Int, *big.Int) {
log.Debug("next range start", "from", from, "zeroBlockNumber", zeroBlockNumber)
rangeSize := big.NewInt(DefaultNodeBlockChunkSize)
to := new(big.Int).Sub(from, big.NewInt(1)) // it won't hit the cache, but we wont load the transfers twice
if to.Cmp(rangeSize) > 0 {
from.Sub(to, rangeSize)
} else {
from = new(big.Int).Set(zeroBlockNumber)
}
log.Debug("next range end", "from", from, "to", to, "zeroBlockNumber", zeroBlockNumber)
return from, to
}
func getToHistoryBlockNumber(headNum *big.Int, blockRange *BlockRange, allHistoryLoaded bool) *big.Int {
var toBlockNum *big.Int
if blockRange != nil {
if !allHistoryLoaded {
toBlockNum = blockRange.FirstKnown
}
} else {
toBlockNum = headNum
}
return toBlockNum
}

View File

@ -149,12 +149,14 @@ func checkRangesWithStartBlock(parent context.Context, client BalanceReader, cac
if *hn == 0 { if *hn == 0 {
log.Debug("zero nonce", "to", to) log.Debug("zero nonce", "to", to)
if startBlock != nil { if hb.Cmp(big.NewInt(0)) == 0 { // balance is 0, nonce is 0, we stop checking further, that will be the start block (even though the real one can be a later one)
if hb.Cmp(big.NewInt(0)) == 0 { // balance is 0, nonce is 0, we stop checking further, that will be the start block (even though the real one can be a later one) if startBlock != nil {
if to.Cmp(newStartBlock) > 0 { if to.Cmp(newStartBlock) > 0 {
log.Debug("found possible start block, we should not search back", "block", to) log.Debug("found possible start block, we should not search back", "block", to)
newStartBlock = to // increase newStartBlock if we found a new higher block newStartBlock = to // increase newStartBlock if we found a new higher block
} }
} else {
newStartBlock = to
} }
} }
@ -210,13 +212,12 @@ func findBlocksWithEthTransfers(parent context.Context, client BalanceReader, ca
account common.Address, low, high *big.Int, noLimit bool, threadLimit uint32) (from *big.Int, headers []*DBHeader, resStartBlock *big.Int, err error) { account common.Address, low, high *big.Int, noLimit bool, threadLimit uint32) (from *big.Int, headers []*DBHeader, resStartBlock *big.Int, err error) {
ranges := [][]*big.Int{{low, high}} ranges := [][]*big.Int{{low, high}}
minBlock := big.NewInt(low.Int64()) from = big.NewInt(low.Int64())
headers = []*DBHeader{} headers = []*DBHeader{}
var lvl = 1 var lvl = 1
resStartBlock = big.NewInt(0)
for len(ranges) > 0 && lvl <= 30 { for len(ranges) > 0 && lvl <= 30 {
log.Info("check blocks ranges", "lvl", lvl, "ranges len", len(ranges)) log.Debug("check blocks ranges", "lvl", lvl, "ranges len", len(ranges))
lvl++ lvl++
// Check if there are transfers in blocks in ranges. To do that, nonce and balance is checked // Check if there are transfers in blocks in ranges. To do that, nonce and balance is checked
// the block ranges that have transfers are returned // the block ranges that have transfers are returned
@ -230,7 +231,7 @@ func findBlocksWithEthTransfers(parent context.Context, client BalanceReader, ca
headers = append(headers, newHeaders...) headers = append(headers, newHeaders...)
if len(newRanges) > 0 { if len(newRanges) > 0 {
log.Info("found new ranges", "account", account, "lvl", lvl, "new ranges len", len(newRanges)) log.Debug("found new ranges", "account", account, "lvl", lvl, "new ranges len", len(newRanges))
} }
if len(newRanges) > 60 && !noLimit { if len(newRanges) > 60 && !noLimit {
sort.SliceStable(newRanges, func(i, j int) bool { sort.SliceStable(newRanges, func(i, j int) bool {
@ -238,11 +239,11 @@ func findBlocksWithEthTransfers(parent context.Context, client BalanceReader, ca
}) })
newRanges = newRanges[:60] newRanges = newRanges[:60]
minBlock = newRanges[len(newRanges)-1][0] from = newRanges[len(newRanges)-1][0]
} }
ranges = newRanges ranges = newRanges
} }
return minBlock, headers, resStartBlock, err return
} }