[wallet] Merge blocks ranges when possible

- old existing ranges are merged when wallet service is started
- a new range is merged with an existing one if possible

This will decrease the number of entries in blocks_range table as
currently it can grow indefinitely (@flexsurfer reported 23307 entries).

This change is also needed for further optimisations of RPC usage.
This commit is contained in:
Roman Volosovskyi 2021-02-05 16:20:26 +02:00
parent d862b042ae
commit 8268083008
No known key found for this signature in database
GPG Key ID: 0238A4B5ECEE70DE
5 changed files with 343 additions and 13 deletions

View File

@ -1 +1 @@
0.70.0
0.70.1

View File

@ -1261,6 +1261,11 @@ func (b *GethStatusBackend) startWallet(watchNewBlocks bool) error {
}
}
err = wallet.MergeBlocksRanges(allAddresses, b.statusNode.Config().NetworkID)
if err != nil {
return err
}
return wallet.StartReactor(
b.statusNode.RPCClient().Ethclient(),
allAddresses,

View File

@ -116,6 +116,7 @@ func (db Database) ProcessBlocks(account common.Address, from *big.Int, to *big.
var (
tx *sql.Tx
)
tx, err = db.db.Begin()
if err != nil {
return err
@ -133,7 +134,7 @@ func (db Database) ProcessBlocks(account common.Address, from *big.Int, to *big.
return
}
err = insertRange(tx, account, db.network, from, to)
err = upsertRange(tx, account, db.network, from, to)
if err != nil {
return
}
@ -726,8 +727,6 @@ func deleteHeaders(creator statementCreator, headers []*DBHeader) error {
return err
}
for _, h := range headers {
k := h.Hash
log.Debug("foo", "k", k)
_, err = delete.Exec(h.Hash)
if err != nil {
return err
@ -790,22 +789,196 @@ func insertBlocksWithTransactions(creator statementCreator, account common.Addre
return nil
}
func insertRange(creator statementCreator, account common.Address, network uint64, from *big.Int, to *big.Int) (err error) {
func upsertRange(creator statementCreator, account common.Address, network uint64, from *big.Int, to *big.Int) (err error) {
update, err := creator.Prepare(`UPDATE blocks_ranges
SET blk_to = ?
WHERE address = ?
AND network_id = ?
AND blk_to = ?`)
if err != nil {
return err
}
res, err := update.Exec((*SQLBigInt)(to), account, network, (*SQLBigInt)(from))
if err != nil {
return err
}
affected, err := res.RowsAffected()
if err != nil {
return err
}
if affected == 0 {
insert, err := creator.Prepare("INSERT INTO blocks_ranges (network_id, address, blk_from, blk_to) VALUES (?, ?, ?, ?)")
if err != nil {
return err
}
_, err = insert.Exec(network, account, (*SQLBigInt)(from), (*SQLBigInt)(to))
if err != nil {
return err
}
}
return
}
type BlocksRange struct {
from *big.Int
to *big.Int
}
func (db *Database) getOldRanges(account common.Address, network uint64) ([]*BlocksRange, error) {
query := `select blk_from, blk_to from blocks_ranges
where address = ?
and network_id = ?
order by blk_from`
rows, err := db.db.Query(query, account, db.network)
if err != nil {
return nil, err
}
defer rows.Close()
ranges := []*BlocksRange{}
for rows.Next() {
from := &big.Int{}
to := &big.Int{}
err = rows.Scan((*SQLBigInt)(from), (*SQLBigInt)(to))
if err != nil {
return nil, err
}
ranges = append(ranges, &BlocksRange{
from: from,
to: to,
})
}
return ranges, nil
}
func getNewRanges(ranges []*BlocksRange) ([]*BlocksRange, []*BlocksRange) {
initValue := big.NewInt(-1)
prevFrom := big.NewInt(-1)
prevTo := big.NewInt(-1)
hasMergedRanges := false
var newRanges []*BlocksRange
var deletedRanges []*BlocksRange
for idx, blocksRange := range ranges {
if prevTo.Cmp(initValue) == 0 {
prevTo = blocksRange.to
prevFrom = blocksRange.from
} else if prevTo.Cmp(blocksRange.from) >= 0 {
hasMergedRanges = true
deletedRanges = append(deletedRanges, ranges[idx-1])
if prevTo.Cmp(blocksRange.to) <= 0 {
prevTo = blocksRange.to
}
} else {
if hasMergedRanges {
deletedRanges = append(deletedRanges, ranges[idx-1])
newRanges = append(newRanges, &BlocksRange{
from: prevFrom,
to: prevTo,
})
}
log.Info("blocks ranges gap detected", "from", prevTo, "to", blocksRange.from)
hasMergedRanges = false
prevFrom = blocksRange.from
prevTo = blocksRange.to
}
}
if hasMergedRanges {
deletedRanges = append(deletedRanges, ranges[len(ranges)-1])
newRanges = append(newRanges, &BlocksRange{
from: prevFrom,
to: prevTo,
})
}
return newRanges, deletedRanges
}
func (db *Database) mergeRanges(account common.Address, network uint64) (err error) {
var (
tx *sql.Tx
)
ranges, err := db.getOldRanges(account, network)
if err != nil {
return err
}
log.Info("merge old ranges", "account", account, "network", network, "ranges", len(ranges))
if len(ranges) <= 1 {
return nil
}
tx, err = db.db.Begin()
if err != nil {
return err
}
defer func() {
if err == nil {
err = tx.Commit()
return
}
_ = tx.Rollback()
}()
newRanges, deletedRanges := getNewRanges(ranges)
for _, rangeToDelete := range deletedRanges {
err = deleteRange(tx, account, network, rangeToDelete.from, rangeToDelete.to)
if err != nil {
return err
}
}
for _, newRange := range newRanges {
err = insertRange(tx, account, network, newRange.from, newRange.to)
if err != nil {
return err
}
}
return nil
}
func deleteRange(creator statementCreator, account common.Address, network uint64, from *big.Int, to *big.Int) error {
log.Info("delete blocks range", "account", account, "network", network, "from", from, "to", to)
delete, err := creator.Prepare(`DELETE FROM blocks_ranges
WHERE address = ?
AND network_id = ?
AND blk_from = ?
AND blk_to = ?`)
if err != nil {
log.Info("some error", "error", err)
return err
}
_, err = delete.Exec(account, network, (*SQLBigInt)(from), (*SQLBigInt)(to))
return err
}
func insertRange(creator statementCreator, account common.Address, network uint64, from *big.Int, to *big.Int) error {
log.Info("insert blocks range", "account", account, "network", network, "from", from, "to", to)
insert, err := creator.Prepare("INSERT INTO blocks_ranges (network_id, address, blk_from, blk_to) VALUES (?, ?, ?, ?)")
if err != nil {
return err
}
_, err = insert.Exec(network, account, (*SQLBigInt)(from), (*SQLBigInt)(to))
if err != nil {
return err
}
return
return err
}
func updateOrInsertTransfers(creator statementCreator, network uint64, transfers []Transfer) error {
update, err := creator.Prepare(`UPDATE transfers
update, err := creator.Prepare(`UPDATE transfers
SET tx = ?, sender = ?, receipt = ?, timestamp = ?, loaded = 1
WHERE address =? AND hash = ?`)
if err != nil {
@ -813,8 +986,8 @@ func updateOrInsertTransfers(creator statementCreator, network uint64, transfers
}
insert, err := creator.Prepare(`INSERT OR IGNORE INTO transfers
(network_id, hash, blk_hash, blk_number, timestamp, address, tx, sender, receipt, log, type, loaded)
VALUES
(network_id, hash, blk_hash, blk_number, timestamp, address, tx, sender, receipt, log, type, loaded)
VALUES
(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 1)`)
if err != nil {
return err

View File

@ -255,3 +255,143 @@ func TestPendingTransactions(t *testing.T) {
require.Equal(t, 0, len(rst))
}
func TestGetNewRanges(t *testing.T) {
ranges := []*BlocksRange{
&BlocksRange{
from: big.NewInt(0),
to: big.NewInt(10),
},
&BlocksRange{
from: big.NewInt(10),
to: big.NewInt(20),
},
}
n, d := getNewRanges(ranges)
require.Equal(t, 1, len(n))
newRange := n[0]
require.Equal(t, int64(0), newRange.from.Int64())
require.Equal(t, int64(20), newRange.to.Int64())
require.Equal(t, 2, len(d))
ranges = []*BlocksRange{
&BlocksRange{
from: big.NewInt(0),
to: big.NewInt(11),
},
&BlocksRange{
from: big.NewInt(10),
to: big.NewInt(20),
},
}
n, d = getNewRanges(ranges)
require.Equal(t, 1, len(n))
newRange = n[0]
require.Equal(t, int64(0), newRange.from.Int64())
require.Equal(t, int64(20), newRange.to.Int64())
require.Equal(t, 2, len(d))
ranges = []*BlocksRange{
&BlocksRange{
from: big.NewInt(0),
to: big.NewInt(20),
},
&BlocksRange{
from: big.NewInt(5),
to: big.NewInt(15),
},
}
n, d = getNewRanges(ranges)
require.Equal(t, 1, len(n))
newRange = n[0]
require.Equal(t, int64(0), newRange.from.Int64())
require.Equal(t, int64(20), newRange.to.Int64())
require.Equal(t, 2, len(d))
ranges = []*BlocksRange{
&BlocksRange{
from: big.NewInt(5),
to: big.NewInt(15),
},
&BlocksRange{
from: big.NewInt(5),
to: big.NewInt(20),
},
}
n, d = getNewRanges(ranges)
require.Equal(t, 1, len(n))
newRange = n[0]
require.Equal(t, int64(5), newRange.from.Int64())
require.Equal(t, int64(20), newRange.to.Int64())
require.Equal(t, 2, len(d))
ranges = []*BlocksRange{
&BlocksRange{
from: big.NewInt(5),
to: big.NewInt(10),
},
&BlocksRange{
from: big.NewInt(15),
to: big.NewInt(20),
},
}
n, d = getNewRanges(ranges)
require.Equal(t, 0, len(n))
require.Equal(t, 0, len(d))
ranges = []*BlocksRange{
&BlocksRange{
from: big.NewInt(0),
to: big.NewInt(10),
},
&BlocksRange{
from: big.NewInt(10),
to: big.NewInt(20),
},
&BlocksRange{
from: big.NewInt(30),
to: big.NewInt(40),
},
}
n, d = getNewRanges(ranges)
require.Equal(t, 1, len(n))
newRange = n[0]
require.Equal(t, int64(0), newRange.from.Int64())
require.Equal(t, int64(20), newRange.to.Int64())
require.Equal(t, 2, len(d))
ranges = []*BlocksRange{
&BlocksRange{
from: big.NewInt(0),
to: big.NewInt(10),
},
&BlocksRange{
from: big.NewInt(10),
to: big.NewInt(20),
},
&BlocksRange{
from: big.NewInt(30),
to: big.NewInt(40),
},
&BlocksRange{
from: big.NewInt(40),
to: big.NewInt(50),
},
}
n, d = getNewRanges(ranges)
require.Equal(t, 2, len(n))
newRange = n[0]
require.Equal(t, int64(0), newRange.from.Int64())
require.Equal(t, int64(20), newRange.to.Int64())
newRange = n[1]
require.Equal(t, int64(30), newRange.from.Int64())
require.Equal(t, int64(50), newRange.to.Int64())
require.Equal(t, 4, len(d))
}

View File

@ -56,6 +56,18 @@ func (s *Service) SetClient(client *ethclient.Client) {
s.client = client
}
// MergeBlocksRanges merge old blocks ranges if possible
func (s *Service) MergeBlocksRanges(accounts []common.Address, chain uint64) error {
for _, account := range accounts {
err := s.db.mergeRanges(account, chain)
if err != nil {
return err
}
}
return nil
}
// StartReactor separately because it requires known ethereum address, which will become available only after login.
func (s *Service) StartReactor(client *ethclient.Client, accounts []common.Address, chain *big.Int, watchNewBlocks bool) error {
reactor := NewReactor(s.db, s.feed, client, chain, watchNewBlocks)