From f3ee49c1100ad78462af0e74fb53c232f260941a Mon Sep 17 00:00:00 2001 From: Roman Volosovskyi Date: Thu, 1 Apr 2021 12:04:47 +0300 Subject: [PATCH] [wallet] cleanup --- api/geth_backend.go | 96 ---- mobile/status.go | 12 - services/local-notifications/core_test.go | 4 +- services/local-notifications/transaction.go | 24 +- services/wallet/README.md | 193 +++++--- services/wallet/api.go | 16 + services/wallet/commands.go | 478 +------------------- services/wallet/commands_test.go | 252 ----------- services/wallet/database.go | 6 - services/wallet/events.go | 23 +- services/wallet/reactor.go | 48 +- services/wallet/service.go | 4 +- services/wallet/service_test.go | 6 +- 13 files changed, 210 insertions(+), 952 deletions(-) delete mode 100644 services/wallet/commands_test.go diff --git a/api/geth_backend.go b/api/geth_backend.go index 4dcca110b..6e261d014 100644 --- a/api/geth_backend.go +++ b/api/geth_backend.go @@ -89,7 +89,6 @@ type GethStatusBackend struct { selectedAccountKeyID string log log.Logger allowAllRPC bool // used only for tests, disables api method restrictions - forceStopWallet bool } // NewGethStatusBackend create a new GethStatusBackend instance @@ -968,50 +967,6 @@ func (b *GethStatusBackend) AppStateChange(state string) { // and normal mode if the app is in foreground. } -func (b *GethStatusBackend) StopWallet() error { - wallet, err := b.statusNode.WalletService() - if err != nil { - b.log.Error("Retrieving of wallet service failed on StopWallet", "error", err) - return nil - } - if wallet.IsStarted() { - err = wallet.Stop() - if err != nil { - b.log.Error("Wallet service stop failed on StopWallet", "error", err) - return nil - } - } - - b.forceStopWallet = true - - return nil -} - -func (b *GethStatusBackend) StartWallet(watchNewBlocks bool) error { - wallet, err := b.statusNode.WalletService() - if err != nil { - b.log.Error("Retrieving of wallet service failed on StartWallet", "error", err) - return nil - } - if !wallet.IsStarted() { - err = wallet.Start(b.statusNode.Server()) - if err != nil { - b.log.Error("Wallet service start failed on StartWallet", "error", err) - return nil - } - - err = b.startWallet(watchNewBlocks) - if err != nil { - b.log.Error("Wallet reactor start failed on StartWallet", "error", err) - return nil - } - } - - b.forceStopWallet = false - - return nil -} - func (b *GethStatusBackend) StopLocalNotifications() error { localPN, err := b.statusNode.LocalNotificationsService() if err != nil { @@ -1044,11 +999,6 @@ func (b *GethStatusBackend) StartLocalNotifications() error { return nil } - if !wallet.IsStarted() { - b.log.Error("Can't start local notifications service without wallet service") - return nil - } - if !localPN.IsStarted() { err = localPN.Start(b.statusNode.Server()) @@ -1244,52 +1194,6 @@ func (b *GethStatusBackend) injectAccountsIntoServices() error { return nil } -func (b *GethStatusBackend) startWallet(watchNewBlocks bool) error { - if !b.statusNode.Config().WalletConfig.Enabled { - return nil - } - - wallet, err := b.statusNode.WalletService() - if err != nil { - return err - } - - accountsDB := accounts.NewDB(b.appDB) - watchAddresses, err := accountsDB.GetWalletAddresses() - if err != nil { - return err - } - - mainAccountAddress, err := b.accountManager.MainAccountAddress() - if err != nil { - return err - } - - uniqAddressesMap := map[common.Address]struct{}{} - allAddresses := []common.Address{} - mainAddress := common.Address(mainAccountAddress) - uniqAddressesMap[mainAddress] = struct{}{} - allAddresses = append(allAddresses, mainAddress) - for _, addr := range watchAddresses { - address := common.Address(addr) - if _, ok := uniqAddressesMap[address]; !ok { - uniqAddressesMap[address] = struct{}{} - allAddresses = append(allAddresses, address) - } - } - - err = wallet.MergeBlocksRanges(allAddresses, b.statusNode.Config().NetworkID) - if err != nil { - return err - } - - return wallet.StartReactor( - b.statusNode.RPCClient().Ethclient(), - allAddresses, - new(big.Int).SetUint64(b.statusNode.Config().NetworkID), - watchNewBlocks) -} - func appendIf(condition bool, services []gethnode.ServiceConstructor, service gethnode.ServiceConstructor) []gethnode.ServiceConstructor { if !condition { return services diff --git a/mobile/status.go b/mobile/status.go index 6a39ea796..3c62d4e89 100644 --- a/mobile/status.go +++ b/mobile/status.go @@ -511,18 +511,6 @@ func AppStateChange(state string) { statusBackend.AppStateChange(state) } -// StopWallet -func StopWallet() string { - err := statusBackend.StopWallet() - return makeJSONResponse(err) -} - -// StartWallet -func StartWallet(watchNewBlocks bool) string { - err := statusBackend.StartWallet(watchNewBlocks) - return makeJSONResponse(err) -} - // StartLocalNotifications func StartLocalNotifications() string { err := statusBackend.StartLocalNotifications() diff --git a/services/local-notifications/core_test.go b/services/local-notifications/core_test.go index 8c4b896ce..ad3a1840c 100644 --- a/services/local-notifications/core_test.go +++ b/services/local-notifications/core_test.go @@ -113,13 +113,13 @@ func TestTransactionNotification(t *testing.T) { require.NoError(t, walletDb.ProcessTranfers(transfers, []*wallet.DBHeader{})) feed.Send(wallet.Event{ - Type: wallet.EventMaxKnownBlock, + Type: wallet.EventRecentHistoryReady, BlockNumber: big.NewInt(0), Accounts: []common.Address{header.Address}, }) feed.Send(wallet.Event{ - Type: wallet.EventNewBlock, + Type: wallet.EventNewTransfers, BlockNumber: header.Number, Accounts: []common.Address{header.Address}, }) diff --git a/services/local-notifications/transaction.go b/services/local-notifications/transaction.go index 5fa59ac97..7550794d6 100644 --- a/services/local-notifications/transaction.go +++ b/services/local-notifications/transaction.go @@ -26,12 +26,10 @@ const ( // TransactionEvent - structure used to pass messages from wallet to bus type TransactionEvent struct { - Type string `json:"type"` - BlockNumber *big.Int `json:"block-number"` - Accounts []common.Address `json:"accounts"` - NewTransactionsPerAccount map[common.Address]int `json:"new-transactions"` - ERC20 bool `json:"erc20"` - MaxKnownBlocks map[common.Address]*big.Int `json:"max-known-blocks"` + Type string `json:"type"` + BlockNumber *big.Int `json:"block-number"` + Accounts []common.Address `json:"accounts"` + MaxKnownBlocks map[common.Address]*big.Int `json:"max-known-blocks"` } type transactionBody struct { @@ -181,7 +179,7 @@ func (s *Service) StartWalletWatcher() { } return case event := <-events: - if event.Type == wallet.EventNewBlock && len(maxKnownBlocks) > 0 { + if event.Type == wallet.EventNewTransfers && len(maxKnownBlocks) > 0 { newBlocks := false for _, address := range event.Accounts { if _, ok := maxKnownBlocks[address]; !ok { @@ -194,15 +192,13 @@ func (s *Service) StartWalletWatcher() { } if newBlocks && s.WatchingEnabled { s.transmitter.publisher.Send(TransactionEvent{ - Type: string(event.Type), - BlockNumber: event.BlockNumber, - Accounts: event.Accounts, - NewTransactionsPerAccount: event.NewTransactionsPerAccount, - ERC20: event.ERC20, - MaxKnownBlocks: maxKnownBlocks, + Type: string(event.Type), + BlockNumber: event.BlockNumber, + Accounts: event.Accounts, + MaxKnownBlocks: maxKnownBlocks, }) } - } else if event.Type == wallet.EventMaxKnownBlock { + } else if event.Type == wallet.EventRecentHistoryReady { for _, address := range event.Accounts { if _, ok := maxKnownBlocks[address]; !ok { maxKnownBlocks[address] = event.BlockNumber diff --git a/services/wallet/README.md b/services/wallet/README.md index 914ad5364..69814471c 100644 --- a/services/wallet/README.md +++ b/services/wallet/README.md @@ -1,8 +1,6 @@ -Wallet -========== +# Wallet service API -Wallet service starts a loop that watches for new transfers (eth and erc20). -To correctly start the service two values need to be changed in the config: +Wallet service provides RPC API for checking transfers history and other methods related to wallet functionality. To enable service two values need to be changed in the config: 1. Set Enable to true in WalletConfig @@ -22,34 +20,120 @@ To correctly start the service two values need to be changed in the config: } ``` -API ----------- +## API -#### wallet_getTransfersByAddress +### wallet_getTransfersByAddress Returns avaiable transfers in a given range. -##### Parameters +#### Parameters - `address`: `HEX` - ethereum address encoded in hex - `toBlock`: `BIGINT` - end of the range. if nil query will return last transfers. - `limit`: `BIGINT` - limit of returned transfers. +- `fetchMore`: `BOOLEAN` - if `true`, there are less than `limit` fetched transfers in the database, and zero block is not reached yet, history will be scanned for more transfers. If `false` only transfers which are already fetched to the app's database will be returned. -##### Examples +#### Examples ```json -{"jsonrpc":"2.0","id":7,"method":"wallet_getTransfersByAddress","params":["0xb81a6845649fa8c042dfaceb3f7a684873406993","0x0","0x5"]} +{ + "jsonrpc":"2.0", + "id":7, + "method":"wallet_getTransfersByAddress", + "params":[ + "0xb81a6845649fa8c042dfaceb3f7a684873406993", + "0x0", + "0x5", + true + ] +} ``` -##### Returns +#### Returns -Objects in the same format. +```json +[ + { + "id":"0xb1a8adeaa0e6727bf01d6d8431b6238bdefa915e19ae7e8ceb16886c9f5e", + "type":"eth", + "address":"0xd65f3cb52605a54a833ae118fb13", + "blockNumber":"0xb7190", + "blockhash":"0x8d98aa2297fe322d0093b24372e2ead98414959093b479baf670", + "timestamp":"0x6048ec6", + "gasPrice":"0x346308a00", + "gasLimit":"0x508", + "gasUsed":"0x520", + "nonce":"0x13", + "txStatus":"0x1", + "input":"0x", + "txHash":"0x1adeaa0e672d7e67bf01d8431b6238bdef15e19ae7e8ceb16886c", + "value":"0x1", + "from":"0x2f865fb5dfdf0dfdf54a833ae118fb1363aaasd", + "to":"0xaaaaaaf3cb52605a54a833ae118fb1363a123123", + "contract":"0x0000000000000000000000000000000000000000", + "NetworkID":1 + },... +] +``` -#### wallet_getTokensBalances +### wallet_setInitialBlocksRange + +Sets `zero block - latest block` range as scanned for an account. It is used when a new multiaccount is generated to avoid scanning transfers history. + +#### Example + +```json +{"jsonrpc":"2.0","id":7,"method":"wallet_setInitialBlocksRange","params":[]} +``` + +### wallet_watchTransaction + +Starts watching for transaction confirmation/rejection. If transaction was not confirmed/rejected in 10 minutes the call is timed out with error. + +#### Parameters + +- `tx-id`: `HEX` - transaction hash + +#### Example + +```json +{ + "jsonrpc":"2.0", + "id":7, + "method":"wallet_watchTransaction", + "params":[ + "0xaaaaaaaa11111112222233333333" + ] +} +``` + +### wallet_checkRecentHistory + +#### Parameters + +- `addresses`: `[]HEX` - array of addresses to be checked + +#### Example + +```json +{ + "jsonrpc":"2.0", + "id":1, + "method":"wallet_checkRecentHistory", + "params":[ + [ + "0x23458d65f3cB52605a54AaA833ae118fb1111aaa", + "0x24568B4166D11aaa1194097C60Cdc714F7e11111" + ] + ] +} +``` + +### wallet_getTokensBalances Returns tokens balances mapping for every account. See section below for the response example. -##### Parameters +#### Parameters - `accounts` `HEX` - list of ethereum addresses encoded in hex - `tokens` `HEX` - list of ethereum addresses encoded in hex @@ -75,62 +159,65 @@ First level keys accounts, second level keys are tokens. } ``` -Signals + +## Signals ------- -Two signals can be emitted: - -1. `newblock` signal - -Emitted when transfers from new block were added to the database. In this case block number if the number of this new block. -Client expected to request transfers starting from received block. +All events are of the same format: ```json { "type": "wallet", "event": { - "type": "newblock", + "type": "event-type", "blockNumber": 0, "accounts": [ "0x42c8f505b4006d417dd4e0ba0e880692986adbd8", "0x3129mdasmeo132128391fml1130410k312312mll" - ] + ], + "message": "something might be here" } } ``` -2. `reorg` signal. +1. `new-transfers` -Emitted when part of blocks were removed. Starting from a given block number all transfers were removed. -Client expected to request new transfers from received block and replace transfers that were received previously. +Emitted when transfers are detected. In this case block number is a block number of the latest found transfer. +Client expected to request transfers starting from received block. -```json -{ - "type": "wallet", - "event": { - "type": "reorg", - "blockNumber": 0, - "accounts": [ - "0x42c8f505b4006d417dd4e0ba0e880692986adbd8" - ] - } -} -``` +2. `recent-history-fetching` -3. `history` signal +Emitted when history scanning is started. -Emmited when historical transfers were downloaded. Block number will refer the first block where historical transfers -were found. +3. `recent-history-ready` -```json -{ - "type": "wallet", - "event": { - "type": "history", - "blockNumber": 0, - "accounts": [ - "0x42c8f505b4006d417dd4e0ba0e880692986adbd8" - ] - } -} -``` +Emitted when history scanning is ended. + +4. `fetching-history-error` + +Emitted when when history can't be fetched because some error. Error's decritption can be found in `message` field. + +5. `non-archival-node-detected` + +Emitted when the application is connected to a non-archival node. + +## Flows + +### Account creation + +When a new multiaccount is created corresponding address will not contain any transaction. Thus no point in checking history, it will be empty. + +1. Call `wallet_setInitialRange` +2. Call `wallet_checkRecentHistory` +3. On `recent-history-ready` request transactions via `wallet_getTransfersByAddress` +4. Repeat `wallet_checkRecentHistory` in N minutes (currently 20 minutes in `status-react` for upstream RPC node. If a custom node is used interval can be arbitrary) + +### Logging into application +1. Call `wallet_checkRecentHistory` +2. On `recent-history-ready` request transactions via `wallet_getTransfersByAddress` +3. Repeat `wallet_checkRecentHistory` in N minutes (currently 20 minutes in `status-react` for upstream RPC node. If a custom node is used interval can be arbitrary) + +### Watching transaction +1. Call `wallet_watchTransaction` +2. On success call `wallet_checkRecentHistory` +3. On `recent-history-ready` request transactions via `wallet_getTransfersByAddress` diff --git a/services/wallet/api.go b/services/wallet/api.go index bbe2f8b5c..040323436 100644 --- a/services/wallet/api.go +++ b/services/wallet/api.go @@ -222,3 +222,19 @@ func (api *API) WatchTransaction(ctx context.Context, transactionHash common.Has return watchTxCommand.Command()(commandContext) } + +func (api *API) CheckRecentHistory(ctx context.Context, addresses []common.Address) error { + if len(addresses) == 0 { + log.Info("no addresses provided") + return nil + } + err := api.s.MergeBlocksRanges(addresses, api.s.db.network) + if err != nil { + return err + } + + return api.s.StartReactor( + api.s.client.client, + addresses, + new(big.Int).SetUint64(api.s.db.network)) +} diff --git a/services/wallet/commands.go b/services/wallet/commands.go index 0a706d24c..4604ec924 100644 --- a/services/wallet/commands.go +++ b/services/wallet/commands.go @@ -2,7 +2,6 @@ package wallet import ( "context" - "errors" "math/big" "strings" "time" @@ -14,7 +13,6 @@ import ( ) var numberOfBlocksCheckedPerIteration = 40 -var blocksDelayThreshhold = 40 * time.Second type ethHistoricalCommand struct { db *Database @@ -117,331 +115,6 @@ func (c *erc20HistoricalCommand) Run(ctx context.Context) (err error) { return nil } -type newBlocksTransfersCommand struct { - db *Database - accounts []common.Address - chain *big.Int - erc20 *ERC20TransfersDownloader - eth *ETHTransferDownloader - client *walletClient - feed *event.Feed - lastFetchedBlockTime time.Time - - initialFrom, from, to *DBHeader -} - -func (c *newBlocksTransfersCommand) Command() Command { - // if both blocks are specified we will use this command to verify that lastly synced blocks are still - // in canonical chain - if c.to != nil && c.from != nil { - return FiniteCommand{ - Interval: 5 * time.Second, - Runable: c.Verify, - }.Run - } - return InfiniteCommand{ - Interval: pollingPeriodByChain(c.chain), - Runable: c.Run, - }.Run -} - -func (c *newBlocksTransfersCommand) Verify(parent context.Context) (err error) { - if c.to == nil || c.from == nil { - return errors.New("`from` and `to` blocks must be specified") - } - for c.from.Number.Cmp(c.to.Number) != 0 { - err = c.Run(parent) - if err != nil { - return err - } - } - return nil -} - -func (c *newBlocksTransfersCommand) getAllTransfers(parent context.Context, from, to uint64) (map[common.Address][]Transfer, error) { - transfersByAddress := map[common.Address][]Transfer{} - if to-from > reorgSafetyDepth(c.chain).Uint64() { - fromByAddress := map[common.Address]*LastKnownBlock{} - toByAddress := map[common.Address]*big.Int{} - for _, account := range c.accounts { - fromByAddress[account] = &LastKnownBlock{Number: new(big.Int).SetUint64(from)} - toByAddress[account] = new(big.Int).SetUint64(to) - } - - balanceCache := newBalanceCache() - blocksCommand := &findAndCheckBlockRangeCommand{ - accounts: c.accounts, - db: c.db, - chain: c.chain, - client: c.client, - balanceCache: balanceCache, - feed: c.feed, - fromByAddress: fromByAddress, - toByAddress: toByAddress, - noLimit: true, - } - - if err := blocksCommand.Command()(parent); err != nil { - return nil, err - } - - for address, headers := range blocksCommand.foundHeaders { - blocks := make([]*big.Int, len(headers)) - for i, header := range headers { - blocks[i] = header.Number - } - txCommand := &loadTransfersCommand{ - accounts: []common.Address{address}, - db: c.db, - chain: c.chain, - client: c.erc20.client, - blocksByAddress: map[common.Address][]*big.Int{address: blocks}, - } - - err := txCommand.Command()(parent) - if err != nil { - return nil, err - } - - transfersByAddress[address] = txCommand.foundTransfersByAddress[address] - } - } else { - all := []Transfer{} - newHeadersByAddress := map[common.Address][]*DBHeader{} - for n := from; n <= to; n++ { - ctx, cancel := context.WithTimeout(parent, 10*time.Second) - header, err := c.client.HeaderByNumber(ctx, big.NewInt(int64(n))) - cancel() - if err != nil { - return nil, err - } - dbHeader := toDBHeader(header) - log.Info("reactor get transfers", "block", dbHeader.Hash, "number", dbHeader.Number) - transfers, err := c.getTransfers(parent, dbHeader) - if err != nil { - log.Error("failed to get transfers", "header", dbHeader.Hash, "error", err) - return nil, err - } - if len(transfers) > 0 { - for _, transfer := range transfers { - headers, ok := newHeadersByAddress[transfer.Address] - if !ok { - headers = []*DBHeader{} - } - - transfers, ok := transfersByAddress[transfer.Address] - if !ok { - transfers = []Transfer{} - } - transfersByAddress[transfer.Address] = append(transfers, transfer) - newHeadersByAddress[transfer.Address] = append(headers, dbHeader) - } - } - all = append(all, transfers...) - } - - err := c.saveHeaders(parent, newHeadersByAddress) - if err != nil { - return nil, err - } - - err = c.db.ProcessTranfers(all, nil) - if err != nil { - log.Error("failed to persist transfers", "error", err) - return nil, err - } - } - - return transfersByAddress, nil -} - -func (c *newBlocksTransfersCommand) saveHeaders(parent context.Context, newHeadersByAddress map[common.Address][]*DBHeader) (err error) { - for _, address := range c.accounts { - headers, ok := newHeadersByAddress[address] - if ok { - err = c.db.SaveBlocks(address, headers) - if err != nil { - log.Error("failed to persist blocks", "error", err) - return err - } - } - } - - return nil -} - -func (c *newBlocksTransfersCommand) checkDelay(parent context.Context, nextHeader *types.Header) (*types.Header, error) { - if time.Since(c.lastFetchedBlockTime) > blocksDelayThreshhold { - log.Info("There was a delay before loading next block", "time since previous successful fetching", time.Since(c.lastFetchedBlockTime)) - ctx, cancel := context.WithTimeout(parent, 5*time.Second) - latestHeader, err := c.client.HeaderByNumber(ctx, nil) - cancel() - if err != nil { - log.Warn("failed to get latest block", "number", nextHeader.Number, "error", err) - return nil, err - } - diff := new(big.Int).Sub(latestHeader.Number, nextHeader.Number) - if diff.Cmp(reorgSafetyDepth(c.chain)) >= 0 { - num := new(big.Int).Sub(latestHeader.Number, reorgSafetyDepth(c.chain)) - ctx, cancel := context.WithTimeout(parent, 5*time.Second) - nextHeader, err = c.client.HeaderByNumber(ctx, num) - cancel() - if err != nil { - log.Warn("failed to get next block", "number", num, "error", err) - return nil, err - } - } - } - - return nextHeader, nil -} - -func (c *newBlocksTransfersCommand) Run(parent context.Context) (err error) { - if c.from == nil { - ctx, cancel := context.WithTimeout(parent, 3*time.Second) - from, err := c.client.HeaderByNumber(ctx, nil) - cancel() - if err != nil { - log.Error("failed to get last known header", "error", err) - return err - } - c.from = toDBHeader(from) - } - num := new(big.Int).Add(c.from.Number, one) - ctx, cancel := context.WithTimeout(parent, 5*time.Second) - nextHeader, err := c.client.HeaderByNumber(ctx, num) - cancel() - if err != nil { - log.Warn("failed to get next block", "number", num, "error", err) - return err - } - log.Info("reactor received new block", "header", num) - - nextHeader, err = c.checkDelay(parent, nextHeader) - if err != nil { - return err - } - - ctx, cancel = context.WithTimeout(parent, 10*time.Second) - latestHeader, removed, latestValidSavedBlock, reorgSpotted, err := c.onNewBlock(ctx, c.from, nextHeader) - cancel() - if err != nil || latestHeader == nil { - log.Error("failed to process new header", "header", nextHeader, "error", err) - return err - } - - err = c.db.ProcessTranfers(nil, removed) - if err != nil { - return err - } - - latestHeader.Loaded = true - - fromN := nextHeader.Number.Uint64() - - if reorgSpotted { - if latestValidSavedBlock != nil { - fromN = latestValidSavedBlock.Number.Uint64() - } - if c.initialFrom != nil { - fromN = c.initialFrom.Number.Uint64() - } - } - toN := latestHeader.Number.Uint64() - all, err := c.getAllTransfers(parent, fromN, toN) - if err != nil { - return err - } - - c.from = toDBHeader(nextHeader) - c.lastFetchedBlockTime = time.Now() - if len(removed) != 0 { - lth := len(removed) - c.feed.Send(Event{ - Type: EventReorg, - BlockNumber: removed[lth-1].Number, - Accounts: uniqueAccountsFromHeaders(removed), - }) - } - log.Info("before sending new block event", "removed", len(removed), "len", len(uniqueAccountsFromTransfers(all))) - - c.feed.Send(Event{ - Type: EventNewBlock, - BlockNumber: latestHeader.Number, - Accounts: uniqueAccountsFromTransfers(all), - NewTransactionsPerAccount: transfersPerAccount(all), - }) - - return nil -} - -func (c *newBlocksTransfersCommand) onNewBlock(ctx context.Context, from *DBHeader, latest *types.Header) (lastestHeader *DBHeader, removed []*DBHeader, lastSavedValidHeader *DBHeader, reorgSpotted bool, err error) { - if from.Hash == latest.ParentHash { - // parent matching from node in the cache. on the same chain. - return toHead(latest), nil, nil, false, nil - } - - lastSavedBlock, err := c.db.GetLastSavedBlock() - if err != nil { - return nil, nil, nil, false, err - } - - if lastSavedBlock == nil { - return toHead(latest), nil, nil, true, nil - } - - header, err := c.client.HeaderByNumber(ctx, lastSavedBlock.Number) - if err != nil { - return nil, nil, nil, false, err - } - - if header.Hash() == lastSavedBlock.Hash { - return toHead(latest), nil, lastSavedBlock, true, nil - } - - log.Debug("wallet reactor spotted reorg", "last header in db", from.Hash, "new parent", latest.ParentHash) - for lastSavedBlock != nil { - removed = append(removed, lastSavedBlock) - lastSavedBlock, err = c.db.GetLastSavedBlockBefore(lastSavedBlock.Number) - - if err != nil { - return nil, nil, nil, false, err - } - - if lastSavedBlock == nil { - continue - } - - header, err := c.client.HeaderByNumber(ctx, lastSavedBlock.Number) - if err != nil { - return nil, nil, nil, false, err - } - - // the last saved block is still valid - if header.Hash() == lastSavedBlock.Hash { - return toHead(latest), nil, lastSavedBlock, true, nil - } - } - - return toHead(latest), removed, lastSavedBlock, true, nil -} - -func (c *newBlocksTransfersCommand) getTransfers(parent context.Context, header *DBHeader) ([]Transfer, error) { - ctx, cancel := context.WithTimeout(parent, 5*time.Second) - ethT, err := c.eth.GetTransfers(ctx, header) - cancel() - if err != nil { - return nil, err - } - ctx, cancel = context.WithTimeout(parent, 5*time.Second) - erc20T, err := c.erc20.GetTransfers(ctx, header) - cancel() - if err != nil { - return nil, err - } - return append(ethT, erc20T...), nil -} - // controlCommand implements following procedure (following parts are executed sequeantially): // - verifies that the last header that was synced is still in the canonical chain // - runs fast indexing for each account separately @@ -454,8 +127,6 @@ type controlCommand struct { chain *big.Int client *walletClient feed *event.Feed - safetyDepth *big.Int - watchNewBlocks bool errorsCount int nonArchivalRPCNode bool } @@ -614,38 +285,6 @@ func (c *controlCommand) LoadTransfers(ctx context.Context, downloader *ETHTrans return loadTransfers(ctx, c.accounts, c.db, c.client, c.chain, limit, make(map[common.Address][]*big.Int)) } -/* -// verifyLastSynced verifies that last header that was added to the database is still in the canonical chain. -// it is done by downloading configured number of parents for the last header in the db. -func (c *controlCommand) verifyLastSynced(parent context.Context, last *DBHeader, head *types.Header) error { - log.Debug("verifying that previous header is still in canonical chan", "from", last.Number, "chain head", head.Number) - if new(big.Int).Sub(head.Number, last.Number).Cmp(c.safetyDepth) <= 0 { - log.Debug("no need to verify. last block is close enough to chain head") - return nil - } - ctx, cancel := context.WithTimeout(parent, 3*time.Second) - header, err := c.client.HeaderByNumber(ctx, new(big.Int).Add(last.Number, c.safetyDepth)) - cancel() - if err != nil { - return err - } - log.Info("spawn reorg verifier", "from", last.Number, "to", header.Number) - // TODO(dshulyak) make a standalone command that - // doesn't manage transfers and has an upper limit - cmd := &newBlocksTransfersCommand{ - db: c.db, - chain: c.chain, - client: c.client, - eth: c.eth, - erc20: c.erc20, - feed: c.feed, - - from: last, - to: toDBHeader(header), - } - return cmd.Command()(parent) -} -*/ func findFirstRange(c context.Context, account common.Address, initialTo *big.Int, client *walletClient) (*big.Int, error) { from := big.NewInt(0) to := initialTo @@ -756,11 +395,7 @@ func (c *controlCommand) Run(parent context.Context) error { } } - target := new(big.Int).Sub(head.Number, c.safetyDepth) - if target.Cmp(zero) <= 0 { - target = zero - } - + target := head.Number fromByAddress := map[common.Address]*LastKnownBlock{} toByAddress := map[common.Address]*big.Int{} @@ -819,15 +454,24 @@ func (c *controlCommand) Run(parent context.Context) error { return err } + events := map[common.Address]Event{} for _, address := range c.accounts { - for _, header := range cmnd.foundHeaders[address] { - c.feed.Send(Event{ - Type: EventNewBlock, - BlockNumber: header.Number, - Accounts: []common.Address{address}, - NewTransactionsPerAccount: map[common.Address]int{address: 20}, - }) + event := Event{ + Type: EventNewTransfers, + Accounts: []common.Address{address}, } + for _, header := range cmnd.foundHeaders[address] { + if event.BlockNumber == nil || header.Number.Cmp(event.BlockNumber) == 1 { + event.BlockNumber = header.Number + } + } + if event.BlockNumber != nil { + events[address] = event + } + } + + for _, event := range events { + c.feed.Send(event) } c.feed.Send(Event{ @@ -836,47 +480,6 @@ func (c *controlCommand) Run(parent context.Context) error { BlockNumber: target, }) - if c.watchNewBlocks { - ctx, cancel = context.WithTimeout(parent, 3*time.Second) - head, err = c.client.HeaderByNumber(ctx, target) - cancel() - if err != nil { - if c.NewError(err) { - return nil - } - return err - } - - log.Info("watching new blocks", "start from", target) - cmd := &newBlocksTransfersCommand{ - db: c.db, - chain: c.chain, - client: c.client, - accounts: c.accounts, - eth: c.eth, - erc20: c.erc20, - feed: c.feed, - initialFrom: toDBHeader(head), - from: toDBHeader(head), - lastFetchedBlockTime: time.Now(), - } - - err = cmd.Command()(parent) - if err != nil { - if c.NewError(err) { - return nil - } - log.Warn("error on running newBlocksTransfersCommand", "err", err) - return err - } - } else { - c.feed.Send(Event{ - Type: EventNewBlock, - BlockNumber: target, - Accounts: []common.Address{}, - }) - } - log.Info("end control command") return err } @@ -913,47 +516,6 @@ func (c *controlCommand) Command() Command { }.Run } -func uniqueAccountsFromTransfers(allTransfers map[common.Address][]Transfer) []common.Address { - accounts := []common.Address{} - unique := map[common.Address]struct{}{} - for address, transfers := range allTransfers { - if len(transfers) == 0 { - continue - } - - _, exist := unique[address] - if exist { - continue - } - unique[address] = struct{}{} - accounts = append(accounts, address) - } - return accounts -} - -func transfersPerAccount(allTransfers map[common.Address][]Transfer) map[common.Address]int { - res := map[common.Address]int{} - for address, transfers := range allTransfers { - res[address] = len(transfers) - } - - return res -} - -func uniqueAccountsFromHeaders(headers []*DBHeader) []common.Address { - accounts := []common.Address{} - unique := map[common.Address]struct{}{} - for i := range headers { - _, exist := unique[headers[i].Address] - if exist { - continue - } - unique[headers[i].Address] = struct{}{} - accounts = append(accounts, headers[i].Address) - } - return accounts -} - type transfersCommand struct { db *Database eth *ETHTransferDownloader @@ -1110,12 +672,6 @@ func (c *findAndCheckBlockRangeCommand) Run(parent context.Context) (err error) } } - c.feed.Send(Event{ - Type: EventMaxKnownBlock, - BlockNumber: maxBlockNumber, - Accounts: c.accounts, - }) - c.foundHeaders = foundHeaders return diff --git a/services/wallet/commands_test.go b/services/wallet/commands_test.go deleted file mode 100644 index 0f2cb0496..000000000 --- a/services/wallet/commands_test.go +++ /dev/null @@ -1,252 +0,0 @@ -package wallet - -import ( - "context" - "math/big" - "testing" - "time" - - "github.com/stretchr/testify/suite" - - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core" - "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/crypto" - "github.com/ethereum/go-ethereum/event" - - "github.com/status-im/status-go/t/devtests/testchain" -) - -func TestNewBlocksSuite(t *testing.T) { - suite.Run(t, new(NewBlocksSuite)) -} - -type NewBlocksSuite struct { - suite.Suite - backend *testchain.Backend - cmd *newBlocksTransfersCommand - address common.Address - db *Database - dbStop func() - feed *event.Feed -} - -func (s *NewBlocksSuite) SetupTest() { - var err error - db, stop := setupTestDB(s.Suite.T()) - s.db = db - s.dbStop = stop - s.backend, err = testchain.NewBackend() - s.Require().NoError(err) - account, err := crypto.GenerateKey() - s.Require().NoError(err) - s.address = crypto.PubkeyToAddress(account.PublicKey) - s.feed = &event.Feed{} - client := &walletClient{client: s.backend.Client} - s.cmd = &newBlocksTransfersCommand{ - db: s.db, - accounts: []common.Address{s.address}, - erc20: NewERC20TransfersDownloader(client, []common.Address{s.address}, s.backend.Signer), - eth: ÐTransferDownloader{ - chain: s.backend.Chain, - client: client, - signer: s.backend.Signer, - db: s.db, - accounts: []common.Address{s.address}, - }, - feed: s.feed, - client: client, - chain: big.NewInt(1777), - } -} - -func (s *NewBlocksSuite) TearDownTest() { - s.dbStop() - s.Require().NoError(s.backend.Stop()) -} - -func (s *NewBlocksSuite) TestOneBlock() { - ctx := context.Background() - s.Require().EqualError(s.cmd.Run(ctx), "not found") - tx := types.NewTransaction(0, s.address, big.NewInt(1e17), 21000, big.NewInt(1), nil) - tx, err := types.SignTx(tx, s.backend.Signer, s.backend.Faucet) - s.Require().NoError(err) - blocks := s.backend.GenerateBlocks(1, 0, func(n int, gen *core.BlockGen) { - gen.AddTx(tx) - }) - n, err := s.backend.Ethereum.BlockChain().InsertChain(blocks) - s.Require().Equal(1, n) - s.Require().NoError(err) - - events := make(chan Event, 1) - sub := s.feed.Subscribe(events) - defer sub.Unsubscribe() - - s.Require().NoError(s.cmd.Run(ctx)) - - select { - case ev := <-events: - s.Require().Equal(ev.Type, EventNewBlock) - s.Require().Equal(ev.BlockNumber, big.NewInt(1)) - default: - s.Require().FailNow("event wasn't emitted") - } - transfers, err := s.db.GetTransfers(big.NewInt(0), nil) - s.Require().NoError(err) - s.Require().Len(transfers, 1) - s.Require().Equal(tx.Hash(), transfers[0].ID) -} - -func (s *NewBlocksSuite) genTx(nonce int) *types.Transaction { - tx := types.NewTransaction(uint64(nonce), s.address, big.NewInt(1e10), 21000, big.NewInt(1), nil) - tx, err := types.SignTx(tx, s.backend.Signer, s.backend.Faucet) - s.Require().NoError(err) - return tx -} - -func (s *NewBlocksSuite) runCmdUntilError(ctx context.Context) (err error) { - for err == nil { - err = s.cmd.Run(ctx) - } - return err -} - -/* -func (s *NewBlocksSuite) TestReorg() { - blocks := s.backend.GenerateBlocks(20, 0, nil) - n, err := s.backend.Ethereum.BlockChain().InsertChain(blocks) - s.Require().Equal(20, n) - s.Require().NoError(err) - - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) - defer cancel() - s.Require().EqualError(s.runCmdUntilError(ctx), "not found") - - blocks = s.backend.GenerateBlocks(3, 20, func(n int, gen *core.BlockGen) { - gen.AddTx(s.genTx(n)) - }) - n, err = s.backend.Ethereum.BlockChain().InsertChain(blocks) - s.Require().Equal(3, n) - s.Require().NoError(err) - - // `not found` returned when we query head+1 block - ctx, cancel = context.WithTimeout(context.Background(), 3*time.Second) - defer cancel() - s.cmd.from = toDBHeader(s.backend.Ethereum.BlockChain().GetHeaderByNumber(15)) - s.cmd.initialFrom = toDBHeader(s.backend.Ethereum.BlockChain().GetHeaderByNumber(15)) - s.Require().EqualError(s.runCmdUntilError(ctx), "not found") - - transfers, err := s.db.GetTransfers(big.NewInt(0), nil) - s.Require().NoError(err) - s.Require().Len(transfers, 3) - - blocks = s.backend.GenerateBlocks(10, 15, func(n int, gen *core.BlockGen) { - gen.AddTx(s.genTx(n)) - }) - n, err = s.backend.Ethereum.BlockChain().InsertChain(blocks) - s.Require().Equal(10, n) - s.Require().NoError(err) - - // it will be less but even if something went wrong we can't get more - events := make(chan Event, 10) - sub := s.feed.Subscribe(events) - defer sub.Unsubscribe() - - ctx, cancel = context.WithTimeout(context.Background(), 3*time.Second) - defer cancel() - s.Require().EqualError(s.runCmdUntilError(ctx), "not found") - - close(events) - expected := []Event{ - {Type: EventReorg, BlockNumber: big.NewInt(21)}, - {Type: EventNewBlock, BlockNumber: big.NewInt(24)}, - {Type: EventNewBlock, BlockNumber: big.NewInt(25)}, - } - i := 0 - for ev := range events { - s.Require().Equal(expected[i].Type, ev.Type) - s.Require().Equal(expected[i].BlockNumber, ev.BlockNumber) - i++ - } - - transfers, err = s.db.GetTransfers(nil, nil) - s.Require().NoError(err) - s.Require().Len(transfers, 10) -} -*/ - -func (s *NewBlocksSuite) downloadHistorical() { - blocks := s.backend.GenerateBlocks(40, 0, func(n int, gen *core.BlockGen) { - if n == 36 { - gen.AddTx(s.genTx(0)) - } else if n == 39 { - gen.AddTx(s.genTx(1)) - } - }) - n, err := s.backend.Ethereum.BlockChain().InsertChain(blocks) - s.Require().Equal(40, n) - s.Require().NoError(err) - - client := &walletClient{client: s.backend.Client} - - nonce := int64(0) - lastBlock := &LastKnownBlock{ - Number: big.NewInt(1), - Balance: big.NewInt(0), - Nonce: &nonce, - } - - eth := ðHistoricalCommand{ - db: s.db, - balanceCache: newBalanceCache(), - eth: ÐTransferDownloader{ - chain: s.backend.Chain, - client: client, - signer: s.backend.Signer, - accounts: []common.Address{s.address}, - }, - feed: s.feed, - address: s.address, - client: client, - from: lastBlock, - to: s.backend.Ethereum.BlockChain().CurrentBlock().Number(), - } - s.Require().NoError(eth.Run(context.Background()), "eth historical command failed to sync transfers") - //dbHeaders, err := s.db.GetBlocks() - s.Require().NoError(err) - s.Require().Len(eth.foundHeaders, 2) -} - -func (s *NewBlocksSuite) reorgHistorical() { - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) - defer cancel() - s.Require().EqualError(s.runCmdUntilError(ctx), "not found") - - blocks := s.backend.GenerateBlocks(10, 35, nil) - n, err := s.backend.Ethereum.BlockChain().InsertChain(blocks) - s.Require().Equal(10, n) - s.Require().NoError(err) - - ctx, cancel = context.WithTimeout(context.Background(), 3*time.Second) - defer cancel() - s.Require().EqualError(s.runCmdUntilError(ctx), "not found") - -} - -func (s *NewBlocksSuite) TestSafetyBufferFailure() { - s.downloadHistorical() - - s.reorgHistorical() -} - -func (s *NewBlocksSuite) TestSafetyBufferSuccess() { - s.downloadHistorical() - - safety := new(big.Int).Sub(s.backend.Ethereum.BlockChain().CurrentHeader().Number, big.NewInt(10)) - s.cmd.from = toDBHeader(s.backend.Ethereum.BlockChain().GetHeaderByNumber(safety.Uint64())) - s.reorgHistorical() - - transfers, err := s.db.GetTransfers(big.NewInt(0), nil) - s.Require().NoError(err) - s.Require().Len(transfers, 0) -} diff --git a/services/wallet/database.go b/services/wallet/database.go index 3b77b16f8..8ea7afe4f 100644 --- a/services/wallet/database.go +++ b/services/wallet/database.go @@ -36,12 +36,6 @@ func toDBHeader(header *types.Header) *DBHeader { } } -func toHead(header *types.Header) *DBHeader { - dbheader := toDBHeader(header) - dbheader.Head = true - return dbheader -} - // SyncOption is used to specify that application processed transfers for that block. type SyncOption uint diff --git a/services/wallet/events.go b/services/wallet/events.go index edf27df16..48dfbeee7 100644 --- a/services/wallet/events.go +++ b/services/wallet/events.go @@ -10,18 +10,13 @@ import ( type EventType string const ( - // EventNewBlock emitted when new block was added to the same canonical chan. - EventNewBlock EventType = "newblock" - EventMaxKnownBlock EventType = "maxKnownBlock" - // EventReorg emitted when canonical chain was changed. In this case, BlockNumber will be an earliest added block. - EventReorg EventType = "reorg" - // EventNewHistory emitted if transfer from older block was added. - EventNewHistory EventType = "history" + // EventNewTransfers emitted when new block was added to the same canonical chan. + EventNewTransfers EventType = "new-transfers" // EventFetchingRecentHistory emitted when fetching of lastest tx history is started EventFetchingRecentHistory EventType = "recent-history-fetching" - // EventRecentHistoryFetched emitted when fetching of lastest tx history is started + // EventRecentHistoryReady emitted when fetching of lastest tx history is started EventRecentHistoryReady EventType = "recent-history-ready" - // EventRecentHistoryError emitted when fetching of tx history failed + // EventFetchingHistoryError emitted when fetching of tx history failed EventFetchingHistoryError EventType = "fetching-history-error" // EventNonArchivalNodeDetected emitted when a connection to a non archival node is detected EventNonArchivalNodeDetected EventType = "non-archival-node-detected" @@ -29,10 +24,8 @@ const ( // Event is a type for wallet events. type Event struct { - Type EventType `json:"type"` - BlockNumber *big.Int `json:"blockNumber"` - Accounts []common.Address `json:"accounts"` - NewTransactionsPerAccount map[common.Address]int `json:"newTransactions"` - ERC20 bool `json:"erc20"` - Message string `json:"message"` + Type EventType `json:"type"` + BlockNumber *big.Int `json:"blockNumber"` + Accounts []common.Address `json:"accounts"` + Message string `json:"message"` } diff --git a/services/wallet/reactor.go b/services/wallet/reactor.go index 3560567f8..7e19ca014 100644 --- a/services/wallet/reactor.go +++ b/services/wallet/reactor.go @@ -5,7 +5,6 @@ import ( "errors" "math/big" "sync" - "time" "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" @@ -13,28 +12,9 @@ import ( "github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/event" - "github.com/status-im/status-go/params" "github.com/status-im/status-go/services/rpcstats" ) -// pow block on main chain is mined once per ~14 seconds -// but for tests we are using clique chain with immediate block signer -// hence we can use different polling periods for methods that depend on mining time. -func pollingPeriodByChain(chain *big.Int) time.Duration { - switch chain.Int64() { - case int64(params.MainNetworkID): - return 10 * time.Second - case int64(params.RopstenNetworkID): - return 4 * time.Second - default: - return 500 * time.Millisecond - } -} - -func reorgSafetyDepth(chain *big.Int) *big.Int { - return big.NewInt(0) -} - var ( erc20BatchSize = big.NewInt(100000) errAlreadyRunning = errors.New("already running") @@ -113,23 +93,21 @@ func (rc *walletClient) CallContract(ctx context.Context, call ethereum.CallMsg, } // NewReactor creates instance of the Reactor. -func NewReactor(db *Database, feed *event.Feed, client *ethclient.Client, chain *big.Int, watchNewBlocks bool) *Reactor { +func NewReactor(db *Database, feed *event.Feed, client *ethclient.Client, chain *big.Int) *Reactor { return &Reactor{ - db: db, - client: client, - feed: feed, - chain: chain, - watchNewBlocks: watchNewBlocks, + db: db, + client: client, + feed: feed, + chain: chain, } } // Reactor listens to new blocks and stores transfers into the database. type Reactor struct { - client *ethclient.Client - db *Database - feed *event.Feed - chain *big.Int - watchNewBlocks bool + client *ethclient.Client + db *Database + feed *event.Feed + chain *big.Int mu sync.Mutex group *Group @@ -150,11 +128,9 @@ func (r *Reactor) newControlCommand(accounts []common.Address) *controlCommand { signer: signer, db: r.db, }, - erc20: NewERC20TransfersDownloader(client, accounts, signer), - feed: r.feed, - safetyDepth: reorgSafetyDepth(r.chain), - watchNewBlocks: r.watchNewBlocks, - errorsCount: 0, + erc20: NewERC20TransfersDownloader(client, accounts, signer), + feed: r.feed, + errorsCount: 0, } return ctl diff --git a/services/wallet/service.go b/services/wallet/service.go index 8e751642d..3c7b02e7a 100644 --- a/services/wallet/service.go +++ b/services/wallet/service.go @@ -71,8 +71,8 @@ func (s *Service) MergeBlocksRanges(accounts []common.Address, chain uint64) err } // StartReactor separately because it requires known ethereum address, which will become available only after login. -func (s *Service) StartReactor(client *ethclient.Client, accounts []common.Address, chain *big.Int, watchNewBlocks bool) error { - reactor := NewReactor(s.db, s.feed, client, chain, watchNewBlocks) +func (s *Service) StartReactor(client *ethclient.Client, accounts []common.Address, chain *big.Int) error { + reactor := NewReactor(s.db, s.feed, client, chain) err := reactor.Start(accounts) if err != nil { return err diff --git a/services/wallet/service_test.go b/services/wallet/service_test.go index 12fde5bf1..fd2ac64a8 100644 --- a/services/wallet/service_test.go +++ b/services/wallet/service_test.go @@ -54,7 +54,7 @@ func (s *ReactorChangesSuite) SetupTest() { s.backend, err = testchain.NewBackend() s.Require().NoError(err) s.feed = &event.Feed{} - s.reactor = NewReactor(s.db, &event.Feed{}, s.backend.Client, big.NewInt(1337), true) + s.reactor = NewReactor(s.db, &event.Feed{}, s.backend.Client, big.NewInt(1337)) account, err := crypto.GenerateKey() s.Require().NoError(err) s.first = crypto.PubkeyToAddress(account.PublicKey) @@ -123,13 +123,13 @@ func TestServiceStartStop(t *testing.T) { account, err := crypto.GenerateKey() require.NoError(t, err) - err = s.StartReactor(backend.Client, []common.Address{crypto.PubkeyToAddress(account.PublicKey)}, big.NewInt(1337), true) + err = s.StartReactor(backend.Client, []common.Address{crypto.PubkeyToAddress(account.PublicKey)}, big.NewInt(1337)) require.NoError(t, err) require.NoError(t, s.Stop()) require.NoError(t, s.Start(nil)) - err = s.StartReactor(backend.Client, []common.Address{crypto.PubkeyToAddress(account.PublicKey)}, big.NewInt(1337), true) + err = s.StartReactor(backend.Client, []common.Address{crypto.PubkeyToAddress(account.PublicKey)}, big.NewInt(1337)) require.NoError(t, err) require.NoError(t, s.Stop()) }