From 047c9b526355a8865a5e4495ba54537cc8e086f7 Mon Sep 17 00:00:00 2001 From: Dmitry Shulyak Date: Fri, 14 Jun 2019 13:16:30 +0300 Subject: [PATCH] Download transfers starting from latest block header (#1467) --- .golangci.yml | 1 + api/backend.go | 50 +- node/node.go | 16 + node/status_node.go | 12 + params/config.go | 12 +- rpc/client.go | 9 + services/wallet/README.md | 199 ++++++++ services/wallet/api.go | 56 +++ services/wallet/async.go | 152 ++++++ services/wallet/commands.go | 462 ++++++++++++++++++ services/wallet/commands_test.go | 231 +++++++++ services/wallet/concurrent.go | 78 +++ services/wallet/concurrent_test.go | 126 +++++ services/wallet/database.go | 361 ++++++++++++++ services/wallet/database_test.go | 235 +++++++++ services/wallet/downloader.go | 304 ++++++++++++ services/wallet/downloader_test.go | 233 +++++++++ services/wallet/erc20/README.md | 4 + services/wallet/erc20/doc.go | 3 + services/wallet/erc20/erc20.go | 352 +++++++++++++ services/wallet/erc20/erc20.sol | 12 + services/wallet/events.go | 26 + services/wallet/iterative.go | 92 ++++ services/wallet/iterative_test.go | 117 +++++ services/wallet/migrations/bindata.go | 127 +++++ services/wallet/migrations/migrate.go | 43 ++ .../migrations/sql/0001_transfers.down.db.sql | 3 + .../migrations/sql/0001_transfers.up.db.sql | 24 + services/wallet/migrations/sql/doc.go | 3 + services/wallet/query.go | 86 ++++ services/wallet/reactor.go | 114 +++++ services/wallet/service.go | 88 ++++ services/wallet/transmitter.go | 64 +++ signal/events_wallet.go | 10 + signal/signals.go | 1 - sqlite/sqlite.go | 46 ++ t/devtests/devnode.go | 79 +-- t/devtests/miner/node.go | 81 +++ t/devtests/testchain/node.go | 80 +++ t/devtests/tranfers_test.go | 114 +++++ 40 files changed, 4018 insertions(+), 88 deletions(-) create mode 100644 services/wallet/README.md create mode 100644 services/wallet/api.go create mode 100644 services/wallet/async.go create mode 100644 services/wallet/commands.go create mode 100644 services/wallet/commands_test.go create mode 100644 services/wallet/concurrent.go create mode 100644 services/wallet/concurrent_test.go create mode 100644 services/wallet/database.go create mode 100644 services/wallet/database_test.go create mode 100644 services/wallet/downloader.go create mode 100644 services/wallet/downloader_test.go create mode 100644 services/wallet/erc20/README.md create mode 100644 services/wallet/erc20/doc.go create mode 100644 services/wallet/erc20/erc20.go create mode 100644 services/wallet/erc20/erc20.sol create mode 100644 services/wallet/events.go create mode 100644 services/wallet/iterative.go create mode 100644 services/wallet/iterative_test.go create mode 100644 services/wallet/migrations/bindata.go create mode 100644 services/wallet/migrations/migrate.go create mode 100644 services/wallet/migrations/sql/0001_transfers.down.db.sql create mode 100644 services/wallet/migrations/sql/0001_transfers.up.db.sql create mode 100644 services/wallet/migrations/sql/doc.go create mode 100644 services/wallet/query.go create mode 100644 services/wallet/reactor.go create mode 100644 services/wallet/service.go create mode 100644 services/wallet/transmitter.go create mode 100644 signal/events_wallet.go create mode 100644 sqlite/sqlite.go create mode 100644 t/devtests/miner/node.go create mode 100644 t/devtests/testchain/node.go create mode 100644 t/devtests/tranfers_test.go diff --git a/.golangci.yml b/.golangci.yml index 60222bc18..9153f013b 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -6,6 +6,7 @@ run: skip-dirs: - static skip-files: + - bindata.go - .*_mock.go - jail/doc.go diff --git a/api/backend.go b/api/backend.go index 32cda0cc6..4a65a9011 100644 --- a/api/backend.go +++ b/api/backend.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "math/big" + "path" "sync" "time" @@ -474,7 +475,19 @@ func (b *StatusBackend) Logout() error { default: return err } - + if b.statusNode.Config().WalletConfig.Enabled { + wallet, err := b.statusNode.WalletService() + switch err { + case node.ErrServiceUnknown: + case nil: + err = wallet.StopReactor() + if err != nil { + return err + } + default: + return err + } + } b.AccountManager().Logout() return nil @@ -497,7 +510,6 @@ func (b *StatusBackend) reSelectAccount() error { default: return err } - return nil } @@ -539,20 +551,38 @@ func (b *StatusBackend) SelectAccount(walletAddress, chatAddress, password strin return err } } + return b.startWallet(password) +} - return nil +func (b *StatusBackend) startWallet(password string) error { + if !b.statusNode.Config().WalletConfig.Enabled { + return nil + } + wallet, err := b.statusNode.WalletService() + if err != nil { + return err + } + account, err := b.accountManager.SelectedWalletAccount() + if err != nil { + return err + } + path := path.Join(b.statusNode.Config().DataDir, fmt.Sprintf("wallet-%x.sql", account.Address)) + return wallet.StartReactor(path, password, + b.statusNode.RPCClient().Ethclient(), + []common.Address{account.Address}, + new(big.Int).SetUint64(b.statusNode.Config().NetworkID)) } // SendDataNotification sends data push notifications to users. // dataPayloadJSON is a JSON string that looks like this: // { -// "data": { -// "msg-v2": { -// "from": "0x2cea3bd5", // hash of sender (first 10 characters/4 bytes of sha3 hash) -// "to": "0xb1f89744", // hash of recipient (first 10 characters/4 bytes of sha3 hash) -// "id": "0x872653ad", // message ID hash (first 10 characters/4 bytes of sha3 hash) -// } -// } +// "data": { +// "msg-v2": { +// "from": "0x2cea3bd5", // hash of sender (first 10 characters/4 bytes of sha3 hash) +// "to": "0xb1f89744", // hash of recipient (first 10 characters/4 bytes of sha3 hash) +// "id": "0x872653ad", // message ID hash (first 10 characters/4 bytes of sha3 hash) +// } +// } // } func (b *StatusBackend) SendDataNotification(dataPayloadJSON string, tokens ...string) error { log.Debug("sending data push notification") diff --git a/node/node.go b/node/node.go index 17a544720..cb19ac6dd 100644 --- a/node/node.go +++ b/node/node.go @@ -29,6 +29,7 @@ import ( "github.com/status-im/status-go/services/personal" "github.com/status-im/status-go/services/shhext" "github.com/status-im/status-go/services/status" + "github.com/status-im/status-go/services/wallet" "github.com/status-im/status-go/static" "github.com/status-im/status-go/timesource" whisper "github.com/status-im/whisper/whisperv6" @@ -45,6 +46,7 @@ var ( ErrStatusServiceRegistrationFailure = errors.New("failed to register the Status service") ErrPeerServiceRegistrationFailure = errors.New("failed to register the Peer service") ErrIncentivisationServiceRegistrationFailure = errors.New("failed to register the Incentivisation service") + ErrWalletServiceRegistrationFailure = errors.New("failed to register the Wallet service") ) // All general log messages in this package should be routed through this logger. @@ -118,6 +120,10 @@ func MakeNode(config *params.NodeConfig, db *leveldb.DB) (*node.Node, error) { return nil, fmt.Errorf("%v: %v", ErrPeerServiceRegistrationFailure, err) } + if err := activateWalletService(stack, config.WalletConfig); err != nil { + return nil, fmt.Errorf("%v: %v", ErrWalletServiceRegistrationFailure, err) + } + return stack, nil } @@ -269,6 +275,16 @@ func activatePeerService(stack *node.Node) error { }) } +func activateWalletService(stack *node.Node, config params.WalletConfig) error { + if !config.Enabled { + logger.Info("service.Wallet is disabled") + return nil + } + return stack.Register(func(*node.ServiceContext) (node.Service, error) { + return wallet.NewService(), nil + }) +} + func registerMailServer(whisperService *whisper.Whisper, config *params.WhisperConfig) (err error) { var mailServer mailserver.WMailServer whisperService.RegisterServer(&mailServer) diff --git a/node/status_node.go b/node/status_node.go index 0f1bb9962..5fe4bc013 100644 --- a/node/status_node.go +++ b/node/status_node.go @@ -32,6 +32,7 @@ import ( "github.com/status-im/status-go/services/peer" "github.com/status-im/status-go/services/shhext" "github.com/status-im/status-go/services/status" + "github.com/status-im/status-go/services/wallet" ) // tickerResolution is the delta to check blockchain sync progress. @@ -590,6 +591,17 @@ func (n *StatusNode) ShhExtService() (s *shhext.Service, err error) { return } +// WalletService returns wallet.Service instance if it is started. +func (n *StatusNode) WalletService() (s *wallet.Service, err error) { + n.mu.RLock() + defer n.mu.RUnlock() + err = n.gethService(&s) + if err == node.ErrServiceUnknown { + err = ErrServiceUnknown + } + return +} + // AccountManager exposes reference to node's accounts manager func (n *StatusNode) AccountManager() (*accounts.Manager, error) { n.mu.RLock() diff --git a/params/config.go b/params/config.go index 9f75406ca..7703f9407 100644 --- a/params/config.go +++ b/params/config.go @@ -340,9 +340,12 @@ type NodeConfig struct { // IncentivisationConfig extra configuration for incentivisation service IncentivisationConfig IncentivisationConfig `json:"IncentivisationConfig," validate:"structonly"` - // ShhextConfig keeps configuration for service running under shhext namespace. + // ShhextConfig extra configuration for service running under shhext namespace. ShhextConfig ShhextConfig `json:"ShhextConfig," validate:"structonly"` + // WalletConfig extra configuration for wallet.Service. + WalletConfig WalletConfig + // SwarmConfig extra configuration for Swarm and ENS SwarmConfig SwarmConfig `json:"SwarmConfig," validate:"structonly"` @@ -358,6 +361,11 @@ type NodeConfig struct { MailServerRegistryAddress string } +// WalletConfig extra configuration for wallet.Service. +type WalletConfig struct { + Enabled bool +} + // ShhextConfig defines options used by shhext service. type ShhextConfig struct { PFSEnabled bool @@ -523,7 +531,7 @@ func NewNodeConfig(dataDir string, networkID uint64) (*NodeConfig, error) { HTTPPort: 8545, HTTPVirtualHosts: []string{"localhost"}, ListenAddr: ":0", - APIModules: "eth,net,web3,peer", + APIModules: "eth,net,web3,peer,wallet", MaxPeers: 25, MaxPendingPeers: 0, IPCFile: "geth.ipc", diff --git a/rpc/client.go b/rpc/client.go index 4173017d2..9e53786d4 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -9,6 +9,7 @@ import ( "sync" "time" + "github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/log" gethrpc "github.com/ethereum/go-ethereum/rpc" "github.com/status-im/status-go/params" @@ -74,6 +75,14 @@ func NewClient(client *gethrpc.Client, upstream params.UpstreamRPCConfig) (*Clie return &c, nil } +// Ethclient returns ethclient.Client with upstream or local client. +func (c *Client) Ethclient() *ethclient.Client { + if c.upstreamEnabled { + return ethclient.NewClient(c.upstream) + } + return ethclient.NewClient(c.local) +} + // UpdateUpstreamURL changes the upstream RPC client URL, if the upstream is enabled. func (c *Client) UpdateUpstreamURL(url string) error { if c.upstream == nil { diff --git a/services/wallet/README.md b/services/wallet/README.md new file mode 100644 index 000000000..811ef64c9 --- /dev/null +++ b/services/wallet/README.md @@ -0,0 +1,199 @@ +Wallet +========== + +Wallet service starts a loop that watches for new transfers (eth and erc20). +To correctly start the service two values need to be changed in the config: + +1. Set Enable to true in WalletConfig + +```json +{ + "WalletConfig": { + "Enabled": true, + } +} +``` + +2. And expose wallet API with APIModules + +``` +{ + APIModules: "eth,net,web3,peer,wallet", +} +``` + +API +---------- + +#### wallet_getTransfers + +Returns avaiable transfers in a given range. + +##### Parameters + +- `start`: `BIGINT` - start of the range +- `end`: `BIGINT` - end of the range. if nil query will return all transfers from start. + +##### Examples + +```json +{"jsonrpc":"2.0","id":14,"method":"wallet_getTransfers","params":[0,20]} +{"jsonrpc":"2.0","id":14,"method":"wallet_getTransfers","params":[0,null]} +{"jsonrpc":"2.0","id":13,"method":"wallet_getTransfers","params":[0]} +``` + +##### Returns + +List of objects like: + + +```json +[ + { + "type": "erc20", + "address": "0x5dc6108dc6296b052bbd33000553afe0ea576b5e", + "blockNumber": 5687981, + "blockhash": "0xcc4553f125be0bc6cc974518368145fcf1344f41e5de238205db0a1c185ea2fc", + "transaction": { + "nonce": "0x57", + "gasPrice": "0x3b9aca00", + "gas": "0x44ba8", + "to": "0xc55cf4b03948d7ebc8b9e8bad92643703811d162", + "value": "0x0", + "input": "0xcae9ca5100000000000000000000000039d16cdb56b5a6a89e1a397a13fe48034694316e0000000000000000000000000000000000000000000000015af1d78b58c40000000000000000000000000000000000000000000000000000000000000000006000000000000000000000000000000000000000000000000000000000000000449134709e00000000000000000000000000000000000000000000000000000000000000010000000000000000000000005dc6108dc6296b052bbd33000553afe0ea576b5e00000000000000000000000000000000000000000000000000000000", + "v": "0x29", + "r": "0x124587e9c1d16d8bd02fda1221aefbfca8e2f4cd6300ed2077ebf736789179ab", + "s": "0x4309fddc1226dacb877488221a439c4f97d77dc2c3f5c8ea51f34f42417d3bda", + "hash": "0x259dd45c9c4d52137f32b7787e6e1fb6c9faf70ba40b8137bf66ba03abc0da00" + }, + "receipt": { + "root": "0x", + "status": "0x1", + "cumulativeGasUsed": "0x389e1e", + "logsBloom": "0x00000000000000000000000000000000000000000000000000000000000200000000020000000000000000000000000000000000004000000000000000200000000000000020000000000008000000000000000000000000000000000000000000000000020000000000002000000800000000100000000000000010000000000000000000400000000000000001000000000040000000400000000400000000020000000000000008000000000020000000010000000002000000000000020000000002000000000000000000000000000000000200000000000000000020000010000000000000000000000400000000000000000000000000000000000000", + "logs": [ + { + "address": "0xc55cf4b03948d7ebc8b9e8bad92643703811d162", + "topics": [ + "0x8c5be1e5ebec7d5bd14f71427d1e84f3dd0314c0f7b2291e5b200ac8c7c3b925", + "0x0000000000000000000000005dc6108dc6296b052bbd33000553afe0ea576b5e", + "0x00000000000000000000000039d16cdb56b5a6a89e1a397a13fe48034694316e" + ], + "data": "0x0000000000000000000000000000000000000000000000015af1d78b58c40000", + "blockNumber": "0x56caad", + "transactionHash": "0x259dd45c9c4d52137f32b7787e6e1fb6c9faf70ba40b8137bf66ba03abc0da00", + "transactionIndex": "0x10", + "blockHash": "0xcc4553f125be0bc6cc974518368145fcf1344f41e5de238205db0a1c185ea2fc", + "logIndex": "0xd", + "removed": false + }, + { + "address": "0xc55cf4b03948d7ebc8b9e8bad92643703811d162", + "topics": [ + "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef", + "0x0000000000000000000000005dc6108dc6296b052bbd33000553afe0ea576b5e", + "0x000000000000000000000000ee55b1661fd24c4760d92026cedb252a5a0f2a4e" + ], + "data": "0x0000000000000000000000000000000000000000000000015af1d78b58c40000", + "blockNumber": "0x56caad", + "transactionHash": "0x259dd45c9c4d52137f32b7787e6e1fb6c9faf70ba40b8137bf66ba03abc0da00", + "transactionIndex": "0x10", + "blockHash": "0xcc4553f125be0bc6cc974518368145fcf1344f41e5de238205db0a1c185ea2fc", + "logIndex": "0xe", + "removed": false + }, + { + "address": "0x39d16cdb56b5a6a89e1a397a13fe48034694316e", + "topics": [ + "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef", + "0x0000000000000000000000000000000000000000000000000000000000000000", + "0x0000000000000000000000005dc6108dc6296b052bbd33000553afe0ea576b5e", + "0x0000000000000000000000000000000000000000000000000000000000000044" + ], + "data": "0x", + "blockNumber": "0x56caad", + "transactionHash": "0x259dd45c9c4d52137f32b7787e6e1fb6c9faf70ba40b8137bf66ba03abc0da00", + "transactionIndex": "0x10", + "blockHash": "0xcc4553f125be0bc6cc974518368145fcf1344f41e5de238205db0a1c185ea2fc", + "logIndex": "0xf", + "removed": false + } + ], + "transactionHash": "0x259dd45c9c4d52137f32b7787e6e1fb6c9faf70ba40b8137bf66ba03abc0da00", + "contractAddress": "0x0000000000000000000000000000000000000000", + "gasUsed": "0x34f42" + } + } +] +``` + +##### Examples + +```json +{"jsonrpc":"2.0","id":14,"method":"wallet_getTransfers","params":[0,20]} +{"jsonrpc":"2.0","id":14,"method":"wallet_getTransfers","params":[0,null]} +{"jsonrpc":"2.0","id":13,"method":"wallet_getTransfers","params":[0]} +``` + +#### wallet_getTransfersByAddress + +Returns avaiable transfers in a given range. + +##### Parameters + +- `address`: `HEX` - ethereum address encoded in hex +- `start`: `BIGINT` - start of the range +- `end`: `BIGINT` - end of the range. if nil query will return all transfers from start. + +##### Examples + +```json +{"jsonrpc":"2.0","id":7,"method":"wallet_getTransfersByAddress","params":["0xb81a6845649fa8c042dfaceb3f7a684873406993","0x0"]} +``` + +##### Returns + +Objects in the same format. + + +Signals +------- + +Two signals can be emitted: + +1. `newblock` signal + +Emitted when transfers from new block were added to the database. In this case block number if the number of this new block. +Client expected to request transfers starting from received block. + +```json +{ + "type": "wallet", + "event": { + "type": "newblock", + "blockNumber": 0, + "accounts": [ + "0x42c8f505b4006d417dd4e0ba0e880692986adbd8", + "0x3129mdasmeo132128391fml1130410k312312mll" + ] + } +} +``` + +2. `reorg` signal. + +Emitted when part of blocks were removed. Starting from a given block number all transfers were removed. +Client expected to request new transfers from received block and replace transfers that were received previously. + +```json +{ + "type": "wallet", + "event": { + "type": "reorg", + "blockNumber": 0, + "accounts": [ + "0x42c8f505b4006d417dd4e0ba0e880692986adbd8" + ] + } +} +``` diff --git a/services/wallet/api.go b/services/wallet/api.go new file mode 100644 index 000000000..535c52cdd --- /dev/null +++ b/services/wallet/api.go @@ -0,0 +1,56 @@ +package wallet + +import ( + "context" + "errors" + "math/big" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/log" +) + +func NewAPI(s *Service) *API { + return &API{s} +} + +// API is class with methods available over RPC. +type API struct { + s *Service +} + +// GetTransfers returns transfers in range of blocks. If `end` is nil all transfers from `start` will be returned. +// TODO(dshulyak) benchmark loading many transfers from database. We can avoid json unmarshal/marshal if we will +// read header, tx and receipt as a raw json. +func (api *API) GetTransfers(ctx context.Context, start, end *hexutil.Big) ([]Transfer, error) { + log.Debug("call to get transfers", "start", start, "end", end) + if start == nil { + return nil, errors.New("start of the query must be provided. use 0 if you want to load all transfers") + } + if api.s.db == nil { + return nil, errors.New("wallet service is not initialized") + } + rst, err := api.s.db.GetTransfers((*big.Int)(start), (*big.Int)(end)) + if err != nil { + return nil, err + } + log.Debug("result from database for transfers", "start", start, "end", end, "len", len(rst)) + return rst, nil +} + +// GetTransfersByAddress returns transfers for a single address between two blocks. +func (api *API) GetTransfersByAddress(ctx context.Context, address common.Address, start, end *hexutil.Big) ([]Transfer, error) { + log.Debug("call to get transfers for an address", "address", address, "start", start, "end", end) + if start == nil { + return nil, errors.New("start of the query must be provided. use 0 if you want to load all transfers") + } + if api.s.db == nil { + return nil, errors.New("wallet service is not initialized") + } + rst, err := api.s.db.GetTransfersByAddress(address, (*big.Int)(start), (*big.Int)(end)) + if err != nil { + return nil, err + } + log.Debug("result from database for address", "address", address, "start", start, "end", end, "len", len(rst)) + return rst, nil +} diff --git a/services/wallet/async.go b/services/wallet/async.go new file mode 100644 index 000000000..253e1def0 --- /dev/null +++ b/services/wallet/async.go @@ -0,0 +1,152 @@ +package wallet + +import ( + "context" + "sync" + "time" +) + +type Command func(context.Context) error + +// FiniteCommand terminates when error is nil. +type FiniteCommand struct { + Interval time.Duration + Runable func(context.Context) error +} + +func (c FiniteCommand) Run(ctx context.Context) error { + ticker := time.NewTicker(c.Interval) + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + err := c.Runable(ctx) + if err == nil { + return nil + } + } + } +} + +// InfiniteCommand runs until context is closed. +type InfiniteCommand struct { + Interval time.Duration + Runable func(context.Context) error +} + +func (c InfiniteCommand) Run(ctx context.Context) error { + ticker := time.NewTicker(c.Interval) + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + _ = c.Runable(ctx) + } + } +} + +func NewGroup(parent context.Context) *Group { + ctx, cancel := context.WithCancel(parent) + return &Group{ + ctx: ctx, + cancel: cancel, + } +} + +type Group struct { + ctx context.Context + cancel func() + wg sync.WaitGroup +} + +func (g *Group) Add(cmd Command) { + g.wg.Add(1) + go func() { + _ = cmd(g.ctx) + g.wg.Done() + }() +} + +func (g *Group) Stop() { + g.cancel() +} + +func (g *Group) Wait() { + g.wg.Wait() +} + +func (g *Group) WaitAsync() <-chan struct{} { + ch := make(chan struct{}) + go func() { + g.Wait() + close(ch) + }() + return ch +} + +func NewAtomicGroup(parent context.Context) *AtomicGroup { + ctx, cancel := context.WithCancel(parent) + return &AtomicGroup{ctx: ctx, cancel: cancel} +} + +// AtomicGroup terminates as soon as first goroutine terminates.. +type AtomicGroup struct { + ctx context.Context + cancel func() + wg sync.WaitGroup + + mu sync.Mutex + error error +} + +// Go spawns function in a goroutine and stores results or errors. +func (d *AtomicGroup) Add(cmd Command) { + d.wg.Add(1) + go func() { + defer d.wg.Done() + err := cmd(d.ctx) + d.mu.Lock() + defer d.mu.Unlock() + if err != nil { + // do not overwrite original error by context errors + if d.error != nil { + return + } + d.error = err + d.cancel() + return + } + }() +} + +// Wait for all downloaders to finish. +func (d *AtomicGroup) Wait() { + d.wg.Wait() + if d.Error() == nil { + d.mu.Lock() + defer d.mu.Unlock() + d.cancel() + } +} + +func (d *AtomicGroup) WaitAsync() <-chan struct{} { + ch := make(chan struct{}) + go func() { + d.Wait() + close(ch) + }() + return ch +} + +// Error stores an error that was reported by any of the downloader. Should be called after Wait. +func (d *AtomicGroup) Error() error { + d.mu.Lock() + defer d.mu.Unlock() + return d.error +} + +func (d *AtomicGroup) Stop() { + d.cancel() +} diff --git a/services/wallet/commands.go b/services/wallet/commands.go new file mode 100644 index 000000000..12a7c9952 --- /dev/null +++ b/services/wallet/commands.go @@ -0,0 +1,462 @@ +package wallet + +import ( + "context" + "errors" + "math/big" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/ethclient" + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/log" +) + +type ethHistoricalCommand struct { + db *Database + eth TransferDownloader + address common.Address + client reactorClient + feed *event.Feed + + from, to *big.Int +} + +func (c *ethHistoricalCommand) Command() Command { + return FiniteCommand{ + Interval: 5 * time.Second, + Runable: c.Run, + }.Run +} + +func (c *ethHistoricalCommand) Run(ctx context.Context) (err error) { + if c.from == nil { + from, err := c.db.GetLatestSynced(c.address, ethSync) + if err != nil { + return err + } + if from == nil { + c.from = zero + } else { + c.from = from.Number + } + log.Debug("initialized downloader for eth historical transfers", "address", c.address, "starting at", c.from, "up to", c.to) + } + ctx, cancel := context.WithTimeout(ctx, 10*time.Minute) + defer cancel() + concurrent := NewConcurrentDownloader(ctx) + start := time.Now() + downloadEthConcurrently(concurrent, c.client, c.eth, c.address, c.from, c.to) + select { + case <-concurrent.WaitAsync(): + case <-ctx.Done(): + log.Error("eth downloader is stuck") + return errors.New("eth downloader is stuck") + } + if concurrent.Error() != nil { + log.Error("failed to dowload transfers using concurrent downloader", "error", err) + return concurrent.Error() + } + transfers := concurrent.Get() + log.Info("eth historical downloader finished successfully", "total transfers", len(transfers), "time", time.Since(start)) + err = c.db.ProcessTranfers(transfers, []common.Address{c.address}, headersFromTransfers(transfers), nil, ethSync) + if err != nil { + log.Error("failed to save downloaded erc20 transfers", "error", err) + return err + } + if len(transfers) > 0 { + // we download all or nothing + c.feed.Send(Event{ + Type: EventNewBlock, + BlockNumber: c.from, + Accounts: []common.Address{c.address}, + }) + } + log.Debug("eth transfers were persisted. command is closed") + return nil +} + +type erc20HistoricalCommand struct { + db *Database + erc20 BatchDownloader + address common.Address + client reactorClient + feed *event.Feed + + iterator *IterativeDownloader + to *DBHeader +} + +func (c *erc20HistoricalCommand) Command() Command { + return FiniteCommand{ + Interval: 5 * time.Second, + Runable: c.Run, + }.Run +} + +func (c *erc20HistoricalCommand) Run(ctx context.Context) (err error) { + if c.iterator == nil { + c.iterator, err = SetupIterativeDownloader( + c.db, c.client, c.address, erc20Sync, + c.erc20, erc20BatchSize, c.to) + if err != nil { + log.Error("failed to setup historical downloader for erc20") + return err + } + } + for !c.iterator.Finished() { + start := time.Now() + transfers, err := c.iterator.Next(ctx) + if err != nil { + log.Error("failed to get next batch", "error", err) + break + } + headers := headersFromTransfers(transfers) + headers = append(headers, c.iterator.Header()) + err = c.db.ProcessTranfers(transfers, []common.Address{c.address}, headers, nil, erc20Sync) + if err != nil { + c.iterator.Revert() + log.Error("failed to save downloaded erc20 transfers", "error", err) + return err + } + if len(transfers) > 0 { + log.Debug("erc20 downloader imported transfers", "len", len(transfers), "time", time.Since(start)) + c.feed.Send(Event{ + Type: EventNewBlock, + BlockNumber: c.iterator.Header().Number, + Accounts: []common.Address{c.address}, + }) + } + } + log.Info("wallet historical downloader for erc20 transfers finished") + return nil +} + +type newBlocksTransfersCommand struct { + db *Database + accounts []common.Address + chain *big.Int + erc20 *ERC20TransfersDownloader + eth *ETHTransferDownloader + client reactorClient + feed *event.Feed + + from, to *DBHeader +} + +func (c *newBlocksTransfersCommand) Command() Command { + // if both blocks are specified we will use this command to verify that lastly synced blocks are still + // in canonical chain + if c.to != nil && c.from != nil { + return FiniteCommand{ + Interval: 5 * time.Second, + Runable: c.Verify, + }.Run + } + return InfiniteCommand{ + Interval: pollingPeriodByChain(c.chain), + Runable: c.Run, + }.Run +} + +func (c *newBlocksTransfersCommand) Verify(parent context.Context) (err error) { + if c.to == nil || c.from == nil { + return errors.New("`from` and `to` blocks must be specified") + } + for c.from.Number.Cmp(c.to.Number) != 0 { + err = c.Run(parent) + if err != nil { + return err + } + } + return nil +} + +func (c *newBlocksTransfersCommand) Run(parent context.Context) (err error) { + if c.from == nil { + ctx, cancel := context.WithTimeout(parent, 3*time.Second) + from, err := c.client.HeaderByNumber(ctx, nil) + cancel() + if err != nil { + log.Error("failed to get last known header", "error", err) + return err + } + c.from = toDBHeader(from) + log.Debug("initialized downloader for new blocks transfers", "starting at", c.from.Number) + } + num := new(big.Int).Add(c.from.Number, one) + ctx, cancel := context.WithTimeout(parent, 5*time.Second) + latest, err := c.client.HeaderByNumber(ctx, num) + cancel() + if err != nil { + log.Warn("failed to get latest block", "number", num, "error", err) + return err + } + log.Debug("reactor received new block", "header", latest.Hash()) + ctx, cancel = context.WithTimeout(parent, 10*time.Second) + added, removed, err := c.onNewBlock(ctx, c.from, latest) + cancel() + if err != nil { + log.Error("failed to process new header", "header", latest, "error", err) + return err + } + if len(added) == 0 && len(removed) == 0 { + log.Debug("new block already in the database", "block", latest.Number) + return nil + } + // for each added block get tranfers from downloaders + all := []Transfer{} + for i := range added { + log.Debug("reactor get transfers", "block", added[i].Hash, "number", added[i].Number) + transfers, err := c.getTransfers(parent, added[i]) + if err != nil { + log.Error("failed to get transfers", "header", added[i].Hash, "error", err) + continue + } + log.Debug("reactor adding transfers", "block", added[i].Hash, "number", added[i].Number, "len", len(transfers)) + all = append(all, transfers...) + } + err = c.db.ProcessTranfers(all, c.accounts, added, removed, erc20Sync|ethSync) + if err != nil { + log.Error("failed to persist transfers", "error", err) + return err + } + c.from = toDBHeader(latest) + if len(added) == 1 && len(removed) == 0 { + c.feed.Send(Event{ + Type: EventNewBlock, + BlockNumber: added[0].Number, + Accounts: uniqueAccountsFromTransfers(all), + }) + } + if len(removed) != 0 { + lth := len(removed) + c.feed.Send(Event{ + Type: EventReorg, + BlockNumber: removed[lth-1].Number, + Accounts: uniqueAccountsFromTransfers(all), + }) + } + return nil +} + +func (c *newBlocksTransfersCommand) onNewBlock(ctx context.Context, from *DBHeader, latest *types.Header) (added, removed []*DBHeader, err error) { + if from == nil { + // first node in the cache + return []*DBHeader{toHead(latest)}, nil, nil + } + if from.Hash == latest.ParentHash { + // parent matching from node in the cache. on the same chain. + return []*DBHeader{toHead(latest)}, nil, nil + } + exists, err := c.db.HeaderExists(latest.Hash()) + if err != nil { + return nil, nil, err + } + if exists { + return nil, nil, nil + } + log.Debug("wallet reactor spotted reorg", "last header in db", from.Hash, "new parent", latest.ParentHash) + for from != nil && from.Hash != latest.ParentHash { + removed = append(removed, from) + added = append(added, toHead(latest)) + latest, err = c.client.HeaderByHash(ctx, latest.ParentHash) + if err != nil { + return nil, nil, err + } + from, err = c.db.GetHeaderByNumber(new(big.Int).Sub(latest.Number, one)) + if err != nil { + return nil, nil, err + } + } + added = append(added, toHead(latest)) + return added, removed, nil +} + +func (c *newBlocksTransfersCommand) getTransfers(parent context.Context, header *DBHeader) ([]Transfer, error) { + ctx, cancel := context.WithTimeout(parent, 5*time.Second) + ethT, err := c.eth.GetTransfers(ctx, header) + cancel() + if err != nil { + return nil, err + } + ctx, cancel = context.WithTimeout(parent, 5*time.Second) + erc20T, err := c.erc20.GetTransfers(ctx, header) + cancel() + if err != nil { + return nil, err + } + return append(ethT, erc20T...), nil +} + +// controlCommand implements following procedure (following parts are executed sequeantially): +// - verifies that the last header that was synced is still in the canonical chain +// - runs fast indexing for each account separately +// - starts listening to new blocks and watches for reorgs +type controlCommand struct { + accounts []common.Address + db *Database + eth *ETHTransferDownloader + erc20 *ERC20TransfersDownloader + chain *big.Int + client *ethclient.Client + feed *event.Feed + safetyDepth *big.Int +} + +// run fast indexing for every accont up to canonical chain head minus safety depth. +// every account will run it from last synced header. +func (c *controlCommand) fastIndex(ctx context.Context, to *DBHeader) error { + start := time.Now() + group := NewGroup(ctx) + for _, address := range c.accounts { + erc20 := &erc20HistoricalCommand{ + db: c.db, + erc20: NewERC20TransfersDownloader(c.client, []common.Address{address}), + client: c.client, + feed: c.feed, + address: address, + to: to, + } + group.Add(erc20.Command()) + eth := ðHistoricalCommand{ + db: c.db, + client: c.client, + address: address, + eth: ÐTransferDownloader{ + client: c.client, + accounts: []common.Address{address}, + signer: types.NewEIP155Signer(c.chain), + }, + feed: c.feed, + to: to.Number, + } + group.Add(eth.Command()) + } + select { + case <-ctx.Done(): + return ctx.Err() + case <-group.WaitAsync(): + log.Debug("fast indexer finished", "in", time.Since(start)) + return nil + } +} + +// verifyLastSynced verifies that last header that was added to the database is still in the canonical chain. +// it is done by downloading configured number of parents for the last header in the db. +func (c *controlCommand) verifyLastSynced(parent context.Context, last *DBHeader, head *types.Header) error { + log.Debug("verifying that previous header is still in canonical chan", "from", last.Number, "chain head", head.Number) + if new(big.Int).Sub(head.Number, last.Number).Cmp(c.safetyDepth) <= 0 { + log.Debug("no need to verify. last block is close enough to chain head") + return nil + } + ctx, cancel := context.WithTimeout(parent, 3*time.Second) + header, err := c.client.HeaderByNumber(ctx, new(big.Int).Add(last.Number, c.safetyDepth)) + cancel() + if err != nil { + return err + } + log.Debug("spawn reorg verifier", "from", last.Number, "to", header.Number) + // TODO(dshulyak) make a standalone command that + // doesn't manage transfers and has an upper limit + cmd := &newBlocksTransfersCommand{ + db: c.db, + chain: c.chain, + client: c.client, + eth: c.eth, + erc20: c.erc20, + feed: c.feed, + + from: last, + to: toDBHeader(header), + } + return cmd.Command()(parent) +} + +func (c *controlCommand) Run(parent context.Context) error { + log.Debug("start control command") + ctx, cancel := context.WithTimeout(parent, 3*time.Second) + head, err := c.client.HeaderByNumber(ctx, nil) + cancel() + if err != nil { + return err + } + log.Debug("current head is", "block number", head.Number) + last, err := c.db.GetLastHead() + if err != nil { + log.Error("failed to load last head from database", "error", err) + return err + } + if last != nil { + err = c.verifyLastSynced(parent, last, head) + if err != nil { + log.Error("failed verification for last header in canonical chain", "error", err) + return err + } + } + target := new(big.Int).Sub(head.Number, c.safetyDepth) + if target.Cmp(zero) <= 0 { + target = zero + } + ctx, cancel = context.WithTimeout(parent, 3*time.Second) + head, err = c.client.HeaderByNumber(ctx, target) + cancel() + if err != nil { + return err + } + log.Debug("run fast indexing for the transfers", "up to", head.Number) + err = c.fastIndex(parent, toDBHeader(head)) + if err != nil { + return err + } + log.Debug("watching new blocks", "start from", head.Number) + cmd := &newBlocksTransfersCommand{ + db: c.db, + chain: c.chain, + client: c.client, + accounts: c.accounts, + eth: c.eth, + erc20: c.erc20, + feed: c.feed, + from: toDBHeader(head), + } + return cmd.Command()(parent) +} + +func (c *controlCommand) Command() Command { + return FiniteCommand{ + Interval: 5 * time.Second, + Runable: c.Run, + }.Run +} + +func headersFromTransfers(transfers []Transfer) []*DBHeader { + byHash := map[common.Hash]struct{}{} + rst := []*DBHeader{} + for i := range transfers { + _, exists := byHash[transfers[i].BlockHash] + if exists { + continue + } + rst = append(rst, &DBHeader{ + Hash: transfers[i].BlockHash, + Number: transfers[i].BlockNumber, + }) + } + return rst +} + +func uniqueAccountsFromTransfers(transfers []Transfer) []common.Address { + accounts := []common.Address{} + unique := map[common.Address]struct{}{} + for i := range transfers { + _, exist := unique[transfers[i].Address] + if exist { + continue + } + unique[transfers[i].Address] = struct{}{} + accounts = append(accounts, transfers[i].Address) + } + return accounts +} diff --git a/services/wallet/commands_test.go b/services/wallet/commands_test.go new file mode 100644 index 000000000..46f0af01c --- /dev/null +++ b/services/wallet/commands_test.go @@ -0,0 +1,231 @@ +package wallet + +import ( + "context" + "math/big" + "testing" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/event" + "github.com/status-im/status-go/t/devtests/testchain" + "github.com/stretchr/testify/suite" +) + +func TestNewBlocksSuite(t *testing.T) { + suite.Run(t, new(NewBlocksSuite)) +} + +type NewBlocksSuite struct { + suite.Suite + backend *testchain.Backend + cmd *newBlocksTransfersCommand + address common.Address + db *Database + dbStop func() + feed *event.Feed +} + +func (s *NewBlocksSuite) SetupTest() { + var err error + db, stop := setupTestDB(s.Suite.T()) + s.db = db + s.dbStop = stop + s.backend, err = testchain.NewBackend() + s.Require().NoError(err) + account, err := crypto.GenerateKey() + s.Require().NoError(err) + s.address = crypto.PubkeyToAddress(account.PublicKey) + s.feed = &event.Feed{} + s.cmd = &newBlocksTransfersCommand{ + db: s.db, + accounts: []common.Address{s.address}, + erc20: NewERC20TransfersDownloader(s.backend.Client, []common.Address{s.address}), + eth: ÐTransferDownloader{ + client: s.backend.Client, + signer: s.backend.Signer, + accounts: []common.Address{s.address}, + }, + feed: s.feed, + client: s.backend.Client, + } +} + +func (s *NewBlocksSuite) TearDownTest() { + s.dbStop() + s.Require().NoError(s.backend.Stop()) +} + +func (s *NewBlocksSuite) TestOneBlock() { + ctx := context.Background() + s.Require().EqualError(s.cmd.Run(ctx), "not found") + tx := types.NewTransaction(0, s.address, big.NewInt(1e17), 21000, big.NewInt(1), nil) + tx, err := types.SignTx(tx, s.backend.Signer, s.backend.Faucet) + s.Require().NoError(err) + blocks := s.backend.GenerateBlocks(1, 0, func(n int, gen *core.BlockGen) { + gen.AddTx(tx) + }) + n, err := s.backend.Ethereum.BlockChain().InsertChain(blocks) + s.Require().Equal(1, n) + s.Require().NoError(err) + + events := make(chan Event, 1) + sub := s.feed.Subscribe(events) + defer sub.Unsubscribe() + + s.Require().NoError(s.cmd.Run(ctx)) + + select { + case ev := <-events: + s.Require().Equal(ev.Type, EventNewBlock) + s.Require().Equal(ev.BlockNumber, big.NewInt(1)) + default: + s.Require().FailNow("event wasn't emitted") + } + transfers, err := s.db.GetTransfers(big.NewInt(0), nil) + s.Require().NoError(err) + s.Require().Len(transfers, 1) + s.Require().Equal(tx.Hash(), transfers[0].ID) +} + +func (s *NewBlocksSuite) genTx(nonce int) *types.Transaction { + tx := types.NewTransaction(uint64(nonce), s.address, big.NewInt(1e10), 21000, big.NewInt(1), nil) + tx, err := types.SignTx(tx, s.backend.Signer, s.backend.Faucet) + s.Require().NoError(err) + return tx +} + +func (s *NewBlocksSuite) runCmdUntilError(ctx context.Context) (err error) { + for err == nil { + err = s.cmd.Run(ctx) + } + return err +} + +func (s *NewBlocksSuite) TestReorg() { + blocks := s.backend.GenerateBlocks(20, 0, nil) + n, err := s.backend.Ethereum.BlockChain().InsertChain(blocks) + s.Require().Equal(20, n) + s.Require().NoError(err) + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + s.Require().EqualError(s.runCmdUntilError(ctx), "not found") + + blocks = s.backend.GenerateBlocks(3, 20, func(n int, gen *core.BlockGen) { + gen.AddTx(s.genTx(n)) + }) + n, err = s.backend.Ethereum.BlockChain().InsertChain(blocks) + s.Require().Equal(3, n) + s.Require().NoError(err) + + // `not found` returned when we query head+1 block + ctx, cancel = context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + s.cmd.from = toDBHeader(s.backend.Ethereum.BlockChain().GetHeaderByNumber(15)) + s.Require().EqualError(s.runCmdUntilError(ctx), "not found") + + transfers, err := s.db.GetTransfers(big.NewInt(0), nil) + s.Require().NoError(err) + s.Require().Len(transfers, 3) + + blocks = s.backend.GenerateBlocks(10, 15, func(n int, gen *core.BlockGen) { + gen.AddTx(s.genTx(n)) + }) + n, err = s.backend.Ethereum.BlockChain().InsertChain(blocks) + s.Require().Equal(10, n) + s.Require().NoError(err) + + // it will be less but even if something went wrong we can't get more + events := make(chan Event, 10) + sub := s.feed.Subscribe(events) + defer sub.Unsubscribe() + + ctx, cancel = context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + s.Require().EqualError(s.runCmdUntilError(ctx), "not found") + + close(events) + expected := []Event{{Type: EventReorg, BlockNumber: big.NewInt(16)}, {Type: EventNewBlock, BlockNumber: big.NewInt(25)}} + i := 0 + for ev := range events { + s.Require().Equal(expected[i].Type, ev.Type) + s.Require().Equal(expected[i].BlockNumber, ev.BlockNumber) + i++ + } + + transfers, err = s.db.GetTransfers(big.NewInt(0), nil) + s.Require().NoError(err) + s.Require().Len(transfers, 10) +} + +func (s *NewBlocksSuite) downloadHistorical() { + blocks := s.backend.GenerateBlocks(40, 0, func(n int, gen *core.BlockGen) { + if n == 36 { + gen.AddTx(s.genTx(0)) + } else if n == 39 { + gen.AddTx(s.genTx(1)) + } + }) + n, err := s.backend.Ethereum.BlockChain().InsertChain(blocks) + s.Require().Equal(40, n) + s.Require().NoError(err) + + eth := ðHistoricalCommand{ + db: s.db, + eth: ÐTransferDownloader{ + client: s.backend.Client, + signer: s.backend.Signer, + accounts: []common.Address{s.address}, + }, + feed: s.feed, + address: s.address, + client: s.backend.Client, + to: s.backend.Ethereum.BlockChain().CurrentBlock().Number(), + } + s.Require().NoError(eth.Run(context.Background()), "eth historical command failed to sync transfers") + transfers, err := s.db.GetTransfers(big.NewInt(0), nil) + s.Require().NoError(err) + s.Require().Len(transfers, 2) +} + +func (s *NewBlocksSuite) reorgHistorical() { + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + s.Require().EqualError(s.runCmdUntilError(ctx), "not found") + + blocks := s.backend.GenerateBlocks(10, 35, nil) + n, err := s.backend.Ethereum.BlockChain().InsertChain(blocks) + s.Require().Equal(10, n) + s.Require().NoError(err) + + ctx, cancel = context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + s.Require().EqualError(s.runCmdUntilError(ctx), "not found") + +} + +func (s *NewBlocksSuite) TestSafetyBufferFailure() { + s.downloadHistorical() + + s.reorgHistorical() + + transfers, err := s.db.GetTransfers(big.NewInt(0), nil) + s.Require().NoError(err) + s.Require().Len(transfers, 1) +} + +func (s *NewBlocksSuite) TestSafetyBufferSuccess() { + s.downloadHistorical() + + safety := new(big.Int).Sub(s.backend.Ethereum.BlockChain().CurrentHeader().Number, big.NewInt(10)) + s.cmd.from = toDBHeader(s.backend.Ethereum.BlockChain().GetHeaderByNumber(safety.Uint64())) + s.reorgHistorical() + + transfers, err := s.db.GetTransfers(big.NewInt(0), nil) + s.Require().NoError(err) + s.Require().Len(transfers, 0) +} diff --git a/services/wallet/concurrent.go b/services/wallet/concurrent.go new file mode 100644 index 000000000..5472832e8 --- /dev/null +++ b/services/wallet/concurrent.go @@ -0,0 +1,78 @@ +package wallet + +import ( + "context" + "math/big" + "sync" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/log" +) + +// NewConcurrentDownloader creates ConcurrentDownloader instance. +func NewConcurrentDownloader(ctx context.Context) *ConcurrentDownloader { + runner := NewAtomicGroup(ctx) + result := &Result{} + return &ConcurrentDownloader{runner, result} +} + +type ConcurrentDownloader struct { + *AtomicGroup + *Result +} + +type Result struct { + mu sync.Mutex + transfers []Transfer +} + +func (r *Result) Push(transfers ...Transfer) { + r.mu.Lock() + defer r.mu.Unlock() + r.transfers = append(r.transfers, transfers...) +} + +func (r *Result) Get() []Transfer { + r.mu.Lock() + defer r.mu.Unlock() + rst := make([]Transfer, len(r.transfers)) + copy(rst, r.transfers) + return rst +} + +// TransferDownloader downloads transfers from single block using number. +type TransferDownloader interface { + GetTransfersByNumber(context.Context, *big.Int) ([]Transfer, error) +} + +func downloadEthConcurrently(c *ConcurrentDownloader, client BalanceReader, downloader TransferDownloader, account common.Address, low, high *big.Int) { + c.Add(func(ctx context.Context) error { + log.Debug("eth transfers comparing blocks", "low", low, "high", high) + lb, err := client.BalanceAt(ctx, account, low) + if err != nil { + return err + } + hb, err := client.BalanceAt(ctx, account, high) + if err != nil { + return err + } + if lb.Cmp(hb) == 0 { + log.Debug("balances are equal", "low", low, "high", high) + return nil + } + if new(big.Int).Sub(high, low).Cmp(one) == 0 { + transfers, err := downloader.GetTransfersByNumber(ctx, high) + if err != nil { + return err + } + c.Push(transfers...) + return nil + } + mid := new(big.Int).Add(low, high) + mid = mid.Div(mid, two) + log.Debug("balances are not equal. spawn two concurrent downloaders", "low", low, "mid", mid, "high", high) + downloadEthConcurrently(c, client, downloader, account, low, mid) + downloadEthConcurrently(c, client, downloader, account, mid, high) + return nil + }) +} diff --git a/services/wallet/concurrent_test.go b/services/wallet/concurrent_test.go new file mode 100644 index 000000000..d60be339c --- /dev/null +++ b/services/wallet/concurrent_test.go @@ -0,0 +1,126 @@ +package wallet + +import ( + "context" + "errors" + "math/big" + "sort" + "testing" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/stretchr/testify/require" +) + +func TestConcurrentErrorInterrupts(t *testing.T) { + concurrent := NewConcurrentDownloader(context.Background()) + var interrupted bool + concurrent.Add(func(ctx context.Context) error { + select { + case <-ctx.Done(): + interrupted = true + case <-time.After(10 * time.Second): + } + return nil + }) + err := errors.New("interrupt") + concurrent.Add(func(ctx context.Context) error { + return err + }) + concurrent.Wait() + require.True(t, interrupted) + require.Equal(t, err, concurrent.Error()) +} + +func TestConcurrentCollectsTransfers(t *testing.T) { + concurrent := NewConcurrentDownloader(context.Background()) + concurrent.Add(func(context.Context) error { + concurrent.Push(Transfer{}) + return nil + }) + concurrent.Add(func(context.Context) error { + concurrent.Push(Transfer{}) + return nil + }) + concurrent.Wait() + require.Len(t, concurrent.Get(), 2) +} + +type balancesFixture []*big.Int + +func (f balancesFixture) BalanceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (*big.Int, error) { + index := int(blockNumber.Int64()) + if index > len(f)-1 { + return nil, errors.New("balance unknown") + } + return f[index], nil +} + +type batchesFixture [][]Transfer + +func (f batchesFixture) GetTransfersByNumber(ctx context.Context, number *big.Int) (rst []Transfer, err error) { + index := int(number.Int64()) + if index > len(f)-1 { + return nil, errors.New("unknown block") + } + return f[index], nil +} + +func TestConcurrentEthDownloader(t *testing.T) { + type options struct { + balances balancesFixture + batches batchesFixture + result []Transfer + last *big.Int + } + type testCase struct { + desc string + options options + } + for _, tc := range []testCase{ + { + desc: "NoBalances", + options: options{ + last: big.NewInt(3), + balances: balancesFixture{big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0)}, + }, + }, + { + desc: "LastBlock", + options: options{ + last: big.NewInt(3), + balances: balancesFixture{big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(10)}, + batches: batchesFixture{{}, {}, {}, {{BlockNumber: big.NewInt(3)}, {BlockNumber: big.NewInt(3)}}}, + result: []Transfer{{BlockNumber: big.NewInt(3)}, {BlockNumber: big.NewInt(3)}}, + }, + }, + { + desc: "ChangesInEveryBlock", + options: options{ + last: big.NewInt(3), + balances: balancesFixture{big.NewInt(0), big.NewInt(3), big.NewInt(7), big.NewInt(10)}, + batches: batchesFixture{{}, {{BlockNumber: big.NewInt(1)}}, {{BlockNumber: big.NewInt(2)}}, {{BlockNumber: big.NewInt(3)}}}, + result: []Transfer{{BlockNumber: big.NewInt(1)}, {BlockNumber: big.NewInt(2)}, {BlockNumber: big.NewInt(3)}}, + }, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + concurrent := NewConcurrentDownloader(ctx) + downloadEthConcurrently( + concurrent, tc.options.balances, tc.options.batches, + common.Address{}, zero, tc.options.last) + concurrent.Wait() + require.NoError(t, concurrent.Error()) + rst := concurrent.Get() + require.Len(t, rst, len(tc.options.result)) + sort.Slice(rst, func(i, j int) bool { + return rst[i].BlockNumber.Cmp(rst[j].BlockNumber) < 0 + }) + for i := range rst { + require.Equal(t, tc.options.result[i].BlockNumber, rst[i].BlockNumber) + } + }) + } +} diff --git a/services/wallet/database.go b/services/wallet/database.go new file mode 100644 index 000000000..936d73159 --- /dev/null +++ b/services/wallet/database.go @@ -0,0 +1,361 @@ +package wallet + +import ( + "database/sql" + "database/sql/driver" + "encoding/json" + "errors" + "math/big" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/status-im/status-go/services/wallet/migrations" + "github.com/status-im/status-go/sqlite" +) + +// DBHeader fields from header that are stored in database. +type DBHeader struct { + Number *big.Int + Hash common.Hash + // Head is true if the block was a head at the time it was pulled from chain. + Head bool +} + +func toDBHeader(header *types.Header) *DBHeader { + return &DBHeader{ + Hash: header.Hash(), + Number: header.Number, + } +} + +func toHead(header *types.Header) *DBHeader { + return &DBHeader{ + Hash: header.Hash(), + Number: header.Number, + Head: true, + } +} + +// SyncOption is used to specify that application processed transfers for that block. +type SyncOption uint + +const ( + // sync options + ethSync SyncOption = 1 + erc20Sync SyncOption = 2 +) + +// InitializeDB creates db file at a given path and applies migrations. +func InitializeDB(path, password string) (*Database, error) { + db, err := sqlite.OpenDB(path, password) + if err != nil { + return nil, err + } + err = migrations.Migrate(db) + if err != nil { + return nil, err + } + return &Database{db: db}, nil +} + +// SQLBigInt type for storing uint256 in the databse. +// FIXME(dshulyak) SQL big int is max 64 bits. Maybe store as bytes in big endian and hope +// that lexographical sorting will work. +type SQLBigInt big.Int + +// Scan implements interface. +func (i *SQLBigInt) Scan(value interface{}) error { + val, ok := value.(int64) + if !ok { + return errors.New("not an integer") + } + (*big.Int)(i).SetInt64(val) + return nil +} + +// Value implements interface. +func (i *SQLBigInt) Value() (driver.Value, error) { + if !(*big.Int)(i).IsInt64() { + return nil, errors.New("not at int64") + } + return (*big.Int)(i).Int64(), nil +} + +// JSONBlob type for marshaling/unmarshaling inner type to json. +type JSONBlob struct { + data interface{} +} + +// Scan implements interface. +func (blob *JSONBlob) Scan(value interface{}) error { + bytes, ok := value.([]byte) + if !ok { + return errors.New("not a byte slice") + } + if len(bytes) == 0 { + return nil + } + err := json.Unmarshal(bytes, blob.data) + return err +} + +// Value implements interface. +func (blob *JSONBlob) Value() (driver.Value, error) { + return json.Marshal(blob.data) +} + +// Database sql wrapper for operations with wallet objects. +type Database struct { + db *sql.DB +} + +// Close closes database. +func (db Database) Close() error { + return db.db.Close() +} + +// ProcessTranfers atomically adds/removes blocks and adds new tranfers. +func (db Database) ProcessTranfers(transfers []Transfer, accounts []common.Address, added, removed []*DBHeader, option SyncOption) (err error) { + var ( + tx *sql.Tx + ) + tx, err = db.db.Begin() + if err != nil { + return err + } + defer func() { + if err == nil { + err = tx.Commit() + return + } + _ = tx.Rollback() + }() + err = deleteHeaders(tx, removed) + if err != nil { + return + } + err = insertHeaders(tx, added) + if err != nil { + return + } + err = insertTransfers(tx, transfers) + if err != nil { + return + } + err = updateAccounts(tx, accounts, added, option) + return +} + +// GetTransfersByAddress loads transfers for a given address between two blocks. +func (db *Database) GetTransfersByAddress(address common.Address, start, end *big.Int) (rst []Transfer, err error) { + query := newTransfersQuery().FilterAddress(address).FilterStart(start).FilterEnd(end) + rows, err := db.db.Query(query.String(), query.Args()...) + if err != nil { + return + } + defer rows.Close() + return query.Scan(rows) +} + +// GetTransfers load transfers transfer betweeen two blocks. +func (db *Database) GetTransfers(start, end *big.Int) (rst []Transfer, err error) { + query := newTransfersQuery().FilterStart(start).FilterEnd(end) + rows, err := db.db.Query(query.String(), query.Args()...) + if err != nil { + return + } + defer rows.Close() + return query.Scan(rows) +} + +// SaveHeader stores a single header. +func (db *Database) SaveHeader(header *types.Header) error { + _, err := db.db.Exec("INSERT INTO blocks(number, hash) VALUES (?, ?)", (*SQLBigInt)(header.Number), header.Hash()) + return err +} + +// SaveHeaders stores a list of headers atomically. +func (db *Database) SaveHeaders(headers []*types.Header) (err error) { + var ( + tx *sql.Tx + insert *sql.Stmt + ) + tx, err = db.db.Begin() + if err != nil { + return + } + insert, err = tx.Prepare("INSERT INTO blocks(number, hash) VALUES (?,?)") + if err != nil { + return + } + defer func() { + if err == nil { + err = tx.Commit() + } else { + _ = tx.Rollback() + } + }() + + for _, h := range headers { + _, err = insert.Exec((*SQLBigInt)(h.Number), h.Hash()) + if err != nil { + return + } + } + return +} + +func (db *Database) SaveSyncedHeader(address common.Address, header *types.Header, option SyncOption) (err error) { + var ( + tx *sql.Tx + insert *sql.Stmt + ) + tx, err = db.db.Begin() + if err != nil { + return + } + insert, err = tx.Prepare("INSERT INTO accounts_to_blocks(address, blk_number, sync) VALUES (?,?,?)") + if err != nil { + return + } + defer func() { + if err == nil { + err = tx.Commit() + } else { + _ = tx.Rollback() + } + }() + _, err = insert.Exec(address, (*SQLBigInt)(header.Number), option) + if err != nil { + return + } + return err +} + +// HeaderExists checks if header with hash exists in db. +func (db *Database) HeaderExists(hash common.Hash) (bool, error) { + var val sql.NullBool + err := db.db.QueryRow("SELECT EXISTS (SELECT hash FROM blocks WHERE hash = ?)", hash).Scan(&val) + if err != nil { + return false, err + } + return val.Bool, nil +} + +// GetHeaderByNumber selects header using block number. +func (db *Database) GetHeaderByNumber(number *big.Int) (header *DBHeader, err error) { + header = &DBHeader{Hash: common.Hash{}, Number: new(big.Int)} + err = db.db.QueryRow("SELECT hash,number FROM blocks WHERE number = ?", (*SQLBigInt)(number)).Scan(&header.Hash, (*SQLBigInt)(header.Number)) + if err == nil { + return header, nil + } + if err == sql.ErrNoRows { + return nil, nil + } + return nil, err +} + +func (db *Database) GetLastHead() (header *DBHeader, err error) { + header = &DBHeader{Hash: common.Hash{}, Number: new(big.Int)} + err = db.db.QueryRow("SELECT hash,number FROM blocks WHERE head = 1 AND number = (SELECT MAX(number) FROM blocks)").Scan(&header.Hash, (*SQLBigInt)(header.Number)) + if err == nil { + return header, nil + } + if err == sql.ErrNoRows { + return nil, nil + } + return nil, err +} + +// GetLatestSynced downloads last synced block with a given option. +func (db *Database) GetLatestSynced(address common.Address, option SyncOption) (header *DBHeader, err error) { + header = &DBHeader{Hash: common.Hash{}, Number: new(big.Int)} + err = db.db.QueryRow(` +SELECT blocks.hash, blk_number FROM accounts_to_blocks JOIN blocks ON blk_number = blocks.number WHERE address = $1 AND blk_number += (SELECT MAX(blk_number) FROM accounts_to_blocks WHERE address = $1 AND sync & $2 = $2)`, address, option).Scan(&header.Hash, (*SQLBigInt)(header.Number)) + if err == nil { + return header, nil + } + if err == sql.ErrNoRows { + return nil, nil + } + return nil, err +} + +// statementCreator allows to pass transaction or database to use in consumer. +type statementCreator interface { + Prepare(query string) (*sql.Stmt, error) +} + +func deleteHeaders(creator statementCreator, headers []*DBHeader) error { + delete, err := creator.Prepare("DELETE FROM blocks WHERE hash = ?") + if err != nil { + return err + } + for _, h := range headers { + _, err = delete.Exec(h.Hash) + if err != nil { + return err + } + } + return nil +} + +func insertHeaders(creator statementCreator, headers []*DBHeader) error { + insert, err := creator.Prepare("INSERT OR IGNORE INTO blocks(hash, number, head) VALUES (?, ?, ?)") + if err != nil { + return err + } + for _, h := range headers { + _, err = insert.Exec(h.Hash, (*SQLBigInt)(h.Number), h.Head) + if err != nil { + return err + } + } + return nil +} + +func insertTransfers(creator statementCreator, transfers []Transfer) error { + insert, err := creator.Prepare("INSERT OR IGNORE INTO transfers(hash, blk_hash, address, tx, receipt, type) VALUES (?, ?, ?, ?, ?, ?)") + if err != nil { + return err + } + for _, t := range transfers { + _, err = insert.Exec(t.ID, t.BlockHash, t.Address, &JSONBlob{t.Transaction}, &JSONBlob{t.Receipt}, t.Type) + if err != nil { + return err + } + } + return nil +} + +func updateAccounts(creator statementCreator, accounts []common.Address, headers []*DBHeader, option SyncOption) error { + update, err := creator.Prepare("UPDATE accounts_to_blocks SET sync=sync|? WHERE address=? AND blk_number=?") + if err != nil { + return err + } + insert, err := creator.Prepare("INSERT OR IGNORE INTO accounts_to_blocks(address,blk_number,sync) VALUES(?,?,?)") + if err != nil { + return err + } + for _, acc := range accounts { + for _, h := range headers { + rst, err := update.Exec(option, acc, (*SQLBigInt)(h.Number)) + if err != nil { + return err + } + affected, err := rst.RowsAffected() + if err != nil { + return err + } + if affected > 0 { + continue + } + _, err = insert.Exec(acc, (*SQLBigInt)(h.Number), option) + if err != nil { + return err + } + } + } + return nil +} diff --git a/services/wallet/database_test.go b/services/wallet/database_test.go new file mode 100644 index 000000000..92b988d76 --- /dev/null +++ b/services/wallet/database_test.go @@ -0,0 +1,235 @@ +package wallet + +import ( + "io/ioutil" + "math/big" + "os" + "testing" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/stretchr/testify/require" +) + +func setupTestDB(t *testing.T) (*Database, func()) { + tmpfile, err := ioutil.TempFile("", "wallet-tests-") + require.NoError(t, err) + db, err := InitializeDB(tmpfile.Name(), "wallet-tests") + require.NoError(t, err) + return db, func() { + require.NoError(t, db.Close()) + require.NoError(t, os.Remove(tmpfile.Name())) + } +} + +func TestDBGetHeaderByNumber(t *testing.T) { + db, stop := setupTestDB(t) + defer stop() + header := &types.Header{ + Number: big.NewInt(10), + Difficulty: big.NewInt(1), + Time: 1, + } + require.NoError(t, db.SaveHeader(header)) + rst, err := db.GetHeaderByNumber(header.Number) + require.NoError(t, err) + require.Equal(t, header.Hash(), rst.Hash) +} + +func TestDBGetHeaderByNumberNoRows(t *testing.T) { + db, stop := setupTestDB(t) + defer stop() + rst, err := db.GetHeaderByNumber(big.NewInt(1)) + require.NoError(t, err) + require.Nil(t, rst) +} + +func TestDBHeaderExists(t *testing.T) { + db, stop := setupTestDB(t) + defer stop() + header := &types.Header{ + Number: big.NewInt(10), + Difficulty: big.NewInt(1), + Time: 1, + } + require.NoError(t, db.SaveHeader(header)) + rst, err := db.HeaderExists(header.Hash()) + require.NoError(t, err) + require.True(t, rst) +} + +func TestDBHeaderDoesntExist(t *testing.T) { + db, stop := setupTestDB(t) + defer stop() + + rst, err := db.HeaderExists(common.Hash{1}) + require.NoError(t, err) + require.False(t, rst) +} + +func TestDBProcessTransfer(t *testing.T) { + db, stop := setupTestDB(t) + defer stop() + header := &DBHeader{ + Number: big.NewInt(1), + Hash: common.Hash{1}, + } + tx := types.NewTransaction(1, common.Address{1}, nil, 10, big.NewInt(10), nil) + transfers := []Transfer{ + { + ID: common.Hash{1}, + Type: ethTransfer, + BlockHash: header.Hash, + BlockNumber: header.Number, + Transaction: tx, + Receipt: types.NewReceipt(nil, false, 100), + }, + } + require.NoError(t, db.ProcessTranfers(transfers, nil, []*DBHeader{header}, nil, 0)) +} + +func TestDBReorgTransfers(t *testing.T) { + db, stop := setupTestDB(t) + defer stop() + rcpt := types.NewReceipt(nil, false, 100) + rcpt.Logs = []*types.Log{} + original := &DBHeader{ + Number: big.NewInt(1), + Hash: common.Hash{1}, + } + replaced := &DBHeader{ + Number: big.NewInt(1), + Hash: common.Hash{2}, + } + originalTX := types.NewTransaction(1, common.Address{1}, nil, 10, big.NewInt(10), nil) + replacedTX := types.NewTransaction(2, common.Address{1}, nil, 10, big.NewInt(10), nil) + require.NoError(t, db.ProcessTranfers([]Transfer{ + {ethTransfer, common.Hash{1}, *originalTX.To(), original.Number, original.Hash, originalTX, rcpt}, + }, nil, []*DBHeader{original}, nil, 0)) + require.NoError(t, db.ProcessTranfers([]Transfer{ + {ethTransfer, common.Hash{2}, *replacedTX.To(), replaced.Number, replaced.Hash, replacedTX, rcpt}, + }, nil, []*DBHeader{replaced}, []*DBHeader{original}, 0)) + + all, err := db.GetTransfers(big.NewInt(0), nil) + require.NoError(t, err) + require.Len(t, all, 1) + require.Equal(t, replacedTX.Hash(), all[0].Transaction.Hash()) +} + +func TestDBGetTransfersFromBlock(t *testing.T) { + db, stop := setupTestDB(t) + defer stop() + headers := []*DBHeader{} + transfers := []Transfer{} + for i := 1; i < 10; i++ { + header := &DBHeader{ + Number: big.NewInt(int64(i)), + Hash: common.Hash{byte(i)}, + } + headers = append(headers, header) + tx := types.NewTransaction(uint64(i), common.Address{1}, nil, 10, big.NewInt(10), nil) + receipt := types.NewReceipt(nil, false, 100) + receipt.Logs = []*types.Log{} + transfer := Transfer{ + ID: tx.Hash(), + Type: ethTransfer, + BlockNumber: header.Number, + BlockHash: header.Hash, + Transaction: tx, + Receipt: receipt, + } + transfers = append(transfers, transfer) + } + require.NoError(t, db.ProcessTranfers(transfers, nil, headers, nil, 0)) + rst, err := db.GetTransfers(big.NewInt(7), nil) + require.NoError(t, err) + require.Len(t, rst, 3) + + rst, err = db.GetTransfers(big.NewInt(2), big.NewInt(5)) + require.NoError(t, err) + require.Len(t, rst, 4) + +} + +func TestDBLatestSynced(t *testing.T) { + db, stop := setupTestDB(t) + defer stop() + + address := common.Address{1} + h1 := &types.Header{ + Number: big.NewInt(10), + Difficulty: big.NewInt(1), + Time: 1, + } + h2 := &types.Header{ + Number: big.NewInt(9), + Difficulty: big.NewInt(1), + Time: 1, + } + require.NoError(t, db.SaveHeader(h1)) + require.NoError(t, db.SaveHeader(h2)) + require.NoError(t, db.SaveSyncedHeader(address, h1, ethSync)) + require.NoError(t, db.SaveSyncedHeader(address, h2, ethSync)) + + latest, err := db.GetLatestSynced(address, ethSync) + require.NoError(t, err) + require.NotNil(t, latest) + require.Equal(t, h1.Number, latest.Number) + require.Equal(t, h1.Hash(), latest.Hash) +} + +func TestDBLatestSyncedDoesntExist(t *testing.T) { + db, stop := setupTestDB(t) + defer stop() + + latest, err := db.GetLatestSynced(common.Address{1}, ethSync) + require.NoError(t, err) + require.Nil(t, latest) +} + +func TestDBProcessTransfersUpdate(t *testing.T) { + db, stop := setupTestDB(t) + defer stop() + + address := common.Address{1} + header := &DBHeader{ + Number: big.NewInt(10), + Hash: common.Hash{1}, + } + transfer := Transfer{ + ID: common.Hash{1}, + BlockNumber: header.Number, + BlockHash: header.Hash, + Transaction: types.NewTransaction(0, common.Address{}, nil, 0, nil, nil), + Address: address, + } + require.NoError(t, db.ProcessTranfers([]Transfer{transfer}, []common.Address{address}, []*DBHeader{header}, nil, ethSync)) + require.NoError(t, db.ProcessTranfers([]Transfer{transfer}, []common.Address{address}, []*DBHeader{header}, nil, erc20Sync)) + + latest, err := db.GetLatestSynced(address, ethSync|erc20Sync) + require.NoError(t, err) + require.Equal(t, header.Hash, latest.Hash) +} + +func TestDBLastHeadExist(t *testing.T) { + db, stop := setupTestDB(t) + defer stop() + + headers := []*DBHeader{ + {Number: big.NewInt(1), Hash: common.Hash{1}, Head: true}, + {Number: big.NewInt(2), Hash: common.Hash{2}, Head: true}, + {Number: big.NewInt(3), Hash: common.Hash{3}, Head: true}, + } + require.NoError(t, db.ProcessTranfers(nil, nil, headers, nil, 0)) + last, err := db.GetLastHead() + require.NoError(t, err) + require.Equal(t, headers[2].Hash, last.Hash) +} + +func TestDBLastHeadDoesntExist(t *testing.T) { + db, stop := setupTestDB(t) + defer stop() + last, err := db.GetLastHead() + require.NoError(t, err) + require.Nil(t, last) +} diff --git a/services/wallet/downloader.go b/services/wallet/downloader.go new file mode 100644 index 000000000..fc6a20a2f --- /dev/null +++ b/services/wallet/downloader.go @@ -0,0 +1,304 @@ +package wallet + +import ( + "context" + "encoding/binary" + "errors" + "math/big" + "time" + + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/ethclient" + "github.com/ethereum/go-ethereum/log" +) + +// TransferType type of the asset that was transferred. +type TransferType string + +const ( + ethTransfer TransferType = "eth" + erc20Transfer TransferType = "erc20" + + erc20TransferEventSignature = "Transfer(address,address,uint256)" +) + +var ( + zero = big.NewInt(0) + one = big.NewInt(1) + two = big.NewInt(2) +) + +// Transfer stores information about transfer. +type Transfer struct { + Type TransferType `json:"type"` + ID common.Hash `json:"-"` + Address common.Address `json:"address"` + BlockNumber *big.Int `json:"blockNumber"` + BlockHash common.Hash `json:"blockhash"` + Transaction *types.Transaction `json:"transaction"` + Receipt *types.Receipt `json:"receipt"` +} + +// ETHTransferDownloader downloads regular eth transfers. +type ETHTransferDownloader struct { + client *ethclient.Client + accounts []common.Address + signer types.Signer +} + +// GetTransfers checks if the balance was changed between two blocks. +// If so it downloads transaction that transfer ethereum from that block. +func (d *ETHTransferDownloader) GetTransfers(ctx context.Context, header *DBHeader) (rst []Transfer, err error) { + // TODO(dshulyak) consider caching balance and reset it on reorg + num := new(big.Int).Sub(header.Number, one) + changed := []common.Address{} + for _, address := range d.accounts { + balance, err := d.client.BalanceAt(ctx, address, num) + if err != nil { + return nil, err + } + current, err := d.client.BalanceAt(ctx, address, header.Number) + if err != nil { + return nil, err + } + if current.Cmp(balance) != 0 { + changed = append(changed, address) + } + } + if len(changed) == 0 { + return nil, nil + } + blk, err := d.client.BlockByHash(ctx, header.Hash) + if err != nil { + return nil, err + } + rst, err = d.getTransfersInBlock(ctx, blk, changed) + if err != nil { + return nil, err + } + return rst, nil +} + +func (d *ETHTransferDownloader) GetTransfersByNumber(ctx context.Context, number *big.Int) ([]Transfer, error) { + blk, err := d.client.BlockByNumber(ctx, number) + if err != nil { + return nil, err + } + rst, err := d.getTransfersInBlock(ctx, blk, d.accounts) + if err != nil { + return nil, err + } + return rst, err +} + +func (d *ETHTransferDownloader) getTransfersInBlock(ctx context.Context, blk *types.Block, accounts []common.Address) (rst []Transfer, err error) { + for _, tx := range blk.Transactions() { + var address *common.Address + from, err := types.Sender(d.signer, tx) + if err != nil { + return nil, err + } + if any(from, accounts) { + address = &from + } else if tx.To() != nil && any(*tx.To(), accounts) { + address = tx.To() + } + if address != nil { + receipt, err := d.client.TransactionReceipt(ctx, tx.Hash()) + if err != nil { + return nil, err + } + if isTokenTransfer(receipt.Logs) { + log.Debug("eth downloader found token transfer", "hash", tx.Hash()) + continue + } + rst = append(rst, Transfer{ + Type: ethTransfer, + ID: tx.Hash(), + Address: *address, + BlockNumber: blk.Number(), + BlockHash: blk.Hash(), + Transaction: tx, Receipt: receipt}) + + } + } + // TODO(dshulyak) test that balance difference was covered by transactions + return rst, nil +} + +// NewERC20TransfersDownloader returns new instance. +func NewERC20TransfersDownloader(client *ethclient.Client, accounts []common.Address) *ERC20TransfersDownloader { + signature := crypto.Keccak256Hash([]byte(erc20TransferEventSignature)) + return &ERC20TransfersDownloader{ + client: client, + accounts: accounts, + signature: signature, + } +} + +// ERC20TransfersDownloader is a downloader for erc20 tokens transfers. +type ERC20TransfersDownloader struct { + client *ethclient.Client + accounts []common.Address + + // hash of the Transfer event signature + signature common.Hash +} + +func (d *ERC20TransfersDownloader) paddedAddress(address common.Address) common.Hash { + rst := common.Hash{} + copy(rst[12:], address[:]) + return rst +} + +func (d *ERC20TransfersDownloader) inboundTopics(address common.Address) [][]common.Hash { + return [][]common.Hash{{d.signature}, {}, {d.paddedAddress(address)}} +} + +func (d *ERC20TransfersDownloader) outboundTopics(address common.Address) [][]common.Hash { + return [][]common.Hash{{d.signature}, {d.paddedAddress(address)}, {}} +} + +func (d *ERC20TransfersDownloader) transferFromLog(parent context.Context, log types.Log, address common.Address) (Transfer, error) { + ctx, cancel := context.WithTimeout(parent, 3*time.Second) + tx, _, err := d.client.TransactionByHash(ctx, log.TxHash) + cancel() + if err != nil { + return Transfer{}, err + } + ctx, cancel = context.WithTimeout(parent, 3*time.Second) + receipt, err := d.client.TransactionReceipt(ctx, log.TxHash) + cancel() + if err != nil { + return Transfer{}, err + } + // TODO(dshulyak) what is the max number of logs? + index := [4]byte{} + binary.BigEndian.PutUint32(index[:], uint32(log.Index)) + id := crypto.Keccak256Hash(log.TxHash.Bytes(), index[:]) + return Transfer{ + Address: address, + ID: id, + Type: erc20Transfer, + BlockNumber: new(big.Int).SetUint64(log.BlockNumber), + BlockHash: log.BlockHash, + Transaction: tx, + Receipt: receipt, + }, nil +} + +func (d *ERC20TransfersDownloader) transfersFromLogs(parent context.Context, logs []types.Log, address common.Address) ([]Transfer, error) { + concurrent := NewConcurrentDownloader(parent) + for i := range logs { + l := logs[i] + concurrent.Add(func(ctx context.Context) error { + transfer, err := d.transferFromLog(ctx, l, address) + if err != nil { + return err + } + concurrent.Push(transfer) + return nil + }) + } + select { + case <-concurrent.WaitAsync(): + case <-parent.Done(): + return nil, errors.New("logs downloader stuck") + } + return concurrent.Get(), nil +} + +// GetTransfers for erc20 uses eth_getLogs rpc with Transfer event signature and our address acount. +func (d *ERC20TransfersDownloader) GetTransfers(ctx context.Context, header *DBHeader) ([]Transfer, error) { + hash := header.Hash + transfers := []Transfer{} + for _, address := range d.accounts { + outbound, err := d.client.FilterLogs(ctx, ethereum.FilterQuery{ + BlockHash: &hash, + Topics: d.outboundTopics(address), + }) + if err != nil { + return nil, err + } + inbound, err := d.client.FilterLogs(ctx, ethereum.FilterQuery{ + BlockHash: &hash, + Topics: d.inboundTopics(address), + }) + if err != nil { + return nil, err + } + logs := append(outbound, inbound...) + if len(logs) == 0 { + continue + } + rst, err := d.transfersFromLogs(ctx, logs, address) + if err != nil { + return nil, err + } + transfers = append(transfers, rst...) + } + return transfers, nil +} + +// GetTransfersInRange returns transfers between two blocks. +// time to get logs for 100000 blocks = 1.144686979s. with 249 events in the result set. +func (d *ERC20TransfersDownloader) GetTransfersInRange(parent context.Context, from, to *big.Int) ([]Transfer, error) { + start := time.Now() + log.Debug("get erc20 transfers in range", "from", from, "to", to) + transfers := []Transfer{} + for _, address := range d.accounts { + ctx, cancel := context.WithTimeout(parent, 5*time.Second) + outbound, err := d.client.FilterLogs(ctx, ethereum.FilterQuery{ + FromBlock: from, + ToBlock: to, + Topics: d.outboundTopics(address), + }) + cancel() + if err != nil { + return nil, err + } + ctx, cancel = context.WithTimeout(parent, 5*time.Second) + inbound, err := d.client.FilterLogs(ctx, ethereum.FilterQuery{ + FromBlock: from, + ToBlock: to, + Topics: d.inboundTopics(address), + }) + cancel() + if err != nil { + return nil, err + } + logs := append(outbound, inbound...) + if len(logs) == 0 { + continue + } + rst, err := d.transfersFromLogs(parent, logs, address) + if err != nil { + return nil, err + } + transfers = append(transfers, rst...) + } + log.Debug("found erc20 transfers between two blocks", "from", from, "to", to, "lth", len(transfers), "took", time.Since(start)) + return transfers, nil +} + +func any(address common.Address, compare []common.Address) bool { + for _, c := range compare { + if c == address { + return true + } + } + return false +} + +func isTokenTransfer(logs []*types.Log) bool { + signature := crypto.Keccak256Hash([]byte(erc20TransferEventSignature)) + for _, l := range logs { + if len(l.Topics) > 0 && l.Topics[0] == signature { + return true + } + } + return false +} diff --git a/services/wallet/downloader_test.go b/services/wallet/downloader_test.go new file mode 100644 index 000000000..084297a18 --- /dev/null +++ b/services/wallet/downloader_test.go @@ -0,0 +1,233 @@ +package wallet + +import ( + "context" + "crypto/ecdsa" + "math/big" + "testing" + "time" + + "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/ethclient" + "github.com/status-im/status-go/services/wallet/erc20" + "github.com/status-im/status-go/t/devtests/miner" + "github.com/stretchr/testify/suite" +) + +func TestETHTransfers(t *testing.T) { + suite.Run(t, new(ETHTransferSuite)) +} + +type ETHTransferSuite struct { + suite.Suite + + ethclient *ethclient.Client + identity *ecdsa.PrivateKey + faucet *ecdsa.PrivateKey + signer types.Signer + + downloader *ETHTransferDownloader +} + +func (s *ETHTransferSuite) SetupTest() { + var err error + s.identity, err = crypto.GenerateKey() + s.Require().NoError(err) + s.faucet, err = crypto.GenerateKey() + s.Require().NoError(err) + + node, err := miner.NewDevNode(crypto.PubkeyToAddress(s.faucet.PublicKey)) + s.Require().NoError(err) + s.Require().NoError(miner.StartWithMiner(node)) + + client, err := node.Attach() + s.Require().NoError(err) + s.ethclient = ethclient.NewClient(client) + s.signer = types.NewEIP155Signer(big.NewInt(1337)) + s.downloader = ÐTransferDownloader{ + signer: s.signer, + client: s.ethclient, + accounts: []common.Address{crypto.PubkeyToAddress(s.identity.PublicKey)}, + } +} + +func (s *ETHTransferSuite) TestNoBalance() { + ctx := context.TODO() + tx := types.NewTransaction(0, common.Address{1}, big.NewInt(1e18), 1e6, big.NewInt(10), nil) + tx, err := types.SignTx(tx, s.signer, s.faucet) + s.Require().NoError(err) + s.Require().NoError(s.ethclient.SendTransaction(ctx, tx)) + timeout, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + _, err = bind.WaitMined(timeout, s.ethclient, tx) + s.Require().NoError(err) + + header, err := s.ethclient.HeaderByNumber(ctx, nil) + s.Require().NoError(err) + transfers, err := s.downloader.GetTransfers(ctx, toDBHeader(header)) + s.Require().NoError(err) + s.Require().Empty(transfers) +} + +func (s *ETHTransferSuite) TestBalanceUpdatedOnInbound() { + ctx := context.TODO() + tx := types.NewTransaction(0, crypto.PubkeyToAddress(s.identity.PublicKey), big.NewInt(1e18), 1e6, big.NewInt(10), nil) + tx, err := types.SignTx(tx, s.signer, s.faucet) + s.Require().NoError(err) + s.Require().NoError(s.ethclient.SendTransaction(ctx, tx)) + timeout, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + _, err = bind.WaitMined(timeout, s.ethclient, tx) + s.Require().NoError(err) + + header, err := s.ethclient.HeaderByNumber(ctx, nil) + s.Require().NoError(err) + s.Require().Equal(big.NewInt(1), header.Number) + transfers, err := s.downloader.GetTransfers(ctx, toDBHeader(header)) + s.Require().NoError(err) + s.Require().Len(transfers, 1) +} + +func (s *ETHTransferSuite) TestBalanceUpdatedOnOutbound() { + ctx := context.TODO() + tx := types.NewTransaction(0, crypto.PubkeyToAddress(s.identity.PublicKey), big.NewInt(1e18), 1e6, big.NewInt(10), nil) + tx, err := types.SignTx(tx, s.signer, s.faucet) + s.Require().NoError(err) + s.Require().NoError(s.ethclient.SendTransaction(ctx, tx)) + timeout, cancel := context.WithTimeout(context.Background(), 5*time.Second) + _, err = bind.WaitMined(timeout, s.ethclient, tx) + cancel() + s.Require().NoError(err) + + tx = types.NewTransaction(0, common.Address{1}, big.NewInt(5e17), 1e6, big.NewInt(10), nil) + tx, err = types.SignTx(tx, s.signer, s.identity) + s.Require().NoError(err) + s.Require().NoError(s.ethclient.SendTransaction(ctx, tx)) + timeout, cancel = context.WithTimeout(context.Background(), 5*time.Second) + _, err = bind.WaitMined(timeout, s.ethclient, tx) + cancel() + s.Require().NoError(err) + + header, err := s.ethclient.HeaderByNumber(ctx, nil) + s.Require().NoError(err) + s.Require().Equal(big.NewInt(2), header.Number) + transfers, err := s.downloader.GetTransfers(ctx, toDBHeader(header)) + s.Require().NoError(err) + s.Require().Len(transfers, 1) +} + +func TestERC20Transfers(t *testing.T) { + suite.Run(t, new(ERC20TransferSuite)) +} + +type ERC20TransferSuite struct { + suite.Suite + + ethclient *ethclient.Client + identity *ecdsa.PrivateKey + faucet *ecdsa.PrivateKey + signer types.Signer + + downloader *ERC20TransfersDownloader + + contract *erc20.ERC20Transfer +} + +func (s *ERC20TransferSuite) SetupTest() { + var err error + s.identity, err = crypto.GenerateKey() + s.Require().NoError(err) + s.faucet, err = crypto.GenerateKey() + s.Require().NoError(err) + + node, err := miner.NewDevNode(crypto.PubkeyToAddress(s.faucet.PublicKey)) + s.Require().NoError(err) + s.Require().NoError(miner.StartWithMiner(node)) + + client, err := node.Attach() + s.Require().NoError(err) + s.ethclient = ethclient.NewClient(client) + s.downloader = NewERC20TransfersDownloader(s.ethclient, []common.Address{crypto.PubkeyToAddress(s.identity.PublicKey)}) + + _, tx, contract, err := erc20.DeployERC20Transfer(bind.NewKeyedTransactor(s.faucet), s.ethclient) + s.Require().NoError(err) + timeout, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + _, err = bind.WaitMined(timeout, s.ethclient, tx) + s.Require().NoError(err) + s.contract = contract + s.signer = types.NewEIP155Signer(big.NewInt(1337)) +} + +func (s *ERC20TransferSuite) TestNoEvents() { + header, err := s.ethclient.HeaderByNumber(context.TODO(), nil) + s.Require().NoError(err) + + transfers, err := s.downloader.GetTransfers(context.TODO(), toDBHeader(header)) + s.Require().NoError(err) + s.Require().Empty(transfers) +} + +func (s *ERC20TransferSuite) TestInboundEvent() { + tx, err := s.contract.Transfer(bind.NewKeyedTransactor(s.faucet), crypto.PubkeyToAddress(s.identity.PublicKey), + big.NewInt(100)) + s.Require().NoError(err) + timeout, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + _, err = bind.WaitMined(timeout, s.ethclient, tx) + s.Require().NoError(err) + + header, err := s.ethclient.HeaderByNumber(context.TODO(), nil) + s.Require().NoError(err) + + transfers, err := s.downloader.GetTransfers(context.TODO(), toDBHeader(header)) + s.Require().NoError(err) + s.Require().Len(transfers, 1) +} + +func (s *ERC20TransferSuite) TestOutboundEvent() { + // give some eth to pay for gas + ctx := context.TODO() + // nonce is 1 - contact with nonce 0 was deployed in setup + // FIXME request nonce + tx := types.NewTransaction(1, crypto.PubkeyToAddress(s.identity.PublicKey), big.NewInt(1e18), 1e6, big.NewInt(10), nil) + tx, err := types.SignTx(tx, s.signer, s.faucet) + s.Require().NoError(err) + s.Require().NoError(s.ethclient.SendTransaction(ctx, tx)) + timeout, cancel := context.WithTimeout(context.Background(), 5*time.Second) + _, err = bind.WaitMined(timeout, s.ethclient, tx) + cancel() + s.Require().NoError(err) + + tx, err = s.contract.Transfer(bind.NewKeyedTransactor(s.identity), common.Address{1}, big.NewInt(100)) + s.Require().NoError(err) + timeout, cancel = context.WithTimeout(context.Background(), 5*time.Second) + _, err = bind.WaitMined(timeout, s.ethclient, tx) + cancel() + s.Require().NoError(err) + + header, err := s.ethclient.HeaderByNumber(context.TODO(), nil) + s.Require().NoError(err) + + transfers, err := s.downloader.GetTransfers(context.TODO(), toDBHeader(header)) + s.Require().NoError(err) + s.Require().Len(transfers, 1) +} + +func (s *ERC20TransferSuite) TestInRange() { + for i := 0; i < 5; i++ { + tx, err := s.contract.Transfer(bind.NewKeyedTransactor(s.faucet), crypto.PubkeyToAddress(s.identity.PublicKey), + big.NewInt(100)) + s.Require().NoError(err) + timeout, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + _, err = bind.WaitMined(timeout, s.ethclient, tx) + s.Require().NoError(err) + } + transfers, err := s.downloader.GetTransfersInRange(context.TODO(), big.NewInt(1), nil) + s.Require().NoError(err) + s.Require().Len(transfers, 5) +} diff --git a/services/wallet/erc20/README.md b/services/wallet/erc20/README.md new file mode 100644 index 000000000..ac7a47c5c --- /dev/null +++ b/services/wallet/erc20/README.md @@ -0,0 +1,4 @@ +Test contract for Transfer events +================================= + +Emits transfer event defined by ERC20 standart. \ No newline at end of file diff --git a/services/wallet/erc20/doc.go b/services/wallet/erc20/doc.go new file mode 100644 index 000000000..4d6a26e5b --- /dev/null +++ b/services/wallet/erc20/doc.go @@ -0,0 +1,3 @@ +package erc20 + +//go:generate abigen -sol erc20.sol -pkg erc20 -out erc20.go diff --git a/services/wallet/erc20/erc20.go b/services/wallet/erc20/erc20.go new file mode 100644 index 000000000..d6cfd2f22 --- /dev/null +++ b/services/wallet/erc20/erc20.go @@ -0,0 +1,352 @@ +// Code generated - DO NOT EDIT. +// This file is a generated binding and any manual changes will be lost. + +package erc20 + +import ( + "math/big" + "strings" + + ethereum "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/accounts/abi" + "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/event" +) + +// Reference imports to suppress errors if they are not otherwise used. +var ( + _ = big.NewInt + _ = strings.NewReader + _ = ethereum.NotFound + _ = abi.U256 + _ = bind.Bind + _ = common.Big1 + _ = types.BloomLookup + _ = event.NewSubscription +) + +// ERC20TransferABI is the input ABI used to generate the binding from. +const ERC20TransferABI = "[{\"constant\":false,\"inputs\":[{\"name\":\"to\",\"type\":\"address\"},{\"name\":\"value\",\"type\":\"uint256\"}],\"name\":\"transfer\",\"outputs\":[],\"payable\":false,\"stateMutability\":\"nonpayable\",\"type\":\"function\"},{\"inputs\":[],\"payable\":false,\"stateMutability\":\"nonpayable\",\"type\":\"constructor\"},{\"anonymous\":false,\"inputs\":[{\"indexed\":true,\"name\":\"from\",\"type\":\"address\"},{\"indexed\":true,\"name\":\"to\",\"type\":\"address\"},{\"indexed\":false,\"name\":\"value\",\"type\":\"uint256\"}],\"name\":\"Transfer\",\"type\":\"event\"}]" + +// ERC20TransferBin is the compiled bytecode used for deploying new contracts. +const ERC20TransferBin = `0x6080604052348015600f57600080fd5b5060c88061001e6000396000f3fe6080604052348015600f57600080fd5b506004361060285760003560e01c8063a9059cbb14602d575b600080fd5b605660048036036040811015604157600080fd5b506001600160a01b0381351690602001356058565b005b6040805182815290516001600160a01b0384169133917fddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef9181900360200190a3505056fea165627a7a72305820821ecaa5ed56957610817ecaabbb288ac338db81a0f09fc3ce5c7ddfa17351b60029` + +// DeployERC20Transfer deploys a new Ethereum contract, binding an instance of ERC20Transfer to it. +func DeployERC20Transfer(auth *bind.TransactOpts, backend bind.ContractBackend) (common.Address, *types.Transaction, *ERC20Transfer, error) { + parsed, err := abi.JSON(strings.NewReader(ERC20TransferABI)) + if err != nil { + return common.Address{}, nil, nil, err + } + address, tx, contract, err := bind.DeployContract(auth, parsed, common.FromHex(ERC20TransferBin), backend) + if err != nil { + return common.Address{}, nil, nil, err + } + return address, tx, &ERC20Transfer{ERC20TransferCaller: ERC20TransferCaller{contract: contract}, ERC20TransferTransactor: ERC20TransferTransactor{contract: contract}, ERC20TransferFilterer: ERC20TransferFilterer{contract: contract}}, nil +} + +// ERC20Transfer is an auto generated Go binding around an Ethereum contract. +type ERC20Transfer struct { + ERC20TransferCaller // Read-only binding to the contract + ERC20TransferTransactor // Write-only binding to the contract + ERC20TransferFilterer // Log filterer for contract events +} + +// ERC20TransferCaller is an auto generated read-only Go binding around an Ethereum contract. +type ERC20TransferCaller struct { + contract *bind.BoundContract // Generic contract wrapper for the low level calls +} + +// ERC20TransferTransactor is an auto generated write-only Go binding around an Ethereum contract. +type ERC20TransferTransactor struct { + contract *bind.BoundContract // Generic contract wrapper for the low level calls +} + +// ERC20TransferFilterer is an auto generated log filtering Go binding around an Ethereum contract events. +type ERC20TransferFilterer struct { + contract *bind.BoundContract // Generic contract wrapper for the low level calls +} + +// ERC20TransferSession is an auto generated Go binding around an Ethereum contract, +// with pre-set call and transact options. +type ERC20TransferSession struct { + Contract *ERC20Transfer // Generic contract binding to set the session for + CallOpts bind.CallOpts // Call options to use throughout this session + TransactOpts bind.TransactOpts // Transaction auth options to use throughout this session +} + +// ERC20TransferCallerSession is an auto generated read-only Go binding around an Ethereum contract, +// with pre-set call options. +type ERC20TransferCallerSession struct { + Contract *ERC20TransferCaller // Generic contract caller binding to set the session for + CallOpts bind.CallOpts // Call options to use throughout this session +} + +// ERC20TransferTransactorSession is an auto generated write-only Go binding around an Ethereum contract, +// with pre-set transact options. +type ERC20TransferTransactorSession struct { + Contract *ERC20TransferTransactor // Generic contract transactor binding to set the session for + TransactOpts bind.TransactOpts // Transaction auth options to use throughout this session +} + +// ERC20TransferRaw is an auto generated low-level Go binding around an Ethereum contract. +type ERC20TransferRaw struct { + Contract *ERC20Transfer // Generic contract binding to access the raw methods on +} + +// ERC20TransferCallerRaw is an auto generated low-level read-only Go binding around an Ethereum contract. +type ERC20TransferCallerRaw struct { + Contract *ERC20TransferCaller // Generic read-only contract binding to access the raw methods on +} + +// ERC20TransferTransactorRaw is an auto generated low-level write-only Go binding around an Ethereum contract. +type ERC20TransferTransactorRaw struct { + Contract *ERC20TransferTransactor // Generic write-only contract binding to access the raw methods on +} + +// NewERC20Transfer creates a new instance of ERC20Transfer, bound to a specific deployed contract. +func NewERC20Transfer(address common.Address, backend bind.ContractBackend) (*ERC20Transfer, error) { + contract, err := bindERC20Transfer(address, backend, backend, backend) + if err != nil { + return nil, err + } + return &ERC20Transfer{ERC20TransferCaller: ERC20TransferCaller{contract: contract}, ERC20TransferTransactor: ERC20TransferTransactor{contract: contract}, ERC20TransferFilterer: ERC20TransferFilterer{contract: contract}}, nil +} + +// NewERC20TransferCaller creates a new read-only instance of ERC20Transfer, bound to a specific deployed contract. +func NewERC20TransferCaller(address common.Address, caller bind.ContractCaller) (*ERC20TransferCaller, error) { + contract, err := bindERC20Transfer(address, caller, nil, nil) + if err != nil { + return nil, err + } + return &ERC20TransferCaller{contract: contract}, nil +} + +// NewERC20TransferTransactor creates a new write-only instance of ERC20Transfer, bound to a specific deployed contract. +func NewERC20TransferTransactor(address common.Address, transactor bind.ContractTransactor) (*ERC20TransferTransactor, error) { + contract, err := bindERC20Transfer(address, nil, transactor, nil) + if err != nil { + return nil, err + } + return &ERC20TransferTransactor{contract: contract}, nil +} + +// NewERC20TransferFilterer creates a new log filterer instance of ERC20Transfer, bound to a specific deployed contract. +func NewERC20TransferFilterer(address common.Address, filterer bind.ContractFilterer) (*ERC20TransferFilterer, error) { + contract, err := bindERC20Transfer(address, nil, nil, filterer) + if err != nil { + return nil, err + } + return &ERC20TransferFilterer{contract: contract}, nil +} + +// bindERC20Transfer binds a generic wrapper to an already deployed contract. +func bindERC20Transfer(address common.Address, caller bind.ContractCaller, transactor bind.ContractTransactor, filterer bind.ContractFilterer) (*bind.BoundContract, error) { + parsed, err := abi.JSON(strings.NewReader(ERC20TransferABI)) + if err != nil { + return nil, err + } + return bind.NewBoundContract(address, parsed, caller, transactor, filterer), nil +} + +// Call invokes the (constant) contract method with params as input values and +// sets the output to result. The result type might be a single field for simple +// returns, a slice of interfaces for anonymous returns and a struct for named +// returns. +func (_ERC20Transfer *ERC20TransferRaw) Call(opts *bind.CallOpts, result interface{}, method string, params ...interface{}) error { + return _ERC20Transfer.Contract.ERC20TransferCaller.contract.Call(opts, result, method, params...) +} + +// Transfer initiates a plain transaction to move funds to the contract, calling +// its default method if one is available. +func (_ERC20Transfer *ERC20TransferRaw) Transfer(opts *bind.TransactOpts) (*types.Transaction, error) { + return _ERC20Transfer.Contract.ERC20TransferTransactor.contract.Transfer(opts) +} + +// Transact invokes the (paid) contract method with params as input values. +func (_ERC20Transfer *ERC20TransferRaw) Transact(opts *bind.TransactOpts, method string, params ...interface{}) (*types.Transaction, error) { + return _ERC20Transfer.Contract.ERC20TransferTransactor.contract.Transact(opts, method, params...) +} + +// Call invokes the (constant) contract method with params as input values and +// sets the output to result. The result type might be a single field for simple +// returns, a slice of interfaces for anonymous returns and a struct for named +// returns. +func (_ERC20Transfer *ERC20TransferCallerRaw) Call(opts *bind.CallOpts, result interface{}, method string, params ...interface{}) error { + return _ERC20Transfer.Contract.contract.Call(opts, result, method, params...) +} + +// Transfer initiates a plain transaction to move funds to the contract, calling +// its default method if one is available. +func (_ERC20Transfer *ERC20TransferTransactorRaw) Transfer(opts *bind.TransactOpts) (*types.Transaction, error) { + return _ERC20Transfer.Contract.contract.Transfer(opts) +} + +// Transact invokes the (paid) contract method with params as input values. +func (_ERC20Transfer *ERC20TransferTransactorRaw) Transact(opts *bind.TransactOpts, method string, params ...interface{}) (*types.Transaction, error) { + return _ERC20Transfer.Contract.contract.Transact(opts, method, params...) +} + +// Transfer is a paid mutator transaction binding the contract method 0xa9059cbb. +// +// Solidity: function transfer(address to, uint256 value) returns() +func (_ERC20Transfer *ERC20TransferTransactor) Transfer(opts *bind.TransactOpts, to common.Address, value *big.Int) (*types.Transaction, error) { + return _ERC20Transfer.contract.Transact(opts, "transfer", to, value) +} + +// Transfer is a paid mutator transaction binding the contract method 0xa9059cbb. +// +// Solidity: function transfer(address to, uint256 value) returns() +func (_ERC20Transfer *ERC20TransferSession) Transfer(to common.Address, value *big.Int) (*types.Transaction, error) { + return _ERC20Transfer.Contract.Transfer(&_ERC20Transfer.TransactOpts, to, value) +} + +// Transfer is a paid mutator transaction binding the contract method 0xa9059cbb. +// +// Solidity: function transfer(address to, uint256 value) returns() +func (_ERC20Transfer *ERC20TransferTransactorSession) Transfer(to common.Address, value *big.Int) (*types.Transaction, error) { + return _ERC20Transfer.Contract.Transfer(&_ERC20Transfer.TransactOpts, to, value) +} + +// ERC20TransferTransferIterator is returned from FilterTransfer and is used to iterate over the raw logs and unpacked data for Transfer events raised by the ERC20Transfer contract. +type ERC20TransferTransferIterator struct { + Event *ERC20TransferTransfer // Event containing the contract specifics and raw log + + contract *bind.BoundContract // Generic contract to use for unpacking event data + event string // Event name to use for unpacking event data + + logs chan types.Log // Log channel receiving the found contract events + sub ethereum.Subscription // Subscription for errors, completion and termination + done bool // Whether the subscription completed delivering logs + fail error // Occurred error to stop iteration +} + +// Next advances the iterator to the subsequent event, returning whether there +// are any more events found. In case of a retrieval or parsing error, false is +// returned and Error() can be queried for the exact failure. +func (it *ERC20TransferTransferIterator) Next() bool { + // If the iterator failed, stop iterating + if it.fail != nil { + return false + } + // If the iterator completed, deliver directly whatever's available + if it.done { + select { + case log := <-it.logs: + it.Event = new(ERC20TransferTransfer) + if err := it.contract.UnpackLog(it.Event, it.event, log); err != nil { + it.fail = err + return false + } + it.Event.Raw = log + return true + + default: + return false + } + } + // Iterator still in progress, wait for either a data or an error event + select { + case log := <-it.logs: + it.Event = new(ERC20TransferTransfer) + if err := it.contract.UnpackLog(it.Event, it.event, log); err != nil { + it.fail = err + return false + } + it.Event.Raw = log + return true + + case err := <-it.sub.Err(): + it.done = true + it.fail = err + return it.Next() + } +} + +// Error returns any retrieval or parsing error occurred during filtering. +func (it *ERC20TransferTransferIterator) Error() error { + return it.fail +} + +// Close terminates the iteration process, releasing any pending underlying +// resources. +func (it *ERC20TransferTransferIterator) Close() error { + it.sub.Unsubscribe() + return nil +} + +// ERC20TransferTransfer represents a Transfer event raised by the ERC20Transfer contract. +type ERC20TransferTransfer struct { + From common.Address + To common.Address + Value *big.Int + Raw types.Log // Blockchain specific contextual infos +} + +// FilterTransfer is a free log retrieval operation binding the contract event 0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef. +// +// Solidity: event Transfer(address indexed from, address indexed to, uint256 value) +func (_ERC20Transfer *ERC20TransferFilterer) FilterTransfer(opts *bind.FilterOpts, from []common.Address, to []common.Address) (*ERC20TransferTransferIterator, error) { + + var fromRule []interface{} + for _, fromItem := range from { + fromRule = append(fromRule, fromItem) + } + var toRule []interface{} + for _, toItem := range to { + toRule = append(toRule, toItem) + } + + logs, sub, err := _ERC20Transfer.contract.FilterLogs(opts, "Transfer", fromRule, toRule) + if err != nil { + return nil, err + } + return &ERC20TransferTransferIterator{contract: _ERC20Transfer.contract, event: "Transfer", logs: logs, sub: sub}, nil +} + +// WatchTransfer is a free log subscription operation binding the contract event 0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef. +// +// Solidity: event Transfer(address indexed from, address indexed to, uint256 value) +func (_ERC20Transfer *ERC20TransferFilterer) WatchTransfer(opts *bind.WatchOpts, sink chan<- *ERC20TransferTransfer, from []common.Address, to []common.Address) (event.Subscription, error) { + + var fromRule []interface{} + for _, fromItem := range from { + fromRule = append(fromRule, fromItem) + } + var toRule []interface{} + for _, toItem := range to { + toRule = append(toRule, toItem) + } + + logs, sub, err := _ERC20Transfer.contract.WatchLogs(opts, "Transfer", fromRule, toRule) + if err != nil { + return nil, err + } + return event.NewSubscription(func(quit <-chan struct{}) error { + defer sub.Unsubscribe() + for { + select { + case log := <-logs: + // New log arrived, parse the event and forward to the user + event := new(ERC20TransferTransfer) + if err := _ERC20Transfer.contract.UnpackLog(event, "Transfer", log); err != nil { + return err + } + event.Raw = log + + select { + case sink <- event: + case err := <-sub.Err(): + return err + case <-quit: + return nil + } + case err := <-sub.Err(): + return err + case <-quit: + return nil + } + } + }), nil +} diff --git a/services/wallet/erc20/erc20.sol b/services/wallet/erc20/erc20.sol new file mode 100644 index 000000000..884eb67d4 --- /dev/null +++ b/services/wallet/erc20/erc20.sol @@ -0,0 +1,12 @@ +pragma solidity ^0.5.0; + +contract ERC20Transfer { + + constructor() public {} + + event Transfer(address indexed from, address indexed to, uint256 value); + + function transfer(address to, uint256 value) public { + emit Transfer(msg.sender, to, value); + } +} diff --git a/services/wallet/events.go b/services/wallet/events.go new file mode 100644 index 000000000..f126daab7 --- /dev/null +++ b/services/wallet/events.go @@ -0,0 +1,26 @@ +package wallet + +import ( + "math/big" + + "github.com/ethereum/go-ethereum/common" +) + +// EventType type for event types. +type EventType string + +const ( + // EventNewBlock emitted when new block was added to the same canonical chan. + EventNewBlock EventType = "newblock" + // EventReorg emitted when canonical chain was changed. In this case, BlockNumber will be an earliest added block. + EventReorg EventType = "reorg" + // EventNewHistory emitted if transfer from older block was added. + EventNewHistory EventType = "history" +) + +// Event is a type for wallet events. +type Event struct { + Type EventType `json:"type"` + BlockNumber *big.Int `json:"blockNumber"` + Accounts []common.Address `json:"accounts"` +} diff --git a/services/wallet/iterative.go b/services/wallet/iterative.go new file mode 100644 index 000000000..ea638060a --- /dev/null +++ b/services/wallet/iterative.go @@ -0,0 +1,92 @@ +package wallet + +import ( + "context" + "math/big" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/log" +) + +// SetupIterativeDownloader configures IterativeDownloader with last known synced block. +func SetupIterativeDownloader( + db *Database, client HeaderReader, address common.Address, option SyncOption, + downloader BatchDownloader, size *big.Int, to *DBHeader) (*IterativeDownloader, error) { + from, err := db.GetLatestSynced(address, option) + if err != nil { + log.Error("failed to get latest synced block", "error", err) + return nil, err + } + if from == nil { + from = &DBHeader{Number: zero} + } + log.Debug("iterative downloader", "address", address, "from", from.Number, "to", to.Number) + d := &IterativeDownloader{ + client: client, + batchSize: size, + downloader: downloader, + from: from, + to: to, + } + return d, nil +} + +// BatchDownloader interface for loading transfers in batches in speificed range of blocks. +type BatchDownloader interface { + GetTransfersInRange(ctx context.Context, from, to *big.Int) ([]Transfer, error) +} + +// IterativeDownloader downloads batches of transfers in a specified size. +type IterativeDownloader struct { + client HeaderReader + + batchSize *big.Int + + downloader BatchDownloader + + from, to *DBHeader + previous *DBHeader +} + +// Finished true when earliest block with given sync option is zero. +func (d *IterativeDownloader) Finished() bool { + return d.from.Number.Cmp(d.to.Number) == 0 +} + +// Header return last synced header. +func (d *IterativeDownloader) Header() *DBHeader { + return d.previous +} + +// Next moves closer to the end on every new iteration. +func (d *IterativeDownloader) Next(parent context.Context) ([]Transfer, error) { + to := new(big.Int).Add(d.from.Number, d.batchSize) + // if start < 0; start = 0 + if to.Cmp(d.to.Number) == 1 { + to = d.to.Number + } + transfers, err := d.downloader.GetTransfersInRange(parent, d.from.Number, to) + if err != nil { + log.Error("failed to get transfer in between two bloks", "from", d.from.Number, "to", to, "error", err) + return nil, err + } + // use integers instead of DBHeader + ctx, cancel := context.WithTimeout(parent, 5*time.Second) + header, err := d.client.HeaderByNumber(ctx, to) + cancel() + if err != nil { + log.Error("failed to get header by number", "from", d.from.Number, "to", to, "error", err) + return nil, err + } + d.previous, d.from = d.from, toDBHeader(header) + return transfers, nil +} + +// Revert reverts last step progress. Should be used if application failed to process transfers. +// For example failed to persist them. +func (d *IterativeDownloader) Revert() { + if d.previous != nil { + d.from = d.previous + } +} diff --git a/services/wallet/iterative_test.go b/services/wallet/iterative_test.go new file mode 100644 index 000000000..3bb66927e --- /dev/null +++ b/services/wallet/iterative_test.go @@ -0,0 +1,117 @@ +package wallet + +import ( + "context" + "errors" + "math/big" + "testing" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/stretchr/testify/require" +) + +type transfersFixture []Transfer + +func (f transfersFixture) GetTransfersInRange(ctx context.Context, from, to *big.Int) ([]Transfer, error) { + rst := []Transfer{} + for _, t := range f { + if t.BlockNumber.Cmp(from) >= 0 && t.BlockNumber.Cmp(to) <= 0 { + rst = append(rst, t) + } + } + return rst, nil +} + +func TestIterFinished(t *testing.T) { + iterator := IterativeDownloader{ + from: &DBHeader{Number: big.NewInt(10)}, + to: &DBHeader{Number: big.NewInt(10)}, + } + require.True(t, iterator.Finished()) +} + +func TestIterNotFinished(t *testing.T) { + iterator := IterativeDownloader{ + from: &DBHeader{Number: big.NewInt(2)}, + to: &DBHeader{Number: big.NewInt(5)}, + } + require.False(t, iterator.Finished()) +} + +func TestIterRevert(t *testing.T) { + iterator := IterativeDownloader{ + from: &DBHeader{Number: big.NewInt(12)}, + to: &DBHeader{Number: big.NewInt(12)}, + previous: &DBHeader{Number: big.NewInt(9)}, + } + require.True(t, iterator.Finished()) + iterator.Revert() + require.False(t, iterator.Finished()) +} + +func TestIterProgress(t *testing.T) { + var ( + chain headers = genHeadersChain(10, 1) + transfers = make(transfersFixture, 10) + ) + for i := range transfers { + transfers[i] = Transfer{ + BlockNumber: chain[i].Number, + BlockHash: chain[i].Hash(), + } + } + iter := &IterativeDownloader{ + client: chain, + downloader: transfers, + batchSize: big.NewInt(5), + from: &DBHeader{Number: big.NewInt(0)}, + to: &DBHeader{Number: big.NewInt(9)}, + } + batch, err := iter.Next(context.TODO()) + require.NoError(t, err) + require.Len(t, batch, 6) + batch, err = iter.Next(context.TODO()) + require.NoError(t, err) + require.Len(t, batch, 5) + require.True(t, iter.Finished()) +} + +type headers []*types.Header + +func (h headers) HeaderByHash(ctx context.Context, hash common.Hash) (*types.Header, error) { + for _, item := range h { + if item.Hash() == hash { + return item, nil + } + } + return nil, errors.New("not found") +} + +func (h headers) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) { + for _, item := range h { + if item.Number.Cmp(number) == 0 { + return item, nil + } + } + return nil, errors.New("not found") +} + +func (h headers) BalanceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (*big.Int, error) { + return nil, errors.New("not implemented") +} + +func genHeadersChain(size, difficulty int) []*types.Header { + rst := make([]*types.Header, size) + for i := 0; i < size; i++ { + rst[i] = &types.Header{ + Number: big.NewInt(int64(i)), + Difficulty: big.NewInt(int64(difficulty)), + Time: 1, + } + if i != 0 { + rst[i].ParentHash = rst[i-1].Hash() + } + } + return rst +} diff --git a/services/wallet/migrations/bindata.go b/services/wallet/migrations/bindata.go new file mode 100644 index 000000000..88abbccca --- /dev/null +++ b/services/wallet/migrations/bindata.go @@ -0,0 +1,127 @@ +package migrations + +import ( + "bytes" + "compress/gzip" + "fmt" + "io" + "strings" +) + +func bindata_read(data []byte, name string) ([]byte, error) { + gz, err := gzip.NewReader(bytes.NewBuffer(data)) + if err != nil { + return nil, fmt.Errorf("Read %q: %v", name, err) + } + + var buf bytes.Buffer + _, err = io.Copy(&buf, gz) + gz.Close() + + if err != nil { + return nil, fmt.Errorf("Read %q: %v", name, err) + } + + return buf.Bytes(), nil +} + +var __0001_transfers_down_db_sql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x72\x09\xf2\x0f\x50\x08\x71\x74\xf2\x71\x55\x28\x29\x4a\xcc\x2b\x4e\x4b\x2d\x2a\xb6\xe6\x42\x12\x4d\xca\xc9\x4f\xce\x2e\xb6\xe6\x02\x04\x00\x00\xff\xff\x27\x4d\x7a\xa1\x29\x00\x00\x00") + +func _0001_transfers_down_db_sql() ([]byte, error) { + return bindata_read( + __0001_transfers_down_db_sql, + "0001_transfers.down.db.sql", + ) +} + +var __0001_transfers_up_db_sql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x8c\x91\xd1\x72\xaa\x30\x10\x86\xef\xf3\x14\x7b\x29\x33\x79\x83\x73\x15\x60\xd1\xcc\xc9\x49\x4e\x43\xa8\xf5\x8a\x41\x4c\xab\xa3\x06\x4a\x60\xa6\xbe\x7d\x07\x01\xad\xad\xe3\xf4\x32\xbb\xd9\xdd\xef\xff\xff\x48\x23\x33\x08\x86\x85\x02\x81\x27\x20\x95\x01\x7c\xe1\xa9\x49\xa1\x6d\x0a\xe7\x5f\x6d\xe3\x61\x46\xb6\x85\xdf\xc2\x33\xd3\xd1\x82\x69\xc8\x24\x7f\xca\x90\x92\x62\xb3\x69\xac\xf7\x97\x7a\x3f\x2b\x33\x21\x28\x59\x1f\xf6\xf9\xcd\xc8\xb5\xd5\x7e\x40\x28\x54\x48\x49\x63\x4b\xbb\xab\xdb\xf1\xd5\x9e\x6a\x7b\xe7\x77\xa2\x34\xf2\xb9\x84\xbf\xb8\x9a\x4d\x4b\x03\xd0\x98\xa0\x46\x19\x61\x0a\xeb\x43\x55\xee\xfd\x6c\xa8\x2b\x09\x31\x0a\x34\x08\x11\x4b\x23\x16\x23\x25\x91\x92\xa9\xd1\x8c\x4b\x03\x9d\xdb\xbd\x77\x36\x9f\x64\xe5\x95\x3b\xaf\xcb\x27\x19\x83\x2c\x38\xef\xa2\x63\x31\x20\xc1\x1f\x42\x1e\x98\x34\xdc\xff\xee\xd0\x7f\xcd\xff\x31\xbd\xea\xb1\x29\x71\xdd\x71\x6d\x1b\x08\xf9\xbc\xa7\x18\xaf\x5c\x25\x6e\x6d\xb1\x81\x50\x29\x01\x31\x26\x2c\x13\x06\x12\x26\x52\x24\x01\x2c\xb9\x59\xa8\xcc\x80\x56\x4b\x1e\x3f\xc6\x28\xca\xb2\xea\x5c\xeb\xf3\xb6\xca\x2f\x48\x8f\xf3\xb9\xc5\xba\xf6\xfc\xc9\x95\xc0\xa5\xf9\x69\xfe\x30\x71\xcf\xfe\xa9\xf3\xab\x00\x8e\x45\x5d\xef\xdc\x5b\xef\xff\x48\x38\x20\x4f\x44\x53\x0e\x63\x93\x7e\x39\xdd\xa7\xf1\x19\x00\x00\xff\xff\xfc\x91\xad\x60\xb2\x02\x00\x00") + +func _0001_transfers_up_db_sql() ([]byte, error) { + return bindata_read( + __0001_transfers_up_db_sql, + "0001_transfers.up.db.sql", + ) +} + +var _doc_go = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x2c\xc9\xb1\x0d\xc4\x20\x0c\x05\xd0\x9e\x29\xfe\x02\xd8\xfd\x6d\xe3\x4b\xac\x2f\x44\x82\x09\x78\x7f\xa5\x49\xfd\xa6\x1d\xdd\xe8\xd8\xcf\x55\x8a\x2a\xe3\x47\x1f\xbe\x2c\x1d\x8c\xfa\x6f\xe3\xb4\x34\xd4\xd9\x89\xbb\x71\x59\xb6\x18\x1b\x35\x20\xa2\x9f\x0a\x03\xa2\xe5\x0d\x00\x00\xff\xff\x60\xcd\x06\xbe\x4a\x00\x00\x00") + +func doc_go() ([]byte, error) { + return bindata_read( + _doc_go, + "doc.go", + ) +} + +// Asset loads and returns the asset for the given name. +// It returns an error if the asset could not be found or +// could not be loaded. +func Asset(name string) ([]byte, error) { + cannonicalName := strings.Replace(name, "\\", "/", -1) + if f, ok := _bindata[cannonicalName]; ok { + return f() + } + return nil, fmt.Errorf("Asset %s not found", name) +} + +// AssetNames returns the names of the assets. +func AssetNames() []string { + names := make([]string, 0, len(_bindata)) + for name := range _bindata { + names = append(names, name) + } + return names +} + +// _bindata is a table, holding each asset generator, mapped to its name. +var _bindata = map[string]func() ([]byte, error){ + "0001_transfers.down.db.sql": _0001_transfers_down_db_sql, + "0001_transfers.up.db.sql": _0001_transfers_up_db_sql, + "doc.go": doc_go, +} +// AssetDir returns the file names below a certain +// directory embedded in the file by go-bindata. +// For example if you run go-bindata on data/... and data contains the +// following hierarchy: +// data/ +// foo.txt +// img/ +// a.png +// b.png +// then AssetDir("data") would return []string{"foo.txt", "img"} +// AssetDir("data/img") would return []string{"a.png", "b.png"} +// AssetDir("foo.txt") and AssetDir("notexist") would return an error +// AssetDir("") will return []string{"data"}. +func AssetDir(name string) ([]string, error) { + node := _bintree + if len(name) != 0 { + cannonicalName := strings.Replace(name, "\\", "/", -1) + pathList := strings.Split(cannonicalName, "/") + for _, p := range pathList { + node = node.Children[p] + if node == nil { + return nil, fmt.Errorf("Asset %s not found", name) + } + } + } + if node.Func != nil { + return nil, fmt.Errorf("Asset %s not found", name) + } + rv := make([]string, 0, len(node.Children)) + for name := range node.Children { + rv = append(rv, name) + } + return rv, nil +} + +type _bintree_t struct { + Func func() ([]byte, error) + Children map[string]*_bintree_t +} +var _bintree = &_bintree_t{nil, map[string]*_bintree_t{ + "0001_transfers.down.db.sql": &_bintree_t{_0001_transfers_down_db_sql, map[string]*_bintree_t{ + }}, + "0001_transfers.up.db.sql": &_bintree_t{_0001_transfers_up_db_sql, map[string]*_bintree_t{ + }}, + "doc.go": &_bintree_t{doc_go, map[string]*_bintree_t{ + }}, +}} diff --git a/services/wallet/migrations/migrate.go b/services/wallet/migrations/migrate.go new file mode 100644 index 000000000..7b5802208 --- /dev/null +++ b/services/wallet/migrations/migrate.go @@ -0,0 +1,43 @@ +package migrations + +import ( + "database/sql" + + "github.com/status-im/migrate/v4" + "github.com/status-im/migrate/v4/database/sqlcipher" + bindata "github.com/status-im/migrate/v4/source/go_bindata" +) + +// Migrate applies migrations. +func Migrate(db *sql.DB) error { + resources := bindata.Resource( + AssetNames(), + func(name string) ([]byte, error) { + return Asset(name) + }, + ) + + source, err := bindata.WithInstance(resources) + if err != nil { + return err + } + + driver, err := sqlcipher.WithInstance(db, &sqlcipher.Config{}) + if err != nil { + return err + } + + m, err := migrate.NewWithInstance( + "go-bindata", + source, + "sqlcipher", + driver) + if err != nil { + return err + } + + if err = m.Up(); err != migrate.ErrNoChange { + return err + } + return nil +} diff --git a/services/wallet/migrations/sql/0001_transfers.down.db.sql b/services/wallet/migrations/sql/0001_transfers.down.db.sql new file mode 100644 index 000000000..d82814a90 --- /dev/null +++ b/services/wallet/migrations/sql/0001_transfers.down.db.sql @@ -0,0 +1,3 @@ +DROP TABLE transfers; +DROP TABLE blocks; +DROP TABLE accounts_to_blocks; diff --git a/services/wallet/migrations/sql/0001_transfers.up.db.sql b/services/wallet/migrations/sql/0001_transfers.up.db.sql new file mode 100644 index 000000000..9b5a380ac --- /dev/null +++ b/services/wallet/migrations/sql/0001_transfers.up.db.sql @@ -0,0 +1,24 @@ +CREATE TABLE IF NOT EXISTS transfers ( +hash VARCHAR UNIQUE, +address VARCHAR NOT NULL, +blk_hash VARCHAR NOT NULL, +tx BLOB, +receipt BLOB, +type VARCHAR NOT NULL, +FOREIGN KEY(blk_hash) REFERENCES blocks(hash) ON DELETE CASCADE, +CONSTRAINT unique_transfer_on_hash_address UNIQUE (hash,address) +); + +CREATE TABLE IF NOT EXISTS blocks ( +hash VARCHAR PRIMARY KEY, +number BIGINT UNIQUE NOT NULL, +head BOOL DEFAULT FALSE +) WITHOUT ROWID; + +CREATE TABLE IF NOT EXISTS accounts_to_blocks ( +address VARCHAR NOT NULL, +blk_number BIGINT NOT NULL, +sync INT, +FOREIGN KEY(blk_number) REFERENCES blocks(number) ON DELETE CASCADE, +CONSTRAINT unique_mapping_on_address_block_number UNIQUE (address,blk_number) +); diff --git a/services/wallet/migrations/sql/doc.go b/services/wallet/migrations/sql/doc.go new file mode 100644 index 000000000..e0a060394 --- /dev/null +++ b/services/wallet/migrations/sql/doc.go @@ -0,0 +1,3 @@ +package sql + +//go:generate go-bindata -pkg migrations -o ../bindata.go ./ diff --git a/services/wallet/query.go b/services/wallet/query.go new file mode 100644 index 000000000..73d94659e --- /dev/null +++ b/services/wallet/query.go @@ -0,0 +1,86 @@ +package wallet + +import ( + "bytes" + "database/sql" + "math/big" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" +) + +const baseTransfersQuery = "SELECT transfers.hash, type, blocks.hash, blocks.number, address, tx, receipt FROM transfers JOIN blocks ON blk_hash = blocks.hash" + +func newTransfersQuery() *transfersQuery { + buf := bytes.NewBuffer(nil) + buf.WriteString(baseTransfersQuery) + return &transfersQuery{buf: buf} +} + +type transfersQuery struct { + buf *bytes.Buffer + args []interface{} + added bool +} + +func (q *transfersQuery) andOrWhere() { + if q.added { + q.buf.WriteString(" AND") + } else { + q.buf.WriteString(" WHERE") + } +} + +func (q *transfersQuery) FilterStart(start *big.Int) *transfersQuery { + if start != nil { + q.andOrWhere() + q.added = true + q.buf.WriteString(" blocks.number >= ?") + q.args = append(q.args, (*SQLBigInt)(start)) + } + return q +} + +func (q *transfersQuery) FilterEnd(end *big.Int) *transfersQuery { + if end != nil { + q.andOrWhere() + q.added = true + q.buf.WriteString(" blocks.number <= ?") + q.args = append(q.args, (*SQLBigInt)(end)) + } + return q +} + +func (q *transfersQuery) FilterAddress(address common.Address) *transfersQuery { + q.andOrWhere() + q.added = true + q.buf.WriteString(" address = ?") + q.args = append(q.args, address) + return q +} + +func (q *transfersQuery) String() string { + return q.buf.String() +} + +func (q *transfersQuery) Args() []interface{} { + return q.args +} + +func (q *transfersQuery) Scan(rows *sql.Rows) (rst []Transfer, err error) { + for rows.Next() { + transfer := Transfer{ + BlockNumber: &big.Int{}, + Transaction: &types.Transaction{}, + Receipt: &types.Receipt{}, + } + err = rows.Scan( + &transfer.ID, &transfer.Type, &transfer.BlockHash, (*SQLBigInt)(transfer.BlockNumber), &transfer.Address, + &JSONBlob{transfer.Transaction}, &JSONBlob{transfer.Receipt}) + if err != nil { + return nil, err + } + rst = append(rst, transfer) + } + return rst, nil +} diff --git a/services/wallet/reactor.go b/services/wallet/reactor.go new file mode 100644 index 000000000..abb412d21 --- /dev/null +++ b/services/wallet/reactor.go @@ -0,0 +1,114 @@ +package wallet + +import ( + "context" + "errors" + "math/big" + "sync" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/ethclient" + "github.com/ethereum/go-ethereum/event" + "github.com/status-im/status-go/params" +) + +// pow block on main chain is mined once per ~14 seconds +// but for tests we are using clique chain with immediate block signer +// hence we can use different polling periods for methods that depend on mining time. +func pollingPeriodByChain(chain *big.Int) time.Duration { + switch chain.Int64() { + case int64(params.MainNetworkID): + return 10 * time.Second + case int64(params.RopstenNetworkID): + return 2 * time.Second + default: + return 500 * time.Millisecond + } +} + +var ( + reorgSafetyDepth = big.NewInt(15) + erc20BatchSize = big.NewInt(100000) +) + +// HeaderReader interface for reading headers using block number or hash. +type HeaderReader interface { + HeaderByHash(ctx context.Context, hash common.Hash) (*types.Header, error) + HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) +} + +// BalanceReader interface for reading balance at a specifeid address. +type BalanceReader interface { + BalanceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (*big.Int, error) +} + +type reactorClient interface { + HeaderReader + BalanceReader +} + +// NewReactor creates instance of the Reactor. +func NewReactor(db *Database, feed *event.Feed, client *ethclient.Client, accounts []common.Address, chain *big.Int) *Reactor { + return &Reactor{ + db: db, + client: client, + feed: feed, + accounts: accounts, + chain: chain, + } +} + +// Reactor listens to new blocks and stores transfers into the database. +type Reactor struct { + client *ethclient.Client + db *Database + feed *event.Feed + accounts []common.Address + chain *big.Int + + mu sync.Mutex + group *Group +} + +// Start runs reactor loop in background. +func (r *Reactor) Start() error { + r.mu.Lock() + defer r.mu.Unlock() + if r.group != nil { + return errors.New("already running") + } + r.group = NewGroup(context.Background()) + // TODO(dshulyak) to support adding accounts in runtime implement keyed group + // and export private api to start downloaders from accounts + // private api should have access only to reactor + ctl := &controlCommand{ + db: r.db, + chain: r.chain, + client: r.client, + accounts: r.accounts, + eth: ÐTransferDownloader{ + client: r.client, + accounts: r.accounts, + signer: types.NewEIP155Signer(r.chain), + }, + erc20: NewERC20TransfersDownloader(r.client, r.accounts), + feed: r.feed, + safetyDepth: reorgSafetyDepth, + } + r.group.Add(ctl.Command()) + return nil +} + +// Stop stops reactor loop and waits till it exits. +func (r *Reactor) Stop() { + r.mu.Lock() + defer r.mu.Unlock() + if r.group == nil { + return + } + r.group.Stop() + r.group.Wait() + r.group = nil +} diff --git a/services/wallet/service.go b/services/wallet/service.go new file mode 100644 index 000000000..17670884f --- /dev/null +++ b/services/wallet/service.go @@ -0,0 +1,88 @@ +package wallet + +import ( + "math/big" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/ethclient" + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/rpc" +) + +// NewService initializes service instance. +func NewService() *Service { + feed := &event.Feed{} + return &Service{ + feed: feed, + signals: &SignalsTransmitter{publisher: feed}, + } +} + +// Service is a wallet service. +type Service struct { + feed *event.Feed + db *Database + reactor *Reactor + signals *SignalsTransmitter +} + +// Start signals transmitter. +func (s *Service) Start(*p2p.Server) error { + return s.signals.Start() +} + +// StartReactor separately because it requires known ethereum address, which will become available only after login. +func (s *Service) StartReactor(dbpath, password string, client *ethclient.Client, accounts []common.Address, chain *big.Int) error { + db, err := InitializeDB(dbpath, password) + if err != nil { + return err + } + reactor := NewReactor(db, s.feed, client, accounts, chain) + err = reactor.Start() + if err != nil { + return err + } + s.db = db + s.reactor = reactor + return nil +} + +// StopReactor stops reactor and closes database. +func (s *Service) StopReactor() error { + if s.reactor == nil { + return nil + } + s.reactor.Stop() + if s.db == nil { + return nil + } + return s.db.Close() +} + +// Stop reactor, signals transmitter and close db. +func (s *Service) Stop() error { + log.Info("wallet will be stopped") + err := s.StopReactor() + s.signals.Stop() + log.Info("wallet stopped") + return err +} + +// APIs returns list of available RPC APIs. +func (s *Service) APIs() []rpc.API { + return []rpc.API{ + { + Namespace: "wallet", + Version: "0.1.0", + Service: NewAPI(s), + Public: true, + }, + } +} + +// Protocols returns list of p2p protocols. +func (s *Service) Protocols() []p2p.Protocol { + return nil +} diff --git a/services/wallet/transmitter.go b/services/wallet/transmitter.go new file mode 100644 index 000000000..e112cc945 --- /dev/null +++ b/services/wallet/transmitter.go @@ -0,0 +1,64 @@ +package wallet + +import ( + "errors" + "sync" + + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/log" + "github.com/status-im/status-go/signal" +) + +type publisher interface { + Subscribe(interface{}) event.Subscription +} + +// SignalsTransmitter transmits received events as wallet signals. +type SignalsTransmitter struct { + publisher + + wg sync.WaitGroup + quit chan struct{} +} + +// Start runs loop in background. +func (tmr *SignalsTransmitter) Start() error { + if tmr.quit != nil { + return errors.New("already running") + } + tmr.quit = make(chan struct{}) + events := make(chan Event, 10) + sub := tmr.publisher.Subscribe(events) + + tmr.wg.Add(1) + go func() { + defer tmr.wg.Done() + for { + select { + case <-tmr.quit: + sub.Unsubscribe() + return + case err := <-sub.Err(): + // technically event.Feed cannot send an error to subscription.Err channel. + // the only time we will get an event is when that channel is closed. + if err != nil { + log.Error("wallet signals transmitter failed with", "error", err) + } + return + case event := <-events: + signal.SendWalletEvent(event) + } + } + }() + return nil +} + +// Stop stops the loop and waits till it exits. +func (tmr *SignalsTransmitter) Stop() { + if tmr.quit == nil { + return + } + close(tmr.quit) + tmr.wg.Wait() + tmr.quit = nil +} diff --git a/signal/events_wallet.go b/signal/events_wallet.go new file mode 100644 index 000000000..3cc493153 --- /dev/null +++ b/signal/events_wallet.go @@ -0,0 +1,10 @@ +package signal + +const ( + walletEvent = "wallet" +) + +// SendWalletEvent sends event from services/wallet/events. +func SendWalletEvent(event interface{}) { + send(walletEvent, event) +} diff --git a/signal/signals.go b/signal/signals.go index f17e9c4d5..534e5f137 100644 --- a/signal/signals.go +++ b/signal/signals.go @@ -49,7 +49,6 @@ func send(typ string, event interface{}) { logger.Error("Marshalling signal envelope", "error", err) return } - // If a Go implementation of signal handler is set, let's use it. if mobileSignalHandler != nil { mobileSignalHandler(data) diff --git a/sqlite/sqlite.go b/sqlite/sqlite.go new file mode 100644 index 000000000..afe173c5d --- /dev/null +++ b/sqlite/sqlite.go @@ -0,0 +1,46 @@ +package sqlite + +import ( + "database/sql" + "errors" + "fmt" + + _ "github.com/mutecomm/go-sqlcipher" // We require go sqlcipher that overrides default implementation +) + +func openDB(path, key string) (*sql.DB, error) { + db, err := sql.Open("sqlite3", path) + if err != nil { + return nil, err + } + + // Disable concurrent access as not supported by the driver + db.SetMaxOpenConns(1) + + if _, err = db.Exec("PRAGMA foreign_keys=ON"); err != nil { + return nil, err + } + keyString := fmt.Sprintf("PRAGMA key = '%s'", key) + if _, err = db.Exec(keyString); err != nil { + return nil, errors.New("failed to set key pragma") + } + + // readers do not block writers and faster i/o operations + // https://www.sqlite.org/draft/wal.html + // must be set after db is encrypted + var mode string + err = db.QueryRow("PRAGMA journal_mode=WAL").Scan(&mode) + if err != nil { + return nil, err + } + if mode != "wal" { + return nil, fmt.Errorf("unable to set journal_mode to WAL. actual mode %s", mode) + } + + return db, nil +} + +// OpenDB opens not-encrypted database. +func OpenDB(path, key string) (*sql.DB, error) { + return openDB(path, key) +} diff --git a/t/devtests/devnode.go b/t/devtests/devnode.go index e1c9d0dfa..89b6a97e9 100644 --- a/t/devtests/devnode.go +++ b/t/devtests/devnode.go @@ -3,91 +3,20 @@ package devtests import ( "crypto/ecdsa" "io/ioutil" - "math/big" "os" - "github.com/ethereum/go-ethereum/accounts/keystore" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/crypto" - "github.com/ethereum/go-ethereum/eth" "github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/rpc" "github.com/status-im/status-go/api" "github.com/status-im/status-go/params" statusrpc "github.com/status-im/status-go/rpc" + "github.com/status-im/status-go/t/devtests/miner" "github.com/stretchr/testify/suite" ) -// NewDevNode returns node with clieque engine and prefunded accounts. -func NewDevNode(faucet common.Address) (*node.Node, error) { - cfg := node.DefaultConfig - ipc, err := ioutil.TempFile("", "devnode-ipc-") - if err != nil { - return nil, err - } - cfg.IPCPath = ipc.Name() - cfg.HTTPModules = []string{"eth"} - cfg.DataDir = "" - cfg.NoUSB = true - cfg.P2P.MaxPeers = 0 - cfg.P2P.ListenAddr = ":0" - cfg.P2P.NoDiscovery = true - cfg.P2P.DiscoveryV5 = false - - stack, err := node.New(&cfg) - if err != nil { - return nil, err - } - - ks := stack.AccountManager().Backends(keystore.KeyStoreType)[0].(*keystore.KeyStore) - // ensure that etherbase is added to an account manager - etherbase, err := crypto.GenerateKey() - if err != nil { - return nil, err - } - acc, err := ks.ImportECDSA(etherbase, "") - if err != nil { - return nil, err - } - err = ks.Unlock(acc, "") - if err != nil { - return nil, err - } - - ethcfg := eth.DefaultConfig - ethcfg.NetworkId = 1337 - // 0 - mine only if transaction pending - ethcfg.Genesis = core.DeveloperGenesisBlock(0, faucet) - extra := make([]byte, 32) // extraVanity - extra = append(extra, acc.Address[:]...) - extra = append(extra, make([]byte, 65)...) // extraSeal - ethcfg.Genesis.ExtraData = extra - ethcfg.MinerGasPrice = big.NewInt(1) - ethcfg.Etherbase = acc.Address - - return stack, stack.Register(func(ctx *node.ServiceContext) (node.Service, error) { - return eth.New(ctx, ðcfg) - }) - -} - -// StartWithMiner starts node with eth service and a miner. -func StartWithMiner(stack *node.Node) error { - err := stack.Start() - if err != nil { - return err - } - var ethereum *eth.Ethereum - err = stack.Service(ðereum) - if err != nil { - return err - } - ethereum.TxPool().SetGasPrice(big.NewInt(1)) - return ethereum.StartMining(0) -} - // DevNodeSuite provides convenient wrapper for starting node with clique backend for mining. type DevNodeSuite struct { suite.Suite @@ -109,9 +38,9 @@ func (s *DevNodeSuite) SetupTest() { s.Require().NoError(err) s.DevAccount = account s.DevAccountAddress = crypto.PubkeyToAddress(account.PublicKey) - s.miner, err = NewDevNode(s.DevAccountAddress) + s.miner, err = miner.NewDevNode(s.DevAccountAddress) s.Require().NoError(err) - s.Require().NoError(StartWithMiner(s.miner)) + s.Require().NoError(miner.StartWithMiner(s.miner)) s.dir, err = ioutil.TempDir("", "devtests-") s.Require().NoError(err) @@ -123,10 +52,10 @@ func (s *DevNodeSuite) SetupTest() { config.WhisperConfig.Enabled = false config.LightEthConfig.Enabled = false config.UpstreamConfig.Enabled = true + config.WalletConfig.Enabled = true config.UpstreamConfig.URL = s.miner.IPCEndpoint() s.backend = api.NewStatusBackend() s.Require().NoError(s.backend.StartNode(config)) - s.Remote, err = s.miner.Attach() s.Require().NoError(err) s.Eth = ethclient.NewClient(s.Remote) diff --git a/t/devtests/miner/node.go b/t/devtests/miner/node.go new file mode 100644 index 000000000..e47068958 --- /dev/null +++ b/t/devtests/miner/node.go @@ -0,0 +1,81 @@ +package miner + +import ( + "io/ioutil" + "math/big" + + "github.com/ethereum/go-ethereum/accounts/keystore" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/eth" + "github.com/ethereum/go-ethereum/node" +) + +// NewDevNode returns node with clieque engine and prefunded accounts. +func NewDevNode(faucet common.Address) (*node.Node, error) { + cfg := node.DefaultConfig + ipc, err := ioutil.TempFile("", "devnode-ipc-") + if err != nil { + return nil, err + } + cfg.IPCPath = ipc.Name() + cfg.HTTPModules = []string{"eth"} + cfg.DataDir = "" + cfg.NoUSB = true + cfg.P2P.MaxPeers = 0 + cfg.P2P.ListenAddr = ":0" + cfg.P2P.NoDiscovery = true + cfg.P2P.DiscoveryV5 = false + + stack, err := node.New(&cfg) + if err != nil { + return nil, err + } + + ks := stack.AccountManager().Backends(keystore.KeyStoreType)[0].(*keystore.KeyStore) + // ensure that etherbase is added to an account manager + etherbase, err := crypto.GenerateKey() + if err != nil { + return nil, err + } + acc, err := ks.ImportECDSA(etherbase, "") + if err != nil { + return nil, err + } + err = ks.Unlock(acc, "") + if err != nil { + return nil, err + } + + ethcfg := eth.DefaultConfig + ethcfg.NetworkId = 1337 + // 0 - mine only if transaction pending + ethcfg.Genesis = core.DeveloperGenesisBlock(0, faucet) + extra := make([]byte, 32) // extraVanity + extra = append(extra, acc.Address[:]...) + extra = append(extra, make([]byte, 65)...) // extraSeal + ethcfg.Genesis.ExtraData = extra + ethcfg.MinerGasPrice = big.NewInt(1) + ethcfg.Etherbase = acc.Address + + return stack, stack.Register(func(ctx *node.ServiceContext) (node.Service, error) { + return eth.New(ctx, ðcfg) + }) + +} + +// StartWithMiner starts node with eth service and a miner. +func StartWithMiner(stack *node.Node) error { + err := stack.Start() + if err != nil { + return err + } + var ethereum *eth.Ethereum + err = stack.Service(ðereum) + if err != nil { + return err + } + ethereum.TxPool().SetGasPrice(big.NewInt(1)) + return ethereum.StartMining(0) +} diff --git a/t/devtests/testchain/node.go b/t/devtests/testchain/node.go new file mode 100644 index 000000000..a26578db2 --- /dev/null +++ b/t/devtests/testchain/node.go @@ -0,0 +1,80 @@ +package testchain + +import ( + "crypto/ecdsa" + "math/big" + + "github.com/ethereum/go-ethereum/consensus/ethash" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/eth" + "github.com/ethereum/go-ethereum/ethclient" + "github.com/ethereum/go-ethereum/node" + "github.com/ethereum/go-ethereum/params" +) + +type Backend struct { + Node *node.Node + Client *ethclient.Client + genesis *core.Genesis + Ethereum *eth.Ethereum + Faucet *ecdsa.PrivateKey + Signer types.Signer +} + +func NewBackend() (*Backend, error) { + faucet, err := crypto.GenerateKey() + if err != nil { + return nil, err + } + config := params.AllEthashProtocolChanges + genesis := &core.Genesis{ + Config: config, + Alloc: core.GenesisAlloc{crypto.PubkeyToAddress(faucet.PublicKey): {Balance: big.NewInt(1e18)}}, + ExtraData: []byte("test genesis"), + Timestamp: 9000, + } + var ethservice *eth.Ethereum + n, err := node.New(&node.Config{}) + if err != nil { + return nil, err + } + err = n.Register(func(ctx *node.ServiceContext) (node.Service, error) { + config := ð.Config{Genesis: genesis} + config.Ethash.PowMode = ethash.ModeFake + ethservice, err = eth.New(ctx, config) + return ethservice, err + }) + if err != nil { + return nil, err + } + + if err := n.Start(); err != nil { + return nil, err + } + client, err := n.Attach() + if err != nil { + return nil, err + } + return &Backend{ + Node: n, + Client: ethclient.NewClient(client), + Ethereum: ethservice, + Faucet: faucet, + Signer: types.NewEIP155Signer(config.ChainID), + genesis: genesis, + }, nil +} + +// GenerateBlocks generate n blocks starting from genesis. +func (b *Backend) GenerateBlocks(n int, start uint64, gen func(int, *core.BlockGen)) []*types.Block { + block := b.Ethereum.BlockChain().GetBlockByNumber(start) + engine := ethash.NewFaker() + blocks, _ := core.GenerateChain(b.genesis.Config, block, engine, b.Ethereum.ChainDb(), n, gen) + return blocks +} + +func (b *Backend) Stop() error { + return b.Node.Stop() +} diff --git a/t/devtests/tranfers_test.go b/t/devtests/tranfers_test.go new file mode 100644 index 000000000..097d5271f --- /dev/null +++ b/t/devtests/tranfers_test.go @@ -0,0 +1,114 @@ +package devtests + +import ( + "context" + "fmt" + "math/big" + "testing" + "time" + + "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/core/types" + "github.com/status-im/status-go/account" + "github.com/status-im/status-go/services/wallet" + "github.com/status-im/status-go/t/utils" + "github.com/stretchr/testify/suite" +) + +func TestTransfersSuite(t *testing.T) { + suite.Run(t, new(TransfersSuite)) +} + +type TransfersSuite struct { + DevNodeSuite + + Password string + Info account.Info + Address common.Address +} + +func (s *TransfersSuite) SelectAccount() { + s.Require().NoError(s.backend.SelectAccount(s.Info.WalletAddress, s.Info.ChatAddress, s.Password)) + _, err := s.backend.AccountManager().SelectedWalletAccount() + s.Require().NoError(err) +} + +func (s *TransfersSuite) SetupTest() { + s.DevNodeSuite.SetupTest() + s.Password = "test" + info, _, err := s.backend.AccountManager().CreateAccount(s.Password) + s.Require().NoError(err) + s.Info = info + s.Address = common.HexToAddress(info.WalletAddress) +} + +func (s *TransfersSuite) TearDownTest() { + s.Require().NoError(s.backend.Logout()) + s.DevNodeSuite.TearDownTest() +} + +func (s *TransfersSuite) getAllTranfers() (rst []wallet.Transfer, err error) { + return rst, s.Local.Call(&rst, "wallet_getTransfersByAddress", s.Address, (*hexutil.Big)(big.NewInt(0))) +} + +func (s *TransfersSuite) sendTx(nonce uint64, to common.Address) { + tx := types.NewTransaction(nonce, to, big.NewInt(1e18), 1e6, big.NewInt(10), nil) + // TODO move signer to DevNodeSuite + tx, err := types.SignTx(tx, types.NewEIP155Signer(big.NewInt(1337)), s.DevAccount) + s.Require().NoError(err) + s.Require().NoError(s.Eth.SendTransaction(context.Background(), tx)) + timeout, cancel := context.WithTimeout(context.Background(), 5*time.Second) + _, err = bind.WaitMined(timeout, s.Eth, tx) + cancel() + s.Require().NoError(err) +} + +func (s *TransfersSuite) TestNewTransfers() { + s.SelectAccount() + s.sendTx(0, s.Address) + s.Require().NoError(utils.Eventually(func() error { + all, err := s.getAllTranfers() + if err != nil { + return err + } + if len(all) != 1 { + return fmt.Errorf("waiting for one transfer") + } + return nil + }, 20*time.Second, 1*time.Second)) + + go func() { + for i := 1; i < 10; i++ { + s.sendTx(uint64(i), s.Address) + } + }() + s.Require().NoError(utils.Eventually(func() error { + all, err := s.getAllTranfers() + if err != nil { + return err + } + if len(all) != 10 { + return fmt.Errorf("waiting for 10 transfers") + } + return nil + }, 30*time.Second, 1*time.Second)) +} + +func (s *TransfersSuite) TestHistoricalTransfers() { + for i := 0; i < 30; i++ { + s.sendTx(uint64(i), s.Address) + } + s.SelectAccount() + s.Require().NoError(utils.Eventually(func() error { + all, err := s.getAllTranfers() + if err != nil { + return err + } + if len(all) >= 30 { + return fmt.Errorf("waiting for atleast 30 transfers") + } + return nil + }, 30*time.Second, 1*time.Second)) +}