[wallet] Reduce number of RPC requests

- Wallet service is not started on foreground event on status-go side
  anymore, it leaves a client side opportunity to decide whether new
  blocks should be watched.
- `watchNewBlocks` parameter is added to `StartWallet`.
- Some requests are removed/moved to the place where they are necessary.
This commit is contained in:
Roman Volosovskyi 2020-11-17 17:54:31 +02:00
parent a4195e5b5c
commit 002f9a5597
No known key found for this signature in database
GPG Key ID: 0238A4B5ECEE70DE
12 changed files with 141 additions and 115 deletions

View File

@ -1 +1 @@
0.63.12
0.64.0

View File

@ -882,26 +882,7 @@ func (b *GethStatusBackend) AppStateChange(state string) {
b.log.Info("App State changed", "new-state", s)
b.appState = s
if s == appStateForeground && !b.forceStopWallet {
wallet, err := b.statusNode.WalletService()
if err != nil {
b.log.Error("Retrieving of wallet service failed on app state change to active", "error", err)
return
}
if !wallet.IsStarted() {
err = wallet.Start(b.statusNode.Server())
if err != nil {
b.log.Error("Wallet service start failed on app state change to active", "error", err)
return
}
err = b.startWallet()
if err != nil {
b.log.Error("Wallet reactor start failed on app state change to active", "error", err)
return
}
}
} else if s == appStateBackground && !b.forceStopWallet {
if s == appStateBackground {
localNotifications, err := b.statusNode.LocalNotificationsService()
if err != nil {
b.log.Error("Retrieving of local notifications service failed on app state change", "error", err)
@ -947,7 +928,7 @@ func (b *GethStatusBackend) StopWallet() error {
return nil
}
func (b *GethStatusBackend) StartWallet() error {
func (b *GethStatusBackend) StartWallet(watchNewBlocks bool) error {
wallet, err := b.statusNode.WalletService()
if err != nil {
b.log.Error("Retrieving of wallet service failed on StartWallet", "error", err)
@ -960,7 +941,7 @@ func (b *GethStatusBackend) StartWallet() error {
return nil
}
err = b.startWallet()
err = b.startWallet(watchNewBlocks)
if err != nil {
b.log.Error("Wallet reactor start failed on StartWallet", "error", err)
return nil
@ -1187,7 +1168,7 @@ func (b *GethStatusBackend) injectAccountIntoServices() error {
return nil
}
func (b *GethStatusBackend) startWallet() error {
func (b *GethStatusBackend) startWallet(watchNewBlocks bool) error {
if !b.statusNode.Config().WalletConfig.Enabled {
return nil
}
@ -1197,7 +1178,11 @@ func (b *GethStatusBackend) startWallet() error {
return err
}
watchAddresses := b.accountManager.WatchAddresses()
accountsDB := accounts.NewDB(b.appDB)
watchAddresses, err := accountsDB.GetWalletAddresses()
if err != nil {
return err
}
mainAccountAddress, err := b.accountManager.MainAccountAddress()
if err != nil {
@ -1220,7 +1205,8 @@ func (b *GethStatusBackend) startWallet() error {
return wallet.StartReactor(
b.statusNode.RPCClient().Ethclient(),
allAddresses,
new(big.Int).SetUint64(b.statusNode.Config().NetworkID))
new(big.Int).SetUint64(b.statusNode.Config().NetworkID),
watchNewBlocks)
}
// InjectChatAccount selects the current chat account using chatKeyHex and injects the key into whisper.

View File

@ -518,8 +518,8 @@ func StopWallet() string {
}
// StartWallet
func StartWallet() string {
err := statusBackend.StartWallet()
func StartWallet(watchNewBlocks bool) string {
err := statusBackend.StartWallet(watchNewBlocks)
return makeJSONResponse(err)
}

View File

@ -54,11 +54,12 @@ type Notification struct {
// TransactionEvent - structure used to pass messages from wallet to bus
type TransactionEvent struct {
Type string `json:"type"`
BlockNumber *big.Int `json:"block-number"`
Accounts []common.Address `json:"accounts"`
NewTransactionsPerAccount map[common.Address]int `json:"new-transactions"`
ERC20 bool `json:"erc20"`
Type string `json:"type"`
BlockNumber *big.Int `json:"block-number"`
Accounts []common.Address `json:"accounts"`
NewTransactionsPerAccount map[common.Address]int `json:"new-transactions"`
ERC20 bool `json:"erc20"`
MaxKnownBlocks map[common.Address]*big.Int `json:"max-known-blocks"`
}
// MessageEvent - structure used to pass messages from chat to bus
@ -165,15 +166,17 @@ func (s *Service) transactionsHandler(payload TransactionEvent) {
limit := 20
if payload.BlockNumber != nil {
for _, address := range payload.Accounts {
log.Info("Handled transfer for address", "info", address)
transfers, err := s.walletDB.GetTransfersByAddressAndBlock(address, payload.BlockNumber, int64(limit))
if err != nil {
log.Error("Could not fetch transfers", "error", err)
}
if payload.BlockNumber.Cmp(payload.MaxKnownBlocks[address]) == 1 {
log.Info("Handled transfer for address", "info", address)
transfers, err := s.walletDB.GetTransfersByAddressAndBlock(address, payload.BlockNumber, int64(limit))
if err != nil {
log.Error("Could not fetch transfers", "error", err)
}
for _, transaction := range transfers {
n := s.buildTransactionNotification(transaction)
pushMessage(n)
for _, transaction := range transfers {
n := s.buildTransactionNotification(transaction)
pushMessage(n)
}
}
}
}
@ -215,6 +218,7 @@ func (s *Service) StartWalletWatcher() {
s.walletTransmitter.wg.Add(1)
maxKnownBlocks := map[common.Address]*big.Int{}
go func() {
defer s.walletTransmitter.wg.Done()
for {
@ -230,14 +234,21 @@ func (s *Service) StartWalletWatcher() {
}
return
case event := <-events:
if event.Type == wallet.EventNewBlock {
if event.Type == wallet.EventNewBlock && len(maxKnownBlocks) > 0 {
s.transmitter.publisher.Send(TransactionEvent{
Type: string(event.Type),
BlockNumber: event.BlockNumber,
Accounts: event.Accounts,
NewTransactionsPerAccount: event.NewTransactionsPerAccount,
ERC20: event.ERC20,
MaxKnownBlocks: maxKnownBlocks,
})
} else if event.Type == wallet.EventMaxKnownBlock {
for _, address := range event.Accounts {
if _, ok := maxKnownBlocks[address]; !ok {
maxKnownBlocks[address] = event.BlockNumber
}
}
}
}
}

View File

@ -105,6 +105,12 @@ func TestTransactionNotification(t *testing.T) {
require.NoError(t, walletDb.ProcessBlocks(header.Address, big.NewInt(1), big.NewInt(1), []*wallet.DBHeader{header}))
require.NoError(t, walletDb.ProcessTranfers(transfers, []*wallet.DBHeader{}))
feed.Send(wallet.Event{
Type: wallet.EventMaxKnownBlock,
BlockNumber: big.NewInt(0),
Accounts: []common.Address{header.Address},
})
feed.Send(wallet.Event{
Type: wallet.EventNewBlock,
BlockNumber: header.Number,

View File

@ -442,14 +442,15 @@ func (c *newBlocksTransfersCommand) getTransfers(parent context.Context, header
// - runs fast indexing for each account separately
// - starts listening to new blocks and watches for reorgs
type controlCommand struct {
accounts []common.Address
db *Database
eth *ETHTransferDownloader
erc20 *ERC20TransfersDownloader
chain *big.Int
client *ethclient.Client
feed *event.Feed
safetyDepth *big.Int
accounts []common.Address
db *Database
eth *ETHTransferDownloader
erc20 *ERC20TransfersDownloader
chain *big.Int
client *ethclient.Client
feed *event.Feed
safetyDepth *big.Int
watchNewBlocks bool
}
// run fast indexing for every accont up to canonical chain head minus safety depth.
@ -558,7 +559,7 @@ func loadTransfers(ctx context.Context, accounts []common.Address, db *Database,
}
for _, block := range blocks {
erc20 := &transfersCommand{
transfers := &transfersCommand{
db: db,
client: client,
address: address,
@ -570,8 +571,8 @@ func loadTransfers(ctx context.Context, accounts []common.Address, db *Database,
},
block: block,
}
commands = append(commands, erc20)
group.Add(erc20.Command())
commands = append(commands, transfers)
group.Add(transfers.Command())
}
}
select {
@ -731,12 +732,6 @@ func (c *controlCommand) Run(parent context.Context) error {
if target.Cmp(zero) <= 0 {
target = zero
}
ctx, cancel = context.WithTimeout(parent, 3*time.Second)
head, err = c.client.HeaderByNumber(ctx, target)
cancel()
if err != nil {
return err
}
fromByAddress := map[common.Address]*big.Int{}
toByAddress := map[common.Address]*big.Int{}
@ -748,7 +743,7 @@ func (c *controlCommand) Run(parent context.Context) error {
}
fromByAddress[address] = from
toByAddress[address] = head.Number
toByAddress[address] = target
}
bCache := newBalanceCache()
@ -779,30 +774,56 @@ func (c *controlCommand) Run(parent context.Context) error {
return err
}
for _, address := range c.accounts {
for _, header := range cmnd.foundHeaders[address] {
c.feed.Send(Event{
Type: EventNewBlock,
BlockNumber: header.Number,
Accounts: []common.Address{address},
NewTransactionsPerAccount: map[common.Address]int{address: 20},
})
}
}
c.feed.Send(Event{
Type: EventRecentHistoryReady,
Accounts: c.accounts,
BlockNumber: head.Number,
BlockNumber: target,
})
log.Info("watching new blocks", "start from", head.Number)
cmd := &newBlocksTransfersCommand{
db: c.db,
chain: c.chain,
client: c.client,
accounts: c.accounts,
eth: c.eth,
erc20: c.erc20,
feed: c.feed,
initialFrom: toDBHeader(head),
from: toDBHeader(head),
lastFetchedBlockTime: time.Now(),
}
if c.watchNewBlocks {
ctx, cancel = context.WithTimeout(parent, 3*time.Second)
head, err = c.client.HeaderByNumber(ctx, target)
cancel()
if err != nil {
return err
}
err = cmd.Command()(parent)
if err != nil {
log.Warn("error on running newBlocksTransfersCommand", "err", err)
return err
log.Info("watching new blocks", "start from", target)
cmd := &newBlocksTransfersCommand{
db: c.db,
chain: c.chain,
client: c.client,
accounts: c.accounts,
eth: c.eth,
erc20: c.erc20,
feed: c.feed,
initialFrom: toDBHeader(head),
from: toDBHeader(head),
lastFetchedBlockTime: time.Now(),
}
err = cmd.Command()(parent)
if err != nil {
log.Warn("error on running newBlocksTransfersCommand", "err", err)
return err
}
} else {
c.feed.Send(Event{
Type: EventNewBlock,
BlockNumber: target,
Accounts: []common.Address{},
})
}
log.Info("end control command")
@ -965,6 +986,7 @@ func (c *findAndCheckBlockRangeCommand) Run(parent context.Context) (err error)
}
foundHeaders := map[common.Address][]*DBHeader{}
maxBlockNumber := big.NewInt(0)
for _, address := range c.accounts {
ethHeaders := ethHeadersByAddress[address]
erc20Headers := erc20HeadersByAddress[address]
@ -972,6 +994,12 @@ func (c *findAndCheckBlockRangeCommand) Run(parent context.Context) (err error)
allHeaders := append(ethHeaders, erc20Headers...)
foundHeaders[address] = allHeaders
for _, header := range allHeaders {
if header.Number.Cmp(maxBlockNumber) == 1 {
maxBlockNumber = header.Number
}
}
log.Debug("saving headers", "len", len(allHeaders), "address")
err = c.db.ProcessBlocks(address, newFromByAddress[address], c.toByAddress[address], allHeaders)
if err != nil {
@ -979,6 +1007,12 @@ func (c *findAndCheckBlockRangeCommand) Run(parent context.Context) (err error)
}
}
c.feed.Send(Event{
Type: EventMaxKnownBlock,
BlockNumber: maxBlockNumber,
Accounts: c.accounts,
})
c.foundHeaders = foundHeaders
return

View File

@ -110,12 +110,6 @@ func checkRanges(parent context.Context, client reactorClient, cache BalanceCach
}
if lb.Cmp(hb) == 0 {
log.Debug("balances are equal", "from", from, "to", to)
// In case if balances are equal but non zero we want to check if
// eth_getTransactionCount return different values, because there
// still might be transactions
if lb.Cmp(zero) != 0 {
return nil
}
ln, err := client.NonceAt(ctx, account, from)
if err != nil {

View File

@ -11,7 +11,8 @@ type EventType string
const (
// EventNewBlock emitted when new block was added to the same canonical chan.
EventNewBlock EventType = "newblock"
EventNewBlock EventType = "newblock"
EventMaxKnownBlock EventType = "maxKnownBlock"
// EventReorg emitted when canonical chain was changed. In this case, BlockNumber will be an earliest added block.
EventReorg EventType = "reorg"
// EventNewHistory emitted if transfer from older block was added.

View File

@ -4,7 +4,6 @@ import (
"context"
"errors"
"math/big"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
@ -70,22 +69,14 @@ func (d *IterativeDownloader) Next(parent context.Context) ([]*DBHeader, *big.In
from = d.from
}
log.Info("load erc20 transfers in range", "from", from, "to", to)
headres, err := d.downloader.GetHeadersInRange(parent, from, to)
headers, err := d.downloader.GetHeadersInRange(parent, from, to)
if err != nil {
log.Error("failed to get transfer in between two bloks", "from", from, "to", to, "error", err)
return nil, nil, nil, err
}
// use integers instead of DBHeader
ctx, cancel := context.WithTimeout(parent, 5*time.Second)
header, err := d.client.HeaderByNumber(ctx, from)
cancel()
if err != nil {
log.Error("failed to get header by number", "from", d.from, "to", to, "error", err)
return nil, nil, nil, err
}
d.previous, d.to = d.to, header.Number
return headres, d.from, to, nil
d.previous, d.to = d.to, from
return headers, d.from, to, nil
}
// Revert reverts last step progress. Should be used if application failed to process transfers.

View File

@ -32,7 +32,7 @@ func pollingPeriodByChain(chain *big.Int) time.Duration {
func reorgSafetyDepth(chain *big.Int) *big.Int {
switch chain.Int64() {
case int64(params.MainNetworkID):
return big.NewInt(5)
return big.NewInt(2)
case int64(params.RopstenNetworkID):
return big.NewInt(15)
default:
@ -63,21 +63,23 @@ type reactorClient interface {
}
// NewReactor creates instance of the Reactor.
func NewReactor(db *Database, feed *event.Feed, client *ethclient.Client, chain *big.Int) *Reactor {
func NewReactor(db *Database, feed *event.Feed, client *ethclient.Client, chain *big.Int, watchNewBlocks bool) *Reactor {
return &Reactor{
db: db,
client: client,
feed: feed,
chain: chain,
db: db,
client: client,
feed: feed,
chain: chain,
watchNewBlocks: watchNewBlocks,
}
}
// Reactor listens to new blocks and stores transfers into the database.
type Reactor struct {
client *ethclient.Client
db *Database
feed *event.Feed
chain *big.Int
client *ethclient.Client
db *Database
feed *event.Feed
chain *big.Int
watchNewBlocks bool
mu sync.Mutex
group *Group
@ -96,9 +98,10 @@ func (r *Reactor) newControlCommand(accounts []common.Address) *controlCommand {
signer: signer,
db: r.db,
},
erc20: NewERC20TransfersDownloader(r.client, accounts, signer),
feed: r.feed,
safetyDepth: reorgSafetyDepth(r.chain),
erc20: NewERC20TransfersDownloader(r.client, accounts, signer),
feed: r.feed,
safetyDepth: reorgSafetyDepth(r.chain),
watchNewBlocks: r.watchNewBlocks,
}
return ctl

View File

@ -52,8 +52,8 @@ func (s *Service) GetFeed() *event.Feed {
}
// StartReactor separately because it requires known ethereum address, which will become available only after login.
func (s *Service) StartReactor(client *ethclient.Client, accounts []common.Address, chain *big.Int) error {
reactor := NewReactor(s.db, s.feed, client, chain)
func (s *Service) StartReactor(client *ethclient.Client, accounts []common.Address, chain *big.Int, watchNewBlocks bool) error {
reactor := NewReactor(s.db, s.feed, client, chain, watchNewBlocks)
err := reactor.Start(accounts)
if err != nil {
return err

View File

@ -54,7 +54,7 @@ func (s *ReactorChangesSuite) SetupTest() {
s.backend, err = testchain.NewBackend()
s.Require().NoError(err)
s.feed = &event.Feed{}
s.reactor = NewReactor(s.db, &event.Feed{}, s.backend.Client, big.NewInt(1337))
s.reactor = NewReactor(s.db, &event.Feed{}, s.backend.Client, big.NewInt(1337), true)
account, err := crypto.GenerateKey()
s.Require().NoError(err)
s.first = crypto.PubkeyToAddress(account.PublicKey)
@ -123,13 +123,13 @@ func TestServiceStartStop(t *testing.T) {
account, err := crypto.GenerateKey()
require.NoError(t, err)
err = s.StartReactor(backend.Client, []common.Address{crypto.PubkeyToAddress(account.PublicKey)}, big.NewInt(1337))
err = s.StartReactor(backend.Client, []common.Address{crypto.PubkeyToAddress(account.PublicKey)}, big.NewInt(1337), true)
require.NoError(t, err)
require.NoError(t, s.Stop())
require.NoError(t, s.Start(nil))
err = s.StartReactor(backend.Client, []common.Address{crypto.PubkeyToAddress(account.PublicKey)}, big.NewInt(1337))
err = s.StartReactor(backend.Client, []common.Address{crypto.PubkeyToAddress(account.PublicKey)}, big.NewInt(1337), true)
require.NoError(t, err)
require.NoError(t, s.Stop())
}