[wallet] RPC usage stats

This commit is contained in:
Roman Volosovskyi 2021-03-08 13:18:43 +02:00
parent e29cca667a
commit 3a408135d8
No known key found for this signature in database
GPG Key ID: 0238A4B5ECEE70DE
15 changed files with 235 additions and 42 deletions

View File

@ -1 +1 @@
0.73.2 0.73.3

View File

@ -39,6 +39,7 @@ import (
"github.com/status-im/status-go/services/permissions" "github.com/status-im/status-go/services/permissions"
"github.com/status-im/status-go/services/personal" "github.com/status-im/status-go/services/personal"
"github.com/status-im/status-go/services/rpcfilters" "github.com/status-im/status-go/services/rpcfilters"
"github.com/status-im/status-go/services/rpcstats"
"github.com/status-im/status-go/services/subscriptions" "github.com/status-im/status-go/services/subscriptions"
"github.com/status-im/status-go/services/typeddata" "github.com/status-im/status-go/services/typeddata"
"github.com/status-im/status-go/services/wallet" "github.com/status-im/status-go/services/wallet"
@ -547,6 +548,12 @@ func (b *GethStatusBackend) subscriptionService() gethnode.ServiceConstructor {
} }
} }
func (b *GethStatusBackend) rpcStatsService() gethnode.ServiceConstructor {
return func(*gethnode.ServiceContext) (gethnode.Service, error) {
return rpcstats.New(), nil
}
}
func (b *GethStatusBackend) accountsService(accountsFeed *event.Feed) gethnode.ServiceConstructor { func (b *GethStatusBackend) accountsService(accountsFeed *event.Feed) gethnode.ServiceConstructor {
return func(*gethnode.ServiceContext) (gethnode.Service, error) { return func(*gethnode.ServiceContext) (gethnode.Service, error) {
return accountssvc.NewService(accounts.NewDB(b.appDB), b.multiaccountsDB, b.accountManager.Manager, accountsFeed), nil return accountssvc.NewService(accounts.NewDB(b.appDB), b.multiaccountsDB, b.accountManager.Manager, accountsFeed), nil
@ -606,6 +613,7 @@ func (b *GethStatusBackend) startNode(config *params.NodeConfig) (err error) {
services := []gethnode.ServiceConstructor{} services := []gethnode.ServiceConstructor{}
services = appendIf(config.UpstreamConfig.Enabled, services, b.rpcFiltersService()) services = appendIf(config.UpstreamConfig.Enabled, services, b.rpcFiltersService())
services = append(services, b.subscriptionService()) services = append(services, b.subscriptionService())
services = append(services, b.rpcStatsService())
services = appendIf(b.appDB != nil && b.multiaccountsDB != nil, services, b.accountsService(accountsFeed)) services = appendIf(b.appDB != nil && b.multiaccountsDB != nil, services, b.accountsService(accountsFeed))
services = appendIf(config.BrowsersConfig.Enabled, services, b.browsersService()) services = appendIf(config.BrowsersConfig.Enabled, services, b.browsersService())
services = appendIf(config.PermissionsConfig.Enabled, services, b.permissionsService()) services = appendIf(config.PermissionsConfig.Enabled, services, b.permissionsService())

View File

@ -14,6 +14,7 @@ import (
gethrpc "github.com/ethereum/go-ethereum/rpc" gethrpc "github.com/ethereum/go-ethereum/rpc"
"github.com/status-im/status-go/params" "github.com/status-im/status-go/params"
"github.com/status-im/status-go/services/rpcstats"
) )
const ( const (
@ -124,6 +125,7 @@ func (c *Client) Call(result interface{}, method string, args ...interface{}) er
// It uses custom routing scheme for calls. // It uses custom routing scheme for calls.
// If there are any local handlers registered for this call, they will handle it. // If there are any local handlers registered for this call, they will handle it.
func (c *Client) CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error { func (c *Client) CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error {
rpcstats.CountCall(method)
if c.router.routeBlocked(method) { if c.router.routeBlocked(method) {
return ErrMethodNotFound return ErrMethodNotFound
} }

34
services/rpcstats/api.go Normal file
View File

@ -0,0 +1,34 @@
package rpcstats
import (
"context"
)
// PublicAPI represents a set of APIs from the namespace.
type PublicAPI struct {
s *Service
}
// NewAPI creates an instance of the API.
func NewAPI(s *Service) *PublicAPI {
return &PublicAPI{s: s}
}
// Reset resets RPC usage stats
func (api *PublicAPI) Reset(context context.Context) {
resetStats()
}
type RPCStats struct {
Total uint `json:"total"`
CounterPerMethod map[string]uint `json:"methods"`
}
// GetStats retrun RPC usage stats
func (api *PublicAPI) GetStats(context context.Context) (RPCStats, error) {
total, perMethod := getStats()
return RPCStats{
Total: total,
CounterPerMethod: perMethod,
}, nil
}

View File

@ -0,0 +1,44 @@
package rpcstats
import (
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/rpc"
)
// Service represents our own implementation of status status operations.
type Service struct{}
// New returns a new Service.
func New() *Service {
return &Service{}
}
// APIs returns a list of new APIs.
func (s *Service) APIs() []rpc.API {
return []rpc.API{
{
Namespace: "rpcstats",
Version: "1.0",
Service: NewAPI(s),
Public: true,
},
}
}
// Protocols returns list of p2p protocols.
func (s *Service) Protocols() []p2p.Protocol {
return nil
}
// Start is run when a service is started.
// It does nothing in this case but is required by `node.Service` interface.
func (s *Service) Start(server *p2p.Server) error {
resetStats()
return nil
}
// Stop is run when a service is stopped.
// It does nothing in this case but is required by `node.Service` interface.
func (s *Service) Stop() error {
return nil
}

View File

@ -0,0 +1,48 @@
package rpcstats
import (
"sync"
)
type RPCUsageStats struct {
total uint
counterPerMethod map[string]uint
rw sync.RWMutex
}
var stats *RPCUsageStats
func getInstance() *RPCUsageStats {
if stats == nil {
stats = &RPCUsageStats{
total: 0,
counterPerMethod: map[string]uint{},
}
}
return stats
}
func getStats() (uint, map[string]uint) {
stats := getInstance()
stats.rw.RLock()
defer stats.rw.RUnlock()
return stats.total, stats.counterPerMethod
}
func resetStats() {
stats := getInstance()
stats.rw.Lock()
defer stats.rw.Unlock()
stats.total = 0
stats.counterPerMethod = map[string]uint{}
}
func CountCall(method string) {
stats := getInstance()
stats.rw.Lock()
defer stats.rw.Unlock()
stats.total++
stats.counterPerMethod[method]++
}

View File

@ -9,7 +9,6 @@ import (
"github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/status-im/status-go/services/wallet/ierc20" "github.com/status-im/status-go/services/wallet/ierc20"
@ -18,7 +17,7 @@ import (
var requestTimeout = 20 * time.Second var requestTimeout = 20 * time.Second
// GetTokensBalances takes list of accounts and tokens and returns mapping of token balances for each account. // GetTokensBalances takes list of accounts and tokens and returns mapping of token balances for each account.
func GetTokensBalances(parent context.Context, client *ethclient.Client, accounts, tokens []common.Address) (map[common.Address]map[common.Address]*hexutil.Big, error) { func GetTokensBalances(parent context.Context, client *walletClient, accounts, tokens []common.Address) (map[common.Address]map[common.Address]*hexutil.Big, error) {
var ( var (
group = NewAtomicGroup(parent) group = NewAtomicGroup(parent)
mu sync.Mutex mu sync.Mutex

View File

@ -29,7 +29,7 @@ type BalancesSuite struct {
tokens []common.Address tokens []common.Address
accounts []common.Address accounts []common.Address
client *ethclient.Client client *walletClient
faucet *ecdsa.PrivateKey faucet *ecdsa.PrivateKey
} }
@ -45,7 +45,7 @@ func (s *BalancesSuite) SetupTest() {
client, err := node.Attach() client, err := node.Attach()
s.Require().NoError(err) s.Require().NoError(err)
s.client = ethclient.NewClient(client) s.client = &walletClient{ethclient.NewClient(client)}
s.tokens = make([]common.Address, 3) s.tokens = make([]common.Address, 3)
s.accounts = make([]common.Address, 5) s.accounts = make([]common.Address, 5)
@ -55,7 +55,7 @@ func (s *BalancesSuite) SetupTest() {
s.accounts[i] = crypto.PubkeyToAddress(key.PublicKey) s.accounts[i] = crypto.PubkeyToAddress(key.PublicKey)
} }
for i := range s.tokens { for i := range s.tokens {
token, tx, _, err := erc20.DeployERC20Transfer(bind.NewKeyedTransactor(s.faucet), s.client) token, tx, _, err := erc20.DeployERC20Transfer(bind.NewKeyedTransactor(s.faucet), s.client.client)
s.Require().NoError(err) s.Require().NoError(err)
_, err = bind.WaitMined(context.Background(), s.client, tx) _, err = bind.WaitMined(context.Background(), s.client, tx)
s.Require().NoError(err) s.Require().NoError(err)
@ -70,7 +70,7 @@ func (s *BalancesSuite) TestBalanceEqualPerToken() {
expected[account] = map[common.Address]*hexutil.Big{} expected[account] = map[common.Address]*hexutil.Big{}
for i, token := range s.tokens { for i, token := range s.tokens {
balance := new(big.Int).Add(base, big.NewInt(int64(i))) balance := new(big.Int).Add(base, big.NewInt(int64(i)))
transactor, err := erc20.NewERC20Transfer(token, s.client) transactor, err := erc20.NewERC20Transfer(token, s.client.client)
s.Require().NoError(err) s.Require().NoError(err)
tx, err := transactor.Transfer(bind.NewKeyedTransactor(s.faucet), account, balance) tx, err := transactor.Transfer(bind.NewKeyedTransactor(s.faucet), account, balance)
s.Require().NoError(err) s.Require().NoError(err)
@ -91,7 +91,7 @@ func (s *BalancesSuite) TestBalanceEqualPerAccount() {
expected[account] = map[common.Address]*hexutil.Big{} expected[account] = map[common.Address]*hexutil.Big{}
for _, token := range s.tokens { for _, token := range s.tokens {
balance := new(big.Int).Add(base, big.NewInt(int64(i))) balance := new(big.Int).Add(base, big.NewInt(int64(i)))
transactor, err := erc20.NewERC20Transfer(token, s.client) transactor, err := erc20.NewERC20Transfer(token, s.client.client)
s.Require().NoError(err) s.Require().NoError(err)
tx, err := transactor.Transfer(bind.NewKeyedTransactor(s.faucet), account, balance) tx, err := transactor.Transfer(bind.NewKeyedTransactor(s.faucet), account, balance)
s.Require().NoError(err) s.Require().NoError(err)

View File

@ -9,7 +9,6 @@ import (
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
) )
@ -21,7 +20,7 @@ type ethHistoricalCommand struct {
db *Database db *Database
eth TransferDownloader eth TransferDownloader
address common.Address address common.Address
client reactorClient client *walletClient
balanceCache *balanceCache balanceCache *balanceCache
feed *event.Feed feed *event.Feed
foundHeaders []*DBHeader foundHeaders []*DBHeader
@ -69,7 +68,7 @@ type erc20HistoricalCommand struct {
db *Database db *Database
erc20 BatchDownloader erc20 BatchDownloader
address common.Address address common.Address
client reactorClient client *walletClient
feed *event.Feed feed *event.Feed
iterator *IterativeDownloader iterator *IterativeDownloader
@ -121,7 +120,7 @@ type newBlocksTransfersCommand struct {
chain *big.Int chain *big.Int
erc20 *ERC20TransfersDownloader erc20 *ERC20TransfersDownloader
eth *ETHTransferDownloader eth *ETHTransferDownloader
client reactorClient client *walletClient
feed *event.Feed feed *event.Feed
lastFetchedBlockTime time.Time lastFetchedBlockTime time.Time
@ -171,7 +170,7 @@ func (c *newBlocksTransfersCommand) getAllTransfers(parent context.Context, from
accounts: c.accounts, accounts: c.accounts,
db: c.db, db: c.db,
chain: c.chain, chain: c.chain,
client: c.eth.client, client: c.client,
balanceCache: balanceCache, balanceCache: balanceCache,
feed: c.feed, feed: c.feed,
fromByAddress: fromByAddress, fromByAddress: fromByAddress,
@ -450,7 +449,7 @@ type controlCommand struct {
eth *ETHTransferDownloader eth *ETHTransferDownloader
erc20 *ERC20TransfersDownloader erc20 *ERC20TransfersDownloader
chain *big.Int chain *big.Int
client *ethclient.Client client *walletClient
feed *event.Feed feed *event.Feed
safetyDepth *big.Int safetyDepth *big.Int
watchNewBlocks bool watchNewBlocks bool
@ -554,7 +553,7 @@ func getTransfersByBlocks(ctx context.Context, db *Database, downloader *ETHTran
return allTransfers, nil return allTransfers, nil
} }
func loadTransfers(ctx context.Context, accounts []common.Address, db *Database, client *ethclient.Client, chain *big.Int, limit int, blocksByAddress map[common.Address][]*big.Int) (map[common.Address][]Transfer, error) { func loadTransfers(ctx context.Context, accounts []common.Address, db *Database, client *walletClient, chain *big.Int, limit int, blocksByAddress map[common.Address][]*big.Int) (map[common.Address][]Transfer, error) {
start := time.Now() start := time.Now()
group := NewGroup(ctx) group := NewGroup(ctx)
@ -643,7 +642,7 @@ func (c *controlCommand) verifyLastSynced(parent context.Context, last *DBHeader
return cmd.Command()(parent) return cmd.Command()(parent)
} }
*/ */
func findFirstRange(c context.Context, account common.Address, initialTo *big.Int, client *ethclient.Client) (*big.Int, error) { func findFirstRange(c context.Context, account common.Address, initialTo *big.Int, client *walletClient) (*big.Int, error) {
from := big.NewInt(0) from := big.NewInt(0)
to := initialTo to := initialTo
goal := uint64(20) goal := uint64(20)
@ -699,7 +698,7 @@ func findFirstRange(c context.Context, account common.Address, initialTo *big.In
return from, nil return from, nil
} }
func findFirstRanges(c context.Context, accounts []common.Address, initialTo *big.Int, client *ethclient.Client) (map[common.Address]*big.Int, error) { func findFirstRanges(c context.Context, accounts []common.Address, initialTo *big.Int, client *walletClient) (map[common.Address]*big.Int, error) {
res := map[common.Address]*big.Int{} res := map[common.Address]*big.Int{}
for _, address := range accounts { for _, address := range accounts {
@ -955,7 +954,7 @@ type transfersCommand struct {
eth *ETHTransferDownloader eth *ETHTransferDownloader
block *big.Int block *big.Int
address common.Address address common.Address
client reactorClient client *walletClient
fetchedTransfers []Transfer fetchedTransfers []Transfer
} }
@ -988,7 +987,7 @@ type loadTransfersCommand struct {
accounts []common.Address accounts []common.Address
db *Database db *Database
chain *big.Int chain *big.Int
client *ethclient.Client client *walletClient
blocksByAddress map[common.Address][]*big.Int blocksByAddress map[common.Address][]*big.Int
foundTransfersByAddress map[common.Address][]Transfer foundTransfersByAddress map[common.Address][]Transfer
} }
@ -1024,7 +1023,7 @@ type findAndCheckBlockRangeCommand struct {
accounts []common.Address accounts []common.Address
db *Database db *Database
chain *big.Int chain *big.Int
client *ethclient.Client client *walletClient
balanceCache *balanceCache balanceCache *balanceCache
feed *event.Feed feed *event.Feed
fromByAddress map[common.Address]*big.Int fromByAddress map[common.Address]*big.Int

View File

@ -42,18 +42,19 @@ func (s *NewBlocksSuite) SetupTest() {
s.Require().NoError(err) s.Require().NoError(err)
s.address = crypto.PubkeyToAddress(account.PublicKey) s.address = crypto.PubkeyToAddress(account.PublicKey)
s.feed = &event.Feed{} s.feed = &event.Feed{}
client := &walletClient{client: s.backend.Client}
s.cmd = &newBlocksTransfersCommand{ s.cmd = &newBlocksTransfersCommand{
db: s.db, db: s.db,
accounts: []common.Address{s.address}, accounts: []common.Address{s.address},
erc20: NewERC20TransfersDownloader(s.backend.Client, []common.Address{s.address}, s.backend.Signer), erc20: NewERC20TransfersDownloader(client, []common.Address{s.address}, s.backend.Signer),
eth: &ETHTransferDownloader{ eth: &ETHTransferDownloader{
client: s.backend.Client, client: client,
signer: s.backend.Signer, signer: s.backend.Signer,
db: s.db, db: s.db,
accounts: []common.Address{s.address}, accounts: []common.Address{s.address},
}, },
feed: s.feed, feed: s.feed,
client: s.backend.Client, client: client,
chain: big.NewInt(1777), chain: big.NewInt(1777),
} }
} }
@ -183,17 +184,18 @@ func (s *NewBlocksSuite) downloadHistorical() {
s.Require().Equal(40, n) s.Require().Equal(40, n)
s.Require().NoError(err) s.Require().NoError(err)
client := &walletClient{client: s.backend.Client}
eth := &ethHistoricalCommand{ eth := &ethHistoricalCommand{
db: s.db, db: s.db,
balanceCache: newBalanceCache(), balanceCache: newBalanceCache(),
eth: &ETHTransferDownloader{ eth: &ETHTransferDownloader{
client: s.backend.Client, client: client,
signer: s.backend.Signer, signer: s.backend.Signer,
accounts: []common.Address{s.address}, accounts: []common.Address{s.address},
}, },
feed: s.feed, feed: s.feed,
address: s.address, address: s.address,
client: s.backend.Client, client: client,
from: big.NewInt(0), from: big.NewInt(0),
to: s.backend.Ethereum.BlockChain().CurrentBlock().Number(), to: s.backend.Ethereum.BlockChain().CurrentBlock().Number(),
} }

View File

@ -85,7 +85,7 @@ type TransferDownloader interface {
GetTransfersByNumber(context.Context, *big.Int) ([]Transfer, error) GetTransfersByNumber(context.Context, *big.Int) ([]Transfer, error)
} }
func checkRanges(parent context.Context, client reactorClient, cache BalanceCache, downloader TransferDownloader, account common.Address, ranges [][]*big.Int) ([][]*big.Int, []*DBHeader, error) { func checkRanges(parent context.Context, client BalanceReader, cache BalanceCache, downloader TransferDownloader, account common.Address, ranges [][]*big.Int) ([][]*big.Int, []*DBHeader, error) {
ctx, cancel := context.WithTimeout(parent, 30*time.Second) ctx, cancel := context.WithTimeout(parent, 30*time.Second)
defer cancel() defer cancel()
@ -166,7 +166,7 @@ func checkRanges(parent context.Context, client reactorClient, cache BalanceCach
return c.GetRanges(), c.GetHeaders(), nil return c.GetRanges(), c.GetHeaders(), nil
} }
func findBlocksWithEthTransfers(parent context.Context, client reactorClient, cache BalanceCache, downloader TransferDownloader, account common.Address, low, high *big.Int, noLimit bool) (from *big.Int, headers []*DBHeader, err error) { func findBlocksWithEthTransfers(parent context.Context, client BalanceReader, cache BalanceCache, downloader TransferDownloader, account common.Address, low, high *big.Int, noLimit bool) (from *big.Int, headers []*DBHeader, err error) {
ranges := [][]*big.Int{{low, high}} ranges := [][]*big.Int{{low, high}}
minBlock := big.NewInt(low.Int64()) minBlock := big.NewInt(low.Int64())
headers = []*DBHeader{} headers = []*DBHeader{}

View File

@ -11,7 +11,6 @@ import (
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
) )
@ -51,7 +50,7 @@ type Transfer struct {
// ETHTransferDownloader downloads regular eth transfers. // ETHTransferDownloader downloads regular eth transfers.
type ETHTransferDownloader struct { type ETHTransferDownloader struct {
client *ethclient.Client client *walletClient
accounts []common.Address accounts []common.Address
signer types.Signer signer types.Signer
db *Database db *Database
@ -143,7 +142,7 @@ func (d *ETHTransferDownloader) getTransfersInBlock(ctx context.Context, blk *ty
} }
// NewERC20TransfersDownloader returns new instance. // NewERC20TransfersDownloader returns new instance.
func NewERC20TransfersDownloader(client *ethclient.Client, accounts []common.Address, signer types.Signer) *ERC20TransfersDownloader { func NewERC20TransfersDownloader(client *walletClient, accounts []common.Address, signer types.Signer) *ERC20TransfersDownloader {
signature := crypto.Keccak256Hash([]byte(erc20TransferEventSignature)) signature := crypto.Keccak256Hash([]byte(erc20TransferEventSignature))
return &ERC20TransfersDownloader{ return &ERC20TransfersDownloader{
client: client, client: client,
@ -155,7 +154,7 @@ func NewERC20TransfersDownloader(client *ethclient.Client, accounts []common.Add
// ERC20TransfersDownloader is a downloader for erc20 tokens transfers. // ERC20TransfersDownloader is a downloader for erc20 tokens transfers.
type ERC20TransfersDownloader struct { type ERC20TransfersDownloader struct {
client *ethclient.Client client *walletClient
accounts []common.Address accounts []common.Address
// hash of the Transfer event signature // hash of the Transfer event signature

View File

@ -56,7 +56,7 @@ func (s *ETHTransferSuite) SetupTest() {
s.dbStop = stop s.dbStop = stop
s.downloader = &ETHTransferDownloader{ s.downloader = &ETHTransferDownloader{
signer: s.signer, signer: s.signer,
client: s.ethclient, client: &walletClient{client: s.ethclient},
db: db, db: db,
accounts: []common.Address{ accounts: []common.Address{
crypto.PubkeyToAddress(s.identity.PublicKey), crypto.PubkeyToAddress(s.identity.PublicKey),
@ -188,7 +188,7 @@ func (s *ERC20TransferSuite) SetupTest() {
client, err := node.Attach() client, err := node.Attach()
s.Require().NoError(err) s.Require().NoError(err)
s.ethclient = ethclient.NewClient(client) s.ethclient = ethclient.NewClient(client)
s.downloader = NewERC20TransfersDownloader(s.ethclient, []common.Address{crypto.PubkeyToAddress(s.identity.PublicKey)}, s.signer) s.downloader = NewERC20TransfersDownloader(&walletClient{client: s.ethclient}, []common.Address{crypto.PubkeyToAddress(s.identity.PublicKey)}, s.signer)
var ( var (
tx *types.Transaction tx *types.Transaction

View File

@ -7,12 +7,14 @@ import (
"sync" "sync"
"time" "time"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/event"
"github.com/status-im/status-go/params" "github.com/status-im/status-go/params"
"github.com/status-im/status-go/services/rpcstats"
) )
// pow block on main chain is mined once per ~14 seconds // pow block on main chain is mined once per ~14 seconds
@ -55,11 +57,66 @@ type HeaderReader interface {
type BalanceReader interface { type BalanceReader interface {
BalanceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (*big.Int, error) BalanceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (*big.Int, error)
NonceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (uint64, error) NonceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (uint64, error)
HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error)
} }
type reactorClient interface { type walletClient struct {
HeaderReader client *ethclient.Client
BalanceReader }
func (rc *walletClient) HeaderByHash(ctx context.Context, hash common.Hash) (*types.Header, error) {
rpcstats.CountCall("eth_getBlockByHash")
return rc.client.HeaderByHash(ctx, hash)
}
func (rc *walletClient) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) {
rpcstats.CountCall("eth_getBlockByNumber")
return rc.client.HeaderByNumber(ctx, number)
}
func (rc *walletClient) BlockByHash(ctx context.Context, hash common.Hash) (*types.Block, error) {
rpcstats.CountCall("eth_getBlockByHash")
return rc.client.BlockByHash(ctx, hash)
}
func (rc *walletClient) BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error) {
rpcstats.CountCall("eth_getBlockByNumber")
return rc.client.BlockByNumber(ctx, number)
}
func (rc *walletClient) BalanceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (*big.Int, error) {
rpcstats.CountCall("eth_getBalance")
return rc.client.BalanceAt(ctx, account, blockNumber)
}
func (rc *walletClient) NonceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (uint64, error) {
rpcstats.CountCall("eth_getTransactionCount")
return rc.client.NonceAt(ctx, account, blockNumber)
}
func (rc *walletClient) TransactionReceipt(ctx context.Context, txHash common.Hash) (*types.Receipt, error) {
rpcstats.CountCall("eth_getTransactionReceipt")
return rc.client.TransactionReceipt(ctx, txHash)
}
func (rc *walletClient) TransactionByHash(ctx context.Context, hash common.Hash) (tx *types.Transaction, isPending bool, err error) {
rpcstats.CountCall("eth_getTransactionByHash")
return rc.client.TransactionByHash(ctx, hash)
}
func (rc *walletClient) FilterLogs(ctx context.Context, q ethereum.FilterQuery) ([]types.Log, error) {
rpcstats.CountCall("eth_getLogs")
return rc.client.FilterLogs(ctx, q)
}
func (rc *walletClient) CodeAt(ctx context.Context, contract common.Address, blockNumber *big.Int) ([]byte, error) {
rpcstats.CountCall("eth_getCode")
return rc.client.CodeAt(ctx, contract, blockNumber)
}
func (rc *walletClient) CallContract(ctx context.Context, call ethereum.CallMsg, blockNumber *big.Int) ([]byte, error) {
rpcstats.CountCall("eth_call")
return rc.client.CallContract(ctx, call, blockNumber)
} }
// NewReactor creates instance of the Reactor. // NewReactor creates instance of the Reactor.
@ -87,18 +144,19 @@ type Reactor struct {
func (r *Reactor) newControlCommand(accounts []common.Address) *controlCommand { func (r *Reactor) newControlCommand(accounts []common.Address) *controlCommand {
signer := types.NewEIP155Signer(r.chain) signer := types.NewEIP155Signer(r.chain)
client := &walletClient{client: r.client}
ctl := &controlCommand{ ctl := &controlCommand{
db: r.db, db: r.db,
chain: r.chain, chain: r.chain,
client: r.client, client: client,
accounts: accounts, accounts: accounts,
eth: &ETHTransferDownloader{ eth: &ETHTransferDownloader{
client: r.client, client: client,
accounts: accounts, accounts: accounts,
signer: signer, signer: signer,
db: r.db, db: r.db,
}, },
erc20: NewERC20TransfersDownloader(r.client, accounts, signer), erc20: NewERC20TransfersDownloader(client, accounts, signer),
feed: r.feed, feed: r.feed,
safetyDepth: reorgSafetyDepth(r.chain), safetyDepth: reorgSafetyDepth(r.chain),
watchNewBlocks: r.watchNewBlocks, watchNewBlocks: r.watchNewBlocks,

View File

@ -34,7 +34,7 @@ type Service struct {
db *Database db *Database
reactor *Reactor reactor *Reactor
signals *SignalsTransmitter signals *SignalsTransmitter
client *ethclient.Client client *walletClient
cryptoOnRampManager *CryptoOnRampManager cryptoOnRampManager *CryptoOnRampManager
started bool started bool
@ -55,7 +55,7 @@ func (s *Service) GetFeed() *event.Feed {
// SetClient sets ethclient // SetClient sets ethclient
func (s *Service) SetClient(client *ethclient.Client) { func (s *Service) SetClient(client *ethclient.Client) {
s.client = client s.client = &walletClient{client: client}
} }
// MergeBlocksRanges merge old blocks ranges if possible // MergeBlocksRanges merge old blocks ranges if possible
@ -78,7 +78,7 @@ func (s *Service) StartReactor(client *ethclient.Client, accounts []common.Addre
return err return err
} }
s.reactor = reactor s.reactor = reactor
s.client = client s.SetClient(client)
s.group.Add(func(ctx context.Context) error { s.group.Add(func(ctx context.Context) error {
return WatchAccountsChanges(ctx, s.accountsFeed, accounts, reactor) return WatchAccountsChanges(ctx, s.accountsFeed, accounts, reactor)
}) })