[wallet] cleanup

This commit is contained in:
Roman Volosovskyi 2021-04-01 12:04:47 +03:00
parent 1724ecffa1
commit f3ee49c110
No known key found for this signature in database
GPG Key ID: 0238A4B5ECEE70DE
13 changed files with 210 additions and 952 deletions

View File

@ -89,7 +89,6 @@ type GethStatusBackend struct {
selectedAccountKeyID string
log log.Logger
allowAllRPC bool // used only for tests, disables api method restrictions
forceStopWallet bool
}
// NewGethStatusBackend create a new GethStatusBackend instance
@ -968,50 +967,6 @@ func (b *GethStatusBackend) AppStateChange(state string) {
// and normal mode if the app is in foreground.
}
func (b *GethStatusBackend) StopWallet() error {
wallet, err := b.statusNode.WalletService()
if err != nil {
b.log.Error("Retrieving of wallet service failed on StopWallet", "error", err)
return nil
}
if wallet.IsStarted() {
err = wallet.Stop()
if err != nil {
b.log.Error("Wallet service stop failed on StopWallet", "error", err)
return nil
}
}
b.forceStopWallet = true
return nil
}
func (b *GethStatusBackend) StartWallet(watchNewBlocks bool) error {
wallet, err := b.statusNode.WalletService()
if err != nil {
b.log.Error("Retrieving of wallet service failed on StartWallet", "error", err)
return nil
}
if !wallet.IsStarted() {
err = wallet.Start(b.statusNode.Server())
if err != nil {
b.log.Error("Wallet service start failed on StartWallet", "error", err)
return nil
}
err = b.startWallet(watchNewBlocks)
if err != nil {
b.log.Error("Wallet reactor start failed on StartWallet", "error", err)
return nil
}
}
b.forceStopWallet = false
return nil
}
func (b *GethStatusBackend) StopLocalNotifications() error {
localPN, err := b.statusNode.LocalNotificationsService()
if err != nil {
@ -1044,11 +999,6 @@ func (b *GethStatusBackend) StartLocalNotifications() error {
return nil
}
if !wallet.IsStarted() {
b.log.Error("Can't start local notifications service without wallet service")
return nil
}
if !localPN.IsStarted() {
err = localPN.Start(b.statusNode.Server())
@ -1244,52 +1194,6 @@ func (b *GethStatusBackend) injectAccountsIntoServices() error {
return nil
}
func (b *GethStatusBackend) startWallet(watchNewBlocks bool) error {
if !b.statusNode.Config().WalletConfig.Enabled {
return nil
}
wallet, err := b.statusNode.WalletService()
if err != nil {
return err
}
accountsDB := accounts.NewDB(b.appDB)
watchAddresses, err := accountsDB.GetWalletAddresses()
if err != nil {
return err
}
mainAccountAddress, err := b.accountManager.MainAccountAddress()
if err != nil {
return err
}
uniqAddressesMap := map[common.Address]struct{}{}
allAddresses := []common.Address{}
mainAddress := common.Address(mainAccountAddress)
uniqAddressesMap[mainAddress] = struct{}{}
allAddresses = append(allAddresses, mainAddress)
for _, addr := range watchAddresses {
address := common.Address(addr)
if _, ok := uniqAddressesMap[address]; !ok {
uniqAddressesMap[address] = struct{}{}
allAddresses = append(allAddresses, address)
}
}
err = wallet.MergeBlocksRanges(allAddresses, b.statusNode.Config().NetworkID)
if err != nil {
return err
}
return wallet.StartReactor(
b.statusNode.RPCClient().Ethclient(),
allAddresses,
new(big.Int).SetUint64(b.statusNode.Config().NetworkID),
watchNewBlocks)
}
func appendIf(condition bool, services []gethnode.ServiceConstructor, service gethnode.ServiceConstructor) []gethnode.ServiceConstructor {
if !condition {
return services

View File

@ -511,18 +511,6 @@ func AppStateChange(state string) {
statusBackend.AppStateChange(state)
}
// StopWallet
func StopWallet() string {
err := statusBackend.StopWallet()
return makeJSONResponse(err)
}
// StartWallet
func StartWallet(watchNewBlocks bool) string {
err := statusBackend.StartWallet(watchNewBlocks)
return makeJSONResponse(err)
}
// StartLocalNotifications
func StartLocalNotifications() string {
err := statusBackend.StartLocalNotifications()

View File

@ -113,13 +113,13 @@ func TestTransactionNotification(t *testing.T) {
require.NoError(t, walletDb.ProcessTranfers(transfers, []*wallet.DBHeader{}))
feed.Send(wallet.Event{
Type: wallet.EventMaxKnownBlock,
Type: wallet.EventRecentHistoryReady,
BlockNumber: big.NewInt(0),
Accounts: []common.Address{header.Address},
})
feed.Send(wallet.Event{
Type: wallet.EventNewBlock,
Type: wallet.EventNewTransfers,
BlockNumber: header.Number,
Accounts: []common.Address{header.Address},
})

View File

@ -26,12 +26,10 @@ const (
// TransactionEvent - structure used to pass messages from wallet to bus
type TransactionEvent struct {
Type string `json:"type"`
BlockNumber *big.Int `json:"block-number"`
Accounts []common.Address `json:"accounts"`
NewTransactionsPerAccount map[common.Address]int `json:"new-transactions"`
ERC20 bool `json:"erc20"`
MaxKnownBlocks map[common.Address]*big.Int `json:"max-known-blocks"`
Type string `json:"type"`
BlockNumber *big.Int `json:"block-number"`
Accounts []common.Address `json:"accounts"`
MaxKnownBlocks map[common.Address]*big.Int `json:"max-known-blocks"`
}
type transactionBody struct {
@ -181,7 +179,7 @@ func (s *Service) StartWalletWatcher() {
}
return
case event := <-events:
if event.Type == wallet.EventNewBlock && len(maxKnownBlocks) > 0 {
if event.Type == wallet.EventNewTransfers && len(maxKnownBlocks) > 0 {
newBlocks := false
for _, address := range event.Accounts {
if _, ok := maxKnownBlocks[address]; !ok {
@ -194,15 +192,13 @@ func (s *Service) StartWalletWatcher() {
}
if newBlocks && s.WatchingEnabled {
s.transmitter.publisher.Send(TransactionEvent{
Type: string(event.Type),
BlockNumber: event.BlockNumber,
Accounts: event.Accounts,
NewTransactionsPerAccount: event.NewTransactionsPerAccount,
ERC20: event.ERC20,
MaxKnownBlocks: maxKnownBlocks,
Type: string(event.Type),
BlockNumber: event.BlockNumber,
Accounts: event.Accounts,
MaxKnownBlocks: maxKnownBlocks,
})
}
} else if event.Type == wallet.EventMaxKnownBlock {
} else if event.Type == wallet.EventRecentHistoryReady {
for _, address := range event.Accounts {
if _, ok := maxKnownBlocks[address]; !ok {
maxKnownBlocks[address] = event.BlockNumber

View File

@ -1,8 +1,6 @@
Wallet
==========
# Wallet service API
Wallet service starts a loop that watches for new transfers (eth and erc20).
To correctly start the service two values need to be changed in the config:
Wallet service provides RPC API for checking transfers history and other methods related to wallet functionality. To enable service two values need to be changed in the config:
1. Set Enable to true in WalletConfig
@ -22,34 +20,120 @@ To correctly start the service two values need to be changed in the config:
}
```
API
----------
## API
#### wallet_getTransfersByAddress
### wallet_getTransfersByAddress
Returns avaiable transfers in a given range.
##### Parameters
#### Parameters
- `address`: `HEX` - ethereum address encoded in hex
- `toBlock`: `BIGINT` - end of the range. if nil query will return last transfers.
- `limit`: `BIGINT` - limit of returned transfers.
- `fetchMore`: `BOOLEAN` - if `true`, there are less than `limit` fetched transfers in the database, and zero block is not reached yet, history will be scanned for more transfers. If `false` only transfers which are already fetched to the app's database will be returned.
##### Examples
#### Examples
```json
{"jsonrpc":"2.0","id":7,"method":"wallet_getTransfersByAddress","params":["0xb81a6845649fa8c042dfaceb3f7a684873406993","0x0","0x5"]}
{
"jsonrpc":"2.0",
"id":7,
"method":"wallet_getTransfersByAddress",
"params":[
"0xb81a6845649fa8c042dfaceb3f7a684873406993",
"0x0",
"0x5",
true
]
}
```
##### Returns
#### Returns
Objects in the same format.
```json
[
{
"id":"0xb1a8adeaa0e6727bf01d6d8431b6238bdefa915e19ae7e8ceb16886c9f5e",
"type":"eth",
"address":"0xd65f3cb52605a54a833ae118fb13",
"blockNumber":"0xb7190",
"blockhash":"0x8d98aa2297fe322d0093b24372e2ead98414959093b479baf670",
"timestamp":"0x6048ec6",
"gasPrice":"0x346308a00",
"gasLimit":"0x508",
"gasUsed":"0x520",
"nonce":"0x13",
"txStatus":"0x1",
"input":"0x",
"txHash":"0x1adeaa0e672d7e67bf01d8431b6238bdef15e19ae7e8ceb16886c",
"value":"0x1",
"from":"0x2f865fb5dfdf0dfdf54a833ae118fb1363aaasd",
"to":"0xaaaaaaf3cb52605a54a833ae118fb1363a123123",
"contract":"0x0000000000000000000000000000000000000000",
"NetworkID":1
},...
]
```
#### wallet_getTokensBalances
### wallet_setInitialBlocksRange
Sets `zero block - latest block` range as scanned for an account. It is used when a new multiaccount is generated to avoid scanning transfers history.
#### Example
```json
{"jsonrpc":"2.0","id":7,"method":"wallet_setInitialBlocksRange","params":[]}
```
### wallet_watchTransaction
Starts watching for transaction confirmation/rejection. If transaction was not confirmed/rejected in 10 minutes the call is timed out with error.
#### Parameters
- `tx-id`: `HEX` - transaction hash
#### Example
```json
{
"jsonrpc":"2.0",
"id":7,
"method":"wallet_watchTransaction",
"params":[
"0xaaaaaaaa11111112222233333333"
]
}
```
### wallet_checkRecentHistory
#### Parameters
- `addresses`: `[]HEX` - array of addresses to be checked
#### Example
```json
{
"jsonrpc":"2.0",
"id":1,
"method":"wallet_checkRecentHistory",
"params":[
[
"0x23458d65f3cB52605a54AaA833ae118fb1111aaa",
"0x24568B4166D11aaa1194097C60Cdc714F7e11111"
]
]
}
```
### wallet_getTokensBalances
Returns tokens balances mapping for every account. See section below for the response example.
##### Parameters
#### Parameters
- `accounts` `HEX` - list of ethereum addresses encoded in hex
- `tokens` `HEX` - list of ethereum addresses encoded in hex
@ -75,62 +159,65 @@ First level keys accounts, second level keys are tokens.
}
```
Signals
## Signals
-------
Two signals can be emitted:
1. `newblock` signal
Emitted when transfers from new block were added to the database. In this case block number if the number of this new block.
Client expected to request transfers starting from received block.
All events are of the same format:
```json
{
"type": "wallet",
"event": {
"type": "newblock",
"type": "event-type",
"blockNumber": 0,
"accounts": [
"0x42c8f505b4006d417dd4e0ba0e880692986adbd8",
"0x3129mdasmeo132128391fml1130410k312312mll"
]
],
"message": "something might be here"
}
}
```
2. `reorg` signal.
1. `new-transfers`
Emitted when part of blocks were removed. Starting from a given block number all transfers were removed.
Client expected to request new transfers from received block and replace transfers that were received previously.
Emitted when transfers are detected. In this case block number is a block number of the latest found transfer.
Client expected to request transfers starting from received block.
```json
{
"type": "wallet",
"event": {
"type": "reorg",
"blockNumber": 0,
"accounts": [
"0x42c8f505b4006d417dd4e0ba0e880692986adbd8"
]
}
}
```
2. `recent-history-fetching`
3. `history` signal
Emitted when history scanning is started.
Emmited when historical transfers were downloaded. Block number will refer the first block where historical transfers
were found.
3. `recent-history-ready`
```json
{
"type": "wallet",
"event": {
"type": "history",
"blockNumber": 0,
"accounts": [
"0x42c8f505b4006d417dd4e0ba0e880692986adbd8"
]
}
}
```
Emitted when history scanning is ended.
4. `fetching-history-error`
Emitted when when history can't be fetched because some error. Error's decritption can be found in `message` field.
5. `non-archival-node-detected`
Emitted when the application is connected to a non-archival node.
## Flows
### Account creation
When a new multiaccount is created corresponding address will not contain any transaction. Thus no point in checking history, it will be empty.
1. Call `wallet_setInitialRange`
2. Call `wallet_checkRecentHistory`
3. On `recent-history-ready` request transactions via `wallet_getTransfersByAddress`
4. Repeat `wallet_checkRecentHistory` in N minutes (currently 20 minutes in `status-react` for upstream RPC node. If a custom node is used interval can be arbitrary)
### Logging into application
1. Call `wallet_checkRecentHistory`
2. On `recent-history-ready` request transactions via `wallet_getTransfersByAddress`
3. Repeat `wallet_checkRecentHistory` in N minutes (currently 20 minutes in `status-react` for upstream RPC node. If a custom node is used interval can be arbitrary)
### Watching transaction
1. Call `wallet_watchTransaction`
2. On success call `wallet_checkRecentHistory`
3. On `recent-history-ready` request transactions via `wallet_getTransfersByAddress`

View File

@ -222,3 +222,19 @@ func (api *API) WatchTransaction(ctx context.Context, transactionHash common.Has
return watchTxCommand.Command()(commandContext)
}
func (api *API) CheckRecentHistory(ctx context.Context, addresses []common.Address) error {
if len(addresses) == 0 {
log.Info("no addresses provided")
return nil
}
err := api.s.MergeBlocksRanges(addresses, api.s.db.network)
if err != nil {
return err
}
return api.s.StartReactor(
api.s.client.client,
addresses,
new(big.Int).SetUint64(api.s.db.network))
}

View File

@ -2,7 +2,6 @@ package wallet
import (
"context"
"errors"
"math/big"
"strings"
"time"
@ -14,7 +13,6 @@ import (
)
var numberOfBlocksCheckedPerIteration = 40
var blocksDelayThreshhold = 40 * time.Second
type ethHistoricalCommand struct {
db *Database
@ -117,331 +115,6 @@ func (c *erc20HistoricalCommand) Run(ctx context.Context) (err error) {
return nil
}
type newBlocksTransfersCommand struct {
db *Database
accounts []common.Address
chain *big.Int
erc20 *ERC20TransfersDownloader
eth *ETHTransferDownloader
client *walletClient
feed *event.Feed
lastFetchedBlockTime time.Time
initialFrom, from, to *DBHeader
}
func (c *newBlocksTransfersCommand) Command() Command {
// if both blocks are specified we will use this command to verify that lastly synced blocks are still
// in canonical chain
if c.to != nil && c.from != nil {
return FiniteCommand{
Interval: 5 * time.Second,
Runable: c.Verify,
}.Run
}
return InfiniteCommand{
Interval: pollingPeriodByChain(c.chain),
Runable: c.Run,
}.Run
}
func (c *newBlocksTransfersCommand) Verify(parent context.Context) (err error) {
if c.to == nil || c.from == nil {
return errors.New("`from` and `to` blocks must be specified")
}
for c.from.Number.Cmp(c.to.Number) != 0 {
err = c.Run(parent)
if err != nil {
return err
}
}
return nil
}
func (c *newBlocksTransfersCommand) getAllTransfers(parent context.Context, from, to uint64) (map[common.Address][]Transfer, error) {
transfersByAddress := map[common.Address][]Transfer{}
if to-from > reorgSafetyDepth(c.chain).Uint64() {
fromByAddress := map[common.Address]*LastKnownBlock{}
toByAddress := map[common.Address]*big.Int{}
for _, account := range c.accounts {
fromByAddress[account] = &LastKnownBlock{Number: new(big.Int).SetUint64(from)}
toByAddress[account] = new(big.Int).SetUint64(to)
}
balanceCache := newBalanceCache()
blocksCommand := &findAndCheckBlockRangeCommand{
accounts: c.accounts,
db: c.db,
chain: c.chain,
client: c.client,
balanceCache: balanceCache,
feed: c.feed,
fromByAddress: fromByAddress,
toByAddress: toByAddress,
noLimit: true,
}
if err := blocksCommand.Command()(parent); err != nil {
return nil, err
}
for address, headers := range blocksCommand.foundHeaders {
blocks := make([]*big.Int, len(headers))
for i, header := range headers {
blocks[i] = header.Number
}
txCommand := &loadTransfersCommand{
accounts: []common.Address{address},
db: c.db,
chain: c.chain,
client: c.erc20.client,
blocksByAddress: map[common.Address][]*big.Int{address: blocks},
}
err := txCommand.Command()(parent)
if err != nil {
return nil, err
}
transfersByAddress[address] = txCommand.foundTransfersByAddress[address]
}
} else {
all := []Transfer{}
newHeadersByAddress := map[common.Address][]*DBHeader{}
for n := from; n <= to; n++ {
ctx, cancel := context.WithTimeout(parent, 10*time.Second)
header, err := c.client.HeaderByNumber(ctx, big.NewInt(int64(n)))
cancel()
if err != nil {
return nil, err
}
dbHeader := toDBHeader(header)
log.Info("reactor get transfers", "block", dbHeader.Hash, "number", dbHeader.Number)
transfers, err := c.getTransfers(parent, dbHeader)
if err != nil {
log.Error("failed to get transfers", "header", dbHeader.Hash, "error", err)
return nil, err
}
if len(transfers) > 0 {
for _, transfer := range transfers {
headers, ok := newHeadersByAddress[transfer.Address]
if !ok {
headers = []*DBHeader{}
}
transfers, ok := transfersByAddress[transfer.Address]
if !ok {
transfers = []Transfer{}
}
transfersByAddress[transfer.Address] = append(transfers, transfer)
newHeadersByAddress[transfer.Address] = append(headers, dbHeader)
}
}
all = append(all, transfers...)
}
err := c.saveHeaders(parent, newHeadersByAddress)
if err != nil {
return nil, err
}
err = c.db.ProcessTranfers(all, nil)
if err != nil {
log.Error("failed to persist transfers", "error", err)
return nil, err
}
}
return transfersByAddress, nil
}
func (c *newBlocksTransfersCommand) saveHeaders(parent context.Context, newHeadersByAddress map[common.Address][]*DBHeader) (err error) {
for _, address := range c.accounts {
headers, ok := newHeadersByAddress[address]
if ok {
err = c.db.SaveBlocks(address, headers)
if err != nil {
log.Error("failed to persist blocks", "error", err)
return err
}
}
}
return nil
}
func (c *newBlocksTransfersCommand) checkDelay(parent context.Context, nextHeader *types.Header) (*types.Header, error) {
if time.Since(c.lastFetchedBlockTime) > blocksDelayThreshhold {
log.Info("There was a delay before loading next block", "time since previous successful fetching", time.Since(c.lastFetchedBlockTime))
ctx, cancel := context.WithTimeout(parent, 5*time.Second)
latestHeader, err := c.client.HeaderByNumber(ctx, nil)
cancel()
if err != nil {
log.Warn("failed to get latest block", "number", nextHeader.Number, "error", err)
return nil, err
}
diff := new(big.Int).Sub(latestHeader.Number, nextHeader.Number)
if diff.Cmp(reorgSafetyDepth(c.chain)) >= 0 {
num := new(big.Int).Sub(latestHeader.Number, reorgSafetyDepth(c.chain))
ctx, cancel := context.WithTimeout(parent, 5*time.Second)
nextHeader, err = c.client.HeaderByNumber(ctx, num)
cancel()
if err != nil {
log.Warn("failed to get next block", "number", num, "error", err)
return nil, err
}
}
}
return nextHeader, nil
}
func (c *newBlocksTransfersCommand) Run(parent context.Context) (err error) {
if c.from == nil {
ctx, cancel := context.WithTimeout(parent, 3*time.Second)
from, err := c.client.HeaderByNumber(ctx, nil)
cancel()
if err != nil {
log.Error("failed to get last known header", "error", err)
return err
}
c.from = toDBHeader(from)
}
num := new(big.Int).Add(c.from.Number, one)
ctx, cancel := context.WithTimeout(parent, 5*time.Second)
nextHeader, err := c.client.HeaderByNumber(ctx, num)
cancel()
if err != nil {
log.Warn("failed to get next block", "number", num, "error", err)
return err
}
log.Info("reactor received new block", "header", num)
nextHeader, err = c.checkDelay(parent, nextHeader)
if err != nil {
return err
}
ctx, cancel = context.WithTimeout(parent, 10*time.Second)
latestHeader, removed, latestValidSavedBlock, reorgSpotted, err := c.onNewBlock(ctx, c.from, nextHeader)
cancel()
if err != nil || latestHeader == nil {
log.Error("failed to process new header", "header", nextHeader, "error", err)
return err
}
err = c.db.ProcessTranfers(nil, removed)
if err != nil {
return err
}
latestHeader.Loaded = true
fromN := nextHeader.Number.Uint64()
if reorgSpotted {
if latestValidSavedBlock != nil {
fromN = latestValidSavedBlock.Number.Uint64()
}
if c.initialFrom != nil {
fromN = c.initialFrom.Number.Uint64()
}
}
toN := latestHeader.Number.Uint64()
all, err := c.getAllTransfers(parent, fromN, toN)
if err != nil {
return err
}
c.from = toDBHeader(nextHeader)
c.lastFetchedBlockTime = time.Now()
if len(removed) != 0 {
lth := len(removed)
c.feed.Send(Event{
Type: EventReorg,
BlockNumber: removed[lth-1].Number,
Accounts: uniqueAccountsFromHeaders(removed),
})
}
log.Info("before sending new block event", "removed", len(removed), "len", len(uniqueAccountsFromTransfers(all)))
c.feed.Send(Event{
Type: EventNewBlock,
BlockNumber: latestHeader.Number,
Accounts: uniqueAccountsFromTransfers(all),
NewTransactionsPerAccount: transfersPerAccount(all),
})
return nil
}
func (c *newBlocksTransfersCommand) onNewBlock(ctx context.Context, from *DBHeader, latest *types.Header) (lastestHeader *DBHeader, removed []*DBHeader, lastSavedValidHeader *DBHeader, reorgSpotted bool, err error) {
if from.Hash == latest.ParentHash {
// parent matching from node in the cache. on the same chain.
return toHead(latest), nil, nil, false, nil
}
lastSavedBlock, err := c.db.GetLastSavedBlock()
if err != nil {
return nil, nil, nil, false, err
}
if lastSavedBlock == nil {
return toHead(latest), nil, nil, true, nil
}
header, err := c.client.HeaderByNumber(ctx, lastSavedBlock.Number)
if err != nil {
return nil, nil, nil, false, err
}
if header.Hash() == lastSavedBlock.Hash {
return toHead(latest), nil, lastSavedBlock, true, nil
}
log.Debug("wallet reactor spotted reorg", "last header in db", from.Hash, "new parent", latest.ParentHash)
for lastSavedBlock != nil {
removed = append(removed, lastSavedBlock)
lastSavedBlock, err = c.db.GetLastSavedBlockBefore(lastSavedBlock.Number)
if err != nil {
return nil, nil, nil, false, err
}
if lastSavedBlock == nil {
continue
}
header, err := c.client.HeaderByNumber(ctx, lastSavedBlock.Number)
if err != nil {
return nil, nil, nil, false, err
}
// the last saved block is still valid
if header.Hash() == lastSavedBlock.Hash {
return toHead(latest), nil, lastSavedBlock, true, nil
}
}
return toHead(latest), removed, lastSavedBlock, true, nil
}
func (c *newBlocksTransfersCommand) getTransfers(parent context.Context, header *DBHeader) ([]Transfer, error) {
ctx, cancel := context.WithTimeout(parent, 5*time.Second)
ethT, err := c.eth.GetTransfers(ctx, header)
cancel()
if err != nil {
return nil, err
}
ctx, cancel = context.WithTimeout(parent, 5*time.Second)
erc20T, err := c.erc20.GetTransfers(ctx, header)
cancel()
if err != nil {
return nil, err
}
return append(ethT, erc20T...), nil
}
// controlCommand implements following procedure (following parts are executed sequeantially):
// - verifies that the last header that was synced is still in the canonical chain
// - runs fast indexing for each account separately
@ -454,8 +127,6 @@ type controlCommand struct {
chain *big.Int
client *walletClient
feed *event.Feed
safetyDepth *big.Int
watchNewBlocks bool
errorsCount int
nonArchivalRPCNode bool
}
@ -614,38 +285,6 @@ func (c *controlCommand) LoadTransfers(ctx context.Context, downloader *ETHTrans
return loadTransfers(ctx, c.accounts, c.db, c.client, c.chain, limit, make(map[common.Address][]*big.Int))
}
/*
// verifyLastSynced verifies that last header that was added to the database is still in the canonical chain.
// it is done by downloading configured number of parents for the last header in the db.
func (c *controlCommand) verifyLastSynced(parent context.Context, last *DBHeader, head *types.Header) error {
log.Debug("verifying that previous header is still in canonical chan", "from", last.Number, "chain head", head.Number)
if new(big.Int).Sub(head.Number, last.Number).Cmp(c.safetyDepth) <= 0 {
log.Debug("no need to verify. last block is close enough to chain head")
return nil
}
ctx, cancel := context.WithTimeout(parent, 3*time.Second)
header, err := c.client.HeaderByNumber(ctx, new(big.Int).Add(last.Number, c.safetyDepth))
cancel()
if err != nil {
return err
}
log.Info("spawn reorg verifier", "from", last.Number, "to", header.Number)
// TODO(dshulyak) make a standalone command that
// doesn't manage transfers and has an upper limit
cmd := &newBlocksTransfersCommand{
db: c.db,
chain: c.chain,
client: c.client,
eth: c.eth,
erc20: c.erc20,
feed: c.feed,
from: last,
to: toDBHeader(header),
}
return cmd.Command()(parent)
}
*/
func findFirstRange(c context.Context, account common.Address, initialTo *big.Int, client *walletClient) (*big.Int, error) {
from := big.NewInt(0)
to := initialTo
@ -756,11 +395,7 @@ func (c *controlCommand) Run(parent context.Context) error {
}
}
target := new(big.Int).Sub(head.Number, c.safetyDepth)
if target.Cmp(zero) <= 0 {
target = zero
}
target := head.Number
fromByAddress := map[common.Address]*LastKnownBlock{}
toByAddress := map[common.Address]*big.Int{}
@ -819,15 +454,24 @@ func (c *controlCommand) Run(parent context.Context) error {
return err
}
events := map[common.Address]Event{}
for _, address := range c.accounts {
for _, header := range cmnd.foundHeaders[address] {
c.feed.Send(Event{
Type: EventNewBlock,
BlockNumber: header.Number,
Accounts: []common.Address{address},
NewTransactionsPerAccount: map[common.Address]int{address: 20},
})
event := Event{
Type: EventNewTransfers,
Accounts: []common.Address{address},
}
for _, header := range cmnd.foundHeaders[address] {
if event.BlockNumber == nil || header.Number.Cmp(event.BlockNumber) == 1 {
event.BlockNumber = header.Number
}
}
if event.BlockNumber != nil {
events[address] = event
}
}
for _, event := range events {
c.feed.Send(event)
}
c.feed.Send(Event{
@ -836,47 +480,6 @@ func (c *controlCommand) Run(parent context.Context) error {
BlockNumber: target,
})
if c.watchNewBlocks {
ctx, cancel = context.WithTimeout(parent, 3*time.Second)
head, err = c.client.HeaderByNumber(ctx, target)
cancel()
if err != nil {
if c.NewError(err) {
return nil
}
return err
}
log.Info("watching new blocks", "start from", target)
cmd := &newBlocksTransfersCommand{
db: c.db,
chain: c.chain,
client: c.client,
accounts: c.accounts,
eth: c.eth,
erc20: c.erc20,
feed: c.feed,
initialFrom: toDBHeader(head),
from: toDBHeader(head),
lastFetchedBlockTime: time.Now(),
}
err = cmd.Command()(parent)
if err != nil {
if c.NewError(err) {
return nil
}
log.Warn("error on running newBlocksTransfersCommand", "err", err)
return err
}
} else {
c.feed.Send(Event{
Type: EventNewBlock,
BlockNumber: target,
Accounts: []common.Address{},
})
}
log.Info("end control command")
return err
}
@ -913,47 +516,6 @@ func (c *controlCommand) Command() Command {
}.Run
}
func uniqueAccountsFromTransfers(allTransfers map[common.Address][]Transfer) []common.Address {
accounts := []common.Address{}
unique := map[common.Address]struct{}{}
for address, transfers := range allTransfers {
if len(transfers) == 0 {
continue
}
_, exist := unique[address]
if exist {
continue
}
unique[address] = struct{}{}
accounts = append(accounts, address)
}
return accounts
}
func transfersPerAccount(allTransfers map[common.Address][]Transfer) map[common.Address]int {
res := map[common.Address]int{}
for address, transfers := range allTransfers {
res[address] = len(transfers)
}
return res
}
func uniqueAccountsFromHeaders(headers []*DBHeader) []common.Address {
accounts := []common.Address{}
unique := map[common.Address]struct{}{}
for i := range headers {
_, exist := unique[headers[i].Address]
if exist {
continue
}
unique[headers[i].Address] = struct{}{}
accounts = append(accounts, headers[i].Address)
}
return accounts
}
type transfersCommand struct {
db *Database
eth *ETHTransferDownloader
@ -1110,12 +672,6 @@ func (c *findAndCheckBlockRangeCommand) Run(parent context.Context) (err error)
}
}
c.feed.Send(Event{
Type: EventMaxKnownBlock,
BlockNumber: maxBlockNumber,
Accounts: c.accounts,
})
c.foundHeaders = foundHeaders
return

View File

@ -1,252 +0,0 @@
package wallet
import (
"context"
"math/big"
"testing"
"time"
"github.com/stretchr/testify/suite"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/event"
"github.com/status-im/status-go/t/devtests/testchain"
)
func TestNewBlocksSuite(t *testing.T) {
suite.Run(t, new(NewBlocksSuite))
}
type NewBlocksSuite struct {
suite.Suite
backend *testchain.Backend
cmd *newBlocksTransfersCommand
address common.Address
db *Database
dbStop func()
feed *event.Feed
}
func (s *NewBlocksSuite) SetupTest() {
var err error
db, stop := setupTestDB(s.Suite.T())
s.db = db
s.dbStop = stop
s.backend, err = testchain.NewBackend()
s.Require().NoError(err)
account, err := crypto.GenerateKey()
s.Require().NoError(err)
s.address = crypto.PubkeyToAddress(account.PublicKey)
s.feed = &event.Feed{}
client := &walletClient{client: s.backend.Client}
s.cmd = &newBlocksTransfersCommand{
db: s.db,
accounts: []common.Address{s.address},
erc20: NewERC20TransfersDownloader(client, []common.Address{s.address}, s.backend.Signer),
eth: &ETHTransferDownloader{
chain: s.backend.Chain,
client: client,
signer: s.backend.Signer,
db: s.db,
accounts: []common.Address{s.address},
},
feed: s.feed,
client: client,
chain: big.NewInt(1777),
}
}
func (s *NewBlocksSuite) TearDownTest() {
s.dbStop()
s.Require().NoError(s.backend.Stop())
}
func (s *NewBlocksSuite) TestOneBlock() {
ctx := context.Background()
s.Require().EqualError(s.cmd.Run(ctx), "not found")
tx := types.NewTransaction(0, s.address, big.NewInt(1e17), 21000, big.NewInt(1), nil)
tx, err := types.SignTx(tx, s.backend.Signer, s.backend.Faucet)
s.Require().NoError(err)
blocks := s.backend.GenerateBlocks(1, 0, func(n int, gen *core.BlockGen) {
gen.AddTx(tx)
})
n, err := s.backend.Ethereum.BlockChain().InsertChain(blocks)
s.Require().Equal(1, n)
s.Require().NoError(err)
events := make(chan Event, 1)
sub := s.feed.Subscribe(events)
defer sub.Unsubscribe()
s.Require().NoError(s.cmd.Run(ctx))
select {
case ev := <-events:
s.Require().Equal(ev.Type, EventNewBlock)
s.Require().Equal(ev.BlockNumber, big.NewInt(1))
default:
s.Require().FailNow("event wasn't emitted")
}
transfers, err := s.db.GetTransfers(big.NewInt(0), nil)
s.Require().NoError(err)
s.Require().Len(transfers, 1)
s.Require().Equal(tx.Hash(), transfers[0].ID)
}
func (s *NewBlocksSuite) genTx(nonce int) *types.Transaction {
tx := types.NewTransaction(uint64(nonce), s.address, big.NewInt(1e10), 21000, big.NewInt(1), nil)
tx, err := types.SignTx(tx, s.backend.Signer, s.backend.Faucet)
s.Require().NoError(err)
return tx
}
func (s *NewBlocksSuite) runCmdUntilError(ctx context.Context) (err error) {
for err == nil {
err = s.cmd.Run(ctx)
}
return err
}
/*
func (s *NewBlocksSuite) TestReorg() {
blocks := s.backend.GenerateBlocks(20, 0, nil)
n, err := s.backend.Ethereum.BlockChain().InsertChain(blocks)
s.Require().Equal(20, n)
s.Require().NoError(err)
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
s.Require().EqualError(s.runCmdUntilError(ctx), "not found")
blocks = s.backend.GenerateBlocks(3, 20, func(n int, gen *core.BlockGen) {
gen.AddTx(s.genTx(n))
})
n, err = s.backend.Ethereum.BlockChain().InsertChain(blocks)
s.Require().Equal(3, n)
s.Require().NoError(err)
// `not found` returned when we query head+1 block
ctx, cancel = context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
s.cmd.from = toDBHeader(s.backend.Ethereum.BlockChain().GetHeaderByNumber(15))
s.cmd.initialFrom = toDBHeader(s.backend.Ethereum.BlockChain().GetHeaderByNumber(15))
s.Require().EqualError(s.runCmdUntilError(ctx), "not found")
transfers, err := s.db.GetTransfers(big.NewInt(0), nil)
s.Require().NoError(err)
s.Require().Len(transfers, 3)
blocks = s.backend.GenerateBlocks(10, 15, func(n int, gen *core.BlockGen) {
gen.AddTx(s.genTx(n))
})
n, err = s.backend.Ethereum.BlockChain().InsertChain(blocks)
s.Require().Equal(10, n)
s.Require().NoError(err)
// it will be less but even if something went wrong we can't get more
events := make(chan Event, 10)
sub := s.feed.Subscribe(events)
defer sub.Unsubscribe()
ctx, cancel = context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
s.Require().EqualError(s.runCmdUntilError(ctx), "not found")
close(events)
expected := []Event{
{Type: EventReorg, BlockNumber: big.NewInt(21)},
{Type: EventNewBlock, BlockNumber: big.NewInt(24)},
{Type: EventNewBlock, BlockNumber: big.NewInt(25)},
}
i := 0
for ev := range events {
s.Require().Equal(expected[i].Type, ev.Type)
s.Require().Equal(expected[i].BlockNumber, ev.BlockNumber)
i++
}
transfers, err = s.db.GetTransfers(nil, nil)
s.Require().NoError(err)
s.Require().Len(transfers, 10)
}
*/
func (s *NewBlocksSuite) downloadHistorical() {
blocks := s.backend.GenerateBlocks(40, 0, func(n int, gen *core.BlockGen) {
if n == 36 {
gen.AddTx(s.genTx(0))
} else if n == 39 {
gen.AddTx(s.genTx(1))
}
})
n, err := s.backend.Ethereum.BlockChain().InsertChain(blocks)
s.Require().Equal(40, n)
s.Require().NoError(err)
client := &walletClient{client: s.backend.Client}
nonce := int64(0)
lastBlock := &LastKnownBlock{
Number: big.NewInt(1),
Balance: big.NewInt(0),
Nonce: &nonce,
}
eth := &ethHistoricalCommand{
db: s.db,
balanceCache: newBalanceCache(),
eth: &ETHTransferDownloader{
chain: s.backend.Chain,
client: client,
signer: s.backend.Signer,
accounts: []common.Address{s.address},
},
feed: s.feed,
address: s.address,
client: client,
from: lastBlock,
to: s.backend.Ethereum.BlockChain().CurrentBlock().Number(),
}
s.Require().NoError(eth.Run(context.Background()), "eth historical command failed to sync transfers")
//dbHeaders, err := s.db.GetBlocks()
s.Require().NoError(err)
s.Require().Len(eth.foundHeaders, 2)
}
func (s *NewBlocksSuite) reorgHistorical() {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
s.Require().EqualError(s.runCmdUntilError(ctx), "not found")
blocks := s.backend.GenerateBlocks(10, 35, nil)
n, err := s.backend.Ethereum.BlockChain().InsertChain(blocks)
s.Require().Equal(10, n)
s.Require().NoError(err)
ctx, cancel = context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
s.Require().EqualError(s.runCmdUntilError(ctx), "not found")
}
func (s *NewBlocksSuite) TestSafetyBufferFailure() {
s.downloadHistorical()
s.reorgHistorical()
}
func (s *NewBlocksSuite) TestSafetyBufferSuccess() {
s.downloadHistorical()
safety := new(big.Int).Sub(s.backend.Ethereum.BlockChain().CurrentHeader().Number, big.NewInt(10))
s.cmd.from = toDBHeader(s.backend.Ethereum.BlockChain().GetHeaderByNumber(safety.Uint64()))
s.reorgHistorical()
transfers, err := s.db.GetTransfers(big.NewInt(0), nil)
s.Require().NoError(err)
s.Require().Len(transfers, 0)
}

View File

@ -36,12 +36,6 @@ func toDBHeader(header *types.Header) *DBHeader {
}
}
func toHead(header *types.Header) *DBHeader {
dbheader := toDBHeader(header)
dbheader.Head = true
return dbheader
}
// SyncOption is used to specify that application processed transfers for that block.
type SyncOption uint

View File

@ -10,18 +10,13 @@ import (
type EventType string
const (
// EventNewBlock emitted when new block was added to the same canonical chan.
EventNewBlock EventType = "newblock"
EventMaxKnownBlock EventType = "maxKnownBlock"
// EventReorg emitted when canonical chain was changed. In this case, BlockNumber will be an earliest added block.
EventReorg EventType = "reorg"
// EventNewHistory emitted if transfer from older block was added.
EventNewHistory EventType = "history"
// EventNewTransfers emitted when new block was added to the same canonical chan.
EventNewTransfers EventType = "new-transfers"
// EventFetchingRecentHistory emitted when fetching of lastest tx history is started
EventFetchingRecentHistory EventType = "recent-history-fetching"
// EventRecentHistoryFetched emitted when fetching of lastest tx history is started
// EventRecentHistoryReady emitted when fetching of lastest tx history is started
EventRecentHistoryReady EventType = "recent-history-ready"
// EventRecentHistoryError emitted when fetching of tx history failed
// EventFetchingHistoryError emitted when fetching of tx history failed
EventFetchingHistoryError EventType = "fetching-history-error"
// EventNonArchivalNodeDetected emitted when a connection to a non archival node is detected
EventNonArchivalNodeDetected EventType = "non-archival-node-detected"
@ -29,10 +24,8 @@ const (
// Event is a type for wallet events.
type Event struct {
Type EventType `json:"type"`
BlockNumber *big.Int `json:"blockNumber"`
Accounts []common.Address `json:"accounts"`
NewTransactionsPerAccount map[common.Address]int `json:"newTransactions"`
ERC20 bool `json:"erc20"`
Message string `json:"message"`
Type EventType `json:"type"`
BlockNumber *big.Int `json:"blockNumber"`
Accounts []common.Address `json:"accounts"`
Message string `json:"message"`
}

View File

@ -5,7 +5,6 @@ import (
"errors"
"math/big"
"sync"
"time"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
@ -13,28 +12,9 @@ import (
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/event"
"github.com/status-im/status-go/params"
"github.com/status-im/status-go/services/rpcstats"
)
// pow block on main chain is mined once per ~14 seconds
// but for tests we are using clique chain with immediate block signer
// hence we can use different polling periods for methods that depend on mining time.
func pollingPeriodByChain(chain *big.Int) time.Duration {
switch chain.Int64() {
case int64(params.MainNetworkID):
return 10 * time.Second
case int64(params.RopstenNetworkID):
return 4 * time.Second
default:
return 500 * time.Millisecond
}
}
func reorgSafetyDepth(chain *big.Int) *big.Int {
return big.NewInt(0)
}
var (
erc20BatchSize = big.NewInt(100000)
errAlreadyRunning = errors.New("already running")
@ -113,23 +93,21 @@ func (rc *walletClient) CallContract(ctx context.Context, call ethereum.CallMsg,
}
// NewReactor creates instance of the Reactor.
func NewReactor(db *Database, feed *event.Feed, client *ethclient.Client, chain *big.Int, watchNewBlocks bool) *Reactor {
func NewReactor(db *Database, feed *event.Feed, client *ethclient.Client, chain *big.Int) *Reactor {
return &Reactor{
db: db,
client: client,
feed: feed,
chain: chain,
watchNewBlocks: watchNewBlocks,
db: db,
client: client,
feed: feed,
chain: chain,
}
}
// Reactor listens to new blocks and stores transfers into the database.
type Reactor struct {
client *ethclient.Client
db *Database
feed *event.Feed
chain *big.Int
watchNewBlocks bool
client *ethclient.Client
db *Database
feed *event.Feed
chain *big.Int
mu sync.Mutex
group *Group
@ -150,11 +128,9 @@ func (r *Reactor) newControlCommand(accounts []common.Address) *controlCommand {
signer: signer,
db: r.db,
},
erc20: NewERC20TransfersDownloader(client, accounts, signer),
feed: r.feed,
safetyDepth: reorgSafetyDepth(r.chain),
watchNewBlocks: r.watchNewBlocks,
errorsCount: 0,
erc20: NewERC20TransfersDownloader(client, accounts, signer),
feed: r.feed,
errorsCount: 0,
}
return ctl

View File

@ -71,8 +71,8 @@ func (s *Service) MergeBlocksRanges(accounts []common.Address, chain uint64) err
}
// StartReactor separately because it requires known ethereum address, which will become available only after login.
func (s *Service) StartReactor(client *ethclient.Client, accounts []common.Address, chain *big.Int, watchNewBlocks bool) error {
reactor := NewReactor(s.db, s.feed, client, chain, watchNewBlocks)
func (s *Service) StartReactor(client *ethclient.Client, accounts []common.Address, chain *big.Int) error {
reactor := NewReactor(s.db, s.feed, client, chain)
err := reactor.Start(accounts)
if err != nil {
return err

View File

@ -54,7 +54,7 @@ func (s *ReactorChangesSuite) SetupTest() {
s.backend, err = testchain.NewBackend()
s.Require().NoError(err)
s.feed = &event.Feed{}
s.reactor = NewReactor(s.db, &event.Feed{}, s.backend.Client, big.NewInt(1337), true)
s.reactor = NewReactor(s.db, &event.Feed{}, s.backend.Client, big.NewInt(1337))
account, err := crypto.GenerateKey()
s.Require().NoError(err)
s.first = crypto.PubkeyToAddress(account.PublicKey)
@ -123,13 +123,13 @@ func TestServiceStartStop(t *testing.T) {
account, err := crypto.GenerateKey()
require.NoError(t, err)
err = s.StartReactor(backend.Client, []common.Address{crypto.PubkeyToAddress(account.PublicKey)}, big.NewInt(1337), true)
err = s.StartReactor(backend.Client, []common.Address{crypto.PubkeyToAddress(account.PublicKey)}, big.NewInt(1337))
require.NoError(t, err)
require.NoError(t, s.Stop())
require.NoError(t, s.Start(nil))
err = s.StartReactor(backend.Client, []common.Address{crypto.PubkeyToAddress(account.PublicKey)}, big.NewInt(1337), true)
err = s.StartReactor(backend.Client, []common.Address{crypto.PubkeyToAddress(account.PublicKey)}, big.NewInt(1337))
require.NoError(t, err)
require.NoError(t, s.Stop())
}