[wallet] Detect non archival RPC node

This commit is contained in:
Roman Volosovskyi 2020-12-30 17:46:47 +02:00
parent 2427b3c0a5
commit d8bccaff18
No known key found for this signature in database
GPG Key ID: 0238A4B5ECEE70DE
4 changed files with 64 additions and 19 deletions

View File

@ -1 +1 @@
0.68.7 0.68.8

View File

@ -56,7 +56,15 @@ func (api *API) GetTransfersByAddress(ctx context.Context, address common.Addres
from, err := findFirstRange(ctx, address, block, api.s.client) from, err := findFirstRange(ctx, address, block, api.s.client)
if err != nil { if err != nil {
return nil, err if nonArchivalNodeError(err) {
api.s.feed.Send(Event{
Type: EventNonArchivalNodeDetected,
})
from = big.NewInt(0).Sub(block, big.NewInt(100))
} else {
log.Error("first range error", "error", err)
return nil, err
}
} }
fromByAddress := map[common.Address]*big.Int{address: from} fromByAddress := map[common.Address]*big.Int{address: from}
toByAddress := map[common.Address]*big.Int{address: block} toByAddress := map[common.Address]*big.Int{address: block}

View File

@ -4,6 +4,7 @@ import (
"context" "context"
"errors" "errors"
"math/big" "math/big"
"strings"
"time" "time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
@ -24,6 +25,7 @@ type ethHistoricalCommand struct {
balanceCache *balanceCache balanceCache *balanceCache
feed *event.Feed feed *event.Feed
foundHeaders []*DBHeader foundHeaders []*DBHeader
error error
noLimit bool noLimit bool
from, to, resultingFrom *big.Int from, to, resultingFrom *big.Int
@ -43,7 +45,8 @@ func (c *ethHistoricalCommand) Run(ctx context.Context) (err error) {
from, headers, err := findBlocksWithEthTransfers(ctx, c.client, c.balanceCache, c.eth, c.address, c.from, c.to, c.noLimit) from, headers, err := findBlocksWithEthTransfers(ctx, c.client, c.balanceCache, c.eth, c.address, c.from, c.to, c.noLimit)
if err != nil { if err != nil {
return err c.error = err
return nil
} }
c.foundHeaders = headers c.foundHeaders = headers
@ -442,16 +445,17 @@ func (c *newBlocksTransfersCommand) getTransfers(parent context.Context, header
// - runs fast indexing for each account separately // - runs fast indexing for each account separately
// - starts listening to new blocks and watches for reorgs // - starts listening to new blocks and watches for reorgs
type controlCommand struct { type controlCommand struct {
accounts []common.Address accounts []common.Address
db *Database db *Database
eth *ETHTransferDownloader eth *ETHTransferDownloader
erc20 *ERC20TransfersDownloader erc20 *ERC20TransfersDownloader
chain *big.Int chain *big.Int
client *ethclient.Client client *ethclient.Client
feed *event.Feed feed *event.Feed
safetyDepth *big.Int safetyDepth *big.Int
watchNewBlocks bool watchNewBlocks bool
errorsCount int errorsCount int
nonArchivalRPCNode bool
} }
// run fast indexing for every accont up to canonical chain head minus safety depth. // run fast indexing for every accont up to canonical chain head minus safety depth.
@ -488,6 +492,9 @@ func (c *findAndCheckBlockRangeCommand) fastIndex(ctx context.Context, bCache *b
resultingFromByAddress := map[common.Address]*big.Int{} resultingFromByAddress := map[common.Address]*big.Int{}
headers := map[common.Address][]*DBHeader{} headers := map[common.Address][]*DBHeader{}
for _, command := range commands { for _, command := range commands {
if command.error != nil {
return nil, nil, command.error
}
resultingFromByAddress[command.address] = command.resultingFrom resultingFromByAddress[command.address] = command.resultingFrom
headers[command.address] = command.foundHeaders headers[command.address] = command.foundHeaders
} }
@ -730,12 +737,16 @@ func (c *controlCommand) Run(parent context.Context) error {
return err return err
} }
fromMap, err := findFirstRanges(parent, accountsWithoutHistory, head.Number, c.client) fromMap := map[common.Address]*big.Int{}
if err != nil {
if c.NewError(err) { if !c.nonArchivalRPCNode {
return nil fromMap, err = findFirstRanges(parent, accountsWithoutHistory, head.Number, c.client)
if err != nil {
if c.NewError(err) {
return nil
}
return err
} }
return err
} }
target := new(big.Int).Sub(head.Number, c.safetyDepth) target := new(big.Int).Sub(head.Number, c.safetyDepth)
@ -751,6 +762,9 @@ func (c *controlCommand) Run(parent context.Context) error {
if !ok { if !ok {
from = fromMap[address] from = fromMap[address]
} }
if c.nonArchivalRPCNode {
from = big.NewInt(0).Sub(target, big.NewInt(100))
}
fromByAddress[address] = from fromByAddress[address] = from
toByAddress[address] = target toByAddress[address] = target
@ -776,6 +790,13 @@ func (c *controlCommand) Run(parent context.Context) error {
return err return err
} }
if cmnd.error != nil {
if c.NewError(cmnd.error) {
return nil
}
return cmnd.error
}
downloader := &ETHTransferDownloader{ downloader := &ETHTransferDownloader{
client: c.client, client: c.client,
accounts: c.accounts, accounts: c.accounts,
@ -852,9 +873,21 @@ func (c *controlCommand) Run(parent context.Context) error {
return err return err
} }
func nonArchivalNodeError(err error) bool {
return strings.Contains(err.Error(), "missing trie node") ||
strings.Contains(err.Error(), "project ID does not have access to archive state")
}
func (c *controlCommand) NewError(err error) bool { func (c *controlCommand) NewError(err error) bool {
c.errorsCount++ c.errorsCount++
log.Error("controlCommand error", "error", err, "counter", c.errorsCount) log.Error("controlCommand error", "error", err, "counter", c.errorsCount)
if nonArchivalNodeError(err) {
log.Info("Non archival node detected")
c.nonArchivalRPCNode = true
c.feed.Send(Event{
Type: EventNonArchivalNodeDetected,
})
}
if c.errorsCount >= 3 { if c.errorsCount >= 3 {
c.feed.Send(Event{ c.feed.Send(Event{
Type: EventFetchingHistoryError, Type: EventFetchingHistoryError,
@ -994,6 +1027,7 @@ type findAndCheckBlockRangeCommand struct {
toByAddress map[common.Address]*big.Int toByAddress map[common.Address]*big.Int
foundHeaders map[common.Address][]*DBHeader foundHeaders map[common.Address][]*DBHeader
noLimit bool noLimit bool
error error
} }
func (c *findAndCheckBlockRangeCommand) Command() Command { func (c *findAndCheckBlockRangeCommand) Command() Command {
@ -1007,7 +1041,8 @@ func (c *findAndCheckBlockRangeCommand) Run(parent context.Context) (err error)
log.Debug("start findAndCHeckBlockRangeCommand") log.Debug("start findAndCHeckBlockRangeCommand")
newFromByAddress, ethHeadersByAddress, err := c.fastIndex(parent, c.balanceCache, c.fromByAddress, c.toByAddress) newFromByAddress, ethHeadersByAddress, err := c.fastIndex(parent, c.balanceCache, c.fromByAddress, c.toByAddress)
if err != nil { if err != nil {
return err c.error = err
return nil
} }
if c.noLimit { if c.noLimit {
newFromByAddress = map[common.Address]*big.Int{} newFromByAddress = map[common.Address]*big.Int{}

View File

@ -23,6 +23,8 @@ const (
EventRecentHistoryReady EventType = "recent-history-ready" EventRecentHistoryReady EventType = "recent-history-ready"
// EventRecentHistoryError emitted when fetching of tx history failed // EventRecentHistoryError emitted when fetching of tx history failed
EventFetchingHistoryError EventType = "fetching-history-error" EventFetchingHistoryError EventType = "fetching-history-error"
// EventNonArchivalNodeDetected emitted when a connection to a non archival node is detected
EventNonArchivalNodeDetected EventType = "non-archival-node-detected"
) )
// Event is a type for wallet events. // Event is a type for wallet events.