diff --git a/services/wallet/transfer/block_ranges_sequential_dao.go b/services/wallet/transfer/block_ranges_sequential_dao.go index e1997289e..5036ca59a 100644 --- a/services/wallet/transfer/block_ranges_sequential_dao.go +++ b/services/wallet/transfer/block_ranges_sequential_dao.go @@ -11,6 +11,7 @@ import ( type BlockRangeDAOer interface { getBlockRange(chainID uint64, address common.Address) (blockRange *ethTokensBlockRanges, exists bool, err error) + getBlockRanges(chainID uint64, addresses []common.Address) (blockRanges map[common.Address]*ethTokensBlockRanges, err error) upsertRange(chainID uint64, account common.Address, newBlockRange *ethTokensBlockRanges) (err error) updateTokenRange(chainID uint64, account common.Address, newBlockRange *BlockRange) (err error) upsertEthRange(chainID uint64, account common.Address, newBlockRange *BlockRange) (err error) @@ -40,8 +41,48 @@ func newEthTokensBlockRanges() *ethTokensBlockRanges { return ðTokensBlockRanges{eth: NewBlockRange(), tokens: NewBlockRange()} } +func scanRanges(rows *sql.Rows) (map[common.Address]*ethTokensBlockRanges, error) { + blockRanges := make(map[common.Address]*ethTokensBlockRanges) + for rows.Next() { + efk := &bigint.NilableSQLBigInt{} + elk := &bigint.NilableSQLBigInt{} + es := &bigint.NilableSQLBigInt{} + tfk := &bigint.NilableSQLBigInt{} + tlk := &bigint.NilableSQLBigInt{} + ts := &bigint.NilableSQLBigInt{} + addressB := []byte{} + blockRange := newEthTokensBlockRanges() + err := rows.Scan(&addressB, es, efk, elk, ts, tfk, tlk, &blockRange.balanceCheckHash) + if err != nil { + return nil, err + } + address := common.BytesToAddress(addressB) + blockRanges[address] = blockRange + + if !es.IsNil() { + blockRanges[address].eth.Start = big.NewInt(es.Int64()) + } + if !efk.IsNil() { + blockRanges[address].eth.FirstKnown = big.NewInt(efk.Int64()) + } + if !elk.IsNil() { + blockRanges[address].eth.LastKnown = big.NewInt(elk.Int64()) + } + if !ts.IsNil() { + blockRanges[address].tokens.Start = big.NewInt(ts.Int64()) + } + if !tfk.IsNil() { + blockRanges[address].tokens.FirstKnown = big.NewInt(tfk.Int64()) + } + if !tlk.IsNil() { + blockRanges[address].tokens.LastKnown = big.NewInt(tlk.Int64()) + } + } + return blockRanges, nil +} + func (b *BlockRangeSequentialDAO) getBlockRange(chainID uint64, address common.Address) (blockRange *ethTokensBlockRanges, exists bool, err error) { - query := `SELECT blk_start, blk_first, blk_last, token_blk_start, token_blk_first, token_blk_last, balance_check_hash FROM blocks_ranges_sequential + query := `SELECT address, blk_start, blk_first, blk_last, token_blk_start, token_blk_first, token_blk_last, balance_check_hash FROM blocks_ranges_sequential WHERE address = ? AND network_id = ?` @@ -51,47 +92,47 @@ func (b *BlockRangeSequentialDAO) getBlockRange(chainID uint64, address common.A } defer rows.Close() - blockRange = newEthTokensBlockRanges() - if rows.Next() { - exists = true - efk := &bigint.NilableSQLBigInt{} - elk := &bigint.NilableSQLBigInt{} - es := &bigint.NilableSQLBigInt{} - tfk := &bigint.NilableSQLBigInt{} - tlk := &bigint.NilableSQLBigInt{} - ts := &bigint.NilableSQLBigInt{} + ranges, err := scanRanges(rows) + if err != nil { + return nil, false, err + } - err = rows.Scan(es, efk, elk, ts, tfk, tlk, &blockRange.balanceCheckHash) - - if !es.IsNil() { - blockRange.eth.Start = big.NewInt(es.Int64()) - } - if !efk.IsNil() { - blockRange.eth.FirstKnown = big.NewInt(efk.Int64()) - } - if !elk.IsNil() { - blockRange.eth.LastKnown = big.NewInt(elk.Int64()) - } - if !ts.IsNil() { - blockRange.tokens.Start = big.NewInt(ts.Int64()) - } - if !tfk.IsNil() { - blockRange.tokens.FirstKnown = big.NewInt(tfk.Int64()) - } - if !tlk.IsNil() { - blockRange.tokens.LastKnown = big.NewInt(tlk.Int64()) - } - - if err != nil { - return nil, exists, err - } - - return blockRange, exists, nil + blockRange, exists = ranges[address] + if !exists { + blockRange = newEthTokensBlockRanges() } return blockRange, exists, nil } +func (b *BlockRangeSequentialDAO) getBlockRanges(chainID uint64, addresses []common.Address) (blockRanges map[common.Address]*ethTokensBlockRanges, err error) { + blockRanges = make(map[common.Address]*ethTokensBlockRanges) + addressesPlaceholder := "" + for i := 0; i < len(addresses); i++ { + addressesPlaceholder += "?" + if i < len(addresses)-1 { + addressesPlaceholder += "," + } + } + + query := "SELECT address, blk_start, blk_first, blk_last, token_blk_start, token_blk_first, token_blk_last, balance_check_hash FROM blocks_ranges_sequential WHERE address IN (" + + addressesPlaceholder + ") AND network_id = ?" + + params := []interface{}{} + for _, address := range addresses { + params = append(params, address) + } + params = append(params, chainID) + + rows, err := b.db.Query(query, params...) + if err != nil { + return + } + defer rows.Close() + + return scanRanges(rows) +} + func (b *BlockRangeSequentialDAO) deleteRange(account common.Address) error { log.Debug("delete blocks range", "account", account) delete, err := b.db.Prepare(`DELETE FROM blocks_ranges_sequential WHERE address = ?`) diff --git a/services/wallet/transfer/commands_sequential.go b/services/wallet/transfer/commands_sequential.go index b3ffbc903..785965196 100644 --- a/services/wallet/transfer/commands_sequential.go +++ b/services/wallet/transfer/commands_sequential.go @@ -970,12 +970,33 @@ func (c *loadBlocksAndTransfersCommand) Run(parent context.Context) (err error) finiteGroup.Wait() }() - fromNum := big.NewInt(0) - headNum, err := getHeadBlockNumber(ctx, c.chainClient) + blockRanges, err := c.blockRangeDAO.getBlockRanges(c.chainClient.NetworkID(), c.accounts) if err != nil { return err } + firstScan := false + var headNum *big.Int + for _, address := range c.accounts { + blockRange, ok := blockRanges[address] + if !ok || blockRange.tokens.LastKnown == nil { + firstScan = true + break + } + + if headNum == nil || blockRange.tokens.LastKnown.Cmp(headNum) < 0 { + headNum = blockRange.tokens.LastKnown + } + } + + fromNum := big.NewInt(0) + if firstScan { + headNum, err = getHeadBlockNumber(ctx, c.chainClient) + if err != nil { + return err + } + } + // It will start loadTransfersCommand which will run until all transfers from DB are loaded or any one failed to load err = c.startFetchingTransfersForLoadedBlocks(finiteGroup) if err != nil {