diff --git a/services/wallet/transfer/commands.go b/services/wallet/transfer/commands.go index 35c4ce4d1..6b78743c8 100644 --- a/services/wallet/transfer/commands.go +++ b/services/wallet/transfer/commands.go @@ -137,7 +137,7 @@ func getErc20BatchSize(chainID uint64) *big.Int { } func (c *erc20HistoricalCommand) Run(ctx context.Context) (err error) { - log.Info("wallet historical downloader for erc20 transfers start", "address", c.address, + log.Info("wallet historical downloader for erc20 transfers start", "chainID", c.chainClient.ChainID, "address", c.address, "from", c.from, "to", c.to) start := time.Now() @@ -158,8 +158,8 @@ func (c *erc20HistoricalCommand) Run(ctx context.Context) (err error) { } c.foundHeaders = append(c.foundHeaders, headers...) } - log.Info("wallet historical downloader for erc20 transfers finished", "address", c.address, - "from", c.from, "to", c.to, "time", time.Since(start)) + log.Info("wallet historical downloader for erc20 transfers finished", "chainID", c.chainClient.ChainID, "address", c.address, + "from", c.from, "to", c.to, "time", time.Since(start), "headers", len(c.foundHeaders)) return nil } @@ -181,8 +181,8 @@ type controlCommand struct { tokenManager *token.Manager } -func (c *controlCommand) LoadTransfers(ctx context.Context, limit int) (map[common.Address][]Transfer, error) { - return loadTransfers(ctx, c.accounts, c.blockDAO, c.db, c.chainClient, limit, make(map[common.Address][]*big.Int), c.transactionManager, c.tokenManager) +func (c *controlCommand) LoadTransfers(ctx context.Context, limit int) error { + return loadTransfers(ctx, c.accounts, c.blockDAO, c.db, c.chainClient, limit, make(map[common.Address][]*big.Int), c.transactionManager, c.tokenManager, c.feed) } func (c *controlCommand) Run(parent context.Context) error { @@ -274,7 +274,7 @@ func (c *controlCommand) Run(parent context.Context) error { return cmnd.error } - _, err = c.LoadTransfers(parent, numberOfBlocksCheckedPerIteration) + err = c.LoadTransfers(parent, numberOfBlocksCheckedPerIteration) if err != nil { if c.NewError(err) { return nil @@ -321,9 +321,9 @@ func nonArchivalNodeError(err error) bool { func (c *controlCommand) NewError(err error) bool { c.errorsCount++ - log.Error("controlCommand error", "error", err, "counter", c.errorsCount) + log.Error("controlCommand error", "chainID", c.chainClient.ChainID, "error", err, "counter", c.errorsCount) if nonArchivalNodeError(err) { - log.Info("Non archival node detected") + log.Info("Non archival node detected", "chainID", c.chainClient.ChainID) c.nonArchivalRPCNode = true c.feed.Send(walletevent.Event{ Type: EventNonArchivalNodeDetected, @@ -524,15 +524,15 @@ func (c *transfersCommand) notifyOfNewTransfers(transfers []Transfer) { } type loadTransfersCommand struct { - accounts []common.Address - db *Database - blockDAO *BlockDAO - chainClient *chain.ClientWithFallback - blocksByAddress map[common.Address][]*big.Int - foundTransfersByAddress map[common.Address][]Transfer - transactionManager *TransactionManager - blocksLimit int - tokenManager *token.Manager + accounts []common.Address + db *Database + blockDAO *BlockDAO + chainClient *chain.ClientWithFallback + blocksByAddress map[common.Address][]*big.Int + transactionManager *TransactionManager + blocksLimit int + tokenManager *token.Manager + feed *event.Feed } func (c *loadTransfersCommand) Command() async.Command { @@ -542,17 +542,12 @@ func (c *loadTransfersCommand) Command() async.Command { }.Run } -func (c *loadTransfersCommand) LoadTransfers(ctx context.Context, limit int, blocksByAddress map[common.Address][]*big.Int) (map[common.Address][]Transfer, error) { - return loadTransfers(ctx, c.accounts, c.blockDAO, c.db, c.chainClient, limit, blocksByAddress, c.transactionManager, c.tokenManager) +func (c *loadTransfersCommand) LoadTransfers(ctx context.Context, limit int, blocksByAddress map[common.Address][]*big.Int) error { + return loadTransfers(ctx, c.accounts, c.blockDAO, c.db, c.chainClient, limit, blocksByAddress, c.transactionManager, c.tokenManager, c.feed) } func (c *loadTransfersCommand) Run(parent context.Context) (err error) { - transfersByAddress, err := c.LoadTransfers(parent, c.blocksLimit, c.blocksByAddress) - if err != nil { - return err - } - c.foundTransfersByAddress = transfersByAddress - + err = c.LoadTransfers(parent, c.blocksLimit, c.blocksByAddress) return } @@ -603,6 +598,8 @@ func (c *findAndCheckBlockRangeCommand) Run(parent context.Context) error { erc20Headers := erc20HeadersByAddress[address] allHeaders := append(ethHeaders, erc20Headers...) + log.Debug("allHeaders found for account", "address", address, "allHeaders.len", len(allHeaders)) + // Ensure only 1 DBHeader per block hash. uniqHeaders := []*DBHeader{} if len(allHeaders) > 0 { @@ -627,6 +624,7 @@ func (c *findAndCheckBlockRangeCommand) Run(parent context.Context) error { Balance: c.balanceCache.ReadCachedBalance(address, lastBlockNumber), Nonce: c.balanceCache.ReadCachedNonce(address, lastBlockNumber), } + log.Debug("uniqHeaders found for account", "address", address, "uniqHeaders.len", len(uniqHeaders)) err = c.db.ProcessBlocks(c.chainClient.ChainID, address, newFromByAddress[address], to, uniqHeaders) if err != nil { return err @@ -720,13 +718,12 @@ func (c *findAndCheckBlockRangeCommand) fastIndexErc20(ctx context.Context, from func loadTransfers(ctx context.Context, accounts []common.Address, blockDAO *BlockDAO, db *Database, chainClient *chain.ClientWithFallback, blocksLimitPerAccount int, blocksByAddress map[common.Address][]*big.Int, - transactionManager *TransactionManager, tokenManager *token.Manager) (map[common.Address][]Transfer, error) { + transactionManager *TransactionManager, tokenManager *token.Manager, feed *event.Feed) error { log.Info("loadTransfers start", "accounts", accounts, "chain", chainClient.ChainID, "limit", blocksLimitPerAccount) start := time.Now() group := async.NewGroup(ctx) - commands := []*transfersCommand{} for _, address := range accounts { transfers := &transfersCommand{ db: db, @@ -742,25 +739,17 @@ func loadTransfers(ctx context.Context, accounts []common.Address, blockDAO *Blo blockNums: blocksByAddress[address], transactionManager: transactionManager, tokenManager: tokenManager, + feed: feed, } - commands = append(commands, transfers) group.Add(transfers.Command()) } select { case <-ctx.Done(): - return nil, ctx.Err() + return ctx.Err() case <-group.WaitAsync(): - transfersByAddress := map[common.Address][]Transfer{} - for _, command := range commands { - if len(command.fetchedTransfers) == 0 { - continue - } - - transfersByAddress[command.address] = append(transfersByAddress[command.address], command.fetchedTransfers...) - } log.Info("loadTransfers finished for account", "in", time.Since(start), "chain", chainClient.ChainID) - return transfersByAddress, nil + return nil } } diff --git a/services/wallet/transfer/commands_sequential.go b/services/wallet/transfer/commands_sequential.go index 33105b0cc..5e9c3b362 100644 --- a/services/wallet/transfer/commands_sequential.go +++ b/services/wallet/transfer/commands_sequential.go @@ -15,6 +15,53 @@ import ( "github.com/status-im/status-go/services/wallet/walletevent" ) +type findNewBlocksCommand struct { + *findBlocksCommand +} + +func (c *findNewBlocksCommand) Command() async.Command { + return async.InfiniteCommand{ + Interval: 13 * time.Second, // TODO - make it configurable based on chain block mining time + Runable: c.Run, + }.Run +} + +func (c *findNewBlocksCommand) Run(parent context.Context) (err error) { + log.Debug("start findNewBlocksCommand", "account", c.account, "chain", c.chainClient.ChainID, "noLimit", c.noLimit) + + headNum, err := getHeadBlockNumber(parent, c.chainClient) + if err != nil { + // c.error = err + return err // Might need to retry a couple of times + } + + blockRange, err := loadBlockRangeInfo(c.chainClient.ChainID, c.account, c.blockRangeDAO) + if err != nil { + log.Error("findBlocksCommand loadBlockRangeInfo", "error", err) + // c.error = err + return err // Will keep spinning forever nomatter what + } + + if blockRange != nil { + c.fromBlockNumber = new(big.Int).Add(blockRange.LastKnown, big.NewInt(1)) + + log.Debug("Launching new blocks command", "chainID", c.chainClient.ChainID, "account", c.account, + "from", c.fromBlockNumber, "headNum", headNum) + + // In case interval between checks is set smaller than block mining time, + // we might need to wait for the next block to be mined + if c.fromBlockNumber.Cmp(headNum) > 0 { + return + } + + c.toBlockNumber = headNum + + _ = c.findBlocksCommand.Run(parent) + } + + return nil +} + // TODO NewFindBlocksCommand type findBlocksCommand struct { account common.Address @@ -27,6 +74,7 @@ type findBlocksCommand struct { transactionManager *TransactionManager fromBlockNumber *big.Int toBlockNumber *big.Int + blocksLoadedCh chan<- []*DBHeader // Not to be set by the caller resFromBlock *Block @@ -72,6 +120,8 @@ func (c *findBlocksCommand) Run(parent context.Context) (err error) { // return err break } + + c.blocksFound(headers) } err = c.upsertBlockRange(&BlockRange{c.startBlockNumber, c.resFromBlock.Number, to}) @@ -93,6 +143,10 @@ func (c *findBlocksCommand) Run(parent context.Context) (err error) { return nil } +func (c *findBlocksCommand) blocksFound(headers []*DBHeader) { + c.blocksLoadedCh <- headers +} + func (c *findBlocksCommand) upsertBlockRange(blockRange *BlockRange) error { log.Debug("upsert block range", "Start", blockRange.Start, "FirstKnown", blockRange.FirstKnown, "LastKnown", blockRange.LastKnown, "chain", c.chainClient.ChainID, "account", c.account) @@ -120,13 +174,14 @@ func (c *findBlocksCommand) checkRange(parent context.Context, from *big.Int, to // return err // In case c.noLimit is true, hystrix "max concurrency" may be reached and we will not be able to index ETH transfers return nil, nil } - log.Debug("findBlocksCommand checkRange", "startBlock", startBlock, "newFromBlock", newFromBlock.Number, "toBlockNumber", to, "noLimit", c.noLimit) + log.Debug("findBlocksCommand checkRange", "chainID", c.chainClient.ChainID, "account", c.account, + "startBlock", startBlock, "newFromBlock", newFromBlock.Number, "toBlockNumber", to, "noLimit", c.noLimit) // There could be incoming ERC20 transfers which don't change the balance // and nonce of ETH account, so we keep looking for them erc20Headers, err := c.fastIndexErc20(parent, newFromBlock.Number, to) if err != nil { - log.Error("findBlocksCommand checkRange fastIndexErc20", "err", err) + log.Error("findBlocksCommand checkRange fastIndexErc20", "err", err, "account", c.account, "chain", c.chainClient.ChainID) c.error = err // return err return nil, nil @@ -141,7 +196,8 @@ func (c *findBlocksCommand) checkRange(parent context.Context, from *big.Int, to c.resFromBlock = newFromBlock c.startBlockNumber = startBlock - log.Debug("end findBlocksCommand checkRange", "c.startBlock", c.startBlockNumber, "newFromBlock", newFromBlock.Number, + log.Debug("end findBlocksCommand checkRange", "chainID", c.chainClient.ChainID, "account", c.account, + "c.startBlock", c.startBlockNumber, "newFromBlock", newFromBlock.Number, "toBlockNumber", to, "c.resFromBlock", c.resFromBlock.Number) return @@ -194,7 +250,8 @@ func (c *findBlocksCommand) fastIndex(ctx context.Context, bCache *balanceCache, fromBlock *Block, toBlockNumber *big.Int) (resultingFrom *Block, headers []*DBHeader, startBlock *big.Int, err error) { - log.Debug("fast index started", "accounts", c.account, "from", fromBlock.Number, "to", toBlockNumber) + log.Debug("fast index started", "chainID", c.chainClient.ChainID, "account", c.account, + "from", fromBlock.Number, "to", toBlockNumber) start := time.Now() group := async.NewGroup(ctx) @@ -224,7 +281,8 @@ func (c *findBlocksCommand) fastIndex(ctx context.Context, bCache *balanceCache, resultingFrom = &Block{Number: command.resultingFrom} headers = command.foundHeaders startBlock = command.startBlock - log.Debug("fast indexer finished", "in", time.Since(start), "startBlock", command.startBlock, "resultingFrom", resultingFrom.Number, "headers", len(headers)) + log.Debug("fast indexer finished", "chainID", c.chainClient.ChainID, "account", c.account, "in", time.Since(start), + "startBlock", command.startBlock, "resultingFrom", resultingFrom.Number, "headers", len(headers)) return } } @@ -253,75 +311,44 @@ func (c *findBlocksCommand) fastIndexErc20(ctx context.Context, fromBlockNumber return nil, ctx.Err() case <-group.WaitAsync(): headers := erc20.foundHeaders - log.Debug("fast indexer Erc20 finished", "in", time.Since(start), "headers", len(headers)) + log.Debug("fast indexer Erc20 finished", "chainID", c.chainClient.ChainID, "account", c.account, + "in", time.Since(start), "headers", len(headers)) return headers, nil } } -// TODO Think on how to reuse loadTransfersCommand, as it shares many members and some methods -// but does not need to return the transfers but only save them to DB, as there can be too many of them -// and the logic of `loadTransfersLoop` is different from `loadTransfers“ -type loadAllTransfersCommand struct { - accounts []common.Address - db *Database - blockDAO *BlockDAO - chainClient *chain.ClientWithFallback - blocksByAddress map[common.Address][]*big.Int - transactionManager *TransactionManager - tokenManager *token.Manager - blocksLimit int - feed *event.Feed -} +func loadTransfersLoop(ctx context.Context, account common.Address, blockDAO *BlockDAO, db *Database, + chainClient *chain.ClientWithFallback, transactionManager *TransactionManager, tokenManager *token.Manager, + feed *event.Feed, blocksLoadedCh <-chan []*DBHeader) { -func (c *loadAllTransfersCommand) Command() async.Command { - return async.FiniteCommand{ - Interval: 5 * time.Second, - Runable: c.Run, - }.Run -} - -func (c *loadAllTransfersCommand) Run(parent context.Context) error { - start := time.Now() - group := async.NewGroup(parent) - - for _, address := range c.accounts { - transfers := &transfersCommand{ - db: c.db, - blockDAO: c.blockDAO, - chainClient: c.chainClient, - address: address, - eth: ÐDownloader{ - chainClient: c.chainClient, - accounts: []common.Address{address}, - signer: types.NewLondonSigner(c.chainClient.ToBigInt()), - db: c.db, - }, - blockNums: c.blocksByAddress[address], - blocksLimit: c.blocksLimit, - transactionManager: c.transactionManager, - tokenManager: c.tokenManager, - feed: c.feed, - } - group.Add(transfers.Command()) - } + log.Debug("loadTransfersLoop start", "chain", chainClient.ChainID, "account", account) select { - case <-parent.Done(): - log.Info("loadTransfers transfersCommand error", "chain", c.chainClient.ChainID, "error", parent.Err()) - return parent.Err() - case <-group.WaitAsync(): - log.Debug("loadTransfers finished for account", "in", time.Since(start), "chain", c.chainClient.ChainID, "limit", c.blocksLimit) - } + case <-ctx.Done(): + log.Info("loadTransfersLoop error", "chain", chainClient.ChainID, "account", account, "error", ctx.Err()) + return + case dbHeaders := <-blocksLoadedCh: + log.Debug("loadTransfersOnDemand transfers received", "chain", chainClient.ChainID, "account", account, "headers", len(dbHeaders)) - return nil + blockNums := make([]*big.Int, len(dbHeaders)) + for i, dbHeader := range dbHeaders { + blockNums[i] = dbHeader.Number + } + + blocksByAddress := map[common.Address][]*big.Int{account: blockNums} + go func() { + _ = loadTransfers(ctx, []common.Address{account}, blockDAO, db, chainClient, noBlockLimit, + blocksByAddress, transactionManager, tokenManager, feed) + }() + } } -func newLoadBlocksAndTransfersCommand(accounts []common.Address, db *Database, +func newLoadBlocksAndTransfersCommand(account common.Address, db *Database, blockDAO *BlockDAO, chainClient *chain.ClientWithFallback, feed *event.Feed, transactionManager *TransactionManager, tokenManager *token.Manager) *loadBlocksAndTransfersCommand { return &loadBlocksAndTransfersCommand{ - accounts: accounts, + account: account, db: db, blockRangeDAO: &BlockRangeSequentialDAO{db.client}, blockDAO: blockDAO, @@ -330,12 +357,12 @@ func newLoadBlocksAndTransfersCommand(accounts []common.Address, db *Database, errorsCount: 0, transactionManager: transactionManager, tokenManager: tokenManager, - transfersLoaded: make(map[common.Address]bool), + blocksLoadedCh: make(chan []*DBHeader), } } type loadBlocksAndTransfersCommand struct { - accounts []common.Address + account common.Address db *Database blockRangeDAO *BlockRangeSequentialDAO blockDAO *BlockDAO @@ -346,13 +373,14 @@ type loadBlocksAndTransfersCommand struct { // nonArchivalRPCNode bool // TODO Make use of it transactionManager *TransactionManager tokenManager *token.Manager + blocksLoadedCh chan []*DBHeader // Not to be set by the caller - transfersLoaded map[common.Address]bool // For event RecentHistoryReady to be sent only once per account during app lifetime + transfersLoaded bool // For event RecentHistoryReady to be sent only once per account during app lifetime } func (c *loadBlocksAndTransfersCommand) Run(parent context.Context) error { - log.Debug("start load all transfers command", "chain", c.chainClient.ChainID) + log.Debug("start load all transfers command", "chain", c.chainClient.ChainID, "account", c.account) ctx := parent @@ -362,143 +390,166 @@ func (c *loadBlocksAndTransfersCommand) Run(parent context.Context) error { group := async.NewGroup(ctx) - headNum, err := getHeadBlockNumber(parent, c.chainClient) - if err != nil { - // c.error = err - return err // Might need to retry a couple of times + err := c.fetchTransfersForLoadedBlocks(group) + for err != nil { + return err } - for _, address := range c.accounts { - blockRange, err := loadBlockRangeInfo(c.chainClient.ChainID, address, c.blockRangeDAO) - if err != nil { - log.Error("findBlocksCommand loadBlockRangeInfo", "error", err) - // c.error = err - return err // Will keep spinning forever nomatter what - } + c.startTransfersLoop(ctx) - allHistoryLoaded := areAllHistoryBlocksLoaded(blockRange) - toHistoryBlockNum := getToHistoryBlockNumber(headNum, blockRange, allHistoryLoaded) - - if !allHistoryLoaded { - c.fetchHistoryBlocks(ctx, group, address, big.NewInt(0), toHistoryBlockNum) - } else { - if !c.transfersLoaded[address] { - transfersLoaded, err := c.areAllTransfersLoadedForAddress(address) - if err != nil { - return err - } - - if transfersLoaded { - c.transfersLoaded[address] = true - c.notifyHistoryReady(address) - } - } - } - - // If no block ranges are stored, all blocks will be fetched by fetchHistoryBlocks method - if blockRange != nil { - c.fetchNewBlocks(ctx, group, address, blockRange, headNum) - } + err = c.fetchHistoryBlocks(parent, group, c.blocksLoadedCh) + for err != nil { + group.Stop() + group.Wait() + return err } - c.fetchTransfers(ctx, group) + c.startFetchingNewBlocks(group, c.account, c.blocksLoadedCh) select { case <-ctx.Done(): return ctx.Err() case <-group.WaitAsync(): - log.Debug("end load all transfers command", "chain", c.chainClient.ChainID) + log.Debug("end loadBlocksAndTransfers command", "chain", c.chainClient.ChainID, "account", c.account) return nil } } func (c *loadBlocksAndTransfersCommand) Command() async.Command { return async.InfiniteCommand{ - Interval: 13 * time.Second, // Slightly more that block mining time + Interval: 5 * time.Second, Runable: c.Run, }.Run } -func (c *loadBlocksAndTransfersCommand) fetchHistoryBlocks(ctx context.Context, group *async.Group, - address common.Address, from *big.Int, to *big.Int) { - - log.Debug("Launching history command", "account", address, "from", from, "to", to) - - fbc := &findBlocksCommand{ - account: address, - db: c.db, - blockRangeDAO: c.blockRangeDAO, - chainClient: c.chainClient, - balanceCache: c.balanceCache, - feed: c.feed, - noLimit: false, - fromBlockNumber: from, - toBlockNumber: to, - transactionManager: c.transactionManager, - } - group.Add(fbc.Command()) +func (c *loadBlocksAndTransfersCommand) startTransfersLoop(ctx context.Context) { + go loadTransfersLoop(ctx, c.account, c.blockDAO, c.db, c.chainClient, c.transactionManager, c.tokenManager, + c.feed, c.blocksLoadedCh) } -func (c *loadBlocksAndTransfersCommand) fetchNewBlocks(ctx context.Context, group *async.Group, - address common.Address, blockRange *BlockRange, headNum *big.Int) { +func (c *loadBlocksAndTransfersCommand) fetchHistoryBlocks(ctx context.Context, group *async.Group, blocksLoadedCh chan []*DBHeader) error { - fromBlockNumber := new(big.Int).Add(blockRange.LastKnown, big.NewInt(1)) + log.Debug("fetchHistoryBlocks start", "chainID", c.chainClient.ChainID, "account", c.account) - log.Debug("Launching new blocks command", "chainID", c.chainClient.ChainID, "account", address, "from", fromBlockNumber, "headNum", headNum) - - // In case interval between checks is set smaller than block mining time, - // we might need to wait for the next block to be mined - if fromBlockNumber.Cmp(headNum) > 0 { - return + headNum, err := getHeadBlockNumber(ctx, c.chainClient) + if err != nil { + // c.error = err + return err // Might need to retry a couple of times } - newBlocksCmd := &findBlocksCommand{ - account: address, - db: c.db, - blockRangeDAO: c.blockRangeDAO, - chainClient: c.chainClient, - balanceCache: c.balanceCache, - feed: c.feed, - noLimit: false, - fromBlockNumber: fromBlockNumber, - toBlockNumber: headNum, - transactionManager: c.transactionManager, + blockRange, err := loadBlockRangeInfo(c.chainClient.ChainID, c.account, c.blockRangeDAO) + if err != nil { + log.Error("findBlocksCommand loadBlockRangeInfo", "error", err) + // c.error = err + return err // Will keep spinning forever nomatter what + } + + allHistoryLoaded := areAllHistoryBlocksLoaded(blockRange) + to := getToHistoryBlockNumber(headNum, blockRange, allHistoryLoaded) + + log.Debug("fetchHistoryBlocks", "chainID", c.chainClient.ChainID, "account", c.account, "to", to, "allHistoryLoaded", allHistoryLoaded) + + if !allHistoryLoaded { + fbc := &findBlocksCommand{ + account: c.account, + db: c.db, + blockRangeDAO: c.blockRangeDAO, + chainClient: c.chainClient, + balanceCache: c.balanceCache, + feed: c.feed, + noLimit: false, + fromBlockNumber: big.NewInt(0), + toBlockNumber: to, + transactionManager: c.transactionManager, + blocksLoadedCh: blocksLoadedCh, + } + group.Add(fbc.Command()) + } else { + if !c.transfersLoaded { + transfersLoaded, err := c.areAllTransfersLoaded() + if err != nil { + return err + } + + if transfersLoaded { + c.transfersLoaded = true + c.notifyHistoryReady() + } + } + } + + log.Debug("fetchHistoryBlocks end", "chainID", c.chainClient.ChainID, "account", c.account) + + return nil +} + +func (c *loadBlocksAndTransfersCommand) startFetchingNewBlocks(group *async.Group, address common.Address, blocksLoadedCh chan<- []*DBHeader) { + + log.Debug("startFetchingNewBlocks", "chainID", c.chainClient.ChainID, "account", address) + + newBlocksCmd := &findNewBlocksCommand{ + findBlocksCommand: &findBlocksCommand{ + account: address, + db: c.db, + blockRangeDAO: c.blockRangeDAO, + chainClient: c.chainClient, + balanceCache: c.balanceCache, + feed: c.feed, + noLimit: false, + transactionManager: c.transactionManager, + blocksLoadedCh: blocksLoadedCh, + }, } group.Add(newBlocksCmd.Command()) } -func (c *loadBlocksAndTransfersCommand) fetchTransfers(ctx context.Context, group *async.Group) { - txCommand := &loadAllTransfersCommand{ - accounts: c.accounts, +func (c *loadBlocksAndTransfersCommand) fetchTransfersForLoadedBlocks(group *async.Group) error { + + log.Debug("fetchTransfers start", "chainID", c.chainClient.ChainID, "account", c.account) + + blocks, err := c.blockDAO.GetBlocksToLoadByAddress(c.chainClient.ChainID, c.account, numberOfBlocksCheckedPerIteration) + if err != nil { + log.Error("loadBlocksAndTransfersCommand GetBlocksToLoadByAddress", "error", err) + return err + } + + blocksMap := make(map[common.Address][]*big.Int) + blocksMap[c.account] = blocks + + txCommand := &loadTransfersCommand{ + accounts: []common.Address{c.account}, db: c.db, blockDAO: c.blockDAO, chainClient: c.chainClient, transactionManager: c.transactionManager, - blocksLimit: noBlockLimit, // load transfers from all `unloaded` blocks + tokenManager: c.tokenManager, + blocksByAddress: blocksMap, feed: c.feed, } group.Add(txCommand.Command()) + + return nil } -func (c *loadBlocksAndTransfersCommand) notifyHistoryReady(address common.Address) { +func (c *loadBlocksAndTransfersCommand) notifyHistoryReady() { if c.feed != nil { c.feed.Send(walletevent.Event{ Type: EventRecentHistoryReady, - Accounts: []common.Address{address}, + Accounts: []common.Address{c.account}, }) } } -func (c *loadBlocksAndTransfersCommand) areAllTransfersLoadedForAddress(address common.Address) (bool, error) { - allBlocksLoaded, err := areAllHistoryBlocksLoadedForAddress(c.blockRangeDAO, c.chainClient.ChainID, address) +func (c *loadBlocksAndTransfersCommand) areAllTransfersLoaded() (bool, error) { + allBlocksLoaded, err := areAllHistoryBlocksLoadedForAddress(c.blockRangeDAO, c.chainClient.ChainID, c.account) if err != nil { log.Error("loadBlockAndTransfersCommand allHistoryBlocksLoaded", "error", err) return false, err } if allBlocksLoaded { - firstHeader, err := c.blockDAO.GetFirstSavedBlock(c.chainClient.ChainID, address) + firstHeader, err := c.blockDAO.GetFirstSavedBlock(c.chainClient.ChainID, c.account) if err != nil { log.Error("loadBlocksAndTransfersCommand GetFirstSavedBlock", "error", err) return false, err @@ -513,6 +564,8 @@ func (c *loadBlocksAndTransfersCommand) areAllTransfersLoadedForAddress(address return false, nil } +// TODO - make it a common method for every service that wants head block number, that will cache the latest block +// and updates it on timeout func getHeadBlockNumber(parent context.Context, chainClient *chain.ClientWithFallback) (*big.Int, error) { ctx, cancel := context.WithTimeout(parent, 3*time.Second) head, err := chainClient.HeaderByNumber(ctx, nil) diff --git a/services/wallet/transfer/downloader.go b/services/wallet/transfer/downloader.go index b74824bf7..42c0db378 100644 --- a/services/wallet/transfer/downloader.go +++ b/services/wallet/transfer/downloader.go @@ -381,7 +381,7 @@ func (d *ERC20TransfersDownloader) blocksFromLogs(parent context.Context, logs [ // time to get logs for 100000 blocks = 1.144686979s. with 249 events in the result set. func (d *ERC20TransfersDownloader) GetHeadersInRange(parent context.Context, from, to *big.Int) ([]*DBHeader, error) { start := time.Now() - log.Debug("get erc20 transfers in range", "from", from, "to", to) + log.Debug("get erc20 transfers in range start", "chainID", d.client.ChainID, "from", from, "to", to) headers := []*DBHeader{} ctx := context.Background() for _, address := range d.accounts { @@ -405,12 +405,21 @@ func (d *ERC20TransfersDownloader) GetHeadersInRange(parent context.Context, fro if len(logs) == 0 { continue } + rst, err := d.blocksFromLogs(parent, logs, address) if err != nil { return nil, err } - headers = append(headers, rst...) + if len(rst) == 0 { + log.Warn("no headers found in logs for account", "chainID", d.client.ChainID, "address", address, "from", from, "to", to) + continue + } else { + headers = append(headers, rst...) + log.Debug("found erc20 transfers for account", "chainID", d.client.ChainID, "address", address, + "from", from, "to", to, "headers", len(headers)) + } } - log.Debug("found erc20 transfers between two blocks", "from", from, "to", to, "headers", len(headers), "took", time.Since(start)) + log.Debug("get erc20 transfers in range end", "chainID", d.client.ChainID, + "from", from, "to", to, "headers", len(headers), "took", time.Since(start)) return headers, nil } diff --git a/services/wallet/transfer/sequential_fetch_strategy.go b/services/wallet/transfer/sequential_fetch_strategy.go index dc6db6eaa..42a36db84 100644 --- a/services/wallet/transfer/sequential_fetch_strategy.go +++ b/services/wallet/transfer/sequential_fetch_strategy.go @@ -44,9 +44,9 @@ type SequentialFetchStrategy struct { } func (s *SequentialFetchStrategy) newCommand(chainClient *chain.ClientWithFallback, - accounts []common.Address) async.Commander { + account common.Address) async.Commander { - return newLoadBlocksAndTransfersCommand(accounts, s.db, s.blockDAO, chainClient, s.feed, + return newLoadBlocksAndTransfersCommand(account, s.db, s.blockDAO, chainClient, s.feed, s.transactionManager, s.tokenManager) } @@ -67,8 +67,10 @@ func (s *SequentialFetchStrategy) start() error { } for _, chainClient := range s.chainClients { - ctl := s.newCommand(chainClient, s.accounts) - s.group.Add(ctl.Command()) + for _, address := range s.accounts { + ctl := s.newCommand(chainClient, address) + s.group.Add(ctl.Command()) + } } return nil