[status-im/status-react#9927] Fast blocks sync after delay

- In order to avoid handling of the reorganized blocks we use an offset
from the latest known block when start listening to new blocks. Before
this commit the offset was 15 blocks for all networks. This offset is
too big for mainnet and causes noticeable delay of marking a transfer as
confirmed in Status (comparing to etherscan). So it was changed to be 5
blocks on mainnet and is still 15 blocks on other networks.
- Also before this commit all new blocks were handled one by one with
network specific interval (10s for mainnet), which means that in case of
lost internet connection or application suspension (happens on iOS)
receiving of new blocks would be paused and then resumed with the same
"speed" - 1 blocks per 10s. In case if that pause is big enough the
application would never catch up with the latest block in the network,
and this also causes the state of transfers to be delayed in the
application. In this commit in case if there was more than 40s delay
after receiving of the previous block the whole history in range between
the previous received block and ("latest"-reorgeSafetyDepth) block is
checked at once and app catches up with a recent state of the chain.
This commit is contained in:
Roman Volosovskyi 2020-01-29 11:08:42 +02:00
parent 8931b14c4e
commit c2f22f1fbc
No known key found for this signature in database
GPG Key ID: 0238A4B5ECEE70DE
7 changed files with 280 additions and 109 deletions

View File

@ -1 +1 @@
0.41.0
0.41.1

View File

@ -14,6 +14,7 @@ import (
)
var numberOfBlocksCheckedPerIteration = 40
var blocksDelayThreshhold = 40 * time.Second
type ethHistoricalCommand struct {
db *Database
@ -23,6 +24,7 @@ type ethHistoricalCommand struct {
balanceCache *balanceCache
feed *event.Feed
foundHeaders []*DBHeader
noLimit bool
from, to, resultingFrom *big.Int
}
@ -38,7 +40,7 @@ func (c *ethHistoricalCommand) Run(ctx context.Context) (err error) {
start := time.Now()
totalRequests, cacheHits := c.balanceCache.getStats(c.address)
log.Info("balance cache before checking range", "total", totalRequests, "cached", totalRequests-cacheHits)
from, headers, err := findBlocksWithEthTransfers(ctx, c.client, c.balanceCache, c.eth, c.address, c.from, c.to)
from, headers, err := findBlocksWithEthTransfers(ctx, c.client, c.balanceCache, c.eth, c.address, c.from, c.to, c.noLimit)
if err != nil {
return err
@ -111,13 +113,14 @@ func (c *erc20HistoricalCommand) Run(ctx context.Context) (err error) {
}
type newBlocksTransfersCommand struct {
db *Database
accounts []common.Address
chain *big.Int
erc20 *ERC20TransfersDownloader
eth *ETHTransferDownloader
client reactorClient
feed *event.Feed
db *Database
accounts []common.Address
chain *big.Int
erc20 *ERC20TransfersDownloader
eth *ETHTransferDownloader
client reactorClient
feed *event.Feed
lastFetchedBlockTime time.Time
initialFrom, from, to *DBHeader
}
@ -150,36 +153,101 @@ func (c *newBlocksTransfersCommand) Verify(parent context.Context) (err error) {
return nil
}
func (c *newBlocksTransfersCommand) getAllTransfers(parent context.Context, from, to uint64) ([]Transfer, map[common.Address][]*DBHeader, error) {
newHeadersByAddress := map[common.Address][]*DBHeader{}
all := []Transfer{}
for n := from; n <= to; n++ {
ctx, cancel := context.WithTimeout(parent, 10*time.Second)
header, err := c.client.HeaderByNumber(ctx, big.NewInt(int64(n)))
cancel()
if err != nil {
return nil, nil, err
func (c *newBlocksTransfersCommand) getAllTransfers(parent context.Context, from, to uint64) (map[common.Address][]Transfer, error) {
transfersByAddress := map[common.Address][]Transfer{}
if to-from > reorgSafetyDepth(c.chain).Uint64() {
fromByAddress := map[common.Address]*big.Int{}
toByAddress := map[common.Address]*big.Int{}
for _, account := range c.accounts {
fromByAddress[account] = new(big.Int).SetUint64(from)
toByAddress[account] = new(big.Int).SetUint64(to)
}
dbHeader := toDBHeader(header)
log.Info("reactor get transfers", "block", dbHeader.Hash, "number", dbHeader.Number)
transfers, err := c.getTransfers(parent, dbHeader)
if err != nil {
log.Error("failed to get transfers", "header", dbHeader.Hash, "error", err)
return nil, nil, err
balanceCache := newBalanceCache()
blocksCommand := &findAndCheckBlockRangeCommand{
accounts: c.accounts,
db: c.db,
chain: c.chain,
client: c.eth.client,
balanceCache: balanceCache,
feed: c.feed,
fromByAddress: fromByAddress,
toByAddress: toByAddress,
noLimit: true,
}
if len(transfers) > 0 {
for _, transfer := range transfers {
headers, ok := newHeadersByAddress[transfer.Address]
if !ok {
headers = []*DBHeader{}
}
newHeadersByAddress[transfer.Address] = append(headers, dbHeader)
if err := blocksCommand.Command()(parent); err != nil {
return nil, err
}
for address, headers := range blocksCommand.foundHeaders {
blocks := make([]*big.Int, len(headers))
for i, header := range headers {
blocks[i] = header.Number
}
txCommand := &loadTransfersCommand{
accounts: []common.Address{address},
db: c.db,
chain: c.chain,
client: c.erc20.client,
blocksByAddress: map[common.Address][]*big.Int{address: blocks},
}
err := txCommand.Command()(parent)
if err != nil {
return nil, err
}
transfersByAddress[address] = txCommand.foundTransfersByAddress[address]
}
} else {
all := []Transfer{}
newHeadersByAddress := map[common.Address][]*DBHeader{}
for n := from; n <= to; n++ {
ctx, cancel := context.WithTimeout(parent, 10*time.Second)
header, err := c.client.HeaderByNumber(ctx, big.NewInt(int64(n)))
cancel()
if err != nil {
return nil, err
}
dbHeader := toDBHeader(header)
log.Info("reactor get transfers", "block", dbHeader.Hash, "number", dbHeader.Number)
transfers, err := c.getTransfers(parent, dbHeader)
if err != nil {
log.Error("failed to get transfers", "header", dbHeader.Hash, "error", err)
return nil, err
}
if len(transfers) > 0 {
for _, transfer := range transfers {
headers, ok := newHeadersByAddress[transfer.Address]
if !ok {
headers = []*DBHeader{}
}
transfers, ok := transfersByAddress[transfer.Address]
if !ok {
transfers = []Transfer{}
}
transfersByAddress[transfer.Address] = append(transfers, transfer)
newHeadersByAddress[transfer.Address] = append(headers, dbHeader)
}
}
all = append(all, transfers...)
}
err := c.saveHeaders(parent, newHeadersByAddress)
if err != nil {
return nil, err
}
err = c.db.ProcessTranfers(all, nil)
if err != nil {
log.Error("failed to persist transfers", "error", err)
return nil, err
}
all = append(all, transfers...)
}
return all, newHeadersByAddress, nil
return transfersByAddress, nil
}
func (c *newBlocksTransfersCommand) saveHeaders(parent context.Context, newHeadersByAddress map[common.Address][]*DBHeader) (err error) {
@ -197,6 +265,32 @@ func (c *newBlocksTransfersCommand) saveHeaders(parent context.Context, newHeade
return nil
}
func (c *newBlocksTransfersCommand) checkDelay(parent context.Context, nextHeader *types.Header) (*types.Header, error) {
if time.Since(c.lastFetchedBlockTime) > blocksDelayThreshhold {
log.Info("There was a delay before loading next block", "time since previous successful fetching", time.Since(c.lastFetchedBlockTime))
ctx, cancel := context.WithTimeout(parent, 5*time.Second)
latestHeader, err := c.client.HeaderByNumber(ctx, nil)
cancel()
if err != nil {
log.Warn("failed to get latest block", "number", nextHeader.Number, "error", err)
return nil, err
}
diff := new(big.Int).Sub(latestHeader.Number, nextHeader.Number)
if diff.Cmp(reorgSafetyDepth(c.chain)) >= 0 {
num := new(big.Int).Sub(latestHeader.Number, reorgSafetyDepth(c.chain))
ctx, cancel := context.WithTimeout(parent, 5*time.Second)
nextHeader, err = c.client.HeaderByNumber(ctx, num)
cancel()
if err != nil {
log.Warn("failed to get next block", "number", num, "error", err)
return nil, err
}
}
}
return nextHeader, nil
}
func (c *newBlocksTransfersCommand) Run(parent context.Context) (err error) {
if c.from == nil {
ctx, cancel := context.WithTimeout(parent, 3*time.Second)
@ -210,28 +304,35 @@ func (c *newBlocksTransfersCommand) Run(parent context.Context) (err error) {
}
num := new(big.Int).Add(c.from.Number, one)
ctx, cancel := context.WithTimeout(parent, 5*time.Second)
latest, err := c.client.HeaderByNumber(ctx, num)
nextHeader, err := c.client.HeaderByNumber(ctx, num)
cancel()
if err != nil {
log.Warn("failed to get latest block", "number", num, "error", err)
log.Warn("failed to get next block", "number", num, "error", err)
return err
}
log.Info("reactor received new block", "header", num)
ctx, cancel = context.WithTimeout(parent, 10*time.Second)
latestHeader, removed, latestValidSavedBlock, reorgSpotted, err := c.onNewBlock(ctx, c.from, latest)
cancel()
nextHeader, err = c.checkDelay(parent, nextHeader)
if err != nil {
log.Error("failed to process new header", "header", latest, "error", err)
return err
}
if latestHeader == nil && len(removed) == 0 {
log.Info("new block already in the database", "block", latest.Number)
return nil
ctx, cancel = context.WithTimeout(parent, 10*time.Second)
latestHeader, removed, latestValidSavedBlock, reorgSpotted, err := c.onNewBlock(ctx, c.from, nextHeader)
cancel()
if err != nil {
log.Error("failed to process new header", "header", nextHeader, "error", err)
return err
}
err = c.db.ProcessTranfers(nil, removed)
if err != nil {
return err
}
latestHeader.Loaded = true
fromN := latest.Number.Uint64()
fromN := nextHeader.Number.Uint64()
if reorgSpotted {
if latestValidSavedBlock != nil {
@ -242,38 +343,30 @@ func (c *newBlocksTransfersCommand) Run(parent context.Context) (err error) {
}
}
toN := latestHeader.Number.Uint64()
all, newHeadersByAddress, err := c.getAllTransfers(parent, fromN, toN)
all, err := c.getAllTransfers(parent, fromN, toN)
if err != nil {
return err
}
err = c.saveHeaders(parent, newHeadersByAddress)
if err != nil {
return err
}
err = c.db.ProcessTranfers(all, removed)
if err != nil {
log.Error("failed to persist transfers", "error", err)
return err
}
c.from = toDBHeader(latest)
c.from = toDBHeader(nextHeader)
c.lastFetchedBlockTime = time.Now()
if len(removed) != 0 {
lth := len(removed)
c.feed.Send(Event{
Type: EventReorg,
BlockNumber: removed[lth-1].Number,
Accounts: uniqueAccountsFromTransfers(all),
Accounts: uniqueAccountsFromHeaders(removed),
})
}
log.Info("before sending new block event", "latest", latestHeader != nil, "removed", len(removed), "len", len(uniqueAccountsFromTransfers(all)))
if latestHeader != nil && len(removed) == 0 {
c.feed.Send(Event{
Type: EventNewBlock,
BlockNumber: latestHeader.Number,
Accounts: uniqueAccountsFromTransfers(all),
})
}
c.feed.Send(Event{
Type: EventNewBlock,
BlockNumber: latestHeader.Number,
Accounts: uniqueAccountsFromTransfers(all),
NewTransactionsPerAccount: transfersPerAccount(all),
})
return nil
}
@ -378,9 +471,10 @@ func (c *findAndCheckBlockRangeCommand) fastIndex(ctx context.Context, bCache *b
signer: types.NewEIP155Signer(c.chain),
db: c.db,
},
feed: c.feed,
from: fromByAddress[address],
to: toByAddress[address],
feed: c.feed,
from: fromByAddress[address],
to: toByAddress[address],
noLimit: c.noLimit,
}
commands[i] = eth
group.Add(eth.Command())
@ -451,10 +545,11 @@ func getTransfersByBlocks(ctx context.Context, db *Database, downloader *ETHTran
return allTransfers, nil
}
func loadTransfers(ctx context.Context, accounts []common.Address, db *Database, client *ethclient.Client, chain *big.Int, limit int, blocksByAddress map[common.Address][]*big.Int) error {
func loadTransfers(ctx context.Context, accounts []common.Address, db *Database, client *ethclient.Client, chain *big.Int, limit int, blocksByAddress map[common.Address][]*big.Int) (map[common.Address][]Transfer, error) {
start := time.Now()
group := NewGroup(ctx)
commands := []*transfersCommand{}
for _, address := range accounts {
blocks, ok := blocksByAddress[address]
@ -475,19 +570,35 @@ func loadTransfers(ctx context.Context, accounts []common.Address, db *Database,
},
block: block,
}
commands = append(commands, erc20)
group.Add(erc20.Command())
}
}
select {
case <-ctx.Done():
return ctx.Err()
return nil, ctx.Err()
case <-group.WaitAsync():
transfersByAddress := map[common.Address][]Transfer{}
for _, command := range commands {
if len(command.fetchedTransfers) == 0 {
continue
}
transfers, ok := transfersByAddress[command.address]
if !ok {
transfers = []Transfer{}
}
for _, transfer := range command.fetchedTransfers {
transfersByAddress[command.address] = append(transfers, transfer)
}
}
log.Info("loadTransfers finished", "in", time.Since(start))
return nil
return transfersByAddress, nil
}
}
func (c *controlCommand) LoadTransfers(ctx context.Context, downloader *ETHTransferDownloader, limit int) error {
func (c *controlCommand) LoadTransfers(ctx context.Context, downloader *ETHTransferDownloader, limit int) (map[common.Address][]Transfer, error) {
return loadTransfers(ctx, c.accounts, c.db, c.client, c.chain, limit, make(map[common.Address][]*big.Int))
}
@ -663,7 +774,7 @@ func (c *controlCommand) Run(parent context.Context) error {
signer: types.NewEIP155Signer(c.chain),
db: c.db,
}
err = c.LoadTransfers(parent, downloader, 40)
_, err = c.LoadTransfers(parent, downloader, 40)
if err != nil {
return err
}
@ -676,15 +787,16 @@ func (c *controlCommand) Run(parent context.Context) error {
log.Info("watching new blocks", "start from", head.Number)
cmd := &newBlocksTransfersCommand{
db: c.db,
chain: c.chain,
client: c.client,
accounts: c.accounts,
eth: c.eth,
erc20: c.erc20,
feed: c.feed,
initialFrom: toDBHeader(head),
from: toDBHeader(head),
db: c.db,
chain: c.chain,
client: c.client,
accounts: c.accounts,
eth: c.eth,
erc20: c.erc20,
feed: c.feed,
initialFrom: toDBHeader(head),
from: toDBHeader(head),
lastFetchedBlockTime: time.Now(),
}
err = cmd.Command()(parent)
@ -704,26 +816,54 @@ func (c *controlCommand) Command() Command {
}.Run
}
func uniqueAccountsFromTransfers(transfers []Transfer) []common.Address {
func uniqueAccountsFromTransfers(allTransfers map[common.Address][]Transfer) []common.Address {
accounts := []common.Address{}
unique := map[common.Address]struct{}{}
for i := range transfers {
_, exist := unique[transfers[i].Address]
for address, transfers := range allTransfers {
if len(transfers) == 0 {
continue
}
_, exist := unique[address]
if exist {
continue
}
unique[transfers[i].Address] = struct{}{}
accounts = append(accounts, transfers[i].Address)
unique[address] = struct{}{}
accounts = append(accounts, address)
}
return accounts
}
func transfersPerAccount(allTransfers map[common.Address][]Transfer) map[common.Address]int {
res := map[common.Address]int{}
for address, transfers := range allTransfers {
res[address] = len(transfers)
}
return res
}
func uniqueAccountsFromHeaders(headers []*DBHeader) []common.Address {
accounts := []common.Address{}
unique := map[common.Address]struct{}{}
for i := range headers {
_, exist := unique[headers[i].Address]
if exist {
continue
}
unique[headers[i].Address] = struct{}{}
accounts = append(accounts, headers[i].Address)
}
return accounts
}
type transfersCommand struct {
db *Database
eth *ETHTransferDownloader
block *big.Int
address common.Address
client reactorClient
db *Database
eth *ETHTransferDownloader
block *big.Int
address common.Address
client reactorClient
fetchedTransfers []Transfer
}
func (c *transfersCommand) Command() Command {
@ -746,16 +886,18 @@ func (c *transfersCommand) Run(ctx context.Context) (err error) {
return err
}
c.fetchedTransfers = allTransfers
log.Debug("transfers loaded", "address", c.address, "len", len(allTransfers))
return nil
}
type loadTransfersCommand struct {
accounts []common.Address
db *Database
chain *big.Int
client *ethclient.Client
blocksByAddress map[common.Address][]*big.Int
accounts []common.Address
db *Database
chain *big.Int
client *ethclient.Client
blocksByAddress map[common.Address][]*big.Int
foundTransfersByAddress map[common.Address][]Transfer
}
func (c *loadTransfersCommand) Command() Command {
@ -765,7 +907,7 @@ func (c *loadTransfersCommand) Command() Command {
}.Run
}
func (c *loadTransfersCommand) LoadTransfers(ctx context.Context, downloader *ETHTransferDownloader, limit int, blocksByAddress map[common.Address][]*big.Int) error {
func (c *loadTransfersCommand) LoadTransfers(ctx context.Context, downloader *ETHTransferDownloader, limit int, blocksByAddress map[common.Address][]*big.Int) (map[common.Address][]Transfer, error) {
return loadTransfers(ctx, c.accounts, c.db, c.client, c.chain, limit, blocksByAddress)
}
@ -776,10 +918,11 @@ func (c *loadTransfersCommand) Run(parent context.Context) (err error) {
signer: types.NewEIP155Signer(c.chain),
db: c.db,
}
err = c.LoadTransfers(parent, downloader, 40, c.blocksByAddress)
transfersByAddress, err := c.LoadTransfers(parent, downloader, 40, c.blocksByAddress)
if err != nil {
return err
}
c.foundTransfersByAddress = transfersByAddress
return
}
@ -793,6 +936,8 @@ type findAndCheckBlockRangeCommand struct {
feed *event.Feed
fromByAddress map[common.Address]*big.Int
toByAddress map[common.Address]*big.Int
foundHeaders map[common.Address][]*DBHeader
noLimit bool
}
func (c *findAndCheckBlockRangeCommand) Command() Command {
@ -808,16 +953,24 @@ func (c *findAndCheckBlockRangeCommand) Run(parent context.Context) (err error)
if err != nil {
return err
}
if c.noLimit {
newFromByAddress = map[common.Address]*big.Int{}
for _, address := range c.accounts {
newFromByAddress[address] = c.fromByAddress[address]
}
}
erc20HeadersByAddress, err := c.fastIndexErc20(parent, newFromByAddress, c.toByAddress)
if err != nil {
return err
}
foundHeaders := map[common.Address][]*DBHeader{}
for _, address := range c.accounts {
ethHeaders := ethHeadersByAddress[address]
erc20Headers := erc20HeadersByAddress[address]
allHeaders := append(ethHeaders, erc20Headers...)
foundHeaders[address] = allHeaders
log.Debug("saving headers", "len", len(allHeaders), "address")
err = c.db.ProcessBlocks(address, newFromByAddress[address], c.toByAddress[address], allHeaders)
@ -826,5 +979,7 @@ func (c *findAndCheckBlockRangeCommand) Run(parent context.Context) (err error)
}
}
c.foundHeaders = foundHeaders
return
}

View File

@ -54,6 +54,7 @@ func (s *NewBlocksSuite) SetupTest() {
},
feed: s.feed,
client: s.backend.Client,
chain: big.NewInt(1777),
}
}
@ -153,7 +154,11 @@ func (s *NewBlocksSuite) TestReorg() {
s.Require().EqualError(s.runCmdUntilError(ctx), "not found")
close(events)
expected := []Event{{Type: EventReorg, BlockNumber: big.NewInt(21)}, {Type: EventNewBlock, BlockNumber: big.NewInt(25)}}
expected := []Event{
{Type: EventReorg, BlockNumber: big.NewInt(21)},
{Type: EventNewBlock, BlockNumber: big.NewInt(24)},
{Type: EventNewBlock, BlockNumber: big.NewInt(25)},
}
i := 0
for ev := range events {
s.Require().Equal(expected[i].Type, ev.Type)
@ -161,7 +166,7 @@ func (s *NewBlocksSuite) TestReorg() {
i++
}
transfers, err = s.db.GetTransfers(big.NewInt(0), nil)
transfers, err = s.db.GetTransfers(nil, nil)
s.Require().NoError(err)
s.Require().Len(transfers, 10)
}

View File

@ -166,7 +166,7 @@ func checkRanges(parent context.Context, client reactorClient, cache BalanceCach
return c.GetRanges(), c.GetHeaders(), nil
}
func findBlocksWithEthTransfers(parent context.Context, client reactorClient, cache BalanceCache, downloader TransferDownloader, account common.Address, low, high *big.Int) (from *big.Int, headers []*DBHeader, err error) {
func findBlocksWithEthTransfers(parent context.Context, client reactorClient, cache BalanceCache, downloader TransferDownloader, account common.Address, low, high *big.Int, noLimit bool) (from *big.Int, headers []*DBHeader, err error) {
ranges := [][]*big.Int{{low, high}}
minBlock := big.NewInt(low.Int64())
headers = []*DBHeader{}
@ -184,7 +184,7 @@ func findBlocksWithEthTransfers(parent context.Context, client reactorClient, ca
if len(newRanges) > 0 {
log.Debug("found new ranges", "account", account, "lvl", lvl, "new ranges len", len(newRanges))
}
if len(newRanges) > 60 {
if len(newRanges) > 60 && !noLimit {
sort.SliceStable(newRanges, func(i, j int) bool {
return newRanges[i][0].Cmp(newRanges[j][0]) == 1
})

View File

@ -129,7 +129,7 @@ func TestConcurrentEthDownloader(t *testing.T) {
concurrent := NewConcurrentDownloader(ctx)
_, headers, _ := findBlocksWithEthTransfers(
ctx, tc.options.balances, newBalanceCache(), tc.options.batches,
common.Address{}, zero, tc.options.last)
common.Address{}, zero, tc.options.last, false)
concurrent.Wait()
require.NoError(t, concurrent.Error())
rst := concurrent.Get()

View File

@ -24,8 +24,9 @@ const (
// Event is a type for wallet events.
type Event struct {
Type EventType `json:"type"`
BlockNumber *big.Int `json:"blockNumber"`
Accounts []common.Address `json:"accounts"`
ERC20 bool `json:"erc20"`
Type EventType `json:"type"`
BlockNumber *big.Int `json:"blockNumber"`
Accounts []common.Address `json:"accounts"`
NewTransactionsPerAccount map[common.Address]int `json:"newTransactions"`
ERC20 bool `json:"erc20"`
}

View File

@ -23,14 +23,24 @@ func pollingPeriodByChain(chain *big.Int) time.Duration {
case int64(params.MainNetworkID):
return 10 * time.Second
case int64(params.RopstenNetworkID):
return 2 * time.Second
return 4 * time.Second
default:
return 500 * time.Millisecond
}
}
func reorgSafetyDepth(chain *big.Int) *big.Int {
switch chain.Int64() {
case int64(params.MainNetworkID):
return big.NewInt(5)
case int64(params.RopstenNetworkID):
return big.NewInt(15)
default:
return big.NewInt(15)
}
}
var (
reorgSafetyDepth = big.NewInt(15)
erc20BatchSize = big.NewInt(100000)
errAlreadyRunning = errors.New("already running")
)
@ -88,7 +98,7 @@ func (r *Reactor) newControlCommand(accounts []common.Address) *controlCommand {
},
erc20: NewERC20TransfersDownloader(r.client, accounts, signer),
feed: r.feed,
safetyDepth: reorgSafetyDepth,
safetyDepth: reorgSafetyDepth(r.chain),
}
return ctl