feat(wallet): refactor transfers to allow multiple transfer fetching (#3432)

strategies.

On mobile devices we might not want to load all transfers history
to avoid high internet traffic consumption or to save some space.
On desktop we will load all transfers in background.

- Moved current fetching transfers implementation to
OnDemandFetchStrategy
- Implemented QueuedAtomicGroup to cache outgoing commands above a
threshold
- Refactored some type names, methods for better clarity:
  - LastKnownBlock name was misused
  - Block renamed to BlockDAO to clarify what it does and avoid
confusion with geth Block type
- Cleanup - removed unused parameters and types' members
- Changed tests to use in memory DB for speedup
- Added more logging and comments

Updates #10246
This commit is contained in:
IvanBelyakoff 2023-05-08 08:02:00 +02:00 committed by GitHub
parent edf8869ff2
commit cb9f2dd7d2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 650 additions and 331 deletions

View File

@ -108,7 +108,7 @@ func TestTransactionNotification(t *testing.T) {
},
}
nonce := int64(0)
lastBlock := &transfer.LastKnownBlock{
lastBlock := &transfer.Block{
Number: big.NewInt(1),
Balance: big.NewInt(0),
Nonce: &nonce,

View File

@ -97,11 +97,13 @@ func (api *API) GetTransfersByAddressAndChainID(ctx context.Context, chainID uin
return api.s.transferController.GetTransfersByAddress(ctx, chainID, address, hexBigToBN(toBlock), limit.ToInt().Int64(), fetchMore)
}
func (api *API) GetCachedBalances(ctx context.Context, addresses []common.Address) ([]transfer.LastKnownBlockView, error) {
// Deprecated: GetCachedBalances is deprecated. Use GetTokensBalances instead
func (api *API) GetCachedBalances(ctx context.Context, addresses []common.Address) ([]transfer.BlockView, error) {
return api.s.transferController.GetCachedBalances(ctx, api.s.rpcClient.UpstreamChainID, addresses)
}
func (api *API) GetCachedBalancesbyChainID(ctx context.Context, chainID uint64, addresses []common.Address) ([]transfer.LastKnownBlockView, error) {
// Deprecated: GetCachedBalances is deprecated. Use GetTokensBalancesForChainIDs instead
func (api *API) GetCachedBalancesbyChainID(ctx context.Context, chainID uint64, addresses []common.Address) ([]transfer.BlockView, error) {
return api.s.transferController.GetCachedBalances(ctx, chainID, addresses)
}

View File

@ -8,6 +8,10 @@ import (
type Command func(context.Context) error
type Commander interface {
Command() Command
}
// FiniteCommand terminates when error is nil.
type FiniteCommand struct {
Interval time.Duration
@ -93,13 +97,16 @@ func (g *Group) WaitAsync() <-chan struct{} {
func NewAtomicGroup(parent context.Context) *AtomicGroup {
ctx, cancel := context.WithCancel(parent)
return &AtomicGroup{ctx: ctx, cancel: cancel}
ag := &AtomicGroup{ctx: ctx, cancel: cancel}
ag.done = ag.onFinish
return ag
}
// AtomicGroup terminates as soon as first goroutine terminates..
type AtomicGroup struct {
ctx context.Context
cancel func()
done func()
wg sync.WaitGroup
mu sync.Mutex
@ -110,7 +117,7 @@ type AtomicGroup struct {
func (d *AtomicGroup) Add(cmd Command) {
d.wg.Add(1)
go func() {
defer d.wg.Done()
defer d.done()
err := cmd(d.ctx)
d.mu.Lock()
defer d.mu.Unlock()
@ -155,3 +162,60 @@ func (d *AtomicGroup) Error() error {
func (d *AtomicGroup) Stop() {
d.cancel()
}
func (d *AtomicGroup) onFinish() {
d.wg.Done()
}
func NewQueuedAtomicGroup(parent context.Context, limit uint32) *QueuedAtomicGroup {
qag := &QueuedAtomicGroup{NewAtomicGroup(parent), limit, 0, []Command{}, sync.Mutex{}}
baseDoneFunc := qag.done // save original done function
qag.AtomicGroup.done = func() {
baseDoneFunc()
qag.onFinish()
}
return qag
}
type QueuedAtomicGroup struct {
*AtomicGroup
limit uint32
count uint32
pendingCmds []Command
mu sync.Mutex
}
func (d *QueuedAtomicGroup) Add(cmd Command) {
d.mu.Lock()
if d.limit > 0 && d.count >= d.limit {
d.pendingCmds = append(d.pendingCmds, cmd)
d.mu.Unlock()
return
}
d.mu.Unlock()
d.run(cmd)
}
func (d *QueuedAtomicGroup) run(cmd Command) {
d.mu.Lock()
d.count++
d.mu.Unlock()
d.AtomicGroup.Add(cmd)
}
func (d *QueuedAtomicGroup) onFinish() {
d.mu.Lock()
d.count--
if d.count < d.limit && len(d.pendingCmds) > 0 {
cmd := d.pendingCmds[0]
d.pendingCmds = d.pendingCmds[1:]
d.mu.Unlock()
d.run(cmd)
return
}
d.mu.Unlock()
}

View File

@ -86,7 +86,7 @@ func NewService(
tokenManager := token.NewTokenManager(db, rpcClient, rpcClient.NetworkManager)
savedAddressesManager := &SavedAddressesManager{db: db}
transactionManager := transfer.NewTransactionManager(db, gethManager, transactor, config, accountsDB)
transferController := transfer.NewTransferController(db, rpcClient, accountFeed, walletFeed, transactionManager)
transferController := transfer.NewTransferController(db, rpcClient, accountFeed, walletFeed, transactionManager, transfer.OnDemandFetchStrategyType)
cryptoCompare := cryptocompare.NewClient()
coingecko := coingecko.NewClient()
marketManager := market.NewManager(cryptoCompare, coingecko, walletFeed)

View File

@ -1,15 +1,12 @@
package transfer
import (
"context"
"database/sql"
"math/big"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/status-im/status-go/multiaccounts/accounts"
"github.com/status-im/status-go/rpc/chain"
"github.com/status-im/status-go/services/wallet/bigint"
)
@ -18,23 +15,23 @@ type BlocksRange struct {
to *big.Int
}
type LastKnownBlock struct {
type Block struct {
Number *big.Int
Balance *big.Int
Nonce *int64
}
type LastKnownBlockView struct {
type BlockView struct {
Address common.Address `json:"address"`
Number *big.Int `json:"blockNumber"`
Balance bigint.BigInt `json:"balance"`
Nonce *int64 `json:"nonce"`
}
func blocksToViews(blocks map[common.Address]*LastKnownBlock) []LastKnownBlockView {
blocksViews := []LastKnownBlockView{}
func blocksToViews(blocks map[common.Address]*Block) []BlockView {
blocksViews := []BlockView{}
for address, block := range blocks {
view := LastKnownBlockView{
view := BlockView{
Address: address,
Number: block.Number,
Balance: bigint.BigInt{Int: block.Balance},
@ -46,12 +43,12 @@ func blocksToViews(blocks map[common.Address]*LastKnownBlock) []LastKnownBlockVi
return blocksViews
}
type Block struct {
type BlockDAO struct {
db *sql.DB
}
// MergeBlocksRanges merge old blocks ranges if possible
func (b *Block) mergeBlocksRanges(chainIDs []uint64, accounts []common.Address) error {
func (b *BlockDAO) mergeBlocksRanges(chainIDs []uint64, accounts []common.Address) error {
for _, chainID := range chainIDs {
for _, account := range accounts {
err := b.mergeRanges(chainID, account)
@ -63,7 +60,7 @@ func (b *Block) mergeBlocksRanges(chainIDs []uint64, accounts []common.Address)
return nil
}
func (b *Block) setInitialBlocksRange(chainClient *chain.ClientWithFallback) error {
func (b *BlockDAO) setInitialBlocksRange(chainID uint64, from *big.Int, to *big.Int) error {
accountsDB, err := accounts.NewDB(b.db)
if err != nil {
return err
@ -73,23 +70,14 @@ func (b *Block) setInitialBlocksRange(chainClient *chain.ClientWithFallback) err
return err
}
from := big.NewInt(0)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
header, err := chainClient.HeaderByNumber(ctx, nil)
if err != nil {
return err
}
err = b.insertRange(chainClient.ChainID, common.Address(watchAddress), from, header.Number, big.NewInt(0), 0)
err = b.insertRange(chainID, common.Address(watchAddress), from, to, big.NewInt(0), 0)
if err != nil {
return err
}
return nil
}
func (b *Block) mergeRanges(chainID uint64, account common.Address) (err error) {
func (b *BlockDAO) mergeRanges(chainID uint64, account common.Address) (err error) {
var (
tx *sql.Tx
)
@ -137,7 +125,7 @@ func (b *Block) mergeRanges(chainID uint64, account common.Address) (err error)
return nil
}
func (b *Block) insertRange(chainID uint64, account common.Address, from, to, balance *big.Int, nonce uint64) error {
func (b *BlockDAO) insertRange(chainID uint64, account common.Address, from, to, balance *big.Int, nonce uint64) error {
log.Debug("insert blocks range", "account", account, "network id", chainID, "from", from, "to", to, "balance", balance, "nonce", nonce)
insert, err := b.db.Prepare("INSERT INTO blocks_ranges (network_id, address, blk_from, blk_to, balance, nonce) VALUES (?, ?, ?, ?, ?, ?)")
if err != nil {
@ -147,7 +135,7 @@ func (b *Block) insertRange(chainID uint64, account common.Address, from, to, ba
return err
}
func (b *Block) getOldRanges(chainID uint64, account common.Address) ([]*BlocksRange, error) {
func (b *BlockDAO) getOldRanges(chainID uint64, account common.Address) ([]*BlocksRange, error) {
query := `select blk_from, blk_to from blocks_ranges
where address = ?
and network_id = ?
@ -177,7 +165,7 @@ func (b *Block) getOldRanges(chainID uint64, account common.Address) ([]*BlocksR
}
// GetBlocksByAddress loads blocks for a given address.
func (b *Block) GetBlocksByAddress(chainID uint64, address common.Address, limit int) (rst []*big.Int, err error) {
func (b *BlockDAO) GetBlocksByAddress(chainID uint64, address common.Address, limit int) (rst []*big.Int, err error) {
query := `SELECT blk_number FROM blocks
WHERE address = ? AND network_id = ? AND loaded = 0
ORDER BY blk_number DESC
@ -198,7 +186,7 @@ func (b *Block) GetBlocksByAddress(chainID uint64, address common.Address, limit
return rst, nil
}
func (b *Block) RemoveBlockWithTransfer(chainID uint64, address common.Address, block *big.Int) error {
func (b *BlockDAO) RemoveBlockWithTransfer(chainID uint64, address common.Address, block *big.Int) error {
query := `DELETE FROM blocks
WHERE address = ?
AND blk_number = ?
@ -213,7 +201,7 @@ func (b *Block) RemoveBlockWithTransfer(chainID uint64, address common.Address,
return nil
}
func (b *Block) GetLastBlockByAddress(chainID uint64, address common.Address, limit int) (rst *big.Int, err error) {
func (b *BlockDAO) GetLastBlockByAddress(chainID uint64, address common.Address, limit int) (rst *big.Int, err error) {
query := `SELECT * FROM
(SELECT blk_number FROM blocks WHERE address = ? AND network_id = ? ORDER BY blk_number DESC LIMIT ?)
ORDER BY blk_number LIMIT 1`
@ -236,7 +224,8 @@ func (b *Block) GetLastBlockByAddress(chainID uint64, address common.Address, li
return nil, nil
}
func (b *Block) GetLastSavedBlock(chainID uint64) (rst *DBHeader, err error) {
// TODO remove as not used
func (b *BlockDAO) GetLastSavedBlock(chainID uint64) (rst *DBHeader, err error) {
query := `SELECT blk_number, blk_hash
FROM blocks
WHERE network_id = ?
@ -260,7 +249,8 @@ func (b *Block) GetLastSavedBlock(chainID uint64) (rst *DBHeader, err error) {
return nil, nil
}
func (b *Block) GetBlocks(chainID uint64) (rst []*DBHeader, err error) {
// TODO remove as not used
func (b *BlockDAO) GetBlocks(chainID uint64) (rst []*DBHeader, err error) {
query := `SELECT blk_number, blk_hash, address FROM blocks`
rows, err := b.db.Query(query, chainID)
if err != nil {
@ -282,7 +272,8 @@ func (b *Block) GetBlocks(chainID uint64) (rst []*DBHeader, err error) {
return rst, nil
}
func (b *Block) GetLastSavedBlockBefore(chainID uint64, block *big.Int) (rst *DBHeader, err error) {
// TODO remove as not used
func (b *BlockDAO) GetLastSavedBlockBefore(chainID uint64, block *big.Int) (rst *DBHeader, err error) {
query := `SELECT blk_number, blk_hash
FROM blocks
WHERE network_id = ? AND blk_number < ?
@ -306,7 +297,7 @@ func (b *Block) GetLastSavedBlockBefore(chainID uint64, block *big.Int) (rst *DB
return nil, nil
}
func (b *Block) GetFirstKnownBlock(chainID uint64, address common.Address) (rst *big.Int, err error) {
func (b *BlockDAO) GetFirstKnownBlock(chainID uint64, address common.Address) (rst *big.Int, err error) {
query := `SELECT blk_from FROM blocks_ranges
WHERE address = ?
AND network_id = ?
@ -332,7 +323,7 @@ func (b *Block) GetFirstKnownBlock(chainID uint64, address common.Address) (rst
return nil, nil
}
func (b *Block) GetLastKnownBlockByAddress(chainID uint64, address common.Address) (block *LastKnownBlock, err error) {
func (b *BlockDAO) GetLastKnownBlockByAddress(chainID uint64, address common.Address) (block *Block, err error) {
query := `SELECT blk_to, balance, nonce FROM blocks_ranges
WHERE address = ?
AND network_id = ?
@ -347,7 +338,7 @@ func (b *Block) GetLastKnownBlockByAddress(chainID uint64, address common.Addres
if rows.Next() {
var nonce sql.NullInt64
block = &LastKnownBlock{Number: &big.Int{}, Balance: &big.Int{}}
block = &Block{Number: &big.Int{}, Balance: &big.Int{}}
err = rows.Scan((*bigint.SQLBigInt)(block.Number), (*bigint.SQLBigIntBytes)(block.Balance), &nonce)
if err != nil {
return nil, err
@ -362,8 +353,8 @@ func (b *Block) GetLastKnownBlockByAddress(chainID uint64, address common.Addres
return nil, nil
}
func (b *Block) getLastKnownBalances(chainID uint64, addresses []common.Address) (map[common.Address]*LastKnownBlock, error) {
result := map[common.Address]*LastKnownBlock{}
func (b *BlockDAO) getLastKnownBlocks(chainID uint64, addresses []common.Address) (map[common.Address]*Block, error) {
result := map[common.Address]*Block{}
for _, address := range addresses {
block, error := b.GetLastKnownBlockByAddress(chainID, address)
if error != nil {
@ -378,8 +369,9 @@ func (b *Block) getLastKnownBalances(chainID uint64, addresses []common.Address)
return result, nil
}
func (b *Block) GetLastKnownBlockByAddresses(chainID uint64, addresses []common.Address) (map[common.Address]*LastKnownBlock, []common.Address, error) {
res := map[common.Address]*LastKnownBlock{}
// TODO Remove the method below, it is used in one place and duplicates getLastKnownBlocks method with slight unneeded change
func (b *BlockDAO) GetLastKnownBlockByAddresses(chainID uint64, addresses []common.Address) (map[common.Address]*Block, []common.Address, error) {
res := map[common.Address]*Block{}
accountsWithoutHistory := []common.Address{}
for _, address := range addresses {
block, err := b.GetLastKnownBlockByAddress(chainID, address)
@ -469,7 +461,7 @@ func insertRange(chainID uint64, creator statementCreator, account common.Addres
return err
}
func upsertRange(chainID uint64, creator statementCreator, account common.Address, from *big.Int, to *LastKnownBlock) (err error) {
func upsertRange(chainID uint64, creator statementCreator, account common.Address, from *big.Int, to *Block) (err error) {
log.Debug("upsert blocks range", "account", account, "network id", chainID, "from", from, "to", to.Number, "balance", to.Balance)
update, err := creator.Prepare(`UPDATE blocks_ranges
SET blk_to = ?, balance = ?, nonce = ?

View File

@ -1,9 +1,7 @@
package transfer
import (
"io/ioutil"
"math/big"
"os"
"testing"
"github.com/stretchr/testify/require"
@ -13,14 +11,11 @@ import (
"github.com/status-im/status-go/sqlite"
)
func setupTestTransferDB(t *testing.T) (*Block, func()) {
tmpfile, err := ioutil.TempFile("", "wallet-transfer-tests-")
func setupTestTransferDB(t *testing.T) (*BlockDAO, func()) {
db, err := appdatabase.InitializeDB(sqlite.InMemoryPath, "wallet-tests", sqlite.ReducedKDFIterationsNumber)
require.NoError(t, err)
db, err := appdatabase.InitializeDB(tmpfile.Name(), "wallet-tests", sqlite.ReducedKDFIterationsNumber)
require.NoError(t, err)
return &Block{db}, func() {
return &BlockDAO{db}, func() {
require.NoError(t, db.Close())
require.NoError(t, os.Remove(tmpfile.Name()))
}
}

View File

@ -45,7 +45,6 @@ var (
)
type ethHistoricalCommand struct {
db *Database
eth Downloader
address common.Address
chainClient *chain.ClientWithFallback
@ -55,7 +54,7 @@ type ethHistoricalCommand struct {
error error
noLimit bool
from *LastKnownBlock
from *Block
to, resultingFrom *big.Int
}
@ -67,6 +66,8 @@ func (c *ethHistoricalCommand) Command() async.Command {
}
func (c *ethHistoricalCommand) Run(ctx context.Context) (err error) {
log.Info("eth historical downloader start", "address", c.address, "from", c.from.Number, "to", c.to, "noLimit", c.noLimit)
start := time.Now()
if c.from.Number != nil && c.from.Balance != nil {
c.balanceCache.addBalanceToCache(c.address, c.from.Number, c.from.Balance)
@ -86,17 +87,10 @@ func (c *ethHistoricalCommand) Run(ctx context.Context) (err error) {
log.Info("eth historical downloader finished successfully", "address", c.address, "from", from, "to", c.to, "total blocks", len(headers), "time", time.Since(start))
//err = c.db.ProcessBlocks(c.address, from, c.to, headers, ethTransfer)
if err != nil {
log.Error("failed to save found blocks with transfers", "error", err)
return err
}
log.Debug("eth transfers were persisted. command is closed")
return nil
}
type erc20HistoricalCommand struct {
db *Database
erc20 BatchDownloader
address common.Address
chainClient *chain.ClientWithFallback
@ -132,10 +126,13 @@ func getErc20BatchSize(chainID uint64) *big.Int {
}
func (c *erc20HistoricalCommand) Run(ctx context.Context) (err error) {
log.Info("wallet historical downloader for erc20 transfers start", "address", c.address,
"from", c.from, "to", c.to)
start := time.Now()
if c.iterator == nil {
c.iterator, err = SetupIterativeDownloader(
c.db, c.chainClient, c.address,
c.chainClient, c.address,
c.erc20, getErc20BatchSize(c.chainClient.ChainID), c.to, c.from)
if err != nil {
log.Error("failed to setup historical downloader for erc20")
@ -149,15 +146,9 @@ func (c *erc20HistoricalCommand) Run(ctx context.Context) (err error) {
return err
}
c.foundHeaders = append(c.foundHeaders, headers...)
/*err = c.db.ProcessBlocks(c.address, from, to, headers, erc20Transfer)
if err != nil {
c.iterator.Revert()
log.Error("failed to save downloaded erc20 blocks with transfers", "error", err)
return err
}*/
}
log.Info("wallet historical downloader for erc20 transfers finished", "in", time.Since(start))
log.Info("wallet historical downloader for erc20 transfers finished", "address", c.address,
"from", c.from, "to", c.to, "time", time.Since(start))
return nil
}
@ -168,7 +159,7 @@ func (c *erc20HistoricalCommand) Run(ctx context.Context) (err error) {
type controlCommand struct {
accounts []common.Address
db *Database
block *Block
blockDAO *BlockDAO
eth *ETHDownloader
erc20 *ERC20TransfersDownloader
chainClient *chain.ClientWithFallback
@ -178,8 +169,8 @@ type controlCommand struct {
transactionManager *TransactionManager
}
func (c *controlCommand) LoadTransfers(ctx context.Context, downloader *ETHDownloader, limit int) (map[common.Address][]Transfer, error) {
return loadTransfers(ctx, c.accounts, c.block, c.db, c.chainClient, limit, make(map[common.Address][]*big.Int), c.transactionManager)
func (c *controlCommand) LoadTransfers(ctx context.Context, limit int) (map[common.Address][]Transfer, error) {
return loadTransfers(ctx, c.accounts, c.blockDAO, c.db, c.chainClient, limit, make(map[common.Address][]*big.Int), c.transactionManager)
}
func (c *controlCommand) Run(parent context.Context) error {
@ -194,13 +185,17 @@ func (c *controlCommand) Run(parent context.Context) error {
return err
}
c.feed.Send(walletevent.Event{
Type: EventFetchingRecentHistory,
Accounts: c.accounts,
})
if c.feed != nil {
c.feed.Send(walletevent.Event{
Type: EventFetchingRecentHistory,
Accounts: c.accounts,
})
}
log.Info("current head is", "block number", head.Number)
lastKnownEthBlocks, accountsWithoutHistory, err := c.block.GetLastKnownBlockByAddresses(c.chainClient.ChainID, c.accounts)
// Get last known block for each account
lastKnownEthBlocks, accountsWithoutHistory, err := c.blockDAO.GetLastKnownBlockByAddresses(c.chainClient.ChainID, c.accounts)
if err != nil {
log.Error("failed to load last head from database", "error", err)
if c.NewError(err) {
@ -209,6 +204,7 @@ func (c *controlCommand) Run(parent context.Context) error {
return err
}
// For accounts without history, find the block where 20 < headNonce - nonce < 25 (blocks have between 20-25 transactions)
fromMap := map[common.Address]*big.Int{}
if !c.nonArchivalRPCNode {
@ -221,17 +217,18 @@ func (c *controlCommand) Run(parent context.Context) error {
}
}
// Set "fromByAddress" from the information we have
target := head.Number
fromByAddress := map[common.Address]*LastKnownBlock{}
fromByAddress := map[common.Address]*Block{}
toByAddress := map[common.Address]*big.Int{}
for _, address := range c.accounts {
from, ok := lastKnownEthBlocks[address]
if !ok {
from = &LastKnownBlock{Number: fromMap[address]}
from = &Block{Number: fromMap[address]}
}
if c.nonArchivalRPCNode {
from = &LastKnownBlock{Number: big.NewInt(0).Sub(target, big.NewInt(100))}
from = &Block{Number: big.NewInt(0).Sub(target, big.NewInt(100))}
}
fromByAddress[address] = from
@ -264,14 +261,7 @@ func (c *controlCommand) Run(parent context.Context) error {
return cmnd.error
}
downloader := &ETHDownloader{
chainClient: c.chainClient,
accounts: c.accounts,
signer: types.NewLondonSigner(c.chainClient.ToBigInt()),
db: c.db,
}
_, err = c.LoadTransfers(parent, downloader, 40)
_, err = c.LoadTransfers(parent, 40)
if err != nil {
if c.NewError(err) {
return nil
@ -279,32 +269,34 @@ func (c *controlCommand) Run(parent context.Context) error {
return err
}
events := map[common.Address]walletevent.Event{}
for _, address := range c.accounts {
event := walletevent.Event{
Type: EventNewTransfers,
Accounts: []common.Address{address},
}
for _, header := range cmnd.foundHeaders[address] {
if event.BlockNumber == nil || header.Number.Cmp(event.BlockNumber) == 1 {
event.BlockNumber = header.Number
if c.feed != nil {
events := map[common.Address]walletevent.Event{}
for _, address := range c.accounts {
event := walletevent.Event{
Type: EventNewTransfers,
Accounts: []common.Address{address},
}
for _, header := range cmnd.foundHeaders[address] {
if event.BlockNumber == nil || header.Number.Cmp(event.BlockNumber) == 1 {
event.BlockNumber = header.Number
}
}
if event.BlockNumber != nil {
events[address] = event
}
}
if event.BlockNumber != nil {
events[address] = event
for _, event := range events {
c.feed.Send(event)
}
}
for _, event := range events {
c.feed.Send(event)
c.feed.Send(walletevent.Event{
Type: EventRecentHistoryReady,
Accounts: c.accounts,
BlockNumber: target,
})
}
c.feed.Send(walletevent.Event{
Type: EventRecentHistoryReady,
Accounts: c.accounts,
BlockNumber: target,
})
log.Info("end control command")
return err
}
@ -359,7 +351,9 @@ func (c *transfersCommand) Command() async.Command {
}
func (c *transfersCommand) Run(ctx context.Context) (err error) {
allTransfers, err := getTransfersByBlocks(ctx, c.db, c.eth, c.address, []*big.Int{c.block})
startTs := time.Now()
allTransfers, err := getTransfersByBlocks(ctx, c.db, c.eth, []*big.Int{c.block})
if err != nil {
log.Info("getTransfersByBlocks error", "error", err)
return err
@ -372,7 +366,7 @@ func (c *transfersCommand) Run(ctx context.Context) (err error) {
entry, err := c.transactionManager.GetPendingEntry(c.chainClient.ChainID, transfer.ID)
if err != nil {
if err == sql.ErrNoRows {
log.Warn("Pending transaction not found for", c.chainClient.ChainID, transfer.ID)
log.Info("Pending transaction not found for", "chainID", c.chainClient.ChainID, "transferID", transfer.ID)
} else {
return err
}
@ -390,7 +384,7 @@ func (c *transfersCommand) Run(ctx context.Context) (err error) {
}
if len(allTransfers) > 0 {
err = c.db.SaveTransfers(c.chainClient.ChainID, c.address, allTransfers, []*big.Int{c.block})
err = c.db.SaveTransfersMarkBlocksLoaded(c.chainClient.ChainID, c.address, allTransfers, []*big.Int{c.block})
if err != nil {
log.Error("SaveTransfers error", "error", err)
return err
@ -398,14 +392,14 @@ func (c *transfersCommand) Run(ctx context.Context) (err error) {
}
c.fetchedTransfers = allTransfers
log.Debug("transfers loaded", "address", c.address, "len", len(allTransfers))
log.Debug("transfers loaded", "address", c.address, "len", len(allTransfers), "in", time.Since(startTs))
return nil
}
type loadTransfersCommand struct {
accounts []common.Address
db *Database
block *Block
blockDAO *BlockDAO
chainClient *chain.ClientWithFallback
blocksByAddress map[common.Address][]*big.Int
foundTransfersByAddress map[common.Address][]Transfer
@ -419,18 +413,12 @@ func (c *loadTransfersCommand) Command() async.Command {
}.Run
}
func (c *loadTransfersCommand) LoadTransfers(ctx context.Context, downloader *ETHDownloader, limit int, blocksByAddress map[common.Address][]*big.Int, transactionManager *TransactionManager) (map[common.Address][]Transfer, error) {
return loadTransfers(ctx, c.accounts, c.block, c.db, c.chainClient, limit, blocksByAddress, c.transactionManager)
func (c *loadTransfersCommand) LoadTransfers(ctx context.Context, limit int, blocksByAddress map[common.Address][]*big.Int, transactionManager *TransactionManager) (map[common.Address][]Transfer, error) {
return loadTransfers(ctx, c.accounts, c.blockDAO, c.db, c.chainClient, limit, blocksByAddress, c.transactionManager)
}
func (c *loadTransfersCommand) Run(parent context.Context) (err error) {
downloader := &ETHDownloader{
chainClient: c.chainClient,
accounts: c.accounts,
signer: types.NewLondonSigner(c.chainClient.ToBigInt()),
db: c.db,
}
transfersByAddress, err := c.LoadTransfers(parent, downloader, 40, c.blocksByAddress, c.transactionManager)
transfersByAddress, err := c.LoadTransfers(parent, 40, c.blocksByAddress, c.transactionManager)
if err != nil {
return err
}
@ -445,7 +433,7 @@ type findAndCheckBlockRangeCommand struct {
chainClient *chain.ClientWithFallback
balanceCache *balanceCache
feed *event.Feed
fromByAddress map[common.Address]*LastKnownBlock
fromByAddress map[common.Address]*Block
toByAddress map[common.Address]*big.Int
foundHeaders map[common.Address][]*DBHeader
noLimit bool
@ -461,9 +449,11 @@ func (c *findAndCheckBlockRangeCommand) Command() async.Command {
func (c *findAndCheckBlockRangeCommand) Run(parent context.Context) (err error) {
log.Debug("start findAndCHeckBlockRangeCommand")
newFromByAddress, ethHeadersByAddress, err := c.fastIndex(parent, c.balanceCache, c.fromByAddress, c.toByAddress)
if err != nil {
c.error = err
// return err // In case c.noLimit is true, hystrix "max concurrency" may be reached and we will not be able to index ETH transfers. But if we return error, we will get stuck in inifinite loop.
return nil
}
if c.noLimit {
@ -478,7 +468,6 @@ func (c *findAndCheckBlockRangeCommand) Run(parent context.Context) (err error)
}
foundHeaders := map[common.Address][]*DBHeader{}
maxBlockNumber := big.NewInt(0)
for _, address := range c.accounts {
ethHeaders := ethHeadersByAddress[address]
erc20Headers := erc20HeadersByAddress[address]
@ -504,15 +493,9 @@ func (c *findAndCheckBlockRangeCommand) Run(parent context.Context) (err error)
foundHeaders[address] = uniqHeaders
for _, header := range allHeaders {
if header.Number.Cmp(maxBlockNumber) == 1 {
maxBlockNumber = header.Number
}
}
lastBlockNumber := c.toByAddress[address]
log.Debug("saving headers", "len", len(uniqHeaders), "lastBlockNumber", lastBlockNumber, "balance", c.balanceCache.ReadCachedBalance(address, lastBlockNumber), "nonce", c.balanceCache.ReadCachedNonce(address, lastBlockNumber))
to := &LastKnownBlock{
to := &Block{
Number: lastBlockNumber,
Balance: c.balanceCache.ReadCachedBalance(address, lastBlockNumber),
Nonce: c.balanceCache.ReadCachedNonce(address, lastBlockNumber),
@ -525,19 +508,24 @@ func (c *findAndCheckBlockRangeCommand) Run(parent context.Context) (err error)
c.foundHeaders = foundHeaders
log.Debug("end findAndCheckBlockRangeCommand")
return
}
// run fast indexing for every accont up to canonical chain head minus safety depth.
// every account will run it from last synced header.
func (c *findAndCheckBlockRangeCommand) fastIndex(ctx context.Context, bCache *balanceCache, fromByAddress map[common.Address]*LastKnownBlock, toByAddress map[common.Address]*big.Int) (map[common.Address]*big.Int, map[common.Address][]*DBHeader, error) {
func (c *findAndCheckBlockRangeCommand) fastIndex(ctx context.Context, bCache *balanceCache,
fromByAddress map[common.Address]*Block, toByAddress map[common.Address]*big.Int) (map[common.Address]*big.Int,
map[common.Address][]*DBHeader, error) {
log.Info("fast indexer started")
start := time.Now()
group := async.NewGroup(ctx)
commands := make([]*ethHistoricalCommand, len(c.accounts))
for i, address := range c.accounts {
eth := &ethHistoricalCommand{
db: c.db,
chainClient: c.chainClient,
balanceCache: bCache,
address: address,
@ -576,13 +564,14 @@ func (c *findAndCheckBlockRangeCommand) fastIndex(ctx context.Context, bCache *b
// run fast indexing for every accont up to canonical chain head minus safety depth.
// every account will run it from last synced header.
func (c *findAndCheckBlockRangeCommand) fastIndexErc20(ctx context.Context, fromByAddress map[common.Address]*big.Int, toByAddress map[common.Address]*big.Int) (map[common.Address][]*DBHeader, error) {
log.Info("fast indexer Erc20 started")
start := time.Now()
group := async.NewGroup(ctx)
commands := make([]*erc20HistoricalCommand, len(c.accounts))
for i, address := range c.accounts {
erc20 := &erc20HistoricalCommand{
db: c.db,
erc20: NewERC20TransfersDownloader(c.chainClient, []common.Address{address}, types.NewLondonSigner(c.chainClient.ToBigInt())),
chainClient: c.chainClient,
feed: c.feed,
@ -607,7 +596,12 @@ func (c *findAndCheckBlockRangeCommand) fastIndexErc20(ctx context.Context, from
}
}
func loadTransfers(ctx context.Context, accounts []common.Address, block *Block, db *Database, chainClient *chain.ClientWithFallback, limit int, blocksByAddress map[common.Address][]*big.Int, transactionManager *TransactionManager) (map[common.Address][]Transfer, error) {
func loadTransfers(ctx context.Context, accounts []common.Address, blockDAO *BlockDAO, db *Database,
chainClient *chain.ClientWithFallback, limit int, blocksByAddress map[common.Address][]*big.Int,
transactionManager *TransactionManager) (map[common.Address][]Transfer, error) {
log.Info("loadTransfers start", "accounts", accounts, "limit", limit)
start := time.Now()
group := async.NewGroup(ctx)
@ -616,7 +610,7 @@ func loadTransfers(ctx context.Context, accounts []common.Address, block *Block,
blocks, ok := blocksByAddress[address]
if !ok {
blocks, _ = block.GetBlocksByAddress(chainClient.ChainID, address, numberOfBlocksCheckedPerIteration)
blocks, _ = blockDAO.GetBlocksByAddress(chainClient.ChainID, address, numberOfBlocksCheckedPerIteration)
}
for _, block := range blocks {
transfers := &transfersCommand{
@ -636,6 +630,7 @@ func loadTransfers(ctx context.Context, accounts []common.Address, block *Block,
group.Add(transfers.Command())
}
}
select {
case <-ctx.Done():
return nil, ctx.Err()
@ -673,7 +668,10 @@ func getLowestFrom(chainID uint64, to *big.Int) *big.Int {
return from
}
// Finds the latest range up to initialTo where the number of transactions is between 20 and 25
func findFirstRange(c context.Context, account common.Address, initialTo *big.Int, client *chain.ClientWithFallback) (*big.Int, error) {
log.Info("findFirstRange", "account", account, "initialTo", initialTo, "client", client)
from := getLowestFrom(client.ChainID, initialTo)
to := initialTo
goal := uint64(20)
@ -682,7 +680,7 @@ func findFirstRange(c context.Context, account common.Address, initialTo *big.In
return to, nil
}
firstNonce, err := client.NonceAt(c, account, to)
firstNonce, err := client.NonceAt(c, account, to) // this is the latest nonce actually
log.Info("find range with 20 <= len(tx) <= 25", "account", account, "firstNonce", firstNonce, "from", from, "to", to)
if err != nil {
@ -703,7 +701,7 @@ func findFirstRange(c context.Context, account common.Address, initialTo *big.In
from = from.Add(from, to)
from = from.Div(from, big.NewInt(2))
} else {
// from = from - (from + to) / 2
// from = from - (to - from) / 2
// to = from
diff := big.NewInt(0).Sub(to, from)
diff.Div(diff, big.NewInt(2))
@ -729,6 +727,7 @@ func findFirstRange(c context.Context, account common.Address, initialTo *big.In
return from, nil
}
// Finds the latest ranges up to initialTo where the number of transactions is between 20 and 25
func findFirstRanges(c context.Context, accounts []common.Address, initialTo *big.Int, client *chain.ClientWithFallback) (map[common.Address]*big.Int, error) {
res := map[common.Address]*big.Int{}
@ -744,7 +743,7 @@ func findFirstRanges(c context.Context, accounts []common.Address, initialTo *bi
return res, nil
}
func getTransfersByBlocks(ctx context.Context, db *Database, downloader *ETHDownloader, address common.Address, blocks []*big.Int) ([]Transfer, error) {
func getTransfersByBlocks(ctx context.Context, db *Database, downloader *ETHDownloader, blocks []*big.Int) ([]Transfer, error) {
allTransfers := []Transfer{}
for _, block := range blocks {

View File

@ -14,15 +14,19 @@ import (
"github.com/status-im/status-go/services/wallet/async"
)
const (
NoThreadLimit uint32 = 0
)
// NewConcurrentDownloader creates ConcurrentDownloader instance.
func NewConcurrentDownloader(ctx context.Context) *ConcurrentDownloader {
runner := async.NewAtomicGroup(ctx)
func NewConcurrentDownloader(ctx context.Context, limit uint32) *ConcurrentDownloader {
runner := async.NewQueuedAtomicGroup(ctx, limit)
result := &Result{}
return &ConcurrentDownloader{runner, result}
}
type ConcurrentDownloader struct {
*async.AtomicGroup
*async.QueuedAtomicGroup
*Result
}
@ -86,11 +90,14 @@ type Downloader interface {
GetTransfersByNumber(context.Context, *big.Int) ([]Transfer, error)
}
func checkRanges(parent context.Context, client BalanceReader, cache BalanceCache, downloader Downloader, account common.Address, ranges [][]*big.Int) ([][]*big.Int, []*DBHeader, error) {
// Returns new block ranges that contain transfers and found block headers that contain transfers.
func checkRanges(parent context.Context, client BalanceReader, cache BalanceCache, downloader Downloader,
account common.Address, ranges [][]*big.Int) ([][]*big.Int, []*DBHeader, error) {
ctx, cancel := context.WithTimeout(parent, 30*time.Second)
defer cancel()
c := NewConcurrentDownloader(ctx)
c := NewConcurrentDownloader(ctx, NoThreadLimit)
for _, blocksRange := range ranges {
from := blocksRange[0]
@ -167,7 +174,10 @@ func checkRanges(parent context.Context, client BalanceReader, cache BalanceCach
return c.GetRanges(), c.GetHeaders(), nil
}
func findBlocksWithEthTransfers(parent context.Context, client BalanceReader, cache BalanceCache, downloader Downloader, account common.Address, low, high *big.Int, noLimit bool) (from *big.Int, headers []*DBHeader, err error) {
func findBlocksWithEthTransfers(parent context.Context, client BalanceReader, cache BalanceCache, downloader Downloader,
account common.Address, low, high *big.Int, noLimit bool) (from *big.Int, headers []*DBHeader, err error) {
log.Debug("findBlocksWithEthTranfers start", "account", account, "low", low, "high", high, "noLimit", noLimit)
ranges := [][]*big.Int{{low, high}}
minBlock := big.NewInt(low.Int64())
headers = []*DBHeader{}
@ -176,7 +186,9 @@ func findBlocksWithEthTransfers(parent context.Context, client BalanceReader, ca
log.Debug("check blocks ranges", "lvl", lvl, "ranges len", len(ranges))
lvl++
newRanges, newHeaders, err := checkRanges(parent, client, cache, downloader, account, ranges)
if err != nil {
log.Info("check ranges end", "err", err)
return nil, nil, err
}
@ -197,5 +209,6 @@ func findBlocksWithEthTransfers(parent context.Context, client BalanceReader, ca
ranges = newRanges
}
log.Debug("findBlocksWithEthTranfers end", "account", account, "minBlock", minBlock, "headers len", len(headers))
return minBlock, headers, err
}

View File

@ -16,7 +16,7 @@ import (
)
func TestConcurrentErrorInterrupts(t *testing.T) {
concurrent := NewConcurrentDownloader(context.Background())
concurrent := NewConcurrentDownloader(context.Background(), NoThreadLimit)
var interrupted bool
concurrent.Add(func(ctx context.Context) error {
select {
@ -36,7 +36,7 @@ func TestConcurrentErrorInterrupts(t *testing.T) {
}
func TestConcurrentCollectsTransfers(t *testing.T) {
concurrent := NewConcurrentDownloader(context.Background())
concurrent := NewConcurrentDownloader(context.Background(), NoThreadLimit)
concurrent.Add(func(context.Context) error {
concurrent.Push(Transfer{})
return nil
@ -126,7 +126,7 @@ func TestConcurrentEthDownloader(t *testing.T) {
t.Run(tc.desc, func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
concurrent := NewConcurrentDownloader(ctx)
concurrent := NewConcurrentDownloader(ctx, 0)
_, headers, _ := findBlocksWithEthTransfers(
ctx, tc.options.balances, newBalanceCache(), tc.options.batches,
common.Address{}, zero, tc.options.last, false)

View File

@ -4,6 +4,7 @@ import (
"context"
"database/sql"
"math/big"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
@ -13,30 +14,32 @@ import (
"github.com/status-im/status-go/rpc"
"github.com/status-im/status-go/rpc/chain"
"github.com/status-im/status-go/services/wallet/async"
"github.com/status-im/status-go/services/wallet/walletevent"
)
type Controller struct {
db *Database
rpcClient *rpc.Client
block *Block
blockDAO *BlockDAO
reactor *Reactor
accountFeed *event.Feed
TransferFeed *event.Feed
group *async.Group
balanceCache *balanceCache
transactionManager *TransactionManager
fetchStrategyType FetchStrategyType
}
func NewTransferController(db *sql.DB, rpcClient *rpc.Client, accountFeed *event.Feed, transferFeed *event.Feed, transactionManager *TransactionManager) *Controller {
block := &Block{db}
func NewTransferController(db *sql.DB, rpcClient *rpc.Client, accountFeed *event.Feed, transferFeed *event.Feed,
transactionManager *TransactionManager, fetchStrategyType FetchStrategyType) *Controller {
blockDAO := &BlockDAO{db}
return &Controller{
db: NewDB(db),
block: block,
blockDAO: blockDAO,
rpcClient: rpcClient,
accountFeed: accountFeed,
TransferFeed: transferFeed,
transactionManager: transactionManager,
fetchStrategyType: fetchStrategyType,
}
}
@ -62,8 +65,18 @@ func (c *Controller) SetInitialBlocksRange(chainIDs []uint64) error {
return err
}
for _, chainClient := range chainClients {
err := c.block.setInitialBlocksRange(chainClient)
for chainID, chainClient := range chainClients {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
toHeader, err := chainClient.HeaderByNumber(ctx, nil)
if err != nil {
return err
}
from := big.NewInt(0)
err = c.blockDAO.setInitialBlocksRange(chainID, from, toHeader.Number)
if err != nil {
return err
}
@ -83,7 +96,7 @@ func (c *Controller) CheckRecentHistory(chainIDs []uint64, accounts []common.Add
return nil
}
err := c.block.mergeBlocksRanges(chainIDs, accounts)
err := c.blockDAO.mergeBlocksRanges(chainIDs, accounts)
if err != nil {
return err
}
@ -94,32 +107,30 @@ func (c *Controller) CheckRecentHistory(chainIDs []uint64, accounts []common.Add
}
if c.reactor != nil {
err := c.reactor.restart(chainClients, accounts)
err := c.reactor.restart(chainClients, accounts, c.fetchStrategyType)
if err != nil {
return err
}
}
} else {
c.reactor = NewReactor(c.db, c.blockDAO, c.TransferFeed, c.transactionManager)
c.reactor = &Reactor{
db: c.db,
feed: c.TransferFeed,
block: c.block,
transactionManager: c.transactionManager,
}
err = c.reactor.start(chainClients, accounts)
if err != nil {
return err
}
err = c.reactor.start(chainClients, accounts, c.fetchStrategyType)
if err != nil {
return err
}
c.group.Add(func(ctx context.Context) error {
return watchAccountsChanges(ctx, c.accountFeed, c.reactor, chainClients, accounts)
})
c.group.Add(func(ctx context.Context) error {
return watchAccountsChanges(ctx, c.accountFeed, c.reactor, chainClients, accounts, c.fetchStrategyType)
})
}
return nil
}
// watchAccountsChanges subscribes to a feed and watches for changes in accounts list. If there are new or removed accounts
// reactor will be restarted.
func watchAccountsChanges(ctx context.Context, accountFeed *event.Feed, reactor *Reactor, chainClients map[uint64]*chain.ClientWithFallback, initial []common.Address) error {
func watchAccountsChanges(ctx context.Context, accountFeed *event.Feed, reactor *Reactor,
chainClients map[uint64]*chain.ClientWithFallback, initial []common.Address, fetchStrategyType FetchStrategyType) error {
accounts := make(chan []*accounts.Account, 1) // it may block if the rate of updates will be significantly higher
sub := accountFeed.Subscribe(accounts)
defer sub.Unsubscribe()
@ -150,7 +161,8 @@ func watchAccountsChanges(ctx context.Context, accountFeed *event.Feed, reactor
}
listenList := mapToList(listen)
log.Debug("list of accounts was changed from a previous version. reactor will be restarted", "new", listenList)
err := reactor.restart(chainClients, listenList)
err := reactor.restart(chainClients, listenList, fetchStrategyType)
if err != nil {
log.Error("failed to restart reactor with new accounts", "error", err)
}
@ -187,7 +199,7 @@ func (c *Controller) LoadTransferByHash(ctx context.Context, rpcClient *rpc.Clie
}
blocks := []*big.Int{transfer.BlockNumber}
err = c.db.SaveTransfers(rpcClient.UpstreamChainID, address, transfers, blocks)
err = c.db.SaveTransfersMarkBlocksLoaded(rpcClient.UpstreamChainID, address, transfers, blocks)
if err != nil {
return err
}
@ -195,98 +207,20 @@ func (c *Controller) LoadTransferByHash(ctx context.Context, rpcClient *rpc.Clie
return nil
}
func (c *Controller) GetTransfersByAddress(ctx context.Context, chainID uint64, address common.Address, toBlock *big.Int, limit int64, fetchMore bool) ([]View, error) {
log.Debug("[WalletAPI:: GetTransfersByAddress] get transfers for an address", "address", address)
func (c *Controller) GetTransfersByAddress(ctx context.Context, chainID uint64, address common.Address, toBlock *big.Int,
limit int64, fetchMore bool) ([]View, error) {
rst, err := c.db.GetTransfersByAddress(chainID, address, toBlock, limit)
rst, err := c.reactor.getTransfersByAddress(ctx, chainID, address, toBlock, limit, fetchMore)
if err != nil {
log.Error("[WalletAPI:: GetTransfersByAddress] can't fetch transfers", "err", err)
return nil, err
}
transfersCount := int64(len(rst))
chainClient, err := c.rpcClient.EthClient(chainID)
if err != nil {
return nil, err
}
if fetchMore && limit > transfersCount {
block, err := c.block.GetFirstKnownBlock(chainID, address)
if err != nil {
return nil, err
}
// if zero block was already checked there is nothing to find more
if block == nil || big.NewInt(0).Cmp(block) == 0 {
return castToTransferViews(rst), nil
}
from, err := findFirstRange(ctx, address, block, chainClient)
if err != nil {
if nonArchivalNodeError(err) {
c.TransferFeed.Send(walletevent.Event{
Type: EventNonArchivalNodeDetected,
})
from = big.NewInt(0).Sub(block, big.NewInt(100))
} else {
log.Error("first range error", "error", err)
return nil, err
}
}
fromByAddress := map[common.Address]*LastKnownBlock{address: {
Number: from,
}}
toByAddress := map[common.Address]*big.Int{address: block}
if c.balanceCache == nil {
c.balanceCache = newBalanceCache()
}
blocksCommand := &findAndCheckBlockRangeCommand{
accounts: []common.Address{address},
db: c.db,
chainClient: chainClient,
balanceCache: c.balanceCache,
feed: c.TransferFeed,
fromByAddress: fromByAddress,
toByAddress: toByAddress,
}
if err = blocksCommand.Command()(ctx); err != nil {
return nil, err
}
blocks, err := c.block.GetBlocksByAddress(chainID, address, numberOfBlocksCheckedPerIteration)
if err != nil {
return nil, err
}
log.Info("checking blocks again", "blocks", len(blocks))
if len(blocks) > 0 {
txCommand := &loadTransfersCommand{
accounts: []common.Address{address},
db: c.db,
block: c.block,
chainClient: chainClient,
transactionManager: c.transactionManager,
}
err = txCommand.Command()(ctx)
if err != nil {
return nil, err
}
rst, err = c.db.GetTransfersByAddress(chainID, address, toBlock, limit)
if err != nil {
return nil, err
}
}
}
return castToTransferViews(rst), nil
}
func (c *Controller) GetCachedBalances(ctx context.Context, chainID uint64, addresses []common.Address) ([]LastKnownBlockView, error) {
result, error := c.block.getLastKnownBalances(chainID, addresses)
func (c *Controller) GetCachedBalances(ctx context.Context, chainID uint64, addresses []common.Address) ([]BlockView, error) {
result, error := c.blockDAO.getLastKnownBlocks(chainID, addresses)
if error != nil {
return nil, error
}

View File

@ -83,7 +83,7 @@ func (db *Database) Close() error {
return db.client.Close()
}
func (db *Database) ProcessBlocks(chainID uint64, account common.Address, from *big.Int, to *LastKnownBlock, headers []*DBHeader) (err error) {
func (db *Database) ProcessBlocks(chainID uint64, account common.Address, from *big.Int, to *Block, headers []*DBHeader) (err error) {
var (
tx *sql.Tx
)
@ -113,6 +113,7 @@ func (db *Database) ProcessBlocks(chainID uint64, account common.Address, from *
return
}
// TODO remove as not used
func (db *Database) SaveBlocks(chainID uint64, account common.Address, headers []*DBHeader) (err error) {
var (
tx *sql.Tx
@ -166,8 +167,35 @@ func (db *Database) ProcessTransfers(chainID uint64, transfers []Transfer, remov
return
}
// SaveTransfersMarkBlocksLoaded
func (db *Database) SaveTransfersMarkBlocksLoaded(chainID uint64, address common.Address, transfers []Transfer, blocks []*big.Int) (err error) {
err = db.SaveTransfers(chainID, address, transfers)
if err != nil {
return
}
var tx *sql.Tx
tx, err = db.client.Begin()
if err != nil {
return err
}
defer func() {
if err == nil {
err = tx.Commit()
return
}
_ = tx.Rollback()
}()
err = markBlocksAsLoaded(chainID, tx, address, blocks)
if err != nil {
return
}
return
}
// SaveTransfers
func (db *Database) SaveTransfers(chainID uint64, address common.Address, transfers []Transfer, blocks []*big.Int) (err error) {
func (db *Database) SaveTransfers(chainID uint64, address common.Address, transfers []Transfer) (err error) {
var tx *sql.Tx
tx, err = db.client.Begin()
if err != nil {
@ -186,11 +214,6 @@ func (db *Database) SaveTransfers(chainID uint64, address common.Address, transf
return
}
err = markBlocksAsLoaded(chainID, tx, address, blocks)
if err != nil {
return
}
return
}
@ -279,8 +302,8 @@ func (db *Database) GetTransactionsLog(chainID uint64, address common.Address, t
return nil, err
}
// SaveHeaders stores a list of headers atomically.
func (db *Database) SaveHeaders(chainID uint64, headers []*types.Header, address common.Address) (err error) {
// saveHeaders stores a list of headers atomically.
func (db *Database) saveHeaders(chainID uint64, headers []*types.Header, address common.Address) (err error) {
var (
tx *sql.Tx
insert *sql.Stmt
@ -310,8 +333,8 @@ func (db *Database) SaveHeaders(chainID uint64, headers []*types.Header, address
return
}
// GetHeaderByNumber selects header using block number.
func (db *Database) GetHeaderByNumber(chainID uint64, number *big.Int) (header *DBHeader, err error) {
// getHeaderByNumber selects header using block number.
func (db *Database) getHeaderByNumber(chainID uint64, number *big.Int) (header *DBHeader, err error) {
header = &DBHeader{Hash: common.Hash{}, Number: new(big.Int)}
err = db.client.QueryRow("SELECT blk_hash, blk_number FROM blocks WHERE blk_number = ? AND network_id = ?", (*bigint.SQLBigInt)(number), chainID).Scan(&header.Hash, (*bigint.SQLBigInt)(header.Number))
if err == nil {

View File

@ -1,9 +1,7 @@
package transfer
import (
"io/ioutil"
"math/big"
"os"
"testing"
"github.com/stretchr/testify/require"
@ -15,14 +13,11 @@ import (
"github.com/status-im/status-go/sqlite"
)
func setupTestDB(t *testing.T) (*Database, *Block, func()) {
tmpfile, err := ioutil.TempFile("", "wallet-tests-")
func setupTestDB(t *testing.T) (*Database, *BlockDAO, func()) {
db, err := appdatabase.InitializeDB(sqlite.InMemoryPath, "wallet-tests", sqlite.ReducedKDFIterationsNumber)
require.NoError(t, err)
db, err := appdatabase.InitializeDB(tmpfile.Name(), "wallet-tests", sqlite.ReducedKDFIterationsNumber)
require.NoError(t, err)
return NewDB(db), &Block{db}, func() {
return NewDB(db), &BlockDAO{db}, func() {
require.NoError(t, db.Close())
require.NoError(t, os.Remove(tmpfile.Name()))
}
}
@ -34,8 +29,8 @@ func TestDBGetHeaderByNumber(t *testing.T) {
Difficulty: big.NewInt(1),
Time: 1,
}
require.NoError(t, db.SaveHeaders(777, []*types.Header{header}, common.Address{1}))
rst, err := db.GetHeaderByNumber(777, header.Number)
require.NoError(t, db.saveHeaders(777, []*types.Header{header}, common.Address{1}))
rst, err := db.getHeaderByNumber(777, header.Number)
require.NoError(t, err)
require.Equal(t, header.Hash(), rst.Hash)
}
@ -43,7 +38,7 @@ func TestDBGetHeaderByNumber(t *testing.T) {
func TestDBGetHeaderByNumberNoRows(t *testing.T) {
db, _, stop := setupTestDB(t)
defer stop()
rst, err := db.GetHeaderByNumber(777, big.NewInt(1))
rst, err := db.getHeaderByNumber(777, big.NewInt(1))
require.NoError(t, err)
require.Nil(t, rst)
}
@ -65,7 +60,7 @@ func TestDBProcessBlocks(t *testing.T) {
}}
t.Log(blocks)
nonce := int64(0)
lastBlock := &LastKnownBlock{
lastBlock := &Block{
Number: to,
Balance: big.NewInt(0),
Nonce: &nonce,
@ -83,7 +78,7 @@ func TestDBProcessBlocks(t *testing.T) {
From: common.Address{1},
},
}
require.NoError(t, db.SaveTransfers(777, address, transfers, []*big.Int{big.NewInt(1), big.NewInt(2)}))
require.NoError(t, db.SaveTransfersMarkBlocksLoaded(777, address, transfers, []*big.Int{big.NewInt(1), big.NewInt(2)}))
}
func TestDBProcessTransfer(t *testing.T) {
@ -108,7 +103,7 @@ func TestDBProcessTransfer(t *testing.T) {
},
}
nonce := int64(0)
lastBlock := &LastKnownBlock{
lastBlock := &Block{
Number: big.NewInt(0),
Balance: big.NewInt(0),
Nonce: &nonce,
@ -135,7 +130,7 @@ func TestDBReorgTransfers(t *testing.T) {
originalTX := types.NewTransaction(1, common.Address{1}, nil, 10, big.NewInt(10), nil)
replacedTX := types.NewTransaction(2, common.Address{1}, nil, 10, big.NewInt(10), nil)
nonce := int64(0)
lastBlock := &LastKnownBlock{
lastBlock := &Block{
Number: original.Number,
Balance: big.NewInt(0),
Nonce: &nonce,
@ -145,7 +140,7 @@ func TestDBReorgTransfers(t *testing.T) {
{ethTransfer, common.Hash{1}, *originalTX.To(), original.Number, original.Hash, 100, originalTX, true, 1777, common.Address{1}, rcpt, nil, "2100", NoMultiTransactionID},
}, []*DBHeader{}))
nonce = int64(0)
lastBlock = &LastKnownBlock{
lastBlock = &Block{
Number: replaced.Number,
Balance: big.NewInt(0),
Nonce: &nonce,
@ -188,7 +183,7 @@ func TestDBGetTransfersFromBlock(t *testing.T) {
transfers = append(transfers, transfer)
}
nonce := int64(0)
lastBlock := &LastKnownBlock{
lastBlock := &Block{
Number: headers[len(headers)-1].Number,
Balance: big.NewInt(0),
Nonce: &nonce,

View File

@ -75,15 +75,14 @@ var errLogsDownloaderStuck = errors.New("logs downloader stuck")
// If so it downloads transaction that transfer ethereum from that block.
func (d *ETHDownloader) GetTransfers(ctx context.Context, header *DBHeader) (rst []Transfer, err error) {
// TODO(dshulyak) consider caching balance and reset it on reorg
changed := d.accounts
if len(changed) == 0 {
if len(d.accounts) == 0 {
return nil, nil
}
blk, err := d.chainClient.BlockByHash(ctx, header.Hash)
if err != nil {
return nil, err
}
rst, err = d.getTransfersInBlock(ctx, blk, changed)
rst, err = d.getTransfersInBlock(ctx, blk, d.accounts)
if err != nil {
return nil, err
}
@ -148,6 +147,8 @@ func getTransferByHash(ctx context.Context, client *chain.ClientWithFallback, si
}
func (d *ETHDownloader) getTransfersInBlock(ctx context.Context, blk *types.Block, accounts []common.Address) (rst []Transfer, err error) {
startTs := time.Now()
for _, address := range accounts {
preloadedTransfers, err := d.db.GetPreloadedTransactions(d.chainClient.ChainID, address, blk.Hash())
if err != nil {
@ -206,7 +207,7 @@ func (d *ETHDownloader) getTransfersInBlock(ctx context.Context, blk *types.Bloc
}
}
}
log.Debug("getTransfersInBlock found", "block", blk.Number(), "len", len(rst))
log.Debug("getTransfersInBlock found", "block", blk.Number(), "len", len(rst), "time", time.Since(startTs))
// TODO(dshulyak) test that balance difference was covered by transactions
return rst, nil
}
@ -344,7 +345,7 @@ func (d *ERC20TransfersDownloader) transferFromLog(parent context.Context, ethlo
}
func (d *ERC20TransfersDownloader) transfersFromLogs(parent context.Context, logs []types.Log, address common.Address) ([]Transfer, error) {
concurrent := NewConcurrentDownloader(parent)
concurrent := NewConcurrentDownloader(parent, NoThreadLimit)
for i := range logs {
l := logs[i]
if l.Removed {
@ -368,7 +369,7 @@ func (d *ERC20TransfersDownloader) transfersFromLogs(parent context.Context, log
}
func (d *ERC20TransfersDownloader) blocksFromLogs(parent context.Context, logs []types.Log, address common.Address) ([]*DBHeader, error) {
concurrent := NewConcurrentDownloader(parent)
concurrent := NewConcurrentDownloader(parent, NoThreadLimit)
for i := range logs {
l := logs[i]

View File

@ -11,7 +11,7 @@ import (
// SetupIterativeDownloader configures IterativeDownloader with last known synced block.
func SetupIterativeDownloader(
db *Database, client HeaderReader, address common.Address,
client HeaderReader, address common.Address,
downloader BatchDownloader, size *big.Int, to *big.Int, from *big.Int) (*IterativeDownloader, error) {
if to == nil || from == nil {

View File

@ -3,18 +3,34 @@ package transfer
import (
"context"
"errors"
"fmt"
"math/big"
"sync"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/status-im/status-go/rpc/chain"
"github.com/status-im/status-go/services/wallet/async"
"github.com/status-im/status-go/services/wallet/walletevent"
)
const (
ReactorNotStarted string = "reactor not started"
NonArchivalNodeBlockChunkSize = 100
)
var errAlreadyRunning = errors.New("already running")
type FetchStrategyType int32
const (
OnDemandFetchStrategyType FetchStrategyType = iota
SequentialFetchStrategyType
)
// HeaderReader interface for reading headers using block number or hash.
type HeaderReader interface {
HeaderByHash(ctx context.Context, hash common.Hash) (*types.Header, error)
@ -28,67 +44,263 @@ type BalanceReader interface {
HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error)
}
// Reactor listens to new blocks and stores transfers into the database.
type Reactor struct {
type HistoryFetcher interface {
start() error
stop()
kind() FetchStrategyType
getTransfersByAddress(ctx context.Context, chainID uint64, address common.Address, toBlock *big.Int,
limit int64, fetchMore bool) ([]Transfer, error)
}
type OnDemandFetchStrategy struct {
db *Database
block *Block
blockDAO *BlockDAO
feed *event.Feed
mu sync.Mutex
group *async.Group
balanceCache *balanceCache
transactionManager *TransactionManager
chainClients map[uint64]*chain.ClientWithFallback
accounts []common.Address
}
func (r *Reactor) newControlCommand(chainClient *chain.ClientWithFallback, accounts []common.Address) *controlCommand {
func (s *OnDemandFetchStrategy) newControlCommand(chainClient *chain.ClientWithFallback, accounts []common.Address) *controlCommand {
signer := types.NewLondonSigner(chainClient.ToBigInt())
ctl := &controlCommand{
db: r.db,
db: s.db,
chainClient: chainClient,
accounts: accounts,
block: r.block,
blockDAO: s.blockDAO,
eth: &ETHDownloader{
chainClient: chainClient,
accounts: accounts,
signer: signer,
db: r.db,
db: s.db,
},
erc20: NewERC20TransfersDownloader(chainClient, accounts, signer),
feed: r.feed,
feed: s.feed,
errorsCount: 0,
transactionManager: r.transactionManager,
transactionManager: s.transactionManager,
}
return ctl
}
// Start runs reactor loop in background.
func (r *Reactor) start(chainClients map[uint64]*chain.ClientWithFallback, accounts []common.Address) error {
r.mu.Lock()
defer r.mu.Unlock()
func (s *OnDemandFetchStrategy) start() error {
s.mu.Lock()
defer s.mu.Unlock()
if r.group != nil {
if s.group != nil {
return errAlreadyRunning
}
r.group = async.NewGroup(context.Background())
for _, chainClient := range chainClients {
ctl := r.newControlCommand(chainClient, accounts)
r.group.Add(ctl.Command())
s.group = async.NewGroup(context.Background())
for _, chainClient := range s.chainClients {
ctl := s.newControlCommand(chainClient, s.accounts)
s.group.Add(ctl.Command())
}
return nil
}
// Stop stops reactor loop and waits till it exits.
func (r *Reactor) stop() {
r.mu.Lock()
defer r.mu.Unlock()
if r.group == nil {
func (s *OnDemandFetchStrategy) stop() {
s.mu.Lock()
defer s.mu.Unlock()
if s.group == nil {
return
}
r.group.Stop()
r.group.Wait()
r.group = nil
s.group.Stop()
s.group.Wait()
s.group = nil
}
func (r *Reactor) restart(chainClients map[uint64]*chain.ClientWithFallback, accounts []common.Address) error {
r.stop()
return r.start(chainClients, accounts)
func (s *OnDemandFetchStrategy) kind() FetchStrategyType {
return OnDemandFetchStrategyType
}
func (s *OnDemandFetchStrategy) getTransfersByAddress(ctx context.Context, chainID uint64, address common.Address, toBlock *big.Int,
limit int64, fetchMore bool) ([]Transfer, error) {
log.Info("[WalletAPI:: GetTransfersByAddress] get transfers for an address", "address", address, "fetchMore", fetchMore,
"chainID", chainID, "toBlock", toBlock, "limit", limit)
rst, err := s.db.GetTransfersByAddress(chainID, address, toBlock, limit)
if err != nil {
log.Error("[WalletAPI:: GetTransfersByAddress] can't fetch transfers", "err", err)
return nil, err
}
transfersCount := int64(len(rst))
chainClient, err := getChainClientByID(s.chainClients, chainID)
if err != nil {
return nil, err
}
if fetchMore && limit > transfersCount {
block, err := s.blockDAO.GetFirstKnownBlock(chainID, address)
if err != nil {
return nil, err
}
// if zero block was already checked there is nothing to find more
if block == nil || big.NewInt(0).Cmp(block) == 0 {
log.Info("[WalletAPI:: GetTransfersByAddress] ZERO block is found for", "address", address, "chaindID", chainID)
return rst, nil
}
from, err := findFirstRange(ctx, address, block, chainClient)
if err != nil {
if nonArchivalNodeError(err) {
if s.feed != nil {
s.feed.Send(walletevent.Event{
Type: EventNonArchivalNodeDetected,
})
}
if block.Cmp(big.NewInt(NonArchivalNodeBlockChunkSize)) >= 0 {
from = big.NewInt(0).Sub(block, big.NewInt(NonArchivalNodeBlockChunkSize))
} else {
from = big.NewInt(0)
}
} else {
log.Error("first range error", "error", err)
return nil, err
}
}
fromByAddress := map[common.Address]*Block{address: {
Number: from,
}}
toByAddress := map[common.Address]*big.Int{address: block}
if s.balanceCache == nil {
s.balanceCache = newBalanceCache()
}
blocksCommand := &findAndCheckBlockRangeCommand{
accounts: []common.Address{address},
db: s.db,
chainClient: chainClient,
balanceCache: s.balanceCache,
feed: s.feed,
fromByAddress: fromByAddress,
toByAddress: toByAddress,
}
if err = blocksCommand.Command()(ctx); err != nil {
return nil, err
}
blocks, err := s.blockDAO.GetBlocksByAddress(chainID, address, numberOfBlocksCheckedPerIteration)
if err != nil {
return nil, err
}
log.Info("checking blocks again", "blocks", len(blocks))
if len(blocks) > 0 {
txCommand := &loadTransfersCommand{
accounts: []common.Address{address},
db: s.db,
blockDAO: s.blockDAO,
chainClient: chainClient,
transactionManager: s.transactionManager,
}
err = txCommand.Command()(ctx)
if err != nil {
return nil, err
}
rst, err = s.db.GetTransfersByAddress(chainID, address, toBlock, limit)
if err != nil {
return nil, err
}
}
}
return rst, nil
}
// Reactor listens to new blocks and stores transfers into the database.
type Reactor struct {
db *Database
blockDAO *BlockDAO
feed *event.Feed
transactionManager *TransactionManager
strategy HistoryFetcher
}
func NewReactor(db *Database, blockDAO *BlockDAO, feed *event.Feed, tm *TransactionManager) *Reactor {
return &Reactor{
db: db,
blockDAO: blockDAO,
feed: feed,
transactionManager: tm,
}
}
// Start runs reactor loop in background.
func (r *Reactor) start(chainClients map[uint64]*chain.ClientWithFallback, accounts []common.Address,
fetchStrategyType FetchStrategyType) error {
r.strategy = r.createFetchStrategy(chainClients, accounts, fetchStrategyType)
return r.strategy.start()
}
// Stop stops reactor loop and waits till it exits.
func (r *Reactor) stop() {
if r.strategy != nil {
r.strategy.stop()
}
}
func (r *Reactor) restart(chainClients map[uint64]*chain.ClientWithFallback, accounts []common.Address,
fetchStrategyType FetchStrategyType) error {
r.stop()
return r.start(chainClients, accounts, fetchStrategyType)
}
func (r *Reactor) createFetchStrategy(chainClients map[uint64]*chain.ClientWithFallback,
accounts []common.Address, fetchType FetchStrategyType) HistoryFetcher {
if fetchType == SequentialFetchStrategyType {
return &SequentialFetchStrategy{
db: r.db,
feed: r.feed,
blockDAO: r.blockDAO,
transactionManager: r.transactionManager,
chainClients: chainClients,
accounts: accounts,
}
}
return &OnDemandFetchStrategy{
db: r.db,
feed: r.feed,
blockDAO: r.blockDAO,
transactionManager: r.transactionManager,
chainClients: chainClients,
accounts: accounts,
}
}
func (r *Reactor) getTransfersByAddress(ctx context.Context, chainID uint64, address common.Address, toBlock *big.Int,
limit int64, fetchMore bool) ([]Transfer, error) {
if r.strategy != nil {
return r.strategy.getTransfersByAddress(ctx, chainID, address, toBlock, limit, fetchMore)
}
return nil, errors.New(ReactorNotStarted)
}
func getChainClientByID(clients map[uint64]*chain.ClientWithFallback, id uint64) (*chain.ClientWithFallback, error) {
for _, client := range clients {
if client.ChainID == id {
return client, nil
}
}
return nil, fmt.Errorf("chain client not found with id=%d", id)
}

View File

@ -0,0 +1,89 @@
package transfer
import (
"context"
"math/big"
"sync"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/event"
"github.com/status-im/status-go/rpc/chain"
"github.com/status-im/status-go/services/wallet/async"
)
type SequentialFetchStrategy struct {
db *Database
blockDAO *BlockDAO
feed *event.Feed
mu sync.Mutex
group *async.Group
transactionManager *TransactionManager
chainClients map[uint64]*chain.ClientWithFallback
accounts []common.Address
}
func (s *SequentialFetchStrategy) newCommand(chainClient *chain.ClientWithFallback,
// accounts []common.Address) *loadAllTransfersCommand {
accounts []common.Address) async.Commander {
signer := types.NewLondonSigner(chainClient.ToBigInt())
// ctl := &loadAllTransfersCommand{
ctl := &controlCommand{ // TODO Will be replaced by loadAllTranfersCommand in upcoming commit
db: s.db,
chainClient: chainClient,
accounts: accounts,
blockDAO: s.blockDAO,
eth: &ETHDownloader{
chainClient: chainClient,
accounts: accounts,
signer: signer,
db: s.db,
},
erc20: NewERC20TransfersDownloader(chainClient, accounts, signer),
feed: s.feed,
errorsCount: 0,
transactionManager: s.transactionManager,
}
return ctl
}
func (s *SequentialFetchStrategy) start() error {
s.mu.Lock()
defer s.mu.Unlock()
if s.group != nil {
return errAlreadyRunning
}
s.group = async.NewGroup(context.Background())
for _, chainClient := range s.chainClients {
ctl := s.newCommand(chainClient, s.accounts)
s.group.Add(ctl.Command())
}
return nil
}
// Stop stops reactor loop and waits till it exits.
func (s *SequentialFetchStrategy) stop() {
s.mu.Lock()
defer s.mu.Unlock()
if s.group == nil {
return
}
s.group.Stop()
s.group.Wait()
s.group = nil
}
func (s *SequentialFetchStrategy) kind() FetchStrategyType {
return SequentialFetchStrategyType
}
func (s *SequentialFetchStrategy) getTransfersByAddress(ctx context.Context, chainID uint64, address common.Address, toBlock *big.Int,
limit int64, fetchMore bool) ([]Transfer, error) {
// TODO: implement - load from database
return []Transfer{}, nil
}