parent
5972cd0fde
commit
6f753617de
|
@ -11,6 +11,7 @@ import (
|
||||||
|
|
||||||
type BlockRangeDAOer interface {
|
type BlockRangeDAOer interface {
|
||||||
getBlockRange(chainID uint64, address common.Address) (blockRange *ethTokensBlockRanges, exists bool, err error)
|
getBlockRange(chainID uint64, address common.Address) (blockRange *ethTokensBlockRanges, exists bool, err error)
|
||||||
|
getBlockRanges(chainID uint64, addresses []common.Address) (blockRanges map[common.Address]*ethTokensBlockRanges, err error)
|
||||||
upsertRange(chainID uint64, account common.Address, newBlockRange *ethTokensBlockRanges) (err error)
|
upsertRange(chainID uint64, account common.Address, newBlockRange *ethTokensBlockRanges) (err error)
|
||||||
updateTokenRange(chainID uint64, account common.Address, newBlockRange *BlockRange) (err error)
|
updateTokenRange(chainID uint64, account common.Address, newBlockRange *BlockRange) (err error)
|
||||||
upsertEthRange(chainID uint64, account common.Address, newBlockRange *BlockRange) (err error)
|
upsertEthRange(chainID uint64, account common.Address, newBlockRange *BlockRange) (err error)
|
||||||
|
@ -40,8 +41,48 @@ func newEthTokensBlockRanges() *ethTokensBlockRanges {
|
||||||
return ðTokensBlockRanges{eth: NewBlockRange(), tokens: NewBlockRange()}
|
return ðTokensBlockRanges{eth: NewBlockRange(), tokens: NewBlockRange()}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func scanRanges(rows *sql.Rows) (map[common.Address]*ethTokensBlockRanges, error) {
|
||||||
|
blockRanges := make(map[common.Address]*ethTokensBlockRanges)
|
||||||
|
for rows.Next() {
|
||||||
|
efk := &bigint.NilableSQLBigInt{}
|
||||||
|
elk := &bigint.NilableSQLBigInt{}
|
||||||
|
es := &bigint.NilableSQLBigInt{}
|
||||||
|
tfk := &bigint.NilableSQLBigInt{}
|
||||||
|
tlk := &bigint.NilableSQLBigInt{}
|
||||||
|
ts := &bigint.NilableSQLBigInt{}
|
||||||
|
addressB := []byte{}
|
||||||
|
blockRange := newEthTokensBlockRanges()
|
||||||
|
err := rows.Scan(&addressB, es, efk, elk, ts, tfk, tlk, &blockRange.balanceCheckHash)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
address := common.BytesToAddress(addressB)
|
||||||
|
blockRanges[address] = blockRange
|
||||||
|
|
||||||
|
if !es.IsNil() {
|
||||||
|
blockRanges[address].eth.Start = big.NewInt(es.Int64())
|
||||||
|
}
|
||||||
|
if !efk.IsNil() {
|
||||||
|
blockRanges[address].eth.FirstKnown = big.NewInt(efk.Int64())
|
||||||
|
}
|
||||||
|
if !elk.IsNil() {
|
||||||
|
blockRanges[address].eth.LastKnown = big.NewInt(elk.Int64())
|
||||||
|
}
|
||||||
|
if !ts.IsNil() {
|
||||||
|
blockRanges[address].tokens.Start = big.NewInt(ts.Int64())
|
||||||
|
}
|
||||||
|
if !tfk.IsNil() {
|
||||||
|
blockRanges[address].tokens.FirstKnown = big.NewInt(tfk.Int64())
|
||||||
|
}
|
||||||
|
if !tlk.IsNil() {
|
||||||
|
blockRanges[address].tokens.LastKnown = big.NewInt(tlk.Int64())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return blockRanges, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (b *BlockRangeSequentialDAO) getBlockRange(chainID uint64, address common.Address) (blockRange *ethTokensBlockRanges, exists bool, err error) {
|
func (b *BlockRangeSequentialDAO) getBlockRange(chainID uint64, address common.Address) (blockRange *ethTokensBlockRanges, exists bool, err error) {
|
||||||
query := `SELECT blk_start, blk_first, blk_last, token_blk_start, token_blk_first, token_blk_last, balance_check_hash FROM blocks_ranges_sequential
|
query := `SELECT address, blk_start, blk_first, blk_last, token_blk_start, token_blk_first, token_blk_last, balance_check_hash FROM blocks_ranges_sequential
|
||||||
WHERE address = ?
|
WHERE address = ?
|
||||||
AND network_id = ?`
|
AND network_id = ?`
|
||||||
|
|
||||||
|
@ -51,47 +92,47 @@ func (b *BlockRangeSequentialDAO) getBlockRange(chainID uint64, address common.A
|
||||||
}
|
}
|
||||||
defer rows.Close()
|
defer rows.Close()
|
||||||
|
|
||||||
blockRange = newEthTokensBlockRanges()
|
ranges, err := scanRanges(rows)
|
||||||
if rows.Next() {
|
if err != nil {
|
||||||
exists = true
|
return nil, false, err
|
||||||
efk := &bigint.NilableSQLBigInt{}
|
}
|
||||||
elk := &bigint.NilableSQLBigInt{}
|
|
||||||
es := &bigint.NilableSQLBigInt{}
|
|
||||||
tfk := &bigint.NilableSQLBigInt{}
|
|
||||||
tlk := &bigint.NilableSQLBigInt{}
|
|
||||||
ts := &bigint.NilableSQLBigInt{}
|
|
||||||
|
|
||||||
err = rows.Scan(es, efk, elk, ts, tfk, tlk, &blockRange.balanceCheckHash)
|
blockRange, exists = ranges[address]
|
||||||
|
if !exists {
|
||||||
if !es.IsNil() {
|
blockRange = newEthTokensBlockRanges()
|
||||||
blockRange.eth.Start = big.NewInt(es.Int64())
|
|
||||||
}
|
|
||||||
if !efk.IsNil() {
|
|
||||||
blockRange.eth.FirstKnown = big.NewInt(efk.Int64())
|
|
||||||
}
|
|
||||||
if !elk.IsNil() {
|
|
||||||
blockRange.eth.LastKnown = big.NewInt(elk.Int64())
|
|
||||||
}
|
|
||||||
if !ts.IsNil() {
|
|
||||||
blockRange.tokens.Start = big.NewInt(ts.Int64())
|
|
||||||
}
|
|
||||||
if !tfk.IsNil() {
|
|
||||||
blockRange.tokens.FirstKnown = big.NewInt(tfk.Int64())
|
|
||||||
}
|
|
||||||
if !tlk.IsNil() {
|
|
||||||
blockRange.tokens.LastKnown = big.NewInt(tlk.Int64())
|
|
||||||
}
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
return nil, exists, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return blockRange, exists, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return blockRange, exists, nil
|
return blockRange, exists, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (b *BlockRangeSequentialDAO) getBlockRanges(chainID uint64, addresses []common.Address) (blockRanges map[common.Address]*ethTokensBlockRanges, err error) {
|
||||||
|
blockRanges = make(map[common.Address]*ethTokensBlockRanges)
|
||||||
|
addressesPlaceholder := ""
|
||||||
|
for i := 0; i < len(addresses); i++ {
|
||||||
|
addressesPlaceholder += "?"
|
||||||
|
if i < len(addresses)-1 {
|
||||||
|
addressesPlaceholder += ","
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
query := "SELECT address, blk_start, blk_first, blk_last, token_blk_start, token_blk_first, token_blk_last, balance_check_hash FROM blocks_ranges_sequential WHERE address IN (" +
|
||||||
|
addressesPlaceholder + ") AND network_id = ?"
|
||||||
|
|
||||||
|
params := []interface{}{}
|
||||||
|
for _, address := range addresses {
|
||||||
|
params = append(params, address)
|
||||||
|
}
|
||||||
|
params = append(params, chainID)
|
||||||
|
|
||||||
|
rows, err := b.db.Query(query, params...)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
|
||||||
|
return scanRanges(rows)
|
||||||
|
}
|
||||||
|
|
||||||
func (b *BlockRangeSequentialDAO) deleteRange(account common.Address) error {
|
func (b *BlockRangeSequentialDAO) deleteRange(account common.Address) error {
|
||||||
log.Debug("delete blocks range", "account", account)
|
log.Debug("delete blocks range", "account", account)
|
||||||
delete, err := b.db.Prepare(`DELETE FROM blocks_ranges_sequential WHERE address = ?`)
|
delete, err := b.db.Prepare(`DELETE FROM blocks_ranges_sequential WHERE address = ?`)
|
||||||
|
|
|
@ -970,12 +970,33 @@ func (c *loadBlocksAndTransfersCommand) Run(parent context.Context) (err error)
|
||||||
finiteGroup.Wait()
|
finiteGroup.Wait()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
fromNum := big.NewInt(0)
|
blockRanges, err := c.blockRangeDAO.getBlockRanges(c.chainClient.NetworkID(), c.accounts)
|
||||||
headNum, err := getHeadBlockNumber(ctx, c.chainClient)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
firstScan := false
|
||||||
|
var headNum *big.Int
|
||||||
|
for _, address := range c.accounts {
|
||||||
|
blockRange, ok := blockRanges[address]
|
||||||
|
if !ok || blockRange.tokens.LastKnown == nil {
|
||||||
|
firstScan = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
if headNum == nil || blockRange.tokens.LastKnown.Cmp(headNum) < 0 {
|
||||||
|
headNum = blockRange.tokens.LastKnown
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fromNum := big.NewInt(0)
|
||||||
|
if firstScan {
|
||||||
|
headNum, err = getHeadBlockNumber(ctx, c.chainClient)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// It will start loadTransfersCommand which will run until all transfers from DB are loaded or any one failed to load
|
// It will start loadTransfersCommand which will run until all transfers from DB are loaded or any one failed to load
|
||||||
err = c.startFetchingTransfersForLoadedBlocks(finiteGroup)
|
err = c.startFetchingTransfersForLoadedBlocks(finiteGroup)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
Loading…
Reference in New Issue