Download transfers starting from latest block header (#1467)

This commit is contained in:
Dmitry Shulyak 2019-06-14 13:16:30 +03:00 committed by GitHub
parent 804a109b26
commit 047c9b5263
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
40 changed files with 4018 additions and 88 deletions

View File

@ -6,6 +6,7 @@ run:
skip-dirs: skip-dirs:
- static - static
skip-files: skip-files:
- bindata.go
- .*_mock.go - .*_mock.go
- jail/doc.go - jail/doc.go

View File

@ -6,6 +6,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"math/big" "math/big"
"path"
"sync" "sync"
"time" "time"
@ -474,7 +475,19 @@ func (b *StatusBackend) Logout() error {
default: default:
return err return err
} }
if b.statusNode.Config().WalletConfig.Enabled {
wallet, err := b.statusNode.WalletService()
switch err {
case node.ErrServiceUnknown:
case nil:
err = wallet.StopReactor()
if err != nil {
return err
}
default:
return err
}
}
b.AccountManager().Logout() b.AccountManager().Logout()
return nil return nil
@ -497,7 +510,6 @@ func (b *StatusBackend) reSelectAccount() error {
default: default:
return err return err
} }
return nil return nil
} }
@ -539,20 +551,38 @@ func (b *StatusBackend) SelectAccount(walletAddress, chatAddress, password strin
return err return err
} }
} }
return b.startWallet(password)
}
return nil func (b *StatusBackend) startWallet(password string) error {
if !b.statusNode.Config().WalletConfig.Enabled {
return nil
}
wallet, err := b.statusNode.WalletService()
if err != nil {
return err
}
account, err := b.accountManager.SelectedWalletAccount()
if err != nil {
return err
}
path := path.Join(b.statusNode.Config().DataDir, fmt.Sprintf("wallet-%x.sql", account.Address))
return wallet.StartReactor(path, password,
b.statusNode.RPCClient().Ethclient(),
[]common.Address{account.Address},
new(big.Int).SetUint64(b.statusNode.Config().NetworkID))
} }
// SendDataNotification sends data push notifications to users. // SendDataNotification sends data push notifications to users.
// dataPayloadJSON is a JSON string that looks like this: // dataPayloadJSON is a JSON string that looks like this:
// { // {
// "data": { // "data": {
// "msg-v2": { // "msg-v2": {
// "from": "0x2cea3bd5", // hash of sender (first 10 characters/4 bytes of sha3 hash) // "from": "0x2cea3bd5", // hash of sender (first 10 characters/4 bytes of sha3 hash)
// "to": "0xb1f89744", // hash of recipient (first 10 characters/4 bytes of sha3 hash) // "to": "0xb1f89744", // hash of recipient (first 10 characters/4 bytes of sha3 hash)
// "id": "0x872653ad", // message ID hash (first 10 characters/4 bytes of sha3 hash) // "id": "0x872653ad", // message ID hash (first 10 characters/4 bytes of sha3 hash)
// } // }
// } // }
// } // }
func (b *StatusBackend) SendDataNotification(dataPayloadJSON string, tokens ...string) error { func (b *StatusBackend) SendDataNotification(dataPayloadJSON string, tokens ...string) error {
log.Debug("sending data push notification") log.Debug("sending data push notification")

View File

@ -29,6 +29,7 @@ import (
"github.com/status-im/status-go/services/personal" "github.com/status-im/status-go/services/personal"
"github.com/status-im/status-go/services/shhext" "github.com/status-im/status-go/services/shhext"
"github.com/status-im/status-go/services/status" "github.com/status-im/status-go/services/status"
"github.com/status-im/status-go/services/wallet"
"github.com/status-im/status-go/static" "github.com/status-im/status-go/static"
"github.com/status-im/status-go/timesource" "github.com/status-im/status-go/timesource"
whisper "github.com/status-im/whisper/whisperv6" whisper "github.com/status-im/whisper/whisperv6"
@ -45,6 +46,7 @@ var (
ErrStatusServiceRegistrationFailure = errors.New("failed to register the Status service") ErrStatusServiceRegistrationFailure = errors.New("failed to register the Status service")
ErrPeerServiceRegistrationFailure = errors.New("failed to register the Peer service") ErrPeerServiceRegistrationFailure = errors.New("failed to register the Peer service")
ErrIncentivisationServiceRegistrationFailure = errors.New("failed to register the Incentivisation service") ErrIncentivisationServiceRegistrationFailure = errors.New("failed to register the Incentivisation service")
ErrWalletServiceRegistrationFailure = errors.New("failed to register the Wallet service")
) )
// All general log messages in this package should be routed through this logger. // All general log messages in this package should be routed through this logger.
@ -118,6 +120,10 @@ func MakeNode(config *params.NodeConfig, db *leveldb.DB) (*node.Node, error) {
return nil, fmt.Errorf("%v: %v", ErrPeerServiceRegistrationFailure, err) return nil, fmt.Errorf("%v: %v", ErrPeerServiceRegistrationFailure, err)
} }
if err := activateWalletService(stack, config.WalletConfig); err != nil {
return nil, fmt.Errorf("%v: %v", ErrWalletServiceRegistrationFailure, err)
}
return stack, nil return stack, nil
} }
@ -269,6 +275,16 @@ func activatePeerService(stack *node.Node) error {
}) })
} }
func activateWalletService(stack *node.Node, config params.WalletConfig) error {
if !config.Enabled {
logger.Info("service.Wallet is disabled")
return nil
}
return stack.Register(func(*node.ServiceContext) (node.Service, error) {
return wallet.NewService(), nil
})
}
func registerMailServer(whisperService *whisper.Whisper, config *params.WhisperConfig) (err error) { func registerMailServer(whisperService *whisper.Whisper, config *params.WhisperConfig) (err error) {
var mailServer mailserver.WMailServer var mailServer mailserver.WMailServer
whisperService.RegisterServer(&mailServer) whisperService.RegisterServer(&mailServer)

View File

@ -32,6 +32,7 @@ import (
"github.com/status-im/status-go/services/peer" "github.com/status-im/status-go/services/peer"
"github.com/status-im/status-go/services/shhext" "github.com/status-im/status-go/services/shhext"
"github.com/status-im/status-go/services/status" "github.com/status-im/status-go/services/status"
"github.com/status-im/status-go/services/wallet"
) )
// tickerResolution is the delta to check blockchain sync progress. // tickerResolution is the delta to check blockchain sync progress.
@ -590,6 +591,17 @@ func (n *StatusNode) ShhExtService() (s *shhext.Service, err error) {
return return
} }
// WalletService returns wallet.Service instance if it is started.
func (n *StatusNode) WalletService() (s *wallet.Service, err error) {
n.mu.RLock()
defer n.mu.RUnlock()
err = n.gethService(&s)
if err == node.ErrServiceUnknown {
err = ErrServiceUnknown
}
return
}
// AccountManager exposes reference to node's accounts manager // AccountManager exposes reference to node's accounts manager
func (n *StatusNode) AccountManager() (*accounts.Manager, error) { func (n *StatusNode) AccountManager() (*accounts.Manager, error) {
n.mu.RLock() n.mu.RLock()

View File

@ -340,9 +340,12 @@ type NodeConfig struct {
// IncentivisationConfig extra configuration for incentivisation service // IncentivisationConfig extra configuration for incentivisation service
IncentivisationConfig IncentivisationConfig `json:"IncentivisationConfig," validate:"structonly"` IncentivisationConfig IncentivisationConfig `json:"IncentivisationConfig," validate:"structonly"`
// ShhextConfig keeps configuration for service running under shhext namespace. // ShhextConfig extra configuration for service running under shhext namespace.
ShhextConfig ShhextConfig `json:"ShhextConfig," validate:"structonly"` ShhextConfig ShhextConfig `json:"ShhextConfig," validate:"structonly"`
// WalletConfig extra configuration for wallet.Service.
WalletConfig WalletConfig
// SwarmConfig extra configuration for Swarm and ENS // SwarmConfig extra configuration for Swarm and ENS
SwarmConfig SwarmConfig `json:"SwarmConfig," validate:"structonly"` SwarmConfig SwarmConfig `json:"SwarmConfig," validate:"structonly"`
@ -358,6 +361,11 @@ type NodeConfig struct {
MailServerRegistryAddress string MailServerRegistryAddress string
} }
// WalletConfig extra configuration for wallet.Service.
type WalletConfig struct {
Enabled bool
}
// ShhextConfig defines options used by shhext service. // ShhextConfig defines options used by shhext service.
type ShhextConfig struct { type ShhextConfig struct {
PFSEnabled bool PFSEnabled bool
@ -523,7 +531,7 @@ func NewNodeConfig(dataDir string, networkID uint64) (*NodeConfig, error) {
HTTPPort: 8545, HTTPPort: 8545,
HTTPVirtualHosts: []string{"localhost"}, HTTPVirtualHosts: []string{"localhost"},
ListenAddr: ":0", ListenAddr: ":0",
APIModules: "eth,net,web3,peer", APIModules: "eth,net,web3,peer,wallet",
MaxPeers: 25, MaxPeers: 25,
MaxPendingPeers: 0, MaxPendingPeers: 0,
IPCFile: "geth.ipc", IPCFile: "geth.ipc",

View File

@ -9,6 +9,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
gethrpc "github.com/ethereum/go-ethereum/rpc" gethrpc "github.com/ethereum/go-ethereum/rpc"
"github.com/status-im/status-go/params" "github.com/status-im/status-go/params"
@ -74,6 +75,14 @@ func NewClient(client *gethrpc.Client, upstream params.UpstreamRPCConfig) (*Clie
return &c, nil return &c, nil
} }
// Ethclient returns ethclient.Client with upstream or local client.
func (c *Client) Ethclient() *ethclient.Client {
if c.upstreamEnabled {
return ethclient.NewClient(c.upstream)
}
return ethclient.NewClient(c.local)
}
// UpdateUpstreamURL changes the upstream RPC client URL, if the upstream is enabled. // UpdateUpstreamURL changes the upstream RPC client URL, if the upstream is enabled.
func (c *Client) UpdateUpstreamURL(url string) error { func (c *Client) UpdateUpstreamURL(url string) error {
if c.upstream == nil { if c.upstream == nil {

199
services/wallet/README.md Normal file
View File

@ -0,0 +1,199 @@
Wallet
==========
Wallet service starts a loop that watches for new transfers (eth and erc20).
To correctly start the service two values need to be changed in the config:
1. Set Enable to true in WalletConfig
```json
{
"WalletConfig": {
"Enabled": true,
}
}
```
2. And expose wallet API with APIModules
```
{
APIModules: "eth,net,web3,peer,wallet",
}
```
API
----------
#### wallet_getTransfers
Returns avaiable transfers in a given range.
##### Parameters
- `start`: `BIGINT` - start of the range
- `end`: `BIGINT` - end of the range. if nil query will return all transfers from start.
##### Examples
```json
{"jsonrpc":"2.0","id":14,"method":"wallet_getTransfers","params":[0,20]}
{"jsonrpc":"2.0","id":14,"method":"wallet_getTransfers","params":[0,null]}
{"jsonrpc":"2.0","id":13,"method":"wallet_getTransfers","params":[0]}
```
##### Returns
List of objects like:
```json
[
{
"type": "erc20",
"address": "0x5dc6108dc6296b052bbd33000553afe0ea576b5e",
"blockNumber": 5687981,
"blockhash": "0xcc4553f125be0bc6cc974518368145fcf1344f41e5de238205db0a1c185ea2fc",
"transaction": {
"nonce": "0x57",
"gasPrice": "0x3b9aca00",
"gas": "0x44ba8",
"to": "0xc55cf4b03948d7ebc8b9e8bad92643703811d162",
"value": "0x0",
"input": "0xcae9ca5100000000000000000000000039d16cdb56b5a6a89e1a397a13fe48034694316e0000000000000000000000000000000000000000000000015af1d78b58c40000000000000000000000000000000000000000000000000000000000000000006000000000000000000000000000000000000000000000000000000000000000449134709e00000000000000000000000000000000000000000000000000000000000000010000000000000000000000005dc6108dc6296b052bbd33000553afe0ea576b5e00000000000000000000000000000000000000000000000000000000",
"v": "0x29",
"r": "0x124587e9c1d16d8bd02fda1221aefbfca8e2f4cd6300ed2077ebf736789179ab",
"s": "0x4309fddc1226dacb877488221a439c4f97d77dc2c3f5c8ea51f34f42417d3bda",
"hash": "0x259dd45c9c4d52137f32b7787e6e1fb6c9faf70ba40b8137bf66ba03abc0da00"
},
"receipt": {
"root": "0x",
"status": "0x1",
"cumulativeGasUsed": "0x389e1e",
"logsBloom": "0x00000000000000000000000000000000000000000000000000000000000200000000020000000000000000000000000000000000004000000000000000200000000000000020000000000008000000000000000000000000000000000000000000000000020000000000002000000800000000100000000000000010000000000000000000400000000000000001000000000040000000400000000400000000020000000000000008000000000020000000010000000002000000000000020000000002000000000000000000000000000000000200000000000000000020000010000000000000000000000400000000000000000000000000000000000000",
"logs": [
{
"address": "0xc55cf4b03948d7ebc8b9e8bad92643703811d162",
"topics": [
"0x8c5be1e5ebec7d5bd14f71427d1e84f3dd0314c0f7b2291e5b200ac8c7c3b925",
"0x0000000000000000000000005dc6108dc6296b052bbd33000553afe0ea576b5e",
"0x00000000000000000000000039d16cdb56b5a6a89e1a397a13fe48034694316e"
],
"data": "0x0000000000000000000000000000000000000000000000015af1d78b58c40000",
"blockNumber": "0x56caad",
"transactionHash": "0x259dd45c9c4d52137f32b7787e6e1fb6c9faf70ba40b8137bf66ba03abc0da00",
"transactionIndex": "0x10",
"blockHash": "0xcc4553f125be0bc6cc974518368145fcf1344f41e5de238205db0a1c185ea2fc",
"logIndex": "0xd",
"removed": false
},
{
"address": "0xc55cf4b03948d7ebc8b9e8bad92643703811d162",
"topics": [
"0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef",
"0x0000000000000000000000005dc6108dc6296b052bbd33000553afe0ea576b5e",
"0x000000000000000000000000ee55b1661fd24c4760d92026cedb252a5a0f2a4e"
],
"data": "0x0000000000000000000000000000000000000000000000015af1d78b58c40000",
"blockNumber": "0x56caad",
"transactionHash": "0x259dd45c9c4d52137f32b7787e6e1fb6c9faf70ba40b8137bf66ba03abc0da00",
"transactionIndex": "0x10",
"blockHash": "0xcc4553f125be0bc6cc974518368145fcf1344f41e5de238205db0a1c185ea2fc",
"logIndex": "0xe",
"removed": false
},
{
"address": "0x39d16cdb56b5a6a89e1a397a13fe48034694316e",
"topics": [
"0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef",
"0x0000000000000000000000000000000000000000000000000000000000000000",
"0x0000000000000000000000005dc6108dc6296b052bbd33000553afe0ea576b5e",
"0x0000000000000000000000000000000000000000000000000000000000000044"
],
"data": "0x",
"blockNumber": "0x56caad",
"transactionHash": "0x259dd45c9c4d52137f32b7787e6e1fb6c9faf70ba40b8137bf66ba03abc0da00",
"transactionIndex": "0x10",
"blockHash": "0xcc4553f125be0bc6cc974518368145fcf1344f41e5de238205db0a1c185ea2fc",
"logIndex": "0xf",
"removed": false
}
],
"transactionHash": "0x259dd45c9c4d52137f32b7787e6e1fb6c9faf70ba40b8137bf66ba03abc0da00",
"contractAddress": "0x0000000000000000000000000000000000000000",
"gasUsed": "0x34f42"
}
}
]
```
##### Examples
```json
{"jsonrpc":"2.0","id":14,"method":"wallet_getTransfers","params":[0,20]}
{"jsonrpc":"2.0","id":14,"method":"wallet_getTransfers","params":[0,null]}
{"jsonrpc":"2.0","id":13,"method":"wallet_getTransfers","params":[0]}
```
#### wallet_getTransfersByAddress
Returns avaiable transfers in a given range.
##### Parameters
- `address`: `HEX` - ethereum address encoded in hex
- `start`: `BIGINT` - start of the range
- `end`: `BIGINT` - end of the range. if nil query will return all transfers from start.
##### Examples
```json
{"jsonrpc":"2.0","id":7,"method":"wallet_getTransfersByAddress","params":["0xb81a6845649fa8c042dfaceb3f7a684873406993","0x0"]}
```
##### Returns
Objects in the same format.
Signals
-------
Two signals can be emitted:
1. `newblock` signal
Emitted when transfers from new block were added to the database. In this case block number if the number of this new block.
Client expected to request transfers starting from received block.
```json
{
"type": "wallet",
"event": {
"type": "newblock",
"blockNumber": 0,
"accounts": [
"0x42c8f505b4006d417dd4e0ba0e880692986adbd8",
"0x3129mdasmeo132128391fml1130410k312312mll"
]
}
}
```
2. `reorg` signal.
Emitted when part of blocks were removed. Starting from a given block number all transfers were removed.
Client expected to request new transfers from received block and replace transfers that were received previously.
```json
{
"type": "wallet",
"event": {
"type": "reorg",
"blockNumber": 0,
"accounts": [
"0x42c8f505b4006d417dd4e0ba0e880692986adbd8"
]
}
}
```

56
services/wallet/api.go Normal file
View File

@ -0,0 +1,56 @@
package wallet
import (
"context"
"errors"
"math/big"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/log"
)
func NewAPI(s *Service) *API {
return &API{s}
}
// API is class with methods available over RPC.
type API struct {
s *Service
}
// GetTransfers returns transfers in range of blocks. If `end` is nil all transfers from `start` will be returned.
// TODO(dshulyak) benchmark loading many transfers from database. We can avoid json unmarshal/marshal if we will
// read header, tx and receipt as a raw json.
func (api *API) GetTransfers(ctx context.Context, start, end *hexutil.Big) ([]Transfer, error) {
log.Debug("call to get transfers", "start", start, "end", end)
if start == nil {
return nil, errors.New("start of the query must be provided. use 0 if you want to load all transfers")
}
if api.s.db == nil {
return nil, errors.New("wallet service is not initialized")
}
rst, err := api.s.db.GetTransfers((*big.Int)(start), (*big.Int)(end))
if err != nil {
return nil, err
}
log.Debug("result from database for transfers", "start", start, "end", end, "len", len(rst))
return rst, nil
}
// GetTransfersByAddress returns transfers for a single address between two blocks.
func (api *API) GetTransfersByAddress(ctx context.Context, address common.Address, start, end *hexutil.Big) ([]Transfer, error) {
log.Debug("call to get transfers for an address", "address", address, "start", start, "end", end)
if start == nil {
return nil, errors.New("start of the query must be provided. use 0 if you want to load all transfers")
}
if api.s.db == nil {
return nil, errors.New("wallet service is not initialized")
}
rst, err := api.s.db.GetTransfersByAddress(address, (*big.Int)(start), (*big.Int)(end))
if err != nil {
return nil, err
}
log.Debug("result from database for address", "address", address, "start", start, "end", end, "len", len(rst))
return rst, nil
}

152
services/wallet/async.go Normal file
View File

@ -0,0 +1,152 @@
package wallet
import (
"context"
"sync"
"time"
)
type Command func(context.Context) error
// FiniteCommand terminates when error is nil.
type FiniteCommand struct {
Interval time.Duration
Runable func(context.Context) error
}
func (c FiniteCommand) Run(ctx context.Context) error {
ticker := time.NewTicker(c.Interval)
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
err := c.Runable(ctx)
if err == nil {
return nil
}
}
}
}
// InfiniteCommand runs until context is closed.
type InfiniteCommand struct {
Interval time.Duration
Runable func(context.Context) error
}
func (c InfiniteCommand) Run(ctx context.Context) error {
ticker := time.NewTicker(c.Interval)
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
_ = c.Runable(ctx)
}
}
}
func NewGroup(parent context.Context) *Group {
ctx, cancel := context.WithCancel(parent)
return &Group{
ctx: ctx,
cancel: cancel,
}
}
type Group struct {
ctx context.Context
cancel func()
wg sync.WaitGroup
}
func (g *Group) Add(cmd Command) {
g.wg.Add(1)
go func() {
_ = cmd(g.ctx)
g.wg.Done()
}()
}
func (g *Group) Stop() {
g.cancel()
}
func (g *Group) Wait() {
g.wg.Wait()
}
func (g *Group) WaitAsync() <-chan struct{} {
ch := make(chan struct{})
go func() {
g.Wait()
close(ch)
}()
return ch
}
func NewAtomicGroup(parent context.Context) *AtomicGroup {
ctx, cancel := context.WithCancel(parent)
return &AtomicGroup{ctx: ctx, cancel: cancel}
}
// AtomicGroup terminates as soon as first goroutine terminates..
type AtomicGroup struct {
ctx context.Context
cancel func()
wg sync.WaitGroup
mu sync.Mutex
error error
}
// Go spawns function in a goroutine and stores results or errors.
func (d *AtomicGroup) Add(cmd Command) {
d.wg.Add(1)
go func() {
defer d.wg.Done()
err := cmd(d.ctx)
d.mu.Lock()
defer d.mu.Unlock()
if err != nil {
// do not overwrite original error by context errors
if d.error != nil {
return
}
d.error = err
d.cancel()
return
}
}()
}
// Wait for all downloaders to finish.
func (d *AtomicGroup) Wait() {
d.wg.Wait()
if d.Error() == nil {
d.mu.Lock()
defer d.mu.Unlock()
d.cancel()
}
}
func (d *AtomicGroup) WaitAsync() <-chan struct{} {
ch := make(chan struct{})
go func() {
d.Wait()
close(ch)
}()
return ch
}
// Error stores an error that was reported by any of the downloader. Should be called after Wait.
func (d *AtomicGroup) Error() error {
d.mu.Lock()
defer d.mu.Unlock()
return d.error
}
func (d *AtomicGroup) Stop() {
d.cancel()
}

462
services/wallet/commands.go Normal file
View File

@ -0,0 +1,462 @@
package wallet
import (
"context"
"errors"
"math/big"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
)
type ethHistoricalCommand struct {
db *Database
eth TransferDownloader
address common.Address
client reactorClient
feed *event.Feed
from, to *big.Int
}
func (c *ethHistoricalCommand) Command() Command {
return FiniteCommand{
Interval: 5 * time.Second,
Runable: c.Run,
}.Run
}
func (c *ethHistoricalCommand) Run(ctx context.Context) (err error) {
if c.from == nil {
from, err := c.db.GetLatestSynced(c.address, ethSync)
if err != nil {
return err
}
if from == nil {
c.from = zero
} else {
c.from = from.Number
}
log.Debug("initialized downloader for eth historical transfers", "address", c.address, "starting at", c.from, "up to", c.to)
}
ctx, cancel := context.WithTimeout(ctx, 10*time.Minute)
defer cancel()
concurrent := NewConcurrentDownloader(ctx)
start := time.Now()
downloadEthConcurrently(concurrent, c.client, c.eth, c.address, c.from, c.to)
select {
case <-concurrent.WaitAsync():
case <-ctx.Done():
log.Error("eth downloader is stuck")
return errors.New("eth downloader is stuck")
}
if concurrent.Error() != nil {
log.Error("failed to dowload transfers using concurrent downloader", "error", err)
return concurrent.Error()
}
transfers := concurrent.Get()
log.Info("eth historical downloader finished successfully", "total transfers", len(transfers), "time", time.Since(start))
err = c.db.ProcessTranfers(transfers, []common.Address{c.address}, headersFromTransfers(transfers), nil, ethSync)
if err != nil {
log.Error("failed to save downloaded erc20 transfers", "error", err)
return err
}
if len(transfers) > 0 {
// we download all or nothing
c.feed.Send(Event{
Type: EventNewBlock,
BlockNumber: c.from,
Accounts: []common.Address{c.address},
})
}
log.Debug("eth transfers were persisted. command is closed")
return nil
}
type erc20HistoricalCommand struct {
db *Database
erc20 BatchDownloader
address common.Address
client reactorClient
feed *event.Feed
iterator *IterativeDownloader
to *DBHeader
}
func (c *erc20HistoricalCommand) Command() Command {
return FiniteCommand{
Interval: 5 * time.Second,
Runable: c.Run,
}.Run
}
func (c *erc20HistoricalCommand) Run(ctx context.Context) (err error) {
if c.iterator == nil {
c.iterator, err = SetupIterativeDownloader(
c.db, c.client, c.address, erc20Sync,
c.erc20, erc20BatchSize, c.to)
if err != nil {
log.Error("failed to setup historical downloader for erc20")
return err
}
}
for !c.iterator.Finished() {
start := time.Now()
transfers, err := c.iterator.Next(ctx)
if err != nil {
log.Error("failed to get next batch", "error", err)
break
}
headers := headersFromTransfers(transfers)
headers = append(headers, c.iterator.Header())
err = c.db.ProcessTranfers(transfers, []common.Address{c.address}, headers, nil, erc20Sync)
if err != nil {
c.iterator.Revert()
log.Error("failed to save downloaded erc20 transfers", "error", err)
return err
}
if len(transfers) > 0 {
log.Debug("erc20 downloader imported transfers", "len", len(transfers), "time", time.Since(start))
c.feed.Send(Event{
Type: EventNewBlock,
BlockNumber: c.iterator.Header().Number,
Accounts: []common.Address{c.address},
})
}
}
log.Info("wallet historical downloader for erc20 transfers finished")
return nil
}
type newBlocksTransfersCommand struct {
db *Database
accounts []common.Address
chain *big.Int
erc20 *ERC20TransfersDownloader
eth *ETHTransferDownloader
client reactorClient
feed *event.Feed
from, to *DBHeader
}
func (c *newBlocksTransfersCommand) Command() Command {
// if both blocks are specified we will use this command to verify that lastly synced blocks are still
// in canonical chain
if c.to != nil && c.from != nil {
return FiniteCommand{
Interval: 5 * time.Second,
Runable: c.Verify,
}.Run
}
return InfiniteCommand{
Interval: pollingPeriodByChain(c.chain),
Runable: c.Run,
}.Run
}
func (c *newBlocksTransfersCommand) Verify(parent context.Context) (err error) {
if c.to == nil || c.from == nil {
return errors.New("`from` and `to` blocks must be specified")
}
for c.from.Number.Cmp(c.to.Number) != 0 {
err = c.Run(parent)
if err != nil {
return err
}
}
return nil
}
func (c *newBlocksTransfersCommand) Run(parent context.Context) (err error) {
if c.from == nil {
ctx, cancel := context.WithTimeout(parent, 3*time.Second)
from, err := c.client.HeaderByNumber(ctx, nil)
cancel()
if err != nil {
log.Error("failed to get last known header", "error", err)
return err
}
c.from = toDBHeader(from)
log.Debug("initialized downloader for new blocks transfers", "starting at", c.from.Number)
}
num := new(big.Int).Add(c.from.Number, one)
ctx, cancel := context.WithTimeout(parent, 5*time.Second)
latest, err := c.client.HeaderByNumber(ctx, num)
cancel()
if err != nil {
log.Warn("failed to get latest block", "number", num, "error", err)
return err
}
log.Debug("reactor received new block", "header", latest.Hash())
ctx, cancel = context.WithTimeout(parent, 10*time.Second)
added, removed, err := c.onNewBlock(ctx, c.from, latest)
cancel()
if err != nil {
log.Error("failed to process new header", "header", latest, "error", err)
return err
}
if len(added) == 0 && len(removed) == 0 {
log.Debug("new block already in the database", "block", latest.Number)
return nil
}
// for each added block get tranfers from downloaders
all := []Transfer{}
for i := range added {
log.Debug("reactor get transfers", "block", added[i].Hash, "number", added[i].Number)
transfers, err := c.getTransfers(parent, added[i])
if err != nil {
log.Error("failed to get transfers", "header", added[i].Hash, "error", err)
continue
}
log.Debug("reactor adding transfers", "block", added[i].Hash, "number", added[i].Number, "len", len(transfers))
all = append(all, transfers...)
}
err = c.db.ProcessTranfers(all, c.accounts, added, removed, erc20Sync|ethSync)
if err != nil {
log.Error("failed to persist transfers", "error", err)
return err
}
c.from = toDBHeader(latest)
if len(added) == 1 && len(removed) == 0 {
c.feed.Send(Event{
Type: EventNewBlock,
BlockNumber: added[0].Number,
Accounts: uniqueAccountsFromTransfers(all),
})
}
if len(removed) != 0 {
lth := len(removed)
c.feed.Send(Event{
Type: EventReorg,
BlockNumber: removed[lth-1].Number,
Accounts: uniqueAccountsFromTransfers(all),
})
}
return nil
}
func (c *newBlocksTransfersCommand) onNewBlock(ctx context.Context, from *DBHeader, latest *types.Header) (added, removed []*DBHeader, err error) {
if from == nil {
// first node in the cache
return []*DBHeader{toHead(latest)}, nil, nil
}
if from.Hash == latest.ParentHash {
// parent matching from node in the cache. on the same chain.
return []*DBHeader{toHead(latest)}, nil, nil
}
exists, err := c.db.HeaderExists(latest.Hash())
if err != nil {
return nil, nil, err
}
if exists {
return nil, nil, nil
}
log.Debug("wallet reactor spotted reorg", "last header in db", from.Hash, "new parent", latest.ParentHash)
for from != nil && from.Hash != latest.ParentHash {
removed = append(removed, from)
added = append(added, toHead(latest))
latest, err = c.client.HeaderByHash(ctx, latest.ParentHash)
if err != nil {
return nil, nil, err
}
from, err = c.db.GetHeaderByNumber(new(big.Int).Sub(latest.Number, one))
if err != nil {
return nil, nil, err
}
}
added = append(added, toHead(latest))
return added, removed, nil
}
func (c *newBlocksTransfersCommand) getTransfers(parent context.Context, header *DBHeader) ([]Transfer, error) {
ctx, cancel := context.WithTimeout(parent, 5*time.Second)
ethT, err := c.eth.GetTransfers(ctx, header)
cancel()
if err != nil {
return nil, err
}
ctx, cancel = context.WithTimeout(parent, 5*time.Second)
erc20T, err := c.erc20.GetTransfers(ctx, header)
cancel()
if err != nil {
return nil, err
}
return append(ethT, erc20T...), nil
}
// controlCommand implements following procedure (following parts are executed sequeantially):
// - verifies that the last header that was synced is still in the canonical chain
// - runs fast indexing for each account separately
// - starts listening to new blocks and watches for reorgs
type controlCommand struct {
accounts []common.Address
db *Database
eth *ETHTransferDownloader
erc20 *ERC20TransfersDownloader
chain *big.Int
client *ethclient.Client
feed *event.Feed
safetyDepth *big.Int
}
// run fast indexing for every accont up to canonical chain head minus safety depth.
// every account will run it from last synced header.
func (c *controlCommand) fastIndex(ctx context.Context, to *DBHeader) error {
start := time.Now()
group := NewGroup(ctx)
for _, address := range c.accounts {
erc20 := &erc20HistoricalCommand{
db: c.db,
erc20: NewERC20TransfersDownloader(c.client, []common.Address{address}),
client: c.client,
feed: c.feed,
address: address,
to: to,
}
group.Add(erc20.Command())
eth := &ethHistoricalCommand{
db: c.db,
client: c.client,
address: address,
eth: &ETHTransferDownloader{
client: c.client,
accounts: []common.Address{address},
signer: types.NewEIP155Signer(c.chain),
},
feed: c.feed,
to: to.Number,
}
group.Add(eth.Command())
}
select {
case <-ctx.Done():
return ctx.Err()
case <-group.WaitAsync():
log.Debug("fast indexer finished", "in", time.Since(start))
return nil
}
}
// verifyLastSynced verifies that last header that was added to the database is still in the canonical chain.
// it is done by downloading configured number of parents for the last header in the db.
func (c *controlCommand) verifyLastSynced(parent context.Context, last *DBHeader, head *types.Header) error {
log.Debug("verifying that previous header is still in canonical chan", "from", last.Number, "chain head", head.Number)
if new(big.Int).Sub(head.Number, last.Number).Cmp(c.safetyDepth) <= 0 {
log.Debug("no need to verify. last block is close enough to chain head")
return nil
}
ctx, cancel := context.WithTimeout(parent, 3*time.Second)
header, err := c.client.HeaderByNumber(ctx, new(big.Int).Add(last.Number, c.safetyDepth))
cancel()
if err != nil {
return err
}
log.Debug("spawn reorg verifier", "from", last.Number, "to", header.Number)
// TODO(dshulyak) make a standalone command that
// doesn't manage transfers and has an upper limit
cmd := &newBlocksTransfersCommand{
db: c.db,
chain: c.chain,
client: c.client,
eth: c.eth,
erc20: c.erc20,
feed: c.feed,
from: last,
to: toDBHeader(header),
}
return cmd.Command()(parent)
}
func (c *controlCommand) Run(parent context.Context) error {
log.Debug("start control command")
ctx, cancel := context.WithTimeout(parent, 3*time.Second)
head, err := c.client.HeaderByNumber(ctx, nil)
cancel()
if err != nil {
return err
}
log.Debug("current head is", "block number", head.Number)
last, err := c.db.GetLastHead()
if err != nil {
log.Error("failed to load last head from database", "error", err)
return err
}
if last != nil {
err = c.verifyLastSynced(parent, last, head)
if err != nil {
log.Error("failed verification for last header in canonical chain", "error", err)
return err
}
}
target := new(big.Int).Sub(head.Number, c.safetyDepth)
if target.Cmp(zero) <= 0 {
target = zero
}
ctx, cancel = context.WithTimeout(parent, 3*time.Second)
head, err = c.client.HeaderByNumber(ctx, target)
cancel()
if err != nil {
return err
}
log.Debug("run fast indexing for the transfers", "up to", head.Number)
err = c.fastIndex(parent, toDBHeader(head))
if err != nil {
return err
}
log.Debug("watching new blocks", "start from", head.Number)
cmd := &newBlocksTransfersCommand{
db: c.db,
chain: c.chain,
client: c.client,
accounts: c.accounts,
eth: c.eth,
erc20: c.erc20,
feed: c.feed,
from: toDBHeader(head),
}
return cmd.Command()(parent)
}
func (c *controlCommand) Command() Command {
return FiniteCommand{
Interval: 5 * time.Second,
Runable: c.Run,
}.Run
}
func headersFromTransfers(transfers []Transfer) []*DBHeader {
byHash := map[common.Hash]struct{}{}
rst := []*DBHeader{}
for i := range transfers {
_, exists := byHash[transfers[i].BlockHash]
if exists {
continue
}
rst = append(rst, &DBHeader{
Hash: transfers[i].BlockHash,
Number: transfers[i].BlockNumber,
})
}
return rst
}
func uniqueAccountsFromTransfers(transfers []Transfer) []common.Address {
accounts := []common.Address{}
unique := map[common.Address]struct{}{}
for i := range transfers {
_, exist := unique[transfers[i].Address]
if exist {
continue
}
unique[transfers[i].Address] = struct{}{}
accounts = append(accounts, transfers[i].Address)
}
return accounts
}

View File

@ -0,0 +1,231 @@
package wallet
import (
"context"
"math/big"
"testing"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/event"
"github.com/status-im/status-go/t/devtests/testchain"
"github.com/stretchr/testify/suite"
)
func TestNewBlocksSuite(t *testing.T) {
suite.Run(t, new(NewBlocksSuite))
}
type NewBlocksSuite struct {
suite.Suite
backend *testchain.Backend
cmd *newBlocksTransfersCommand
address common.Address
db *Database
dbStop func()
feed *event.Feed
}
func (s *NewBlocksSuite) SetupTest() {
var err error
db, stop := setupTestDB(s.Suite.T())
s.db = db
s.dbStop = stop
s.backend, err = testchain.NewBackend()
s.Require().NoError(err)
account, err := crypto.GenerateKey()
s.Require().NoError(err)
s.address = crypto.PubkeyToAddress(account.PublicKey)
s.feed = &event.Feed{}
s.cmd = &newBlocksTransfersCommand{
db: s.db,
accounts: []common.Address{s.address},
erc20: NewERC20TransfersDownloader(s.backend.Client, []common.Address{s.address}),
eth: &ETHTransferDownloader{
client: s.backend.Client,
signer: s.backend.Signer,
accounts: []common.Address{s.address},
},
feed: s.feed,
client: s.backend.Client,
}
}
func (s *NewBlocksSuite) TearDownTest() {
s.dbStop()
s.Require().NoError(s.backend.Stop())
}
func (s *NewBlocksSuite) TestOneBlock() {
ctx := context.Background()
s.Require().EqualError(s.cmd.Run(ctx), "not found")
tx := types.NewTransaction(0, s.address, big.NewInt(1e17), 21000, big.NewInt(1), nil)
tx, err := types.SignTx(tx, s.backend.Signer, s.backend.Faucet)
s.Require().NoError(err)
blocks := s.backend.GenerateBlocks(1, 0, func(n int, gen *core.BlockGen) {
gen.AddTx(tx)
})
n, err := s.backend.Ethereum.BlockChain().InsertChain(blocks)
s.Require().Equal(1, n)
s.Require().NoError(err)
events := make(chan Event, 1)
sub := s.feed.Subscribe(events)
defer sub.Unsubscribe()
s.Require().NoError(s.cmd.Run(ctx))
select {
case ev := <-events:
s.Require().Equal(ev.Type, EventNewBlock)
s.Require().Equal(ev.BlockNumber, big.NewInt(1))
default:
s.Require().FailNow("event wasn't emitted")
}
transfers, err := s.db.GetTransfers(big.NewInt(0), nil)
s.Require().NoError(err)
s.Require().Len(transfers, 1)
s.Require().Equal(tx.Hash(), transfers[0].ID)
}
func (s *NewBlocksSuite) genTx(nonce int) *types.Transaction {
tx := types.NewTransaction(uint64(nonce), s.address, big.NewInt(1e10), 21000, big.NewInt(1), nil)
tx, err := types.SignTx(tx, s.backend.Signer, s.backend.Faucet)
s.Require().NoError(err)
return tx
}
func (s *NewBlocksSuite) runCmdUntilError(ctx context.Context) (err error) {
for err == nil {
err = s.cmd.Run(ctx)
}
return err
}
func (s *NewBlocksSuite) TestReorg() {
blocks := s.backend.GenerateBlocks(20, 0, nil)
n, err := s.backend.Ethereum.BlockChain().InsertChain(blocks)
s.Require().Equal(20, n)
s.Require().NoError(err)
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
s.Require().EqualError(s.runCmdUntilError(ctx), "not found")
blocks = s.backend.GenerateBlocks(3, 20, func(n int, gen *core.BlockGen) {
gen.AddTx(s.genTx(n))
})
n, err = s.backend.Ethereum.BlockChain().InsertChain(blocks)
s.Require().Equal(3, n)
s.Require().NoError(err)
// `not found` returned when we query head+1 block
ctx, cancel = context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
s.cmd.from = toDBHeader(s.backend.Ethereum.BlockChain().GetHeaderByNumber(15))
s.Require().EqualError(s.runCmdUntilError(ctx), "not found")
transfers, err := s.db.GetTransfers(big.NewInt(0), nil)
s.Require().NoError(err)
s.Require().Len(transfers, 3)
blocks = s.backend.GenerateBlocks(10, 15, func(n int, gen *core.BlockGen) {
gen.AddTx(s.genTx(n))
})
n, err = s.backend.Ethereum.BlockChain().InsertChain(blocks)
s.Require().Equal(10, n)
s.Require().NoError(err)
// it will be less but even if something went wrong we can't get more
events := make(chan Event, 10)
sub := s.feed.Subscribe(events)
defer sub.Unsubscribe()
ctx, cancel = context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
s.Require().EqualError(s.runCmdUntilError(ctx), "not found")
close(events)
expected := []Event{{Type: EventReorg, BlockNumber: big.NewInt(16)}, {Type: EventNewBlock, BlockNumber: big.NewInt(25)}}
i := 0
for ev := range events {
s.Require().Equal(expected[i].Type, ev.Type)
s.Require().Equal(expected[i].BlockNumber, ev.BlockNumber)
i++
}
transfers, err = s.db.GetTransfers(big.NewInt(0), nil)
s.Require().NoError(err)
s.Require().Len(transfers, 10)
}
func (s *NewBlocksSuite) downloadHistorical() {
blocks := s.backend.GenerateBlocks(40, 0, func(n int, gen *core.BlockGen) {
if n == 36 {
gen.AddTx(s.genTx(0))
} else if n == 39 {
gen.AddTx(s.genTx(1))
}
})
n, err := s.backend.Ethereum.BlockChain().InsertChain(blocks)
s.Require().Equal(40, n)
s.Require().NoError(err)
eth := &ethHistoricalCommand{
db: s.db,
eth: &ETHTransferDownloader{
client: s.backend.Client,
signer: s.backend.Signer,
accounts: []common.Address{s.address},
},
feed: s.feed,
address: s.address,
client: s.backend.Client,
to: s.backend.Ethereum.BlockChain().CurrentBlock().Number(),
}
s.Require().NoError(eth.Run(context.Background()), "eth historical command failed to sync transfers")
transfers, err := s.db.GetTransfers(big.NewInt(0), nil)
s.Require().NoError(err)
s.Require().Len(transfers, 2)
}
func (s *NewBlocksSuite) reorgHistorical() {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
s.Require().EqualError(s.runCmdUntilError(ctx), "not found")
blocks := s.backend.GenerateBlocks(10, 35, nil)
n, err := s.backend.Ethereum.BlockChain().InsertChain(blocks)
s.Require().Equal(10, n)
s.Require().NoError(err)
ctx, cancel = context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
s.Require().EqualError(s.runCmdUntilError(ctx), "not found")
}
func (s *NewBlocksSuite) TestSafetyBufferFailure() {
s.downloadHistorical()
s.reorgHistorical()
transfers, err := s.db.GetTransfers(big.NewInt(0), nil)
s.Require().NoError(err)
s.Require().Len(transfers, 1)
}
func (s *NewBlocksSuite) TestSafetyBufferSuccess() {
s.downloadHistorical()
safety := new(big.Int).Sub(s.backend.Ethereum.BlockChain().CurrentHeader().Number, big.NewInt(10))
s.cmd.from = toDBHeader(s.backend.Ethereum.BlockChain().GetHeaderByNumber(safety.Uint64()))
s.reorgHistorical()
transfers, err := s.db.GetTransfers(big.NewInt(0), nil)
s.Require().NoError(err)
s.Require().Len(transfers, 0)
}

View File

@ -0,0 +1,78 @@
package wallet
import (
"context"
"math/big"
"sync"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
)
// NewConcurrentDownloader creates ConcurrentDownloader instance.
func NewConcurrentDownloader(ctx context.Context) *ConcurrentDownloader {
runner := NewAtomicGroup(ctx)
result := &Result{}
return &ConcurrentDownloader{runner, result}
}
type ConcurrentDownloader struct {
*AtomicGroup
*Result
}
type Result struct {
mu sync.Mutex
transfers []Transfer
}
func (r *Result) Push(transfers ...Transfer) {
r.mu.Lock()
defer r.mu.Unlock()
r.transfers = append(r.transfers, transfers...)
}
func (r *Result) Get() []Transfer {
r.mu.Lock()
defer r.mu.Unlock()
rst := make([]Transfer, len(r.transfers))
copy(rst, r.transfers)
return rst
}
// TransferDownloader downloads transfers from single block using number.
type TransferDownloader interface {
GetTransfersByNumber(context.Context, *big.Int) ([]Transfer, error)
}
func downloadEthConcurrently(c *ConcurrentDownloader, client BalanceReader, downloader TransferDownloader, account common.Address, low, high *big.Int) {
c.Add(func(ctx context.Context) error {
log.Debug("eth transfers comparing blocks", "low", low, "high", high)
lb, err := client.BalanceAt(ctx, account, low)
if err != nil {
return err
}
hb, err := client.BalanceAt(ctx, account, high)
if err != nil {
return err
}
if lb.Cmp(hb) == 0 {
log.Debug("balances are equal", "low", low, "high", high)
return nil
}
if new(big.Int).Sub(high, low).Cmp(one) == 0 {
transfers, err := downloader.GetTransfersByNumber(ctx, high)
if err != nil {
return err
}
c.Push(transfers...)
return nil
}
mid := new(big.Int).Add(low, high)
mid = mid.Div(mid, two)
log.Debug("balances are not equal. spawn two concurrent downloaders", "low", low, "mid", mid, "high", high)
downloadEthConcurrently(c, client, downloader, account, low, mid)
downloadEthConcurrently(c, client, downloader, account, mid, high)
return nil
})
}

View File

@ -0,0 +1,126 @@
package wallet
import (
"context"
"errors"
"math/big"
"sort"
"testing"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/stretchr/testify/require"
)
func TestConcurrentErrorInterrupts(t *testing.T) {
concurrent := NewConcurrentDownloader(context.Background())
var interrupted bool
concurrent.Add(func(ctx context.Context) error {
select {
case <-ctx.Done():
interrupted = true
case <-time.After(10 * time.Second):
}
return nil
})
err := errors.New("interrupt")
concurrent.Add(func(ctx context.Context) error {
return err
})
concurrent.Wait()
require.True(t, interrupted)
require.Equal(t, err, concurrent.Error())
}
func TestConcurrentCollectsTransfers(t *testing.T) {
concurrent := NewConcurrentDownloader(context.Background())
concurrent.Add(func(context.Context) error {
concurrent.Push(Transfer{})
return nil
})
concurrent.Add(func(context.Context) error {
concurrent.Push(Transfer{})
return nil
})
concurrent.Wait()
require.Len(t, concurrent.Get(), 2)
}
type balancesFixture []*big.Int
func (f balancesFixture) BalanceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (*big.Int, error) {
index := int(blockNumber.Int64())
if index > len(f)-1 {
return nil, errors.New("balance unknown")
}
return f[index], nil
}
type batchesFixture [][]Transfer
func (f batchesFixture) GetTransfersByNumber(ctx context.Context, number *big.Int) (rst []Transfer, err error) {
index := int(number.Int64())
if index > len(f)-1 {
return nil, errors.New("unknown block")
}
return f[index], nil
}
func TestConcurrentEthDownloader(t *testing.T) {
type options struct {
balances balancesFixture
batches batchesFixture
result []Transfer
last *big.Int
}
type testCase struct {
desc string
options options
}
for _, tc := range []testCase{
{
desc: "NoBalances",
options: options{
last: big.NewInt(3),
balances: balancesFixture{big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0)},
},
},
{
desc: "LastBlock",
options: options{
last: big.NewInt(3),
balances: balancesFixture{big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(10)},
batches: batchesFixture{{}, {}, {}, {{BlockNumber: big.NewInt(3)}, {BlockNumber: big.NewInt(3)}}},
result: []Transfer{{BlockNumber: big.NewInt(3)}, {BlockNumber: big.NewInt(3)}},
},
},
{
desc: "ChangesInEveryBlock",
options: options{
last: big.NewInt(3),
balances: balancesFixture{big.NewInt(0), big.NewInt(3), big.NewInt(7), big.NewInt(10)},
batches: batchesFixture{{}, {{BlockNumber: big.NewInt(1)}}, {{BlockNumber: big.NewInt(2)}}, {{BlockNumber: big.NewInt(3)}}},
result: []Transfer{{BlockNumber: big.NewInt(1)}, {BlockNumber: big.NewInt(2)}, {BlockNumber: big.NewInt(3)}},
},
},
} {
t.Run(tc.desc, func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
concurrent := NewConcurrentDownloader(ctx)
downloadEthConcurrently(
concurrent, tc.options.balances, tc.options.batches,
common.Address{}, zero, tc.options.last)
concurrent.Wait()
require.NoError(t, concurrent.Error())
rst := concurrent.Get()
require.Len(t, rst, len(tc.options.result))
sort.Slice(rst, func(i, j int) bool {
return rst[i].BlockNumber.Cmp(rst[j].BlockNumber) < 0
})
for i := range rst {
require.Equal(t, tc.options.result[i].BlockNumber, rst[i].BlockNumber)
}
})
}
}

361
services/wallet/database.go Normal file
View File

@ -0,0 +1,361 @@
package wallet
import (
"database/sql"
"database/sql/driver"
"encoding/json"
"errors"
"math/big"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/status-im/status-go/services/wallet/migrations"
"github.com/status-im/status-go/sqlite"
)
// DBHeader fields from header that are stored in database.
type DBHeader struct {
Number *big.Int
Hash common.Hash
// Head is true if the block was a head at the time it was pulled from chain.
Head bool
}
func toDBHeader(header *types.Header) *DBHeader {
return &DBHeader{
Hash: header.Hash(),
Number: header.Number,
}
}
func toHead(header *types.Header) *DBHeader {
return &DBHeader{
Hash: header.Hash(),
Number: header.Number,
Head: true,
}
}
// SyncOption is used to specify that application processed transfers for that block.
type SyncOption uint
const (
// sync options
ethSync SyncOption = 1
erc20Sync SyncOption = 2
)
// InitializeDB creates db file at a given path and applies migrations.
func InitializeDB(path, password string) (*Database, error) {
db, err := sqlite.OpenDB(path, password)
if err != nil {
return nil, err
}
err = migrations.Migrate(db)
if err != nil {
return nil, err
}
return &Database{db: db}, nil
}
// SQLBigInt type for storing uint256 in the databse.
// FIXME(dshulyak) SQL big int is max 64 bits. Maybe store as bytes in big endian and hope
// that lexographical sorting will work.
type SQLBigInt big.Int
// Scan implements interface.
func (i *SQLBigInt) Scan(value interface{}) error {
val, ok := value.(int64)
if !ok {
return errors.New("not an integer")
}
(*big.Int)(i).SetInt64(val)
return nil
}
// Value implements interface.
func (i *SQLBigInt) Value() (driver.Value, error) {
if !(*big.Int)(i).IsInt64() {
return nil, errors.New("not at int64")
}
return (*big.Int)(i).Int64(), nil
}
// JSONBlob type for marshaling/unmarshaling inner type to json.
type JSONBlob struct {
data interface{}
}
// Scan implements interface.
func (blob *JSONBlob) Scan(value interface{}) error {
bytes, ok := value.([]byte)
if !ok {
return errors.New("not a byte slice")
}
if len(bytes) == 0 {
return nil
}
err := json.Unmarshal(bytes, blob.data)
return err
}
// Value implements interface.
func (blob *JSONBlob) Value() (driver.Value, error) {
return json.Marshal(blob.data)
}
// Database sql wrapper for operations with wallet objects.
type Database struct {
db *sql.DB
}
// Close closes database.
func (db Database) Close() error {
return db.db.Close()
}
// ProcessTranfers atomically adds/removes blocks and adds new tranfers.
func (db Database) ProcessTranfers(transfers []Transfer, accounts []common.Address, added, removed []*DBHeader, option SyncOption) (err error) {
var (
tx *sql.Tx
)
tx, err = db.db.Begin()
if err != nil {
return err
}
defer func() {
if err == nil {
err = tx.Commit()
return
}
_ = tx.Rollback()
}()
err = deleteHeaders(tx, removed)
if err != nil {
return
}
err = insertHeaders(tx, added)
if err != nil {
return
}
err = insertTransfers(tx, transfers)
if err != nil {
return
}
err = updateAccounts(tx, accounts, added, option)
return
}
// GetTransfersByAddress loads transfers for a given address between two blocks.
func (db *Database) GetTransfersByAddress(address common.Address, start, end *big.Int) (rst []Transfer, err error) {
query := newTransfersQuery().FilterAddress(address).FilterStart(start).FilterEnd(end)
rows, err := db.db.Query(query.String(), query.Args()...)
if err != nil {
return
}
defer rows.Close()
return query.Scan(rows)
}
// GetTransfers load transfers transfer betweeen two blocks.
func (db *Database) GetTransfers(start, end *big.Int) (rst []Transfer, err error) {
query := newTransfersQuery().FilterStart(start).FilterEnd(end)
rows, err := db.db.Query(query.String(), query.Args()...)
if err != nil {
return
}
defer rows.Close()
return query.Scan(rows)
}
// SaveHeader stores a single header.
func (db *Database) SaveHeader(header *types.Header) error {
_, err := db.db.Exec("INSERT INTO blocks(number, hash) VALUES (?, ?)", (*SQLBigInt)(header.Number), header.Hash())
return err
}
// SaveHeaders stores a list of headers atomically.
func (db *Database) SaveHeaders(headers []*types.Header) (err error) {
var (
tx *sql.Tx
insert *sql.Stmt
)
tx, err = db.db.Begin()
if err != nil {
return
}
insert, err = tx.Prepare("INSERT INTO blocks(number, hash) VALUES (?,?)")
if err != nil {
return
}
defer func() {
if err == nil {
err = tx.Commit()
} else {
_ = tx.Rollback()
}
}()
for _, h := range headers {
_, err = insert.Exec((*SQLBigInt)(h.Number), h.Hash())
if err != nil {
return
}
}
return
}
func (db *Database) SaveSyncedHeader(address common.Address, header *types.Header, option SyncOption) (err error) {
var (
tx *sql.Tx
insert *sql.Stmt
)
tx, err = db.db.Begin()
if err != nil {
return
}
insert, err = tx.Prepare("INSERT INTO accounts_to_blocks(address, blk_number, sync) VALUES (?,?,?)")
if err != nil {
return
}
defer func() {
if err == nil {
err = tx.Commit()
} else {
_ = tx.Rollback()
}
}()
_, err = insert.Exec(address, (*SQLBigInt)(header.Number), option)
if err != nil {
return
}
return err
}
// HeaderExists checks if header with hash exists in db.
func (db *Database) HeaderExists(hash common.Hash) (bool, error) {
var val sql.NullBool
err := db.db.QueryRow("SELECT EXISTS (SELECT hash FROM blocks WHERE hash = ?)", hash).Scan(&val)
if err != nil {
return false, err
}
return val.Bool, nil
}
// GetHeaderByNumber selects header using block number.
func (db *Database) GetHeaderByNumber(number *big.Int) (header *DBHeader, err error) {
header = &DBHeader{Hash: common.Hash{}, Number: new(big.Int)}
err = db.db.QueryRow("SELECT hash,number FROM blocks WHERE number = ?", (*SQLBigInt)(number)).Scan(&header.Hash, (*SQLBigInt)(header.Number))
if err == nil {
return header, nil
}
if err == sql.ErrNoRows {
return nil, nil
}
return nil, err
}
func (db *Database) GetLastHead() (header *DBHeader, err error) {
header = &DBHeader{Hash: common.Hash{}, Number: new(big.Int)}
err = db.db.QueryRow("SELECT hash,number FROM blocks WHERE head = 1 AND number = (SELECT MAX(number) FROM blocks)").Scan(&header.Hash, (*SQLBigInt)(header.Number))
if err == nil {
return header, nil
}
if err == sql.ErrNoRows {
return nil, nil
}
return nil, err
}
// GetLatestSynced downloads last synced block with a given option.
func (db *Database) GetLatestSynced(address common.Address, option SyncOption) (header *DBHeader, err error) {
header = &DBHeader{Hash: common.Hash{}, Number: new(big.Int)}
err = db.db.QueryRow(`
SELECT blocks.hash, blk_number FROM accounts_to_blocks JOIN blocks ON blk_number = blocks.number WHERE address = $1 AND blk_number
= (SELECT MAX(blk_number) FROM accounts_to_blocks WHERE address = $1 AND sync & $2 = $2)`, address, option).Scan(&header.Hash, (*SQLBigInt)(header.Number))
if err == nil {
return header, nil
}
if err == sql.ErrNoRows {
return nil, nil
}
return nil, err
}
// statementCreator allows to pass transaction or database to use in consumer.
type statementCreator interface {
Prepare(query string) (*sql.Stmt, error)
}
func deleteHeaders(creator statementCreator, headers []*DBHeader) error {
delete, err := creator.Prepare("DELETE FROM blocks WHERE hash = ?")
if err != nil {
return err
}
for _, h := range headers {
_, err = delete.Exec(h.Hash)
if err != nil {
return err
}
}
return nil
}
func insertHeaders(creator statementCreator, headers []*DBHeader) error {
insert, err := creator.Prepare("INSERT OR IGNORE INTO blocks(hash, number, head) VALUES (?, ?, ?)")
if err != nil {
return err
}
for _, h := range headers {
_, err = insert.Exec(h.Hash, (*SQLBigInt)(h.Number), h.Head)
if err != nil {
return err
}
}
return nil
}
func insertTransfers(creator statementCreator, transfers []Transfer) error {
insert, err := creator.Prepare("INSERT OR IGNORE INTO transfers(hash, blk_hash, address, tx, receipt, type) VALUES (?, ?, ?, ?, ?, ?)")
if err != nil {
return err
}
for _, t := range transfers {
_, err = insert.Exec(t.ID, t.BlockHash, t.Address, &JSONBlob{t.Transaction}, &JSONBlob{t.Receipt}, t.Type)
if err != nil {
return err
}
}
return nil
}
func updateAccounts(creator statementCreator, accounts []common.Address, headers []*DBHeader, option SyncOption) error {
update, err := creator.Prepare("UPDATE accounts_to_blocks SET sync=sync|? WHERE address=? AND blk_number=?")
if err != nil {
return err
}
insert, err := creator.Prepare("INSERT OR IGNORE INTO accounts_to_blocks(address,blk_number,sync) VALUES(?,?,?)")
if err != nil {
return err
}
for _, acc := range accounts {
for _, h := range headers {
rst, err := update.Exec(option, acc, (*SQLBigInt)(h.Number))
if err != nil {
return err
}
affected, err := rst.RowsAffected()
if err != nil {
return err
}
if affected > 0 {
continue
}
_, err = insert.Exec(acc, (*SQLBigInt)(h.Number), option)
if err != nil {
return err
}
}
}
return nil
}

View File

@ -0,0 +1,235 @@
package wallet
import (
"io/ioutil"
"math/big"
"os"
"testing"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/stretchr/testify/require"
)
func setupTestDB(t *testing.T) (*Database, func()) {
tmpfile, err := ioutil.TempFile("", "wallet-tests-")
require.NoError(t, err)
db, err := InitializeDB(tmpfile.Name(), "wallet-tests")
require.NoError(t, err)
return db, func() {
require.NoError(t, db.Close())
require.NoError(t, os.Remove(tmpfile.Name()))
}
}
func TestDBGetHeaderByNumber(t *testing.T) {
db, stop := setupTestDB(t)
defer stop()
header := &types.Header{
Number: big.NewInt(10),
Difficulty: big.NewInt(1),
Time: 1,
}
require.NoError(t, db.SaveHeader(header))
rst, err := db.GetHeaderByNumber(header.Number)
require.NoError(t, err)
require.Equal(t, header.Hash(), rst.Hash)
}
func TestDBGetHeaderByNumberNoRows(t *testing.T) {
db, stop := setupTestDB(t)
defer stop()
rst, err := db.GetHeaderByNumber(big.NewInt(1))
require.NoError(t, err)
require.Nil(t, rst)
}
func TestDBHeaderExists(t *testing.T) {
db, stop := setupTestDB(t)
defer stop()
header := &types.Header{
Number: big.NewInt(10),
Difficulty: big.NewInt(1),
Time: 1,
}
require.NoError(t, db.SaveHeader(header))
rst, err := db.HeaderExists(header.Hash())
require.NoError(t, err)
require.True(t, rst)
}
func TestDBHeaderDoesntExist(t *testing.T) {
db, stop := setupTestDB(t)
defer stop()
rst, err := db.HeaderExists(common.Hash{1})
require.NoError(t, err)
require.False(t, rst)
}
func TestDBProcessTransfer(t *testing.T) {
db, stop := setupTestDB(t)
defer stop()
header := &DBHeader{
Number: big.NewInt(1),
Hash: common.Hash{1},
}
tx := types.NewTransaction(1, common.Address{1}, nil, 10, big.NewInt(10), nil)
transfers := []Transfer{
{
ID: common.Hash{1},
Type: ethTransfer,
BlockHash: header.Hash,
BlockNumber: header.Number,
Transaction: tx,
Receipt: types.NewReceipt(nil, false, 100),
},
}
require.NoError(t, db.ProcessTranfers(transfers, nil, []*DBHeader{header}, nil, 0))
}
func TestDBReorgTransfers(t *testing.T) {
db, stop := setupTestDB(t)
defer stop()
rcpt := types.NewReceipt(nil, false, 100)
rcpt.Logs = []*types.Log{}
original := &DBHeader{
Number: big.NewInt(1),
Hash: common.Hash{1},
}
replaced := &DBHeader{
Number: big.NewInt(1),
Hash: common.Hash{2},
}
originalTX := types.NewTransaction(1, common.Address{1}, nil, 10, big.NewInt(10), nil)
replacedTX := types.NewTransaction(2, common.Address{1}, nil, 10, big.NewInt(10), nil)
require.NoError(t, db.ProcessTranfers([]Transfer{
{ethTransfer, common.Hash{1}, *originalTX.To(), original.Number, original.Hash, originalTX, rcpt},
}, nil, []*DBHeader{original}, nil, 0))
require.NoError(t, db.ProcessTranfers([]Transfer{
{ethTransfer, common.Hash{2}, *replacedTX.To(), replaced.Number, replaced.Hash, replacedTX, rcpt},
}, nil, []*DBHeader{replaced}, []*DBHeader{original}, 0))
all, err := db.GetTransfers(big.NewInt(0), nil)
require.NoError(t, err)
require.Len(t, all, 1)
require.Equal(t, replacedTX.Hash(), all[0].Transaction.Hash())
}
func TestDBGetTransfersFromBlock(t *testing.T) {
db, stop := setupTestDB(t)
defer stop()
headers := []*DBHeader{}
transfers := []Transfer{}
for i := 1; i < 10; i++ {
header := &DBHeader{
Number: big.NewInt(int64(i)),
Hash: common.Hash{byte(i)},
}
headers = append(headers, header)
tx := types.NewTransaction(uint64(i), common.Address{1}, nil, 10, big.NewInt(10), nil)
receipt := types.NewReceipt(nil, false, 100)
receipt.Logs = []*types.Log{}
transfer := Transfer{
ID: tx.Hash(),
Type: ethTransfer,
BlockNumber: header.Number,
BlockHash: header.Hash,
Transaction: tx,
Receipt: receipt,
}
transfers = append(transfers, transfer)
}
require.NoError(t, db.ProcessTranfers(transfers, nil, headers, nil, 0))
rst, err := db.GetTransfers(big.NewInt(7), nil)
require.NoError(t, err)
require.Len(t, rst, 3)
rst, err = db.GetTransfers(big.NewInt(2), big.NewInt(5))
require.NoError(t, err)
require.Len(t, rst, 4)
}
func TestDBLatestSynced(t *testing.T) {
db, stop := setupTestDB(t)
defer stop()
address := common.Address{1}
h1 := &types.Header{
Number: big.NewInt(10),
Difficulty: big.NewInt(1),
Time: 1,
}
h2 := &types.Header{
Number: big.NewInt(9),
Difficulty: big.NewInt(1),
Time: 1,
}
require.NoError(t, db.SaveHeader(h1))
require.NoError(t, db.SaveHeader(h2))
require.NoError(t, db.SaveSyncedHeader(address, h1, ethSync))
require.NoError(t, db.SaveSyncedHeader(address, h2, ethSync))
latest, err := db.GetLatestSynced(address, ethSync)
require.NoError(t, err)
require.NotNil(t, latest)
require.Equal(t, h1.Number, latest.Number)
require.Equal(t, h1.Hash(), latest.Hash)
}
func TestDBLatestSyncedDoesntExist(t *testing.T) {
db, stop := setupTestDB(t)
defer stop()
latest, err := db.GetLatestSynced(common.Address{1}, ethSync)
require.NoError(t, err)
require.Nil(t, latest)
}
func TestDBProcessTransfersUpdate(t *testing.T) {
db, stop := setupTestDB(t)
defer stop()
address := common.Address{1}
header := &DBHeader{
Number: big.NewInt(10),
Hash: common.Hash{1},
}
transfer := Transfer{
ID: common.Hash{1},
BlockNumber: header.Number,
BlockHash: header.Hash,
Transaction: types.NewTransaction(0, common.Address{}, nil, 0, nil, nil),
Address: address,
}
require.NoError(t, db.ProcessTranfers([]Transfer{transfer}, []common.Address{address}, []*DBHeader{header}, nil, ethSync))
require.NoError(t, db.ProcessTranfers([]Transfer{transfer}, []common.Address{address}, []*DBHeader{header}, nil, erc20Sync))
latest, err := db.GetLatestSynced(address, ethSync|erc20Sync)
require.NoError(t, err)
require.Equal(t, header.Hash, latest.Hash)
}
func TestDBLastHeadExist(t *testing.T) {
db, stop := setupTestDB(t)
defer stop()
headers := []*DBHeader{
{Number: big.NewInt(1), Hash: common.Hash{1}, Head: true},
{Number: big.NewInt(2), Hash: common.Hash{2}, Head: true},
{Number: big.NewInt(3), Hash: common.Hash{3}, Head: true},
}
require.NoError(t, db.ProcessTranfers(nil, nil, headers, nil, 0))
last, err := db.GetLastHead()
require.NoError(t, err)
require.Equal(t, headers[2].Hash, last.Hash)
}
func TestDBLastHeadDoesntExist(t *testing.T) {
db, stop := setupTestDB(t)
defer stop()
last, err := db.GetLastHead()
require.NoError(t, err)
require.Nil(t, last)
}

View File

@ -0,0 +1,304 @@
package wallet
import (
"context"
"encoding/binary"
"errors"
"math/big"
"time"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log"
)
// TransferType type of the asset that was transferred.
type TransferType string
const (
ethTransfer TransferType = "eth"
erc20Transfer TransferType = "erc20"
erc20TransferEventSignature = "Transfer(address,address,uint256)"
)
var (
zero = big.NewInt(0)
one = big.NewInt(1)
two = big.NewInt(2)
)
// Transfer stores information about transfer.
type Transfer struct {
Type TransferType `json:"type"`
ID common.Hash `json:"-"`
Address common.Address `json:"address"`
BlockNumber *big.Int `json:"blockNumber"`
BlockHash common.Hash `json:"blockhash"`
Transaction *types.Transaction `json:"transaction"`
Receipt *types.Receipt `json:"receipt"`
}
// ETHTransferDownloader downloads regular eth transfers.
type ETHTransferDownloader struct {
client *ethclient.Client
accounts []common.Address
signer types.Signer
}
// GetTransfers checks if the balance was changed between two blocks.
// If so it downloads transaction that transfer ethereum from that block.
func (d *ETHTransferDownloader) GetTransfers(ctx context.Context, header *DBHeader) (rst []Transfer, err error) {
// TODO(dshulyak) consider caching balance and reset it on reorg
num := new(big.Int).Sub(header.Number, one)
changed := []common.Address{}
for _, address := range d.accounts {
balance, err := d.client.BalanceAt(ctx, address, num)
if err != nil {
return nil, err
}
current, err := d.client.BalanceAt(ctx, address, header.Number)
if err != nil {
return nil, err
}
if current.Cmp(balance) != 0 {
changed = append(changed, address)
}
}
if len(changed) == 0 {
return nil, nil
}
blk, err := d.client.BlockByHash(ctx, header.Hash)
if err != nil {
return nil, err
}
rst, err = d.getTransfersInBlock(ctx, blk, changed)
if err != nil {
return nil, err
}
return rst, nil
}
func (d *ETHTransferDownloader) GetTransfersByNumber(ctx context.Context, number *big.Int) ([]Transfer, error) {
blk, err := d.client.BlockByNumber(ctx, number)
if err != nil {
return nil, err
}
rst, err := d.getTransfersInBlock(ctx, blk, d.accounts)
if err != nil {
return nil, err
}
return rst, err
}
func (d *ETHTransferDownloader) getTransfersInBlock(ctx context.Context, blk *types.Block, accounts []common.Address) (rst []Transfer, err error) {
for _, tx := range blk.Transactions() {
var address *common.Address
from, err := types.Sender(d.signer, tx)
if err != nil {
return nil, err
}
if any(from, accounts) {
address = &from
} else if tx.To() != nil && any(*tx.To(), accounts) {
address = tx.To()
}
if address != nil {
receipt, err := d.client.TransactionReceipt(ctx, tx.Hash())
if err != nil {
return nil, err
}
if isTokenTransfer(receipt.Logs) {
log.Debug("eth downloader found token transfer", "hash", tx.Hash())
continue
}
rst = append(rst, Transfer{
Type: ethTransfer,
ID: tx.Hash(),
Address: *address,
BlockNumber: blk.Number(),
BlockHash: blk.Hash(),
Transaction: tx, Receipt: receipt})
}
}
// TODO(dshulyak) test that balance difference was covered by transactions
return rst, nil
}
// NewERC20TransfersDownloader returns new instance.
func NewERC20TransfersDownloader(client *ethclient.Client, accounts []common.Address) *ERC20TransfersDownloader {
signature := crypto.Keccak256Hash([]byte(erc20TransferEventSignature))
return &ERC20TransfersDownloader{
client: client,
accounts: accounts,
signature: signature,
}
}
// ERC20TransfersDownloader is a downloader for erc20 tokens transfers.
type ERC20TransfersDownloader struct {
client *ethclient.Client
accounts []common.Address
// hash of the Transfer event signature
signature common.Hash
}
func (d *ERC20TransfersDownloader) paddedAddress(address common.Address) common.Hash {
rst := common.Hash{}
copy(rst[12:], address[:])
return rst
}
func (d *ERC20TransfersDownloader) inboundTopics(address common.Address) [][]common.Hash {
return [][]common.Hash{{d.signature}, {}, {d.paddedAddress(address)}}
}
func (d *ERC20TransfersDownloader) outboundTopics(address common.Address) [][]common.Hash {
return [][]common.Hash{{d.signature}, {d.paddedAddress(address)}, {}}
}
func (d *ERC20TransfersDownloader) transferFromLog(parent context.Context, log types.Log, address common.Address) (Transfer, error) {
ctx, cancel := context.WithTimeout(parent, 3*time.Second)
tx, _, err := d.client.TransactionByHash(ctx, log.TxHash)
cancel()
if err != nil {
return Transfer{}, err
}
ctx, cancel = context.WithTimeout(parent, 3*time.Second)
receipt, err := d.client.TransactionReceipt(ctx, log.TxHash)
cancel()
if err != nil {
return Transfer{}, err
}
// TODO(dshulyak) what is the max number of logs?
index := [4]byte{}
binary.BigEndian.PutUint32(index[:], uint32(log.Index))
id := crypto.Keccak256Hash(log.TxHash.Bytes(), index[:])
return Transfer{
Address: address,
ID: id,
Type: erc20Transfer,
BlockNumber: new(big.Int).SetUint64(log.BlockNumber),
BlockHash: log.BlockHash,
Transaction: tx,
Receipt: receipt,
}, nil
}
func (d *ERC20TransfersDownloader) transfersFromLogs(parent context.Context, logs []types.Log, address common.Address) ([]Transfer, error) {
concurrent := NewConcurrentDownloader(parent)
for i := range logs {
l := logs[i]
concurrent.Add(func(ctx context.Context) error {
transfer, err := d.transferFromLog(ctx, l, address)
if err != nil {
return err
}
concurrent.Push(transfer)
return nil
})
}
select {
case <-concurrent.WaitAsync():
case <-parent.Done():
return nil, errors.New("logs downloader stuck")
}
return concurrent.Get(), nil
}
// GetTransfers for erc20 uses eth_getLogs rpc with Transfer event signature and our address acount.
func (d *ERC20TransfersDownloader) GetTransfers(ctx context.Context, header *DBHeader) ([]Transfer, error) {
hash := header.Hash
transfers := []Transfer{}
for _, address := range d.accounts {
outbound, err := d.client.FilterLogs(ctx, ethereum.FilterQuery{
BlockHash: &hash,
Topics: d.outboundTopics(address),
})
if err != nil {
return nil, err
}
inbound, err := d.client.FilterLogs(ctx, ethereum.FilterQuery{
BlockHash: &hash,
Topics: d.inboundTopics(address),
})
if err != nil {
return nil, err
}
logs := append(outbound, inbound...)
if len(logs) == 0 {
continue
}
rst, err := d.transfersFromLogs(ctx, logs, address)
if err != nil {
return nil, err
}
transfers = append(transfers, rst...)
}
return transfers, nil
}
// GetTransfersInRange returns transfers between two blocks.
// time to get logs for 100000 blocks = 1.144686979s. with 249 events in the result set.
func (d *ERC20TransfersDownloader) GetTransfersInRange(parent context.Context, from, to *big.Int) ([]Transfer, error) {
start := time.Now()
log.Debug("get erc20 transfers in range", "from", from, "to", to)
transfers := []Transfer{}
for _, address := range d.accounts {
ctx, cancel := context.WithTimeout(parent, 5*time.Second)
outbound, err := d.client.FilterLogs(ctx, ethereum.FilterQuery{
FromBlock: from,
ToBlock: to,
Topics: d.outboundTopics(address),
})
cancel()
if err != nil {
return nil, err
}
ctx, cancel = context.WithTimeout(parent, 5*time.Second)
inbound, err := d.client.FilterLogs(ctx, ethereum.FilterQuery{
FromBlock: from,
ToBlock: to,
Topics: d.inboundTopics(address),
})
cancel()
if err != nil {
return nil, err
}
logs := append(outbound, inbound...)
if len(logs) == 0 {
continue
}
rst, err := d.transfersFromLogs(parent, logs, address)
if err != nil {
return nil, err
}
transfers = append(transfers, rst...)
}
log.Debug("found erc20 transfers between two blocks", "from", from, "to", to, "lth", len(transfers), "took", time.Since(start))
return transfers, nil
}
func any(address common.Address, compare []common.Address) bool {
for _, c := range compare {
if c == address {
return true
}
}
return false
}
func isTokenTransfer(logs []*types.Log) bool {
signature := crypto.Keccak256Hash([]byte(erc20TransferEventSignature))
for _, l := range logs {
if len(l.Topics) > 0 && l.Topics[0] == signature {
return true
}
}
return false
}

View File

@ -0,0 +1,233 @@
package wallet
import (
"context"
"crypto/ecdsa"
"math/big"
"testing"
"time"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/status-im/status-go/services/wallet/erc20"
"github.com/status-im/status-go/t/devtests/miner"
"github.com/stretchr/testify/suite"
)
func TestETHTransfers(t *testing.T) {
suite.Run(t, new(ETHTransferSuite))
}
type ETHTransferSuite struct {
suite.Suite
ethclient *ethclient.Client
identity *ecdsa.PrivateKey
faucet *ecdsa.PrivateKey
signer types.Signer
downloader *ETHTransferDownloader
}
func (s *ETHTransferSuite) SetupTest() {
var err error
s.identity, err = crypto.GenerateKey()
s.Require().NoError(err)
s.faucet, err = crypto.GenerateKey()
s.Require().NoError(err)
node, err := miner.NewDevNode(crypto.PubkeyToAddress(s.faucet.PublicKey))
s.Require().NoError(err)
s.Require().NoError(miner.StartWithMiner(node))
client, err := node.Attach()
s.Require().NoError(err)
s.ethclient = ethclient.NewClient(client)
s.signer = types.NewEIP155Signer(big.NewInt(1337))
s.downloader = &ETHTransferDownloader{
signer: s.signer,
client: s.ethclient,
accounts: []common.Address{crypto.PubkeyToAddress(s.identity.PublicKey)},
}
}
func (s *ETHTransferSuite) TestNoBalance() {
ctx := context.TODO()
tx := types.NewTransaction(0, common.Address{1}, big.NewInt(1e18), 1e6, big.NewInt(10), nil)
tx, err := types.SignTx(tx, s.signer, s.faucet)
s.Require().NoError(err)
s.Require().NoError(s.ethclient.SendTransaction(ctx, tx))
timeout, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_, err = bind.WaitMined(timeout, s.ethclient, tx)
s.Require().NoError(err)
header, err := s.ethclient.HeaderByNumber(ctx, nil)
s.Require().NoError(err)
transfers, err := s.downloader.GetTransfers(ctx, toDBHeader(header))
s.Require().NoError(err)
s.Require().Empty(transfers)
}
func (s *ETHTransferSuite) TestBalanceUpdatedOnInbound() {
ctx := context.TODO()
tx := types.NewTransaction(0, crypto.PubkeyToAddress(s.identity.PublicKey), big.NewInt(1e18), 1e6, big.NewInt(10), nil)
tx, err := types.SignTx(tx, s.signer, s.faucet)
s.Require().NoError(err)
s.Require().NoError(s.ethclient.SendTransaction(ctx, tx))
timeout, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_, err = bind.WaitMined(timeout, s.ethclient, tx)
s.Require().NoError(err)
header, err := s.ethclient.HeaderByNumber(ctx, nil)
s.Require().NoError(err)
s.Require().Equal(big.NewInt(1), header.Number)
transfers, err := s.downloader.GetTransfers(ctx, toDBHeader(header))
s.Require().NoError(err)
s.Require().Len(transfers, 1)
}
func (s *ETHTransferSuite) TestBalanceUpdatedOnOutbound() {
ctx := context.TODO()
tx := types.NewTransaction(0, crypto.PubkeyToAddress(s.identity.PublicKey), big.NewInt(1e18), 1e6, big.NewInt(10), nil)
tx, err := types.SignTx(tx, s.signer, s.faucet)
s.Require().NoError(err)
s.Require().NoError(s.ethclient.SendTransaction(ctx, tx))
timeout, cancel := context.WithTimeout(context.Background(), 5*time.Second)
_, err = bind.WaitMined(timeout, s.ethclient, tx)
cancel()
s.Require().NoError(err)
tx = types.NewTransaction(0, common.Address{1}, big.NewInt(5e17), 1e6, big.NewInt(10), nil)
tx, err = types.SignTx(tx, s.signer, s.identity)
s.Require().NoError(err)
s.Require().NoError(s.ethclient.SendTransaction(ctx, tx))
timeout, cancel = context.WithTimeout(context.Background(), 5*time.Second)
_, err = bind.WaitMined(timeout, s.ethclient, tx)
cancel()
s.Require().NoError(err)
header, err := s.ethclient.HeaderByNumber(ctx, nil)
s.Require().NoError(err)
s.Require().Equal(big.NewInt(2), header.Number)
transfers, err := s.downloader.GetTransfers(ctx, toDBHeader(header))
s.Require().NoError(err)
s.Require().Len(transfers, 1)
}
func TestERC20Transfers(t *testing.T) {
suite.Run(t, new(ERC20TransferSuite))
}
type ERC20TransferSuite struct {
suite.Suite
ethclient *ethclient.Client
identity *ecdsa.PrivateKey
faucet *ecdsa.PrivateKey
signer types.Signer
downloader *ERC20TransfersDownloader
contract *erc20.ERC20Transfer
}
func (s *ERC20TransferSuite) SetupTest() {
var err error
s.identity, err = crypto.GenerateKey()
s.Require().NoError(err)
s.faucet, err = crypto.GenerateKey()
s.Require().NoError(err)
node, err := miner.NewDevNode(crypto.PubkeyToAddress(s.faucet.PublicKey))
s.Require().NoError(err)
s.Require().NoError(miner.StartWithMiner(node))
client, err := node.Attach()
s.Require().NoError(err)
s.ethclient = ethclient.NewClient(client)
s.downloader = NewERC20TransfersDownloader(s.ethclient, []common.Address{crypto.PubkeyToAddress(s.identity.PublicKey)})
_, tx, contract, err := erc20.DeployERC20Transfer(bind.NewKeyedTransactor(s.faucet), s.ethclient)
s.Require().NoError(err)
timeout, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_, err = bind.WaitMined(timeout, s.ethclient, tx)
s.Require().NoError(err)
s.contract = contract
s.signer = types.NewEIP155Signer(big.NewInt(1337))
}
func (s *ERC20TransferSuite) TestNoEvents() {
header, err := s.ethclient.HeaderByNumber(context.TODO(), nil)
s.Require().NoError(err)
transfers, err := s.downloader.GetTransfers(context.TODO(), toDBHeader(header))
s.Require().NoError(err)
s.Require().Empty(transfers)
}
func (s *ERC20TransferSuite) TestInboundEvent() {
tx, err := s.contract.Transfer(bind.NewKeyedTransactor(s.faucet), crypto.PubkeyToAddress(s.identity.PublicKey),
big.NewInt(100))
s.Require().NoError(err)
timeout, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_, err = bind.WaitMined(timeout, s.ethclient, tx)
s.Require().NoError(err)
header, err := s.ethclient.HeaderByNumber(context.TODO(), nil)
s.Require().NoError(err)
transfers, err := s.downloader.GetTransfers(context.TODO(), toDBHeader(header))
s.Require().NoError(err)
s.Require().Len(transfers, 1)
}
func (s *ERC20TransferSuite) TestOutboundEvent() {
// give some eth to pay for gas
ctx := context.TODO()
// nonce is 1 - contact with nonce 0 was deployed in setup
// FIXME request nonce
tx := types.NewTransaction(1, crypto.PubkeyToAddress(s.identity.PublicKey), big.NewInt(1e18), 1e6, big.NewInt(10), nil)
tx, err := types.SignTx(tx, s.signer, s.faucet)
s.Require().NoError(err)
s.Require().NoError(s.ethclient.SendTransaction(ctx, tx))
timeout, cancel := context.WithTimeout(context.Background(), 5*time.Second)
_, err = bind.WaitMined(timeout, s.ethclient, tx)
cancel()
s.Require().NoError(err)
tx, err = s.contract.Transfer(bind.NewKeyedTransactor(s.identity), common.Address{1}, big.NewInt(100))
s.Require().NoError(err)
timeout, cancel = context.WithTimeout(context.Background(), 5*time.Second)
_, err = bind.WaitMined(timeout, s.ethclient, tx)
cancel()
s.Require().NoError(err)
header, err := s.ethclient.HeaderByNumber(context.TODO(), nil)
s.Require().NoError(err)
transfers, err := s.downloader.GetTransfers(context.TODO(), toDBHeader(header))
s.Require().NoError(err)
s.Require().Len(transfers, 1)
}
func (s *ERC20TransferSuite) TestInRange() {
for i := 0; i < 5; i++ {
tx, err := s.contract.Transfer(bind.NewKeyedTransactor(s.faucet), crypto.PubkeyToAddress(s.identity.PublicKey),
big.NewInt(100))
s.Require().NoError(err)
timeout, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_, err = bind.WaitMined(timeout, s.ethclient, tx)
s.Require().NoError(err)
}
transfers, err := s.downloader.GetTransfersInRange(context.TODO(), big.NewInt(1), nil)
s.Require().NoError(err)
s.Require().Len(transfers, 5)
}

View File

@ -0,0 +1,4 @@
Test contract for Transfer events
=================================
Emits transfer event defined by ERC20 standart.

View File

@ -0,0 +1,3 @@
package erc20
//go:generate abigen -sol erc20.sol -pkg erc20 -out erc20.go

View File

@ -0,0 +1,352 @@
// Code generated - DO NOT EDIT.
// This file is a generated binding and any manual changes will be lost.
package erc20
import (
"math/big"
"strings"
ethereum "github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/event"
)
// Reference imports to suppress errors if they are not otherwise used.
var (
_ = big.NewInt
_ = strings.NewReader
_ = ethereum.NotFound
_ = abi.U256
_ = bind.Bind
_ = common.Big1
_ = types.BloomLookup
_ = event.NewSubscription
)
// ERC20TransferABI is the input ABI used to generate the binding from.
const ERC20TransferABI = "[{\"constant\":false,\"inputs\":[{\"name\":\"to\",\"type\":\"address\"},{\"name\":\"value\",\"type\":\"uint256\"}],\"name\":\"transfer\",\"outputs\":[],\"payable\":false,\"stateMutability\":\"nonpayable\",\"type\":\"function\"},{\"inputs\":[],\"payable\":false,\"stateMutability\":\"nonpayable\",\"type\":\"constructor\"},{\"anonymous\":false,\"inputs\":[{\"indexed\":true,\"name\":\"from\",\"type\":\"address\"},{\"indexed\":true,\"name\":\"to\",\"type\":\"address\"},{\"indexed\":false,\"name\":\"value\",\"type\":\"uint256\"}],\"name\":\"Transfer\",\"type\":\"event\"}]"
// ERC20TransferBin is the compiled bytecode used for deploying new contracts.
const ERC20TransferBin = `0x6080604052348015600f57600080fd5b5060c88061001e6000396000f3fe6080604052348015600f57600080fd5b506004361060285760003560e01c8063a9059cbb14602d575b600080fd5b605660048036036040811015604157600080fd5b506001600160a01b0381351690602001356058565b005b6040805182815290516001600160a01b0384169133917fddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef9181900360200190a3505056fea165627a7a72305820821ecaa5ed56957610817ecaabbb288ac338db81a0f09fc3ce5c7ddfa17351b60029`
// DeployERC20Transfer deploys a new Ethereum contract, binding an instance of ERC20Transfer to it.
func DeployERC20Transfer(auth *bind.TransactOpts, backend bind.ContractBackend) (common.Address, *types.Transaction, *ERC20Transfer, error) {
parsed, err := abi.JSON(strings.NewReader(ERC20TransferABI))
if err != nil {
return common.Address{}, nil, nil, err
}
address, tx, contract, err := bind.DeployContract(auth, parsed, common.FromHex(ERC20TransferBin), backend)
if err != nil {
return common.Address{}, nil, nil, err
}
return address, tx, &ERC20Transfer{ERC20TransferCaller: ERC20TransferCaller{contract: contract}, ERC20TransferTransactor: ERC20TransferTransactor{contract: contract}, ERC20TransferFilterer: ERC20TransferFilterer{contract: contract}}, nil
}
// ERC20Transfer is an auto generated Go binding around an Ethereum contract.
type ERC20Transfer struct {
ERC20TransferCaller // Read-only binding to the contract
ERC20TransferTransactor // Write-only binding to the contract
ERC20TransferFilterer // Log filterer for contract events
}
// ERC20TransferCaller is an auto generated read-only Go binding around an Ethereum contract.
type ERC20TransferCaller struct {
contract *bind.BoundContract // Generic contract wrapper for the low level calls
}
// ERC20TransferTransactor is an auto generated write-only Go binding around an Ethereum contract.
type ERC20TransferTransactor struct {
contract *bind.BoundContract // Generic contract wrapper for the low level calls
}
// ERC20TransferFilterer is an auto generated log filtering Go binding around an Ethereum contract events.
type ERC20TransferFilterer struct {
contract *bind.BoundContract // Generic contract wrapper for the low level calls
}
// ERC20TransferSession is an auto generated Go binding around an Ethereum contract,
// with pre-set call and transact options.
type ERC20TransferSession struct {
Contract *ERC20Transfer // Generic contract binding to set the session for
CallOpts bind.CallOpts // Call options to use throughout this session
TransactOpts bind.TransactOpts // Transaction auth options to use throughout this session
}
// ERC20TransferCallerSession is an auto generated read-only Go binding around an Ethereum contract,
// with pre-set call options.
type ERC20TransferCallerSession struct {
Contract *ERC20TransferCaller // Generic contract caller binding to set the session for
CallOpts bind.CallOpts // Call options to use throughout this session
}
// ERC20TransferTransactorSession is an auto generated write-only Go binding around an Ethereum contract,
// with pre-set transact options.
type ERC20TransferTransactorSession struct {
Contract *ERC20TransferTransactor // Generic contract transactor binding to set the session for
TransactOpts bind.TransactOpts // Transaction auth options to use throughout this session
}
// ERC20TransferRaw is an auto generated low-level Go binding around an Ethereum contract.
type ERC20TransferRaw struct {
Contract *ERC20Transfer // Generic contract binding to access the raw methods on
}
// ERC20TransferCallerRaw is an auto generated low-level read-only Go binding around an Ethereum contract.
type ERC20TransferCallerRaw struct {
Contract *ERC20TransferCaller // Generic read-only contract binding to access the raw methods on
}
// ERC20TransferTransactorRaw is an auto generated low-level write-only Go binding around an Ethereum contract.
type ERC20TransferTransactorRaw struct {
Contract *ERC20TransferTransactor // Generic write-only contract binding to access the raw methods on
}
// NewERC20Transfer creates a new instance of ERC20Transfer, bound to a specific deployed contract.
func NewERC20Transfer(address common.Address, backend bind.ContractBackend) (*ERC20Transfer, error) {
contract, err := bindERC20Transfer(address, backend, backend, backend)
if err != nil {
return nil, err
}
return &ERC20Transfer{ERC20TransferCaller: ERC20TransferCaller{contract: contract}, ERC20TransferTransactor: ERC20TransferTransactor{contract: contract}, ERC20TransferFilterer: ERC20TransferFilterer{contract: contract}}, nil
}
// NewERC20TransferCaller creates a new read-only instance of ERC20Transfer, bound to a specific deployed contract.
func NewERC20TransferCaller(address common.Address, caller bind.ContractCaller) (*ERC20TransferCaller, error) {
contract, err := bindERC20Transfer(address, caller, nil, nil)
if err != nil {
return nil, err
}
return &ERC20TransferCaller{contract: contract}, nil
}
// NewERC20TransferTransactor creates a new write-only instance of ERC20Transfer, bound to a specific deployed contract.
func NewERC20TransferTransactor(address common.Address, transactor bind.ContractTransactor) (*ERC20TransferTransactor, error) {
contract, err := bindERC20Transfer(address, nil, transactor, nil)
if err != nil {
return nil, err
}
return &ERC20TransferTransactor{contract: contract}, nil
}
// NewERC20TransferFilterer creates a new log filterer instance of ERC20Transfer, bound to a specific deployed contract.
func NewERC20TransferFilterer(address common.Address, filterer bind.ContractFilterer) (*ERC20TransferFilterer, error) {
contract, err := bindERC20Transfer(address, nil, nil, filterer)
if err != nil {
return nil, err
}
return &ERC20TransferFilterer{contract: contract}, nil
}
// bindERC20Transfer binds a generic wrapper to an already deployed contract.
func bindERC20Transfer(address common.Address, caller bind.ContractCaller, transactor bind.ContractTransactor, filterer bind.ContractFilterer) (*bind.BoundContract, error) {
parsed, err := abi.JSON(strings.NewReader(ERC20TransferABI))
if err != nil {
return nil, err
}
return bind.NewBoundContract(address, parsed, caller, transactor, filterer), nil
}
// Call invokes the (constant) contract method with params as input values and
// sets the output to result. The result type might be a single field for simple
// returns, a slice of interfaces for anonymous returns and a struct for named
// returns.
func (_ERC20Transfer *ERC20TransferRaw) Call(opts *bind.CallOpts, result interface{}, method string, params ...interface{}) error {
return _ERC20Transfer.Contract.ERC20TransferCaller.contract.Call(opts, result, method, params...)
}
// Transfer initiates a plain transaction to move funds to the contract, calling
// its default method if one is available.
func (_ERC20Transfer *ERC20TransferRaw) Transfer(opts *bind.TransactOpts) (*types.Transaction, error) {
return _ERC20Transfer.Contract.ERC20TransferTransactor.contract.Transfer(opts)
}
// Transact invokes the (paid) contract method with params as input values.
func (_ERC20Transfer *ERC20TransferRaw) Transact(opts *bind.TransactOpts, method string, params ...interface{}) (*types.Transaction, error) {
return _ERC20Transfer.Contract.ERC20TransferTransactor.contract.Transact(opts, method, params...)
}
// Call invokes the (constant) contract method with params as input values and
// sets the output to result. The result type might be a single field for simple
// returns, a slice of interfaces for anonymous returns and a struct for named
// returns.
func (_ERC20Transfer *ERC20TransferCallerRaw) Call(opts *bind.CallOpts, result interface{}, method string, params ...interface{}) error {
return _ERC20Transfer.Contract.contract.Call(opts, result, method, params...)
}
// Transfer initiates a plain transaction to move funds to the contract, calling
// its default method if one is available.
func (_ERC20Transfer *ERC20TransferTransactorRaw) Transfer(opts *bind.TransactOpts) (*types.Transaction, error) {
return _ERC20Transfer.Contract.contract.Transfer(opts)
}
// Transact invokes the (paid) contract method with params as input values.
func (_ERC20Transfer *ERC20TransferTransactorRaw) Transact(opts *bind.TransactOpts, method string, params ...interface{}) (*types.Transaction, error) {
return _ERC20Transfer.Contract.contract.Transact(opts, method, params...)
}
// Transfer is a paid mutator transaction binding the contract method 0xa9059cbb.
//
// Solidity: function transfer(address to, uint256 value) returns()
func (_ERC20Transfer *ERC20TransferTransactor) Transfer(opts *bind.TransactOpts, to common.Address, value *big.Int) (*types.Transaction, error) {
return _ERC20Transfer.contract.Transact(opts, "transfer", to, value)
}
// Transfer is a paid mutator transaction binding the contract method 0xa9059cbb.
//
// Solidity: function transfer(address to, uint256 value) returns()
func (_ERC20Transfer *ERC20TransferSession) Transfer(to common.Address, value *big.Int) (*types.Transaction, error) {
return _ERC20Transfer.Contract.Transfer(&_ERC20Transfer.TransactOpts, to, value)
}
// Transfer is a paid mutator transaction binding the contract method 0xa9059cbb.
//
// Solidity: function transfer(address to, uint256 value) returns()
func (_ERC20Transfer *ERC20TransferTransactorSession) Transfer(to common.Address, value *big.Int) (*types.Transaction, error) {
return _ERC20Transfer.Contract.Transfer(&_ERC20Transfer.TransactOpts, to, value)
}
// ERC20TransferTransferIterator is returned from FilterTransfer and is used to iterate over the raw logs and unpacked data for Transfer events raised by the ERC20Transfer contract.
type ERC20TransferTransferIterator struct {
Event *ERC20TransferTransfer // Event containing the contract specifics and raw log
contract *bind.BoundContract // Generic contract to use for unpacking event data
event string // Event name to use for unpacking event data
logs chan types.Log // Log channel receiving the found contract events
sub ethereum.Subscription // Subscription for errors, completion and termination
done bool // Whether the subscription completed delivering logs
fail error // Occurred error to stop iteration
}
// Next advances the iterator to the subsequent event, returning whether there
// are any more events found. In case of a retrieval or parsing error, false is
// returned and Error() can be queried for the exact failure.
func (it *ERC20TransferTransferIterator) Next() bool {
// If the iterator failed, stop iterating
if it.fail != nil {
return false
}
// If the iterator completed, deliver directly whatever's available
if it.done {
select {
case log := <-it.logs:
it.Event = new(ERC20TransferTransfer)
if err := it.contract.UnpackLog(it.Event, it.event, log); err != nil {
it.fail = err
return false
}
it.Event.Raw = log
return true
default:
return false
}
}
// Iterator still in progress, wait for either a data or an error event
select {
case log := <-it.logs:
it.Event = new(ERC20TransferTransfer)
if err := it.contract.UnpackLog(it.Event, it.event, log); err != nil {
it.fail = err
return false
}
it.Event.Raw = log
return true
case err := <-it.sub.Err():
it.done = true
it.fail = err
return it.Next()
}
}
// Error returns any retrieval or parsing error occurred during filtering.
func (it *ERC20TransferTransferIterator) Error() error {
return it.fail
}
// Close terminates the iteration process, releasing any pending underlying
// resources.
func (it *ERC20TransferTransferIterator) Close() error {
it.sub.Unsubscribe()
return nil
}
// ERC20TransferTransfer represents a Transfer event raised by the ERC20Transfer contract.
type ERC20TransferTransfer struct {
From common.Address
To common.Address
Value *big.Int
Raw types.Log // Blockchain specific contextual infos
}
// FilterTransfer is a free log retrieval operation binding the contract event 0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef.
//
// Solidity: event Transfer(address indexed from, address indexed to, uint256 value)
func (_ERC20Transfer *ERC20TransferFilterer) FilterTransfer(opts *bind.FilterOpts, from []common.Address, to []common.Address) (*ERC20TransferTransferIterator, error) {
var fromRule []interface{}
for _, fromItem := range from {
fromRule = append(fromRule, fromItem)
}
var toRule []interface{}
for _, toItem := range to {
toRule = append(toRule, toItem)
}
logs, sub, err := _ERC20Transfer.contract.FilterLogs(opts, "Transfer", fromRule, toRule)
if err != nil {
return nil, err
}
return &ERC20TransferTransferIterator{contract: _ERC20Transfer.contract, event: "Transfer", logs: logs, sub: sub}, nil
}
// WatchTransfer is a free log subscription operation binding the contract event 0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef.
//
// Solidity: event Transfer(address indexed from, address indexed to, uint256 value)
func (_ERC20Transfer *ERC20TransferFilterer) WatchTransfer(opts *bind.WatchOpts, sink chan<- *ERC20TransferTransfer, from []common.Address, to []common.Address) (event.Subscription, error) {
var fromRule []interface{}
for _, fromItem := range from {
fromRule = append(fromRule, fromItem)
}
var toRule []interface{}
for _, toItem := range to {
toRule = append(toRule, toItem)
}
logs, sub, err := _ERC20Transfer.contract.WatchLogs(opts, "Transfer", fromRule, toRule)
if err != nil {
return nil, err
}
return event.NewSubscription(func(quit <-chan struct{}) error {
defer sub.Unsubscribe()
for {
select {
case log := <-logs:
// New log arrived, parse the event and forward to the user
event := new(ERC20TransferTransfer)
if err := _ERC20Transfer.contract.UnpackLog(event, "Transfer", log); err != nil {
return err
}
event.Raw = log
select {
case sink <- event:
case err := <-sub.Err():
return err
case <-quit:
return nil
}
case err := <-sub.Err():
return err
case <-quit:
return nil
}
}
}), nil
}

View File

@ -0,0 +1,12 @@
pragma solidity ^0.5.0;
contract ERC20Transfer {
constructor() public {}
event Transfer(address indexed from, address indexed to, uint256 value);
function transfer(address to, uint256 value) public {
emit Transfer(msg.sender, to, value);
}
}

26
services/wallet/events.go Normal file
View File

@ -0,0 +1,26 @@
package wallet
import (
"math/big"
"github.com/ethereum/go-ethereum/common"
)
// EventType type for event types.
type EventType string
const (
// EventNewBlock emitted when new block was added to the same canonical chan.
EventNewBlock EventType = "newblock"
// EventReorg emitted when canonical chain was changed. In this case, BlockNumber will be an earliest added block.
EventReorg EventType = "reorg"
// EventNewHistory emitted if transfer from older block was added.
EventNewHistory EventType = "history"
)
// Event is a type for wallet events.
type Event struct {
Type EventType `json:"type"`
BlockNumber *big.Int `json:"blockNumber"`
Accounts []common.Address `json:"accounts"`
}

View File

@ -0,0 +1,92 @@
package wallet
import (
"context"
"math/big"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
)
// SetupIterativeDownloader configures IterativeDownloader with last known synced block.
func SetupIterativeDownloader(
db *Database, client HeaderReader, address common.Address, option SyncOption,
downloader BatchDownloader, size *big.Int, to *DBHeader) (*IterativeDownloader, error) {
from, err := db.GetLatestSynced(address, option)
if err != nil {
log.Error("failed to get latest synced block", "error", err)
return nil, err
}
if from == nil {
from = &DBHeader{Number: zero}
}
log.Debug("iterative downloader", "address", address, "from", from.Number, "to", to.Number)
d := &IterativeDownloader{
client: client,
batchSize: size,
downloader: downloader,
from: from,
to: to,
}
return d, nil
}
// BatchDownloader interface for loading transfers in batches in speificed range of blocks.
type BatchDownloader interface {
GetTransfersInRange(ctx context.Context, from, to *big.Int) ([]Transfer, error)
}
// IterativeDownloader downloads batches of transfers in a specified size.
type IterativeDownloader struct {
client HeaderReader
batchSize *big.Int
downloader BatchDownloader
from, to *DBHeader
previous *DBHeader
}
// Finished true when earliest block with given sync option is zero.
func (d *IterativeDownloader) Finished() bool {
return d.from.Number.Cmp(d.to.Number) == 0
}
// Header return last synced header.
func (d *IterativeDownloader) Header() *DBHeader {
return d.previous
}
// Next moves closer to the end on every new iteration.
func (d *IterativeDownloader) Next(parent context.Context) ([]Transfer, error) {
to := new(big.Int).Add(d.from.Number, d.batchSize)
// if start < 0; start = 0
if to.Cmp(d.to.Number) == 1 {
to = d.to.Number
}
transfers, err := d.downloader.GetTransfersInRange(parent, d.from.Number, to)
if err != nil {
log.Error("failed to get transfer in between two bloks", "from", d.from.Number, "to", to, "error", err)
return nil, err
}
// use integers instead of DBHeader
ctx, cancel := context.WithTimeout(parent, 5*time.Second)
header, err := d.client.HeaderByNumber(ctx, to)
cancel()
if err != nil {
log.Error("failed to get header by number", "from", d.from.Number, "to", to, "error", err)
return nil, err
}
d.previous, d.from = d.from, toDBHeader(header)
return transfers, nil
}
// Revert reverts last step progress. Should be used if application failed to process transfers.
// For example failed to persist them.
func (d *IterativeDownloader) Revert() {
if d.previous != nil {
d.from = d.previous
}
}

View File

@ -0,0 +1,117 @@
package wallet
import (
"context"
"errors"
"math/big"
"testing"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/stretchr/testify/require"
)
type transfersFixture []Transfer
func (f transfersFixture) GetTransfersInRange(ctx context.Context, from, to *big.Int) ([]Transfer, error) {
rst := []Transfer{}
for _, t := range f {
if t.BlockNumber.Cmp(from) >= 0 && t.BlockNumber.Cmp(to) <= 0 {
rst = append(rst, t)
}
}
return rst, nil
}
func TestIterFinished(t *testing.T) {
iterator := IterativeDownloader{
from: &DBHeader{Number: big.NewInt(10)},
to: &DBHeader{Number: big.NewInt(10)},
}
require.True(t, iterator.Finished())
}
func TestIterNotFinished(t *testing.T) {
iterator := IterativeDownloader{
from: &DBHeader{Number: big.NewInt(2)},
to: &DBHeader{Number: big.NewInt(5)},
}
require.False(t, iterator.Finished())
}
func TestIterRevert(t *testing.T) {
iterator := IterativeDownloader{
from: &DBHeader{Number: big.NewInt(12)},
to: &DBHeader{Number: big.NewInt(12)},
previous: &DBHeader{Number: big.NewInt(9)},
}
require.True(t, iterator.Finished())
iterator.Revert()
require.False(t, iterator.Finished())
}
func TestIterProgress(t *testing.T) {
var (
chain headers = genHeadersChain(10, 1)
transfers = make(transfersFixture, 10)
)
for i := range transfers {
transfers[i] = Transfer{
BlockNumber: chain[i].Number,
BlockHash: chain[i].Hash(),
}
}
iter := &IterativeDownloader{
client: chain,
downloader: transfers,
batchSize: big.NewInt(5),
from: &DBHeader{Number: big.NewInt(0)},
to: &DBHeader{Number: big.NewInt(9)},
}
batch, err := iter.Next(context.TODO())
require.NoError(t, err)
require.Len(t, batch, 6)
batch, err = iter.Next(context.TODO())
require.NoError(t, err)
require.Len(t, batch, 5)
require.True(t, iter.Finished())
}
type headers []*types.Header
func (h headers) HeaderByHash(ctx context.Context, hash common.Hash) (*types.Header, error) {
for _, item := range h {
if item.Hash() == hash {
return item, nil
}
}
return nil, errors.New("not found")
}
func (h headers) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) {
for _, item := range h {
if item.Number.Cmp(number) == 0 {
return item, nil
}
}
return nil, errors.New("not found")
}
func (h headers) BalanceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (*big.Int, error) {
return nil, errors.New("not implemented")
}
func genHeadersChain(size, difficulty int) []*types.Header {
rst := make([]*types.Header, size)
for i := 0; i < size; i++ {
rst[i] = &types.Header{
Number: big.NewInt(int64(i)),
Difficulty: big.NewInt(int64(difficulty)),
Time: 1,
}
if i != 0 {
rst[i].ParentHash = rst[i-1].Hash()
}
}
return rst
}

View File

@ -0,0 +1,127 @@
package migrations
import (
"bytes"
"compress/gzip"
"fmt"
"io"
"strings"
)
func bindata_read(data []byte, name string) ([]byte, error) {
gz, err := gzip.NewReader(bytes.NewBuffer(data))
if err != nil {
return nil, fmt.Errorf("Read %q: %v", name, err)
}
var buf bytes.Buffer
_, err = io.Copy(&buf, gz)
gz.Close()
if err != nil {
return nil, fmt.Errorf("Read %q: %v", name, err)
}
return buf.Bytes(), nil
}
var __0001_transfers_down_db_sql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x72\x09\xf2\x0f\x50\x08\x71\x74\xf2\x71\x55\x28\x29\x4a\xcc\x2b\x4e\x4b\x2d\x2a\xb6\xe6\x42\x12\x4d\xca\xc9\x4f\xce\x2e\xb6\xe6\x02\x04\x00\x00\xff\xff\x27\x4d\x7a\xa1\x29\x00\x00\x00")
func _0001_transfers_down_db_sql() ([]byte, error) {
return bindata_read(
__0001_transfers_down_db_sql,
"0001_transfers.down.db.sql",
)
}
var __0001_transfers_up_db_sql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x8c\x91\xd1\x72\xaa\x30\x10\x86\xef\xf3\x14\x7b\x29\x33\x79\x83\x73\x15\x60\xd1\xcc\xc9\x49\x4e\x43\xa8\xf5\x8a\x41\x4c\xab\xa3\x06\x4a\x60\xa6\xbe\x7d\x07\x01\xad\xad\xe3\xf4\x32\xbb\xd9\xdd\xef\xff\xff\x48\x23\x33\x08\x86\x85\x02\x81\x27\x20\x95\x01\x7c\xe1\xa9\x49\xa1\x6d\x0a\xe7\x5f\x6d\xe3\x61\x46\xb6\x85\xdf\xc2\x33\xd3\xd1\x82\x69\xc8\x24\x7f\xca\x90\x92\x62\xb3\x69\xac\xf7\x97\x7a\x3f\x2b\x33\x21\x28\x59\x1f\xf6\xf9\xcd\xc8\xb5\xd5\x7e\x40\x28\x54\x48\x49\x63\x4b\xbb\xab\xdb\xf1\xd5\x9e\x6a\x7b\xe7\x77\xa2\x34\xf2\xb9\x84\xbf\xb8\x9a\x4d\x4b\x03\xd0\x98\xa0\x46\x19\x61\x0a\xeb\x43\x55\xee\xfd\x6c\xa8\x2b\x09\x31\x0a\x34\x08\x11\x4b\x23\x16\x23\x25\x91\x92\xa9\xd1\x8c\x4b\x03\x9d\xdb\xbd\x77\x36\x9f\x64\xe5\x95\x3b\xaf\xcb\x27\x19\x83\x2c\x38\xef\xa2\x63\x31\x20\xc1\x1f\x42\x1e\x98\x34\xdc\xff\xee\xd0\x7f\xcd\xff\x31\xbd\xea\xb1\x29\x71\xdd\x71\x6d\x1b\x08\xf9\xbc\xa7\x18\xaf\x5c\x25\x6e\x6d\xb1\x81\x50\x29\x01\x31\x26\x2c\x13\x06\x12\x26\x52\x24\x01\x2c\xb9\x59\xa8\xcc\x80\x56\x4b\x1e\x3f\xc6\x28\xca\xb2\xea\x5c\xeb\xf3\xb6\xca\x2f\x48\x8f\xf3\xb9\xc5\xba\xf6\xfc\xc9\x95\xc0\xa5\xf9\x69\xfe\x30\x71\xcf\xfe\xa9\xf3\xab\x00\x8e\x45\x5d\xef\xdc\x5b\xef\xff\x48\x38\x20\x4f\x44\x53\x0e\x63\x93\x7e\x39\xdd\xa7\xf1\x19\x00\x00\xff\xff\xfc\x91\xad\x60\xb2\x02\x00\x00")
func _0001_transfers_up_db_sql() ([]byte, error) {
return bindata_read(
__0001_transfers_up_db_sql,
"0001_transfers.up.db.sql",
)
}
var _doc_go = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x2c\xc9\xb1\x0d\xc4\x20\x0c\x05\xd0\x9e\x29\xfe\x02\xd8\xfd\x6d\xe3\x4b\xac\x2f\x44\x82\x09\x78\x7f\xa5\x49\xfd\xa6\x1d\xdd\xe8\xd8\xcf\x55\x8a\x2a\xe3\x47\x1f\xbe\x2c\x1d\x8c\xfa\x6f\xe3\xb4\x34\xd4\xd9\x89\xbb\x71\x59\xb6\x18\x1b\x35\x20\xa2\x9f\x0a\x03\xa2\xe5\x0d\x00\x00\xff\xff\x60\xcd\x06\xbe\x4a\x00\x00\x00")
func doc_go() ([]byte, error) {
return bindata_read(
_doc_go,
"doc.go",
)
}
// Asset loads and returns the asset for the given name.
// It returns an error if the asset could not be found or
// could not be loaded.
func Asset(name string) ([]byte, error) {
cannonicalName := strings.Replace(name, "\\", "/", -1)
if f, ok := _bindata[cannonicalName]; ok {
return f()
}
return nil, fmt.Errorf("Asset %s not found", name)
}
// AssetNames returns the names of the assets.
func AssetNames() []string {
names := make([]string, 0, len(_bindata))
for name := range _bindata {
names = append(names, name)
}
return names
}
// _bindata is a table, holding each asset generator, mapped to its name.
var _bindata = map[string]func() ([]byte, error){
"0001_transfers.down.db.sql": _0001_transfers_down_db_sql,
"0001_transfers.up.db.sql": _0001_transfers_up_db_sql,
"doc.go": doc_go,
}
// AssetDir returns the file names below a certain
// directory embedded in the file by go-bindata.
// For example if you run go-bindata on data/... and data contains the
// following hierarchy:
// data/
// foo.txt
// img/
// a.png
// b.png
// then AssetDir("data") would return []string{"foo.txt", "img"}
// AssetDir("data/img") would return []string{"a.png", "b.png"}
// AssetDir("foo.txt") and AssetDir("notexist") would return an error
// AssetDir("") will return []string{"data"}.
func AssetDir(name string) ([]string, error) {
node := _bintree
if len(name) != 0 {
cannonicalName := strings.Replace(name, "\\", "/", -1)
pathList := strings.Split(cannonicalName, "/")
for _, p := range pathList {
node = node.Children[p]
if node == nil {
return nil, fmt.Errorf("Asset %s not found", name)
}
}
}
if node.Func != nil {
return nil, fmt.Errorf("Asset %s not found", name)
}
rv := make([]string, 0, len(node.Children))
for name := range node.Children {
rv = append(rv, name)
}
return rv, nil
}
type _bintree_t struct {
Func func() ([]byte, error)
Children map[string]*_bintree_t
}
var _bintree = &_bintree_t{nil, map[string]*_bintree_t{
"0001_transfers.down.db.sql": &_bintree_t{_0001_transfers_down_db_sql, map[string]*_bintree_t{
}},
"0001_transfers.up.db.sql": &_bintree_t{_0001_transfers_up_db_sql, map[string]*_bintree_t{
}},
"doc.go": &_bintree_t{doc_go, map[string]*_bintree_t{
}},
}}

View File

@ -0,0 +1,43 @@
package migrations
import (
"database/sql"
"github.com/status-im/migrate/v4"
"github.com/status-im/migrate/v4/database/sqlcipher"
bindata "github.com/status-im/migrate/v4/source/go_bindata"
)
// Migrate applies migrations.
func Migrate(db *sql.DB) error {
resources := bindata.Resource(
AssetNames(),
func(name string) ([]byte, error) {
return Asset(name)
},
)
source, err := bindata.WithInstance(resources)
if err != nil {
return err
}
driver, err := sqlcipher.WithInstance(db, &sqlcipher.Config{})
if err != nil {
return err
}
m, err := migrate.NewWithInstance(
"go-bindata",
source,
"sqlcipher",
driver)
if err != nil {
return err
}
if err = m.Up(); err != migrate.ErrNoChange {
return err
}
return nil
}

View File

@ -0,0 +1,3 @@
DROP TABLE transfers;
DROP TABLE blocks;
DROP TABLE accounts_to_blocks;

View File

@ -0,0 +1,24 @@
CREATE TABLE IF NOT EXISTS transfers (
hash VARCHAR UNIQUE,
address VARCHAR NOT NULL,
blk_hash VARCHAR NOT NULL,
tx BLOB,
receipt BLOB,
type VARCHAR NOT NULL,
FOREIGN KEY(blk_hash) REFERENCES blocks(hash) ON DELETE CASCADE,
CONSTRAINT unique_transfer_on_hash_address UNIQUE (hash,address)
);
CREATE TABLE IF NOT EXISTS blocks (
hash VARCHAR PRIMARY KEY,
number BIGINT UNIQUE NOT NULL,
head BOOL DEFAULT FALSE
) WITHOUT ROWID;
CREATE TABLE IF NOT EXISTS accounts_to_blocks (
address VARCHAR NOT NULL,
blk_number BIGINT NOT NULL,
sync INT,
FOREIGN KEY(blk_number) REFERENCES blocks(number) ON DELETE CASCADE,
CONSTRAINT unique_mapping_on_address_block_number UNIQUE (address,blk_number)
);

View File

@ -0,0 +1,3 @@
package sql
//go:generate go-bindata -pkg migrations -o ../bindata.go ./

86
services/wallet/query.go Normal file
View File

@ -0,0 +1,86 @@
package wallet
import (
"bytes"
"database/sql"
"math/big"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
)
const baseTransfersQuery = "SELECT transfers.hash, type, blocks.hash, blocks.number, address, tx, receipt FROM transfers JOIN blocks ON blk_hash = blocks.hash"
func newTransfersQuery() *transfersQuery {
buf := bytes.NewBuffer(nil)
buf.WriteString(baseTransfersQuery)
return &transfersQuery{buf: buf}
}
type transfersQuery struct {
buf *bytes.Buffer
args []interface{}
added bool
}
func (q *transfersQuery) andOrWhere() {
if q.added {
q.buf.WriteString(" AND")
} else {
q.buf.WriteString(" WHERE")
}
}
func (q *transfersQuery) FilterStart(start *big.Int) *transfersQuery {
if start != nil {
q.andOrWhere()
q.added = true
q.buf.WriteString(" blocks.number >= ?")
q.args = append(q.args, (*SQLBigInt)(start))
}
return q
}
func (q *transfersQuery) FilterEnd(end *big.Int) *transfersQuery {
if end != nil {
q.andOrWhere()
q.added = true
q.buf.WriteString(" blocks.number <= ?")
q.args = append(q.args, (*SQLBigInt)(end))
}
return q
}
func (q *transfersQuery) FilterAddress(address common.Address) *transfersQuery {
q.andOrWhere()
q.added = true
q.buf.WriteString(" address = ?")
q.args = append(q.args, address)
return q
}
func (q *transfersQuery) String() string {
return q.buf.String()
}
func (q *transfersQuery) Args() []interface{} {
return q.args
}
func (q *transfersQuery) Scan(rows *sql.Rows) (rst []Transfer, err error) {
for rows.Next() {
transfer := Transfer{
BlockNumber: &big.Int{},
Transaction: &types.Transaction{},
Receipt: &types.Receipt{},
}
err = rows.Scan(
&transfer.ID, &transfer.Type, &transfer.BlockHash, (*SQLBigInt)(transfer.BlockNumber), &transfer.Address,
&JSONBlob{transfer.Transaction}, &JSONBlob{transfer.Receipt})
if err != nil {
return nil, err
}
rst = append(rst, transfer)
}
return rst, nil
}

114
services/wallet/reactor.go Normal file
View File

@ -0,0 +1,114 @@
package wallet
import (
"context"
"errors"
"math/big"
"sync"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/event"
"github.com/status-im/status-go/params"
)
// pow block on main chain is mined once per ~14 seconds
// but for tests we are using clique chain with immediate block signer
// hence we can use different polling periods for methods that depend on mining time.
func pollingPeriodByChain(chain *big.Int) time.Duration {
switch chain.Int64() {
case int64(params.MainNetworkID):
return 10 * time.Second
case int64(params.RopstenNetworkID):
return 2 * time.Second
default:
return 500 * time.Millisecond
}
}
var (
reorgSafetyDepth = big.NewInt(15)
erc20BatchSize = big.NewInt(100000)
)
// HeaderReader interface for reading headers using block number or hash.
type HeaderReader interface {
HeaderByHash(ctx context.Context, hash common.Hash) (*types.Header, error)
HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error)
}
// BalanceReader interface for reading balance at a specifeid address.
type BalanceReader interface {
BalanceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (*big.Int, error)
}
type reactorClient interface {
HeaderReader
BalanceReader
}
// NewReactor creates instance of the Reactor.
func NewReactor(db *Database, feed *event.Feed, client *ethclient.Client, accounts []common.Address, chain *big.Int) *Reactor {
return &Reactor{
db: db,
client: client,
feed: feed,
accounts: accounts,
chain: chain,
}
}
// Reactor listens to new blocks and stores transfers into the database.
type Reactor struct {
client *ethclient.Client
db *Database
feed *event.Feed
accounts []common.Address
chain *big.Int
mu sync.Mutex
group *Group
}
// Start runs reactor loop in background.
func (r *Reactor) Start() error {
r.mu.Lock()
defer r.mu.Unlock()
if r.group != nil {
return errors.New("already running")
}
r.group = NewGroup(context.Background())
// TODO(dshulyak) to support adding accounts in runtime implement keyed group
// and export private api to start downloaders from accounts
// private api should have access only to reactor
ctl := &controlCommand{
db: r.db,
chain: r.chain,
client: r.client,
accounts: r.accounts,
eth: &ETHTransferDownloader{
client: r.client,
accounts: r.accounts,
signer: types.NewEIP155Signer(r.chain),
},
erc20: NewERC20TransfersDownloader(r.client, r.accounts),
feed: r.feed,
safetyDepth: reorgSafetyDepth,
}
r.group.Add(ctl.Command())
return nil
}
// Stop stops reactor loop and waits till it exits.
func (r *Reactor) Stop() {
r.mu.Lock()
defer r.mu.Unlock()
if r.group == nil {
return
}
r.group.Stop()
r.group.Wait()
r.group = nil
}

View File

@ -0,0 +1,88 @@
package wallet
import (
"math/big"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/rpc"
)
// NewService initializes service instance.
func NewService() *Service {
feed := &event.Feed{}
return &Service{
feed: feed,
signals: &SignalsTransmitter{publisher: feed},
}
}
// Service is a wallet service.
type Service struct {
feed *event.Feed
db *Database
reactor *Reactor
signals *SignalsTransmitter
}
// Start signals transmitter.
func (s *Service) Start(*p2p.Server) error {
return s.signals.Start()
}
// StartReactor separately because it requires known ethereum address, which will become available only after login.
func (s *Service) StartReactor(dbpath, password string, client *ethclient.Client, accounts []common.Address, chain *big.Int) error {
db, err := InitializeDB(dbpath, password)
if err != nil {
return err
}
reactor := NewReactor(db, s.feed, client, accounts, chain)
err = reactor.Start()
if err != nil {
return err
}
s.db = db
s.reactor = reactor
return nil
}
// StopReactor stops reactor and closes database.
func (s *Service) StopReactor() error {
if s.reactor == nil {
return nil
}
s.reactor.Stop()
if s.db == nil {
return nil
}
return s.db.Close()
}
// Stop reactor, signals transmitter and close db.
func (s *Service) Stop() error {
log.Info("wallet will be stopped")
err := s.StopReactor()
s.signals.Stop()
log.Info("wallet stopped")
return err
}
// APIs returns list of available RPC APIs.
func (s *Service) APIs() []rpc.API {
return []rpc.API{
{
Namespace: "wallet",
Version: "0.1.0",
Service: NewAPI(s),
Public: true,
},
}
}
// Protocols returns list of p2p protocols.
func (s *Service) Protocols() []p2p.Protocol {
return nil
}

View File

@ -0,0 +1,64 @@
package wallet
import (
"errors"
"sync"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/status-im/status-go/signal"
)
type publisher interface {
Subscribe(interface{}) event.Subscription
}
// SignalsTransmitter transmits received events as wallet signals.
type SignalsTransmitter struct {
publisher
wg sync.WaitGroup
quit chan struct{}
}
// Start runs loop in background.
func (tmr *SignalsTransmitter) Start() error {
if tmr.quit != nil {
return errors.New("already running")
}
tmr.quit = make(chan struct{})
events := make(chan Event, 10)
sub := tmr.publisher.Subscribe(events)
tmr.wg.Add(1)
go func() {
defer tmr.wg.Done()
for {
select {
case <-tmr.quit:
sub.Unsubscribe()
return
case err := <-sub.Err():
// technically event.Feed cannot send an error to subscription.Err channel.
// the only time we will get an event is when that channel is closed.
if err != nil {
log.Error("wallet signals transmitter failed with", "error", err)
}
return
case event := <-events:
signal.SendWalletEvent(event)
}
}
}()
return nil
}
// Stop stops the loop and waits till it exits.
func (tmr *SignalsTransmitter) Stop() {
if tmr.quit == nil {
return
}
close(tmr.quit)
tmr.wg.Wait()
tmr.quit = nil
}

10
signal/events_wallet.go Normal file
View File

@ -0,0 +1,10 @@
package signal
const (
walletEvent = "wallet"
)
// SendWalletEvent sends event from services/wallet/events.
func SendWalletEvent(event interface{}) {
send(walletEvent, event)
}

View File

@ -49,7 +49,6 @@ func send(typ string, event interface{}) {
logger.Error("Marshalling signal envelope", "error", err) logger.Error("Marshalling signal envelope", "error", err)
return return
} }
// If a Go implementation of signal handler is set, let's use it. // If a Go implementation of signal handler is set, let's use it.
if mobileSignalHandler != nil { if mobileSignalHandler != nil {
mobileSignalHandler(data) mobileSignalHandler(data)

46
sqlite/sqlite.go Normal file
View File

@ -0,0 +1,46 @@
package sqlite
import (
"database/sql"
"errors"
"fmt"
_ "github.com/mutecomm/go-sqlcipher" // We require go sqlcipher that overrides default implementation
)
func openDB(path, key string) (*sql.DB, error) {
db, err := sql.Open("sqlite3", path)
if err != nil {
return nil, err
}
// Disable concurrent access as not supported by the driver
db.SetMaxOpenConns(1)
if _, err = db.Exec("PRAGMA foreign_keys=ON"); err != nil {
return nil, err
}
keyString := fmt.Sprintf("PRAGMA key = '%s'", key)
if _, err = db.Exec(keyString); err != nil {
return nil, errors.New("failed to set key pragma")
}
// readers do not block writers and faster i/o operations
// https://www.sqlite.org/draft/wal.html
// must be set after db is encrypted
var mode string
err = db.QueryRow("PRAGMA journal_mode=WAL").Scan(&mode)
if err != nil {
return nil, err
}
if mode != "wal" {
return nil, fmt.Errorf("unable to set journal_mode to WAL. actual mode %s", mode)
}
return db, nil
}
// OpenDB opens not-encrypted database.
func OpenDB(path, key string) (*sql.DB, error) {
return openDB(path, key)
}

View File

@ -3,91 +3,20 @@ package devtests
import ( import (
"crypto/ecdsa" "crypto/ecdsa"
"io/ioutil" "io/ioutil"
"math/big"
"os" "os"
"github.com/ethereum/go-ethereum/accounts/keystore"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/eth"
"github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
"github.com/status-im/status-go/api" "github.com/status-im/status-go/api"
"github.com/status-im/status-go/params" "github.com/status-im/status-go/params"
statusrpc "github.com/status-im/status-go/rpc" statusrpc "github.com/status-im/status-go/rpc"
"github.com/status-im/status-go/t/devtests/miner"
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
) )
// NewDevNode returns node with clieque engine and prefunded accounts.
func NewDevNode(faucet common.Address) (*node.Node, error) {
cfg := node.DefaultConfig
ipc, err := ioutil.TempFile("", "devnode-ipc-")
if err != nil {
return nil, err
}
cfg.IPCPath = ipc.Name()
cfg.HTTPModules = []string{"eth"}
cfg.DataDir = ""
cfg.NoUSB = true
cfg.P2P.MaxPeers = 0
cfg.P2P.ListenAddr = ":0"
cfg.P2P.NoDiscovery = true
cfg.P2P.DiscoveryV5 = false
stack, err := node.New(&cfg)
if err != nil {
return nil, err
}
ks := stack.AccountManager().Backends(keystore.KeyStoreType)[0].(*keystore.KeyStore)
// ensure that etherbase is added to an account manager
etherbase, err := crypto.GenerateKey()
if err != nil {
return nil, err
}
acc, err := ks.ImportECDSA(etherbase, "")
if err != nil {
return nil, err
}
err = ks.Unlock(acc, "")
if err != nil {
return nil, err
}
ethcfg := eth.DefaultConfig
ethcfg.NetworkId = 1337
// 0 - mine only if transaction pending
ethcfg.Genesis = core.DeveloperGenesisBlock(0, faucet)
extra := make([]byte, 32) // extraVanity
extra = append(extra, acc.Address[:]...)
extra = append(extra, make([]byte, 65)...) // extraSeal
ethcfg.Genesis.ExtraData = extra
ethcfg.MinerGasPrice = big.NewInt(1)
ethcfg.Etherbase = acc.Address
return stack, stack.Register(func(ctx *node.ServiceContext) (node.Service, error) {
return eth.New(ctx, &ethcfg)
})
}
// StartWithMiner starts node with eth service and a miner.
func StartWithMiner(stack *node.Node) error {
err := stack.Start()
if err != nil {
return err
}
var ethereum *eth.Ethereum
err = stack.Service(&ethereum)
if err != nil {
return err
}
ethereum.TxPool().SetGasPrice(big.NewInt(1))
return ethereum.StartMining(0)
}
// DevNodeSuite provides convenient wrapper for starting node with clique backend for mining. // DevNodeSuite provides convenient wrapper for starting node with clique backend for mining.
type DevNodeSuite struct { type DevNodeSuite struct {
suite.Suite suite.Suite
@ -109,9 +38,9 @@ func (s *DevNodeSuite) SetupTest() {
s.Require().NoError(err) s.Require().NoError(err)
s.DevAccount = account s.DevAccount = account
s.DevAccountAddress = crypto.PubkeyToAddress(account.PublicKey) s.DevAccountAddress = crypto.PubkeyToAddress(account.PublicKey)
s.miner, err = NewDevNode(s.DevAccountAddress) s.miner, err = miner.NewDevNode(s.DevAccountAddress)
s.Require().NoError(err) s.Require().NoError(err)
s.Require().NoError(StartWithMiner(s.miner)) s.Require().NoError(miner.StartWithMiner(s.miner))
s.dir, err = ioutil.TempDir("", "devtests-") s.dir, err = ioutil.TempDir("", "devtests-")
s.Require().NoError(err) s.Require().NoError(err)
@ -123,10 +52,10 @@ func (s *DevNodeSuite) SetupTest() {
config.WhisperConfig.Enabled = false config.WhisperConfig.Enabled = false
config.LightEthConfig.Enabled = false config.LightEthConfig.Enabled = false
config.UpstreamConfig.Enabled = true config.UpstreamConfig.Enabled = true
config.WalletConfig.Enabled = true
config.UpstreamConfig.URL = s.miner.IPCEndpoint() config.UpstreamConfig.URL = s.miner.IPCEndpoint()
s.backend = api.NewStatusBackend() s.backend = api.NewStatusBackend()
s.Require().NoError(s.backend.StartNode(config)) s.Require().NoError(s.backend.StartNode(config))
s.Remote, err = s.miner.Attach() s.Remote, err = s.miner.Attach()
s.Require().NoError(err) s.Require().NoError(err)
s.Eth = ethclient.NewClient(s.Remote) s.Eth = ethclient.NewClient(s.Remote)

81
t/devtests/miner/node.go Normal file
View File

@ -0,0 +1,81 @@
package miner
import (
"io/ioutil"
"math/big"
"github.com/ethereum/go-ethereum/accounts/keystore"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/eth"
"github.com/ethereum/go-ethereum/node"
)
// NewDevNode returns node with clieque engine and prefunded accounts.
func NewDevNode(faucet common.Address) (*node.Node, error) {
cfg := node.DefaultConfig
ipc, err := ioutil.TempFile("", "devnode-ipc-")
if err != nil {
return nil, err
}
cfg.IPCPath = ipc.Name()
cfg.HTTPModules = []string{"eth"}
cfg.DataDir = ""
cfg.NoUSB = true
cfg.P2P.MaxPeers = 0
cfg.P2P.ListenAddr = ":0"
cfg.P2P.NoDiscovery = true
cfg.P2P.DiscoveryV5 = false
stack, err := node.New(&cfg)
if err != nil {
return nil, err
}
ks := stack.AccountManager().Backends(keystore.KeyStoreType)[0].(*keystore.KeyStore)
// ensure that etherbase is added to an account manager
etherbase, err := crypto.GenerateKey()
if err != nil {
return nil, err
}
acc, err := ks.ImportECDSA(etherbase, "")
if err != nil {
return nil, err
}
err = ks.Unlock(acc, "")
if err != nil {
return nil, err
}
ethcfg := eth.DefaultConfig
ethcfg.NetworkId = 1337
// 0 - mine only if transaction pending
ethcfg.Genesis = core.DeveloperGenesisBlock(0, faucet)
extra := make([]byte, 32) // extraVanity
extra = append(extra, acc.Address[:]...)
extra = append(extra, make([]byte, 65)...) // extraSeal
ethcfg.Genesis.ExtraData = extra
ethcfg.MinerGasPrice = big.NewInt(1)
ethcfg.Etherbase = acc.Address
return stack, stack.Register(func(ctx *node.ServiceContext) (node.Service, error) {
return eth.New(ctx, &ethcfg)
})
}
// StartWithMiner starts node with eth service and a miner.
func StartWithMiner(stack *node.Node) error {
err := stack.Start()
if err != nil {
return err
}
var ethereum *eth.Ethereum
err = stack.Service(&ethereum)
if err != nil {
return err
}
ethereum.TxPool().SetGasPrice(big.NewInt(1))
return ethereum.StartMining(0)
}

View File

@ -0,0 +1,80 @@
package testchain
import (
"crypto/ecdsa"
"math/big"
"github.com/ethereum/go-ethereum/consensus/ethash"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/eth"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/params"
)
type Backend struct {
Node *node.Node
Client *ethclient.Client
genesis *core.Genesis
Ethereum *eth.Ethereum
Faucet *ecdsa.PrivateKey
Signer types.Signer
}
func NewBackend() (*Backend, error) {
faucet, err := crypto.GenerateKey()
if err != nil {
return nil, err
}
config := params.AllEthashProtocolChanges
genesis := &core.Genesis{
Config: config,
Alloc: core.GenesisAlloc{crypto.PubkeyToAddress(faucet.PublicKey): {Balance: big.NewInt(1e18)}},
ExtraData: []byte("test genesis"),
Timestamp: 9000,
}
var ethservice *eth.Ethereum
n, err := node.New(&node.Config{})
if err != nil {
return nil, err
}
err = n.Register(func(ctx *node.ServiceContext) (node.Service, error) {
config := &eth.Config{Genesis: genesis}
config.Ethash.PowMode = ethash.ModeFake
ethservice, err = eth.New(ctx, config)
return ethservice, err
})
if err != nil {
return nil, err
}
if err := n.Start(); err != nil {
return nil, err
}
client, err := n.Attach()
if err != nil {
return nil, err
}
return &Backend{
Node: n,
Client: ethclient.NewClient(client),
Ethereum: ethservice,
Faucet: faucet,
Signer: types.NewEIP155Signer(config.ChainID),
genesis: genesis,
}, nil
}
// GenerateBlocks generate n blocks starting from genesis.
func (b *Backend) GenerateBlocks(n int, start uint64, gen func(int, *core.BlockGen)) []*types.Block {
block := b.Ethereum.BlockChain().GetBlockByNumber(start)
engine := ethash.NewFaker()
blocks, _ := core.GenerateChain(b.genesis.Config, block, engine, b.Ethereum.ChainDb(), n, gen)
return blocks
}
func (b *Backend) Stop() error {
return b.Node.Stop()
}

114
t/devtests/tranfers_test.go Normal file
View File

@ -0,0 +1,114 @@
package devtests
import (
"context"
"fmt"
"math/big"
"testing"
"time"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
"github.com/status-im/status-go/account"
"github.com/status-im/status-go/services/wallet"
"github.com/status-im/status-go/t/utils"
"github.com/stretchr/testify/suite"
)
func TestTransfersSuite(t *testing.T) {
suite.Run(t, new(TransfersSuite))
}
type TransfersSuite struct {
DevNodeSuite
Password string
Info account.Info
Address common.Address
}
func (s *TransfersSuite) SelectAccount() {
s.Require().NoError(s.backend.SelectAccount(s.Info.WalletAddress, s.Info.ChatAddress, s.Password))
_, err := s.backend.AccountManager().SelectedWalletAccount()
s.Require().NoError(err)
}
func (s *TransfersSuite) SetupTest() {
s.DevNodeSuite.SetupTest()
s.Password = "test"
info, _, err := s.backend.AccountManager().CreateAccount(s.Password)
s.Require().NoError(err)
s.Info = info
s.Address = common.HexToAddress(info.WalletAddress)
}
func (s *TransfersSuite) TearDownTest() {
s.Require().NoError(s.backend.Logout())
s.DevNodeSuite.TearDownTest()
}
func (s *TransfersSuite) getAllTranfers() (rst []wallet.Transfer, err error) {
return rst, s.Local.Call(&rst, "wallet_getTransfersByAddress", s.Address, (*hexutil.Big)(big.NewInt(0)))
}
func (s *TransfersSuite) sendTx(nonce uint64, to common.Address) {
tx := types.NewTransaction(nonce, to, big.NewInt(1e18), 1e6, big.NewInt(10), nil)
// TODO move signer to DevNodeSuite
tx, err := types.SignTx(tx, types.NewEIP155Signer(big.NewInt(1337)), s.DevAccount)
s.Require().NoError(err)
s.Require().NoError(s.Eth.SendTransaction(context.Background(), tx))
timeout, cancel := context.WithTimeout(context.Background(), 5*time.Second)
_, err = bind.WaitMined(timeout, s.Eth, tx)
cancel()
s.Require().NoError(err)
}
func (s *TransfersSuite) TestNewTransfers() {
s.SelectAccount()
s.sendTx(0, s.Address)
s.Require().NoError(utils.Eventually(func() error {
all, err := s.getAllTranfers()
if err != nil {
return err
}
if len(all) != 1 {
return fmt.Errorf("waiting for one transfer")
}
return nil
}, 20*time.Second, 1*time.Second))
go func() {
for i := 1; i < 10; i++ {
s.sendTx(uint64(i), s.Address)
}
}()
s.Require().NoError(utils.Eventually(func() error {
all, err := s.getAllTranfers()
if err != nil {
return err
}
if len(all) != 10 {
return fmt.Errorf("waiting for 10 transfers")
}
return nil
}, 30*time.Second, 1*time.Second))
}
func (s *TransfersSuite) TestHistoricalTransfers() {
for i := 0; i < 30; i++ {
s.sendTx(uint64(i), s.Address)
}
s.SelectAccount()
s.Require().NoError(utils.Eventually(func() error {
all, err := s.getAllTranfers()
if err != nil {
return err
}
if len(all) >= 30 {
return fmt.Errorf("waiting for atleast 30 transfers")
}
return nil
}, 30*time.Second, 1*time.Second))
}