feat(wallet): Improved performance of fetching tranfers for Sequential

fetch strategy:
Before:
 - block fetching commands for different accounts were in the same wait
group, making them dependent on each iteration.
 - transfers loading command was checking database for new unloaded
blocks on timeout and was in the same wait group with block fetching, so
it was often blocked until all block fetching commands finish for
iteration.
Now:
 - block fetching commands run independently for each account
 - transfers fetching command is run once on startup for unloaded blocks
from DB
 - fetch history blocks commands are launched once on startup for
accounts with no full history loaded
 - transfers are loaded on each iteration of block range check
without waiting for all ranges to be checked
This commit is contained in:
Ivan Belyakov 2023-06-14 12:00:56 +02:00 committed by IvanBelyakoff
parent 2b02968819
commit 30af25198e
4 changed files with 246 additions and 193 deletions

View File

@ -137,7 +137,7 @@ func getErc20BatchSize(chainID uint64) *big.Int {
}
func (c *erc20HistoricalCommand) Run(ctx context.Context) (err error) {
log.Info("wallet historical downloader for erc20 transfers start", "address", c.address,
log.Info("wallet historical downloader for erc20 transfers start", "chainID", c.chainClient.ChainID, "address", c.address,
"from", c.from, "to", c.to)
start := time.Now()
@ -158,8 +158,8 @@ func (c *erc20HistoricalCommand) Run(ctx context.Context) (err error) {
}
c.foundHeaders = append(c.foundHeaders, headers...)
}
log.Info("wallet historical downloader for erc20 transfers finished", "address", c.address,
"from", c.from, "to", c.to, "time", time.Since(start))
log.Info("wallet historical downloader for erc20 transfers finished", "chainID", c.chainClient.ChainID, "address", c.address,
"from", c.from, "to", c.to, "time", time.Since(start), "headers", len(c.foundHeaders))
return nil
}
@ -181,8 +181,8 @@ type controlCommand struct {
tokenManager *token.Manager
}
func (c *controlCommand) LoadTransfers(ctx context.Context, limit int) (map[common.Address][]Transfer, error) {
return loadTransfers(ctx, c.accounts, c.blockDAO, c.db, c.chainClient, limit, make(map[common.Address][]*big.Int), c.transactionManager, c.tokenManager)
func (c *controlCommand) LoadTransfers(ctx context.Context, limit int) error {
return loadTransfers(ctx, c.accounts, c.blockDAO, c.db, c.chainClient, limit, make(map[common.Address][]*big.Int), c.transactionManager, c.tokenManager, c.feed)
}
func (c *controlCommand) Run(parent context.Context) error {
@ -274,7 +274,7 @@ func (c *controlCommand) Run(parent context.Context) error {
return cmnd.error
}
_, err = c.LoadTransfers(parent, numberOfBlocksCheckedPerIteration)
err = c.LoadTransfers(parent, numberOfBlocksCheckedPerIteration)
if err != nil {
if c.NewError(err) {
return nil
@ -321,9 +321,9 @@ func nonArchivalNodeError(err error) bool {
func (c *controlCommand) NewError(err error) bool {
c.errorsCount++
log.Error("controlCommand error", "error", err, "counter", c.errorsCount)
log.Error("controlCommand error", "chainID", c.chainClient.ChainID, "error", err, "counter", c.errorsCount)
if nonArchivalNodeError(err) {
log.Info("Non archival node detected")
log.Info("Non archival node detected", "chainID", c.chainClient.ChainID)
c.nonArchivalRPCNode = true
c.feed.Send(walletevent.Event{
Type: EventNonArchivalNodeDetected,
@ -524,15 +524,15 @@ func (c *transfersCommand) notifyOfNewTransfers(transfers []Transfer) {
}
type loadTransfersCommand struct {
accounts []common.Address
db *Database
blockDAO *BlockDAO
chainClient *chain.ClientWithFallback
blocksByAddress map[common.Address][]*big.Int
foundTransfersByAddress map[common.Address][]Transfer
transactionManager *TransactionManager
blocksLimit int
tokenManager *token.Manager
accounts []common.Address
db *Database
blockDAO *BlockDAO
chainClient *chain.ClientWithFallback
blocksByAddress map[common.Address][]*big.Int
transactionManager *TransactionManager
blocksLimit int
tokenManager *token.Manager
feed *event.Feed
}
func (c *loadTransfersCommand) Command() async.Command {
@ -542,17 +542,12 @@ func (c *loadTransfersCommand) Command() async.Command {
}.Run
}
func (c *loadTransfersCommand) LoadTransfers(ctx context.Context, limit int, blocksByAddress map[common.Address][]*big.Int) (map[common.Address][]Transfer, error) {
return loadTransfers(ctx, c.accounts, c.blockDAO, c.db, c.chainClient, limit, blocksByAddress, c.transactionManager, c.tokenManager)
func (c *loadTransfersCommand) LoadTransfers(ctx context.Context, limit int, blocksByAddress map[common.Address][]*big.Int) error {
return loadTransfers(ctx, c.accounts, c.blockDAO, c.db, c.chainClient, limit, blocksByAddress, c.transactionManager, c.tokenManager, c.feed)
}
func (c *loadTransfersCommand) Run(parent context.Context) (err error) {
transfersByAddress, err := c.LoadTransfers(parent, c.blocksLimit, c.blocksByAddress)
if err != nil {
return err
}
c.foundTransfersByAddress = transfersByAddress
err = c.LoadTransfers(parent, c.blocksLimit, c.blocksByAddress)
return
}
@ -603,6 +598,8 @@ func (c *findAndCheckBlockRangeCommand) Run(parent context.Context) error {
erc20Headers := erc20HeadersByAddress[address]
allHeaders := append(ethHeaders, erc20Headers...)
log.Debug("allHeaders found for account", "address", address, "allHeaders.len", len(allHeaders))
// Ensure only 1 DBHeader per block hash.
uniqHeaders := []*DBHeader{}
if len(allHeaders) > 0 {
@ -627,6 +624,7 @@ func (c *findAndCheckBlockRangeCommand) Run(parent context.Context) error {
Balance: c.balanceCache.ReadCachedBalance(address, lastBlockNumber),
Nonce: c.balanceCache.ReadCachedNonce(address, lastBlockNumber),
}
log.Debug("uniqHeaders found for account", "address", address, "uniqHeaders.len", len(uniqHeaders))
err = c.db.ProcessBlocks(c.chainClient.ChainID, address, newFromByAddress[address], to, uniqHeaders)
if err != nil {
return err
@ -720,13 +718,12 @@ func (c *findAndCheckBlockRangeCommand) fastIndexErc20(ctx context.Context, from
func loadTransfers(ctx context.Context, accounts []common.Address, blockDAO *BlockDAO, db *Database,
chainClient *chain.ClientWithFallback, blocksLimitPerAccount int, blocksByAddress map[common.Address][]*big.Int,
transactionManager *TransactionManager, tokenManager *token.Manager) (map[common.Address][]Transfer, error) {
transactionManager *TransactionManager, tokenManager *token.Manager, feed *event.Feed) error {
log.Info("loadTransfers start", "accounts", accounts, "chain", chainClient.ChainID, "limit", blocksLimitPerAccount)
start := time.Now()
group := async.NewGroup(ctx)
commands := []*transfersCommand{}
for _, address := range accounts {
transfers := &transfersCommand{
db: db,
@ -742,25 +739,17 @@ func loadTransfers(ctx context.Context, accounts []common.Address, blockDAO *Blo
blockNums: blocksByAddress[address],
transactionManager: transactionManager,
tokenManager: tokenManager,
feed: feed,
}
commands = append(commands, transfers)
group.Add(transfers.Command())
}
select {
case <-ctx.Done():
return nil, ctx.Err()
return ctx.Err()
case <-group.WaitAsync():
transfersByAddress := map[common.Address][]Transfer{}
for _, command := range commands {
if len(command.fetchedTransfers) == 0 {
continue
}
transfersByAddress[command.address] = append(transfersByAddress[command.address], command.fetchedTransfers...)
}
log.Info("loadTransfers finished for account", "in", time.Since(start), "chain", chainClient.ChainID)
return transfersByAddress, nil
return nil
}
}

View File

@ -15,6 +15,53 @@ import (
"github.com/status-im/status-go/services/wallet/walletevent"
)
type findNewBlocksCommand struct {
*findBlocksCommand
}
func (c *findNewBlocksCommand) Command() async.Command {
return async.InfiniteCommand{
Interval: 13 * time.Second, // TODO - make it configurable based on chain block mining time
Runable: c.Run,
}.Run
}
func (c *findNewBlocksCommand) Run(parent context.Context) (err error) {
log.Debug("start findNewBlocksCommand", "account", c.account, "chain", c.chainClient.ChainID, "noLimit", c.noLimit)
headNum, err := getHeadBlockNumber(parent, c.chainClient)
if err != nil {
// c.error = err
return err // Might need to retry a couple of times
}
blockRange, err := loadBlockRangeInfo(c.chainClient.ChainID, c.account, c.blockRangeDAO)
if err != nil {
log.Error("findBlocksCommand loadBlockRangeInfo", "error", err)
// c.error = err
return err // Will keep spinning forever nomatter what
}
if blockRange != nil {
c.fromBlockNumber = new(big.Int).Add(blockRange.LastKnown, big.NewInt(1))
log.Debug("Launching new blocks command", "chainID", c.chainClient.ChainID, "account", c.account,
"from", c.fromBlockNumber, "headNum", headNum)
// In case interval between checks is set smaller than block mining time,
// we might need to wait for the next block to be mined
if c.fromBlockNumber.Cmp(headNum) > 0 {
return
}
c.toBlockNumber = headNum
_ = c.findBlocksCommand.Run(parent)
}
return nil
}
// TODO NewFindBlocksCommand
type findBlocksCommand struct {
account common.Address
@ -27,6 +74,7 @@ type findBlocksCommand struct {
transactionManager *TransactionManager
fromBlockNumber *big.Int
toBlockNumber *big.Int
blocksLoadedCh chan<- []*DBHeader
// Not to be set by the caller
resFromBlock *Block
@ -72,6 +120,8 @@ func (c *findBlocksCommand) Run(parent context.Context) (err error) {
// return err
break
}
c.blocksFound(headers)
}
err = c.upsertBlockRange(&BlockRange{c.startBlockNumber, c.resFromBlock.Number, to})
@ -93,6 +143,10 @@ func (c *findBlocksCommand) Run(parent context.Context) (err error) {
return nil
}
func (c *findBlocksCommand) blocksFound(headers []*DBHeader) {
c.blocksLoadedCh <- headers
}
func (c *findBlocksCommand) upsertBlockRange(blockRange *BlockRange) error {
log.Debug("upsert block range", "Start", blockRange.Start, "FirstKnown", blockRange.FirstKnown, "LastKnown", blockRange.LastKnown,
"chain", c.chainClient.ChainID, "account", c.account)
@ -120,13 +174,14 @@ func (c *findBlocksCommand) checkRange(parent context.Context, from *big.Int, to
// return err // In case c.noLimit is true, hystrix "max concurrency" may be reached and we will not be able to index ETH transfers
return nil, nil
}
log.Debug("findBlocksCommand checkRange", "startBlock", startBlock, "newFromBlock", newFromBlock.Number, "toBlockNumber", to, "noLimit", c.noLimit)
log.Debug("findBlocksCommand checkRange", "chainID", c.chainClient.ChainID, "account", c.account,
"startBlock", startBlock, "newFromBlock", newFromBlock.Number, "toBlockNumber", to, "noLimit", c.noLimit)
// There could be incoming ERC20 transfers which don't change the balance
// and nonce of ETH account, so we keep looking for them
erc20Headers, err := c.fastIndexErc20(parent, newFromBlock.Number, to)
if err != nil {
log.Error("findBlocksCommand checkRange fastIndexErc20", "err", err)
log.Error("findBlocksCommand checkRange fastIndexErc20", "err", err, "account", c.account, "chain", c.chainClient.ChainID)
c.error = err
// return err
return nil, nil
@ -141,7 +196,8 @@ func (c *findBlocksCommand) checkRange(parent context.Context, from *big.Int, to
c.resFromBlock = newFromBlock
c.startBlockNumber = startBlock
log.Debug("end findBlocksCommand checkRange", "c.startBlock", c.startBlockNumber, "newFromBlock", newFromBlock.Number,
log.Debug("end findBlocksCommand checkRange", "chainID", c.chainClient.ChainID, "account", c.account,
"c.startBlock", c.startBlockNumber, "newFromBlock", newFromBlock.Number,
"toBlockNumber", to, "c.resFromBlock", c.resFromBlock.Number)
return
@ -194,7 +250,8 @@ func (c *findBlocksCommand) fastIndex(ctx context.Context, bCache *balanceCache,
fromBlock *Block, toBlockNumber *big.Int) (resultingFrom *Block, headers []*DBHeader,
startBlock *big.Int, err error) {
log.Debug("fast index started", "accounts", c.account, "from", fromBlock.Number, "to", toBlockNumber)
log.Debug("fast index started", "chainID", c.chainClient.ChainID, "account", c.account,
"from", fromBlock.Number, "to", toBlockNumber)
start := time.Now()
group := async.NewGroup(ctx)
@ -224,7 +281,8 @@ func (c *findBlocksCommand) fastIndex(ctx context.Context, bCache *balanceCache,
resultingFrom = &Block{Number: command.resultingFrom}
headers = command.foundHeaders
startBlock = command.startBlock
log.Debug("fast indexer finished", "in", time.Since(start), "startBlock", command.startBlock, "resultingFrom", resultingFrom.Number, "headers", len(headers))
log.Debug("fast indexer finished", "chainID", c.chainClient.ChainID, "account", c.account, "in", time.Since(start),
"startBlock", command.startBlock, "resultingFrom", resultingFrom.Number, "headers", len(headers))
return
}
}
@ -253,75 +311,44 @@ func (c *findBlocksCommand) fastIndexErc20(ctx context.Context, fromBlockNumber
return nil, ctx.Err()
case <-group.WaitAsync():
headers := erc20.foundHeaders
log.Debug("fast indexer Erc20 finished", "in", time.Since(start), "headers", len(headers))
log.Debug("fast indexer Erc20 finished", "chainID", c.chainClient.ChainID, "account", c.account,
"in", time.Since(start), "headers", len(headers))
return headers, nil
}
}
// TODO Think on how to reuse loadTransfersCommand, as it shares many members and some methods
// but does not need to return the transfers but only save them to DB, as there can be too many of them
// and the logic of `loadTransfersLoop` is different from `loadTransfers“
type loadAllTransfersCommand struct {
accounts []common.Address
db *Database
blockDAO *BlockDAO
chainClient *chain.ClientWithFallback
blocksByAddress map[common.Address][]*big.Int
transactionManager *TransactionManager
tokenManager *token.Manager
blocksLimit int
feed *event.Feed
}
func loadTransfersLoop(ctx context.Context, account common.Address, blockDAO *BlockDAO, db *Database,
chainClient *chain.ClientWithFallback, transactionManager *TransactionManager, tokenManager *token.Manager,
feed *event.Feed, blocksLoadedCh <-chan []*DBHeader) {
func (c *loadAllTransfersCommand) Command() async.Command {
return async.FiniteCommand{
Interval: 5 * time.Second,
Runable: c.Run,
}.Run
}
func (c *loadAllTransfersCommand) Run(parent context.Context) error {
start := time.Now()
group := async.NewGroup(parent)
for _, address := range c.accounts {
transfers := &transfersCommand{
db: c.db,
blockDAO: c.blockDAO,
chainClient: c.chainClient,
address: address,
eth: &ETHDownloader{
chainClient: c.chainClient,
accounts: []common.Address{address},
signer: types.NewLondonSigner(c.chainClient.ToBigInt()),
db: c.db,
},
blockNums: c.blocksByAddress[address],
blocksLimit: c.blocksLimit,
transactionManager: c.transactionManager,
tokenManager: c.tokenManager,
feed: c.feed,
}
group.Add(transfers.Command())
}
log.Debug("loadTransfersLoop start", "chain", chainClient.ChainID, "account", account)
select {
case <-parent.Done():
log.Info("loadTransfers transfersCommand error", "chain", c.chainClient.ChainID, "error", parent.Err())
return parent.Err()
case <-group.WaitAsync():
log.Debug("loadTransfers finished for account", "in", time.Since(start), "chain", c.chainClient.ChainID, "limit", c.blocksLimit)
}
case <-ctx.Done():
log.Info("loadTransfersLoop error", "chain", chainClient.ChainID, "account", account, "error", ctx.Err())
return
case dbHeaders := <-blocksLoadedCh:
log.Debug("loadTransfersOnDemand transfers received", "chain", chainClient.ChainID, "account", account, "headers", len(dbHeaders))
return nil
blockNums := make([]*big.Int, len(dbHeaders))
for i, dbHeader := range dbHeaders {
blockNums[i] = dbHeader.Number
}
blocksByAddress := map[common.Address][]*big.Int{account: blockNums}
go func() {
_ = loadTransfers(ctx, []common.Address{account}, blockDAO, db, chainClient, noBlockLimit,
blocksByAddress, transactionManager, tokenManager, feed)
}()
}
}
func newLoadBlocksAndTransfersCommand(accounts []common.Address, db *Database,
func newLoadBlocksAndTransfersCommand(account common.Address, db *Database,
blockDAO *BlockDAO, chainClient *chain.ClientWithFallback, feed *event.Feed,
transactionManager *TransactionManager, tokenManager *token.Manager) *loadBlocksAndTransfersCommand {
return &loadBlocksAndTransfersCommand{
accounts: accounts,
account: account,
db: db,
blockRangeDAO: &BlockRangeSequentialDAO{db.client},
blockDAO: blockDAO,
@ -330,12 +357,12 @@ func newLoadBlocksAndTransfersCommand(accounts []common.Address, db *Database,
errorsCount: 0,
transactionManager: transactionManager,
tokenManager: tokenManager,
transfersLoaded: make(map[common.Address]bool),
blocksLoadedCh: make(chan []*DBHeader),
}
}
type loadBlocksAndTransfersCommand struct {
accounts []common.Address
account common.Address
db *Database
blockRangeDAO *BlockRangeSequentialDAO
blockDAO *BlockDAO
@ -346,13 +373,14 @@ type loadBlocksAndTransfersCommand struct {
// nonArchivalRPCNode bool // TODO Make use of it
transactionManager *TransactionManager
tokenManager *token.Manager
blocksLoadedCh chan []*DBHeader
// Not to be set by the caller
transfersLoaded map[common.Address]bool // For event RecentHistoryReady to be sent only once per account during app lifetime
transfersLoaded bool // For event RecentHistoryReady to be sent only once per account during app lifetime
}
func (c *loadBlocksAndTransfersCommand) Run(parent context.Context) error {
log.Debug("start load all transfers command", "chain", c.chainClient.ChainID)
log.Debug("start load all transfers command", "chain", c.chainClient.ChainID, "account", c.account)
ctx := parent
@ -362,143 +390,166 @@ func (c *loadBlocksAndTransfersCommand) Run(parent context.Context) error {
group := async.NewGroup(ctx)
headNum, err := getHeadBlockNumber(parent, c.chainClient)
if err != nil {
// c.error = err
return err // Might need to retry a couple of times
err := c.fetchTransfersForLoadedBlocks(group)
for err != nil {
return err
}
for _, address := range c.accounts {
blockRange, err := loadBlockRangeInfo(c.chainClient.ChainID, address, c.blockRangeDAO)
if err != nil {
log.Error("findBlocksCommand loadBlockRangeInfo", "error", err)
// c.error = err
return err // Will keep spinning forever nomatter what
}
c.startTransfersLoop(ctx)
allHistoryLoaded := areAllHistoryBlocksLoaded(blockRange)
toHistoryBlockNum := getToHistoryBlockNumber(headNum, blockRange, allHistoryLoaded)
if !allHistoryLoaded {
c.fetchHistoryBlocks(ctx, group, address, big.NewInt(0), toHistoryBlockNum)
} else {
if !c.transfersLoaded[address] {
transfersLoaded, err := c.areAllTransfersLoadedForAddress(address)
if err != nil {
return err
}
if transfersLoaded {
c.transfersLoaded[address] = true
c.notifyHistoryReady(address)
}
}
}
// If no block ranges are stored, all blocks will be fetched by fetchHistoryBlocks method
if blockRange != nil {
c.fetchNewBlocks(ctx, group, address, blockRange, headNum)
}
err = c.fetchHistoryBlocks(parent, group, c.blocksLoadedCh)
for err != nil {
group.Stop()
group.Wait()
return err
}
c.fetchTransfers(ctx, group)
c.startFetchingNewBlocks(group, c.account, c.blocksLoadedCh)
select {
case <-ctx.Done():
return ctx.Err()
case <-group.WaitAsync():
log.Debug("end load all transfers command", "chain", c.chainClient.ChainID)
log.Debug("end loadBlocksAndTransfers command", "chain", c.chainClient.ChainID, "account", c.account)
return nil
}
}
func (c *loadBlocksAndTransfersCommand) Command() async.Command {
return async.InfiniteCommand{
Interval: 13 * time.Second, // Slightly more that block mining time
Interval: 5 * time.Second,
Runable: c.Run,
}.Run
}
func (c *loadBlocksAndTransfersCommand) fetchHistoryBlocks(ctx context.Context, group *async.Group,
address common.Address, from *big.Int, to *big.Int) {
log.Debug("Launching history command", "account", address, "from", from, "to", to)
fbc := &findBlocksCommand{
account: address,
db: c.db,
blockRangeDAO: c.blockRangeDAO,
chainClient: c.chainClient,
balanceCache: c.balanceCache,
feed: c.feed,
noLimit: false,
fromBlockNumber: from,
toBlockNumber: to,
transactionManager: c.transactionManager,
}
group.Add(fbc.Command())
func (c *loadBlocksAndTransfersCommand) startTransfersLoop(ctx context.Context) {
go loadTransfersLoop(ctx, c.account, c.blockDAO, c.db, c.chainClient, c.transactionManager, c.tokenManager,
c.feed, c.blocksLoadedCh)
}
func (c *loadBlocksAndTransfersCommand) fetchNewBlocks(ctx context.Context, group *async.Group,
address common.Address, blockRange *BlockRange, headNum *big.Int) {
func (c *loadBlocksAndTransfersCommand) fetchHistoryBlocks(ctx context.Context, group *async.Group, blocksLoadedCh chan []*DBHeader) error {
fromBlockNumber := new(big.Int).Add(blockRange.LastKnown, big.NewInt(1))
log.Debug("fetchHistoryBlocks start", "chainID", c.chainClient.ChainID, "account", c.account)
log.Debug("Launching new blocks command", "chainID", c.chainClient.ChainID, "account", address, "from", fromBlockNumber, "headNum", headNum)
// In case interval between checks is set smaller than block mining time,
// we might need to wait for the next block to be mined
if fromBlockNumber.Cmp(headNum) > 0 {
return
headNum, err := getHeadBlockNumber(ctx, c.chainClient)
if err != nil {
// c.error = err
return err // Might need to retry a couple of times
}
newBlocksCmd := &findBlocksCommand{
account: address,
db: c.db,
blockRangeDAO: c.blockRangeDAO,
chainClient: c.chainClient,
balanceCache: c.balanceCache,
feed: c.feed,
noLimit: false,
fromBlockNumber: fromBlockNumber,
toBlockNumber: headNum,
transactionManager: c.transactionManager,
blockRange, err := loadBlockRangeInfo(c.chainClient.ChainID, c.account, c.blockRangeDAO)
if err != nil {
log.Error("findBlocksCommand loadBlockRangeInfo", "error", err)
// c.error = err
return err // Will keep spinning forever nomatter what
}
allHistoryLoaded := areAllHistoryBlocksLoaded(blockRange)
to := getToHistoryBlockNumber(headNum, blockRange, allHistoryLoaded)
log.Debug("fetchHistoryBlocks", "chainID", c.chainClient.ChainID, "account", c.account, "to", to, "allHistoryLoaded", allHistoryLoaded)
if !allHistoryLoaded {
fbc := &findBlocksCommand{
account: c.account,
db: c.db,
blockRangeDAO: c.blockRangeDAO,
chainClient: c.chainClient,
balanceCache: c.balanceCache,
feed: c.feed,
noLimit: false,
fromBlockNumber: big.NewInt(0),
toBlockNumber: to,
transactionManager: c.transactionManager,
blocksLoadedCh: blocksLoadedCh,
}
group.Add(fbc.Command())
} else {
if !c.transfersLoaded {
transfersLoaded, err := c.areAllTransfersLoaded()
if err != nil {
return err
}
if transfersLoaded {
c.transfersLoaded = true
c.notifyHistoryReady()
}
}
}
log.Debug("fetchHistoryBlocks end", "chainID", c.chainClient.ChainID, "account", c.account)
return nil
}
func (c *loadBlocksAndTransfersCommand) startFetchingNewBlocks(group *async.Group, address common.Address, blocksLoadedCh chan<- []*DBHeader) {
log.Debug("startFetchingNewBlocks", "chainID", c.chainClient.ChainID, "account", address)
newBlocksCmd := &findNewBlocksCommand{
findBlocksCommand: &findBlocksCommand{
account: address,
db: c.db,
blockRangeDAO: c.blockRangeDAO,
chainClient: c.chainClient,
balanceCache: c.balanceCache,
feed: c.feed,
noLimit: false,
transactionManager: c.transactionManager,
blocksLoadedCh: blocksLoadedCh,
},
}
group.Add(newBlocksCmd.Command())
}
func (c *loadBlocksAndTransfersCommand) fetchTransfers(ctx context.Context, group *async.Group) {
txCommand := &loadAllTransfersCommand{
accounts: c.accounts,
func (c *loadBlocksAndTransfersCommand) fetchTransfersForLoadedBlocks(group *async.Group) error {
log.Debug("fetchTransfers start", "chainID", c.chainClient.ChainID, "account", c.account)
blocks, err := c.blockDAO.GetBlocksToLoadByAddress(c.chainClient.ChainID, c.account, numberOfBlocksCheckedPerIteration)
if err != nil {
log.Error("loadBlocksAndTransfersCommand GetBlocksToLoadByAddress", "error", err)
return err
}
blocksMap := make(map[common.Address][]*big.Int)
blocksMap[c.account] = blocks
txCommand := &loadTransfersCommand{
accounts: []common.Address{c.account},
db: c.db,
blockDAO: c.blockDAO,
chainClient: c.chainClient,
transactionManager: c.transactionManager,
blocksLimit: noBlockLimit, // load transfers from all `unloaded` blocks
tokenManager: c.tokenManager,
blocksByAddress: blocksMap,
feed: c.feed,
}
group.Add(txCommand.Command())
return nil
}
func (c *loadBlocksAndTransfersCommand) notifyHistoryReady(address common.Address) {
func (c *loadBlocksAndTransfersCommand) notifyHistoryReady() {
if c.feed != nil {
c.feed.Send(walletevent.Event{
Type: EventRecentHistoryReady,
Accounts: []common.Address{address},
Accounts: []common.Address{c.account},
})
}
}
func (c *loadBlocksAndTransfersCommand) areAllTransfersLoadedForAddress(address common.Address) (bool, error) {
allBlocksLoaded, err := areAllHistoryBlocksLoadedForAddress(c.blockRangeDAO, c.chainClient.ChainID, address)
func (c *loadBlocksAndTransfersCommand) areAllTransfersLoaded() (bool, error) {
allBlocksLoaded, err := areAllHistoryBlocksLoadedForAddress(c.blockRangeDAO, c.chainClient.ChainID, c.account)
if err != nil {
log.Error("loadBlockAndTransfersCommand allHistoryBlocksLoaded", "error", err)
return false, err
}
if allBlocksLoaded {
firstHeader, err := c.blockDAO.GetFirstSavedBlock(c.chainClient.ChainID, address)
firstHeader, err := c.blockDAO.GetFirstSavedBlock(c.chainClient.ChainID, c.account)
if err != nil {
log.Error("loadBlocksAndTransfersCommand GetFirstSavedBlock", "error", err)
return false, err
@ -513,6 +564,8 @@ func (c *loadBlocksAndTransfersCommand) areAllTransfersLoadedForAddress(address
return false, nil
}
// TODO - make it a common method for every service that wants head block number, that will cache the latest block
// and updates it on timeout
func getHeadBlockNumber(parent context.Context, chainClient *chain.ClientWithFallback) (*big.Int, error) {
ctx, cancel := context.WithTimeout(parent, 3*time.Second)
head, err := chainClient.HeaderByNumber(ctx, nil)

View File

@ -381,7 +381,7 @@ func (d *ERC20TransfersDownloader) blocksFromLogs(parent context.Context, logs [
// time to get logs for 100000 blocks = 1.144686979s. with 249 events in the result set.
func (d *ERC20TransfersDownloader) GetHeadersInRange(parent context.Context, from, to *big.Int) ([]*DBHeader, error) {
start := time.Now()
log.Debug("get erc20 transfers in range", "from", from, "to", to)
log.Debug("get erc20 transfers in range start", "chainID", d.client.ChainID, "from", from, "to", to)
headers := []*DBHeader{}
ctx := context.Background()
for _, address := range d.accounts {
@ -405,12 +405,21 @@ func (d *ERC20TransfersDownloader) GetHeadersInRange(parent context.Context, fro
if len(logs) == 0 {
continue
}
rst, err := d.blocksFromLogs(parent, logs, address)
if err != nil {
return nil, err
}
headers = append(headers, rst...)
if len(rst) == 0 {
log.Warn("no headers found in logs for account", "chainID", d.client.ChainID, "address", address, "from", from, "to", to)
continue
} else {
headers = append(headers, rst...)
log.Debug("found erc20 transfers for account", "chainID", d.client.ChainID, "address", address,
"from", from, "to", to, "headers", len(headers))
}
}
log.Debug("found erc20 transfers between two blocks", "from", from, "to", to, "headers", len(headers), "took", time.Since(start))
log.Debug("get erc20 transfers in range end", "chainID", d.client.ChainID,
"from", from, "to", to, "headers", len(headers), "took", time.Since(start))
return headers, nil
}

View File

@ -44,9 +44,9 @@ type SequentialFetchStrategy struct {
}
func (s *SequentialFetchStrategy) newCommand(chainClient *chain.ClientWithFallback,
accounts []common.Address) async.Commander {
account common.Address) async.Commander {
return newLoadBlocksAndTransfersCommand(accounts, s.db, s.blockDAO, chainClient, s.feed,
return newLoadBlocksAndTransfersCommand(account, s.db, s.blockDAO, chainClient, s.feed,
s.transactionManager, s.tokenManager)
}
@ -67,8 +67,10 @@ func (s *SequentialFetchStrategy) start() error {
}
for _, chainClient := range s.chainClients {
ctl := s.newCommand(chainClient, s.accounts)
s.group.Add(ctl.Command())
for _, address := range s.accounts {
ctl := s.newCommand(chainClient, address)
s.group.Add(ctl.Command())
}
}
return nil