Watch new accounts aftter they were saved to accounts table (#1569)

* Watch new accounts once they are saved in accounts table

* Add test that reactor can be restarted and watch new accounts
This commit is contained in:
Dmitry Shulyak 2019-08-28 10:49:03 +03:00 committed by GitHub
parent cf3dc0664c
commit 0165b028c9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 223 additions and 37 deletions

View File

@ -15,6 +15,7 @@ import (
"github.com/ethereum/go-ethereum/common/hexutil"
ethcrypto "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
gethnode "github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p/enode"
@ -287,9 +288,9 @@ func (b *StatusBackend) subscriptionService() gethnode.ServiceConstructor {
}
}
func (b *StatusBackend) accountsService() gethnode.ServiceConstructor {
func (b *StatusBackend) accountsService(accountsFeed *event.Feed) gethnode.ServiceConstructor {
return func(*gethnode.ServiceContext) (gethnode.Service, error) {
return accountssvc.NewService(accounts.NewDB(b.appDB), b.multiaccountsDB, b.accountManager), nil
return accountssvc.NewService(accounts.NewDB(b.appDB), b.multiaccountsDB, b.accountManager, accountsFeed), nil
}
}
@ -305,9 +306,9 @@ func (b *StatusBackend) permissionsService() gethnode.ServiceConstructor {
}
}
func (b *StatusBackend) walletService(network uint64) gethnode.ServiceConstructor {
func (b *StatusBackend) walletService(network uint64, accountsFeed *event.Feed) gethnode.ServiceConstructor {
return func(*gethnode.ServiceContext) (gethnode.Service, error) {
return wallet.NewService(wallet.NewDB(b.appDB, network)), nil
return wallet.NewService(wallet.NewDB(b.appDB, network), accountsFeed), nil
}
}
@ -322,13 +323,14 @@ func (b *StatusBackend) startNode(config *params.NodeConfig) (err error) {
if err := config.Validate(); err != nil {
return err
}
accountsFeed := &event.Feed{}
services := []gethnode.ServiceConstructor{}
services = appendIf(config.UpstreamConfig.Enabled, services, b.rpcFiltersService())
services = append(services, b.subscriptionService())
services = appendIf(b.appDB != nil && b.multiaccountsDB != nil, services, b.accountsService())
services = appendIf(b.appDB != nil && b.multiaccountsDB != nil, services, b.accountsService(accountsFeed))
services = appendIf(config.BrowsersConfig.Enabled, services, b.browsersService())
services = appendIf(config.PermissionsConfig.Enabled, services, b.permissionsService())
services = appendIf(config.WalletConfig.Enabled, services, b.walletService(config.NetworkID))
services = appendIf(config.WalletConfig.Enabled, services, b.walletService(config.NetworkID, accountsFeed))
manager := b.accountManager.GetManager()
if manager == nil {

View File

@ -3,20 +3,27 @@ package accounts
import (
"context"
"github.com/ethereum/go-ethereum/event"
"github.com/status-im/status-go/multiaccounts/accounts"
)
func NewAccountsAPI(db *accounts.Database) *API {
return &API{db}
func NewAccountsAPI(db *accounts.Database, feed *event.Feed) *API {
return &API{db, feed}
}
// API is class with methods available over RPC.
type API struct {
db *accounts.Database
db *accounts.Database
feed *event.Feed
}
func (api *API) SaveAccounts(ctx context.Context, accounts []accounts.Account) error {
return api.db.SaveAccounts(accounts)
err := api.db.SaveAccounts(accounts)
if err != nil {
return err
}
api.feed.Send(accounts)
return nil
}
func (api *API) GetAccounts(ctx context.Context) ([]accounts.Account, error) {

View File

@ -1,6 +1,7 @@
package accounts
import (
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/rpc"
"github.com/status-im/status-go/account"
@ -9,8 +10,8 @@ import (
)
// NewService initializes service instance.
func NewService(db *accounts.Database, mdb *multiaccounts.Database, manager *account.Manager) *Service {
return &Service{db, mdb, manager}
func NewService(db *accounts.Database, mdb *multiaccounts.Database, manager *account.Manager, feed *event.Feed) *Service {
return &Service{db, mdb, manager, feed}
}
// Service is a browsers service.
@ -18,6 +19,7 @@ type Service struct {
db *accounts.Database
mdb *multiaccounts.Database
manager *account.Manager
feed *event.Feed
}
// Start a service.
@ -41,7 +43,7 @@ func (s *Service) APIs() []rpc.API {
{
Namespace: "accounts",
Version: "0.1.0",
Service: NewAccountsAPI(s.db),
Service: NewAccountsAPI(s.db, s.feed),
},
{
Namespace: "multiaccounts",

View File

@ -15,6 +15,10 @@ type FiniteCommand struct {
}
func (c FiniteCommand) Run(ctx context.Context) error {
err := c.Runable(ctx)
if err == nil {
return nil
}
ticker := time.NewTicker(c.Interval)
for {
select {
@ -36,6 +40,7 @@ type InfiniteCommand struct {
}
func (c InfiniteCommand) Run(ctx context.Context) error {
_ = c.Runable(ctx)
ticker := time.NewTicker(c.Interval)
for {
select {

View File

@ -47,6 +47,9 @@ type TransferDownloader interface {
func downloadEthConcurrently(c *ConcurrentDownloader, client BalanceReader, downloader TransferDownloader, account common.Address, low, high *big.Int) {
c.Add(func(ctx context.Context) error {
if low.Cmp(high) >= 0 {
return nil
}
log.Debug("eth transfers comparing blocks", "low", low, "high", high)
lb, err := client.BalanceAt(ctx, account, low)
if err != nil {

View File

@ -50,30 +50,28 @@ type reactorClient interface {
}
// NewReactor creates instance of the Reactor.
func NewReactor(db *Database, feed *event.Feed, client *ethclient.Client, accounts []common.Address, chain *big.Int) *Reactor {
func NewReactor(db *Database, feed *event.Feed, client *ethclient.Client, chain *big.Int) *Reactor {
return &Reactor{
db: db,
client: client,
feed: feed,
accounts: accounts,
chain: chain,
db: db,
client: client,
feed: feed,
chain: chain,
}
}
// Reactor listens to new blocks and stores transfers into the database.
type Reactor struct {
client *ethclient.Client
db *Database
feed *event.Feed
accounts []common.Address
chain *big.Int
client *ethclient.Client
db *Database
feed *event.Feed
chain *big.Int
mu sync.Mutex
group *Group
}
// Start runs reactor loop in background.
func (r *Reactor) Start() error {
func (r *Reactor) Start(accounts []common.Address) error {
r.mu.Lock()
defer r.mu.Unlock()
if r.group != nil {
@ -81,20 +79,17 @@ func (r *Reactor) Start() error {
}
r.group = NewGroup(context.Background())
signer := types.NewEIP155Signer(r.chain)
// TODO(dshulyak) to support adding accounts in runtime implement keyed group
// and export private api to start downloaders from accounts
// private api should have access only to reactor
ctl := &controlCommand{
db: r.db,
chain: r.chain,
client: r.client,
accounts: r.accounts,
accounts: accounts,
eth: &ETHTransferDownloader{
client: r.client,
accounts: r.accounts,
accounts: accounts,
signer: signer,
},
erc20: NewERC20TransfersDownloader(r.client, r.accounts, signer),
erc20: NewERC20TransfersDownloader(r.client, accounts, signer),
feed: r.feed,
safetyDepth: reorgSafetyDepth,
}

View File

@ -1,6 +1,7 @@
package wallet
import (
"context"
"math/big"
"github.com/ethereum/go-ethereum/common"
@ -9,15 +10,17 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/rpc"
"github.com/status-im/status-go/multiaccounts/accounts"
)
// NewService initializes service instance.
func NewService(db *Database) *Service {
func NewService(db *Database, accountsFeed *event.Feed) *Service {
feed := &event.Feed{}
return &Service{
db: db,
feed: feed,
signals: &SignalsTransmitter{publisher: feed},
db: db,
feed: feed,
signals: &SignalsTransmitter{publisher: feed},
accountsFeed: accountsFeed,
}
}
@ -28,22 +31,29 @@ type Service struct {
reactor *Reactor
signals *SignalsTransmitter
client *ethclient.Client
group *Group
accountsFeed *event.Feed
}
// Start signals transmitter.
func (s *Service) Start(*p2p.Server) error {
s.group = NewGroup(context.Background())
return s.signals.Start()
}
// StartReactor separately because it requires known ethereum address, which will become available only after login.
func (s *Service) StartReactor(client *ethclient.Client, accounts []common.Address, chain *big.Int) error {
reactor := NewReactor(s.db, s.feed, client, accounts, chain)
err := reactor.Start()
reactor := NewReactor(s.db, s.feed, client, chain)
err := reactor.Start(accounts)
if err != nil {
return err
}
s.reactor = reactor
s.client = client
s.group.Add(func(ctx context.Context) error {
return WatchAccountsChanges(ctx, s.accountsFeed, accounts, reactor)
})
return nil
}
@ -53,6 +63,8 @@ func (s *Service) StopReactor() error {
return nil
}
s.reactor.Stop()
s.group.Stop()
s.group.Wait()
return nil
}
@ -61,6 +73,11 @@ func (s *Service) Stop() error {
log.Info("wallet will be stopped")
err := s.StopReactor()
s.signals.Stop()
if s.group != nil {
s.group.Stop()
s.group.Wait()
s.group = nil
}
log.Info("wallet stopped")
return err
}
@ -81,3 +98,53 @@ func (s *Service) APIs() []rpc.API {
func (s *Service) Protocols() []p2p.Protocol {
return nil
}
// WatchAccountsChanges subsribes to a feed and watches for changes in accounts list. If there are new or removed accounts
// reactor will be restarted.
func WatchAccountsChanges(ctx context.Context, feed *event.Feed, initial []common.Address, reactor *Reactor) error {
accounts := make(chan []accounts.Account, 1) // it may block if the rate of updates will be significantly higher
sub := feed.Subscribe(accounts)
defer sub.Unsubscribe()
listen := make(map[common.Address]struct{}, len(initial))
for _, address := range initial {
listen[address] = struct{}{}
}
for {
select {
case <-ctx.Done():
return nil
case err := <-sub.Err():
if err != nil {
log.Error("accounts watcher subscription failed", "error", err)
}
case n := <-accounts:
log.Debug("wallet received updated list of accoutns", "accounts", n)
restart := false
for _, acc := range n {
_, exist := listen[acc.Address]
if !exist {
listen[acc.Address] = struct{}{}
restart = true
}
}
if !restart {
continue
}
listenList := mapToList(listen)
log.Debug("list of accounts was changed from a previous version. reactor will be restarted", "new", listenList)
reactor.Stop()
err := reactor.Start(listenList) // error is raised only if reactor is already running
if err != nil {
log.Error("failed to restart reactor with new accounts", "error", err)
}
}
}
}
func mapToList(m map[common.Address]struct{}) []common.Address {
rst := make([]common.Address, 0, len(m))
for address := range m {
rst = append(rst, address)
}
return rst
}

View File

@ -0,0 +1,105 @@
package wallet
import (
"context"
"fmt"
"math/big"
"testing"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/event"
"github.com/status-im/status-go/multiaccounts/accounts"
"github.com/status-im/status-go/t/devtests/testchain"
"github.com/status-im/status-go/t/utils"
"github.com/stretchr/testify/suite"
)
func TestReactorChanges(t *testing.T) {
suite.Run(t, new(ReactorChangesSuite))
}
type ReactorChangesSuite struct {
suite.Suite
backend *testchain.Backend
reactor *Reactor
db *Database
dbStop func()
feed *event.Feed
first, second common.Address
}
func (s *ReactorChangesSuite) txToAddress(nonce uint64, address common.Address) *types.Transaction {
tx := types.NewTransaction(nonce, address, big.NewInt(1e17), 21000, big.NewInt(1), nil)
tx, err := types.SignTx(tx, s.backend.Signer, s.backend.Faucet)
s.Require().NoError(err)
return tx
}
func (s *ReactorChangesSuite) SetupTest() {
var err error
db, stop := setupTestDB(s.Suite.T())
s.db = db
s.dbStop = stop
s.backend, err = testchain.NewBackend()
s.Require().NoError(err)
s.feed = &event.Feed{}
s.reactor = NewReactor(s.db, &event.Feed{}, s.backend.Client, big.NewInt(1337))
account, err := crypto.GenerateKey()
s.Require().NoError(err)
s.first = crypto.PubkeyToAddress(account.PublicKey)
account, err = crypto.GenerateKey()
s.Require().NoError(err)
s.second = crypto.PubkeyToAddress(account.PublicKey)
nonce := uint64(0)
blocks := s.backend.GenerateBlocks(1, 0, func(n int, gen *core.BlockGen) {
gen.AddTx(s.txToAddress(nonce, s.first))
nonce++
gen.AddTx(s.txToAddress(nonce, s.second))
nonce++
})
_, err = s.backend.Ethereum.BlockChain().InsertChain(blocks)
s.Require().NoError(err)
}
func (s *ReactorChangesSuite) TestWatchNewAccounts() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
group := NewGroup(ctx)
group.Add(func(ctx context.Context) error {
return WatchAccountsChanges(ctx, s.feed, []common.Address{s.first}, s.reactor)
})
s.Require().NoError(s.reactor.Start([]common.Address{s.first}))
s.Require().NoError(utils.Eventually(func() error {
transfers, err := s.db.GetTransfersByAddress(s.first, big.NewInt(0), nil)
if err != nil {
return err
}
if len(transfers) != 1 {
return fmt.Errorf("expect to get 1 transfer for first address %x, got %d", s.first, len(transfers))
}
transfers, err = s.db.GetTransfersByAddress(s.second, big.NewInt(0), nil)
if err != nil {
return err
}
if len(transfers) != 0 {
return fmt.Errorf("expect not to get any transfer for second address %x", s.second)
}
return nil
}, 5*time.Second, 500*time.Millisecond))
s.feed.Send([]accounts.Account{{Address: s.first}, {Address: s.second}})
s.Require().NoError(utils.Eventually(func() error {
transfers, err := s.db.GetTransfersByAddress(s.second, big.NewInt(0), nil)
if err != nil {
return err
}
if len(transfers) == 0 {
return fmt.Errorf("expect 1 transfer for second address %x, got %d", s.second, len(transfers))
}
return nil
}, 5*time.Second, 500*time.Millisecond))
}