feat(wallet): separate ETH and tokens search ranges to allow calling

`getLogs` for multiple accounts simultaneously. For now only used for
new transfers detection. Detection of `new` transfers has been changed,
now they are searched from head and forward. Previously they were
searched from last scanned block forward.
This commit is contained in:
Ivan Belyakov 2023-11-27 11:08:17 +01:00 committed by IvanBelyakoff
parent 04c533b8d5
commit 82185b54b5
16 changed files with 644 additions and 546 deletions

View File

@ -117,14 +117,8 @@ func TestTransactionNotification(t *testing.T) {
Address: header.Address,
},
}
nonce := int64(0)
lastBlock := &transfer.Block{
Number: big.NewInt(1),
Balance: big.NewInt(0),
Nonce: &nonce,
}
require.NoError(t, walletDb.ProcessBlocks(1777, header.Address, big.NewInt(1), lastBlock, []*transfer.DBHeader{header}))
require.NoError(t, walletDb.ProcessTransfers(1777, transfers, []*transfer.DBHeader{}))
require.NoError(t, walletDb.SaveBlocks(1777, []*transfer.DBHeader{header}))
require.NoError(t, transfer.SaveTransfersMarkBlocksLoaded(walletDb, 1777, header.Address, transfers, []*big.Int{header.Number}))
feed.Send(walletevent.Event{
Type: transfer.EventRecentHistoryReady,

View File

@ -388,39 +388,3 @@ func insertRange(chainID uint64, creator statementCreator, account common.Addres
_, err = insert.Exec(chainID, account, (*bigint.SQLBigInt)(from), (*bigint.SQLBigInt)(to))
return err
}
func upsertRange(chainID uint64, creator statementCreator, account common.Address, from *big.Int, to *Block) (err error) {
log.Debug("upsert blocks range", "account", account, "network id", chainID, "from", from, "to", to.Number, "balance", to.Balance)
update, err := creator.Prepare(`UPDATE blocks_ranges
SET blk_to = ?, balance = ?, nonce = ?
WHERE address = ?
AND network_id = ?
AND blk_to = ?`)
if err != nil {
return err
}
res, err := update.Exec((*bigint.SQLBigInt)(to.Number), (*bigint.SQLBigIntBytes)(to.Balance), to.Nonce, account, chainID, (*bigint.SQLBigInt)(from))
if err != nil {
return err
}
affected, err := res.RowsAffected()
if err != nil {
return err
}
if affected == 0 {
insert, err := creator.Prepare("INSERT INTO blocks_ranges (network_id, address, blk_from, blk_to, balance, nonce) VALUES (?, ?, ?, ?, ?, ?)")
if err != nil {
return err
}
_, err = insert.Exec(chainID, account, (*bigint.SQLBigInt)(from), (*bigint.SQLBigInt)(to.Number), (*bigint.SQLBigIntBytes)(to.Balance), to.Nonce)
if err != nil {
return err
}
}
return
}

View File

@ -23,8 +23,17 @@ func NewBlockRange() *BlockRange {
return &BlockRange{Start: &big.Int{}, FirstKnown: &big.Int{}, LastKnown: &big.Int{}}
}
func (b *BlockRangeSequentialDAO) getBlockRange(chainID uint64, address common.Address) (blockRange *BlockRange, err error) {
query := `SELECT blk_start, blk_first, blk_last FROM blocks_ranges_sequential
type ethTokensBlockRanges struct {
eth *BlockRange
tokens *BlockRange
}
func newEthTokensBlockRanges() *ethTokensBlockRanges {
return &ethTokensBlockRanges{eth: NewBlockRange(), tokens: NewBlockRange()}
}
func (b *BlockRangeSequentialDAO) getBlockRange(chainID uint64, address common.Address) (blockRange *ethTokensBlockRanges, err error) {
query := `SELECT blk_start, blk_first, blk_last, token_blk_start, token_blk_first, token_blk_last FROM blocks_ranges_sequential
WHERE address = ?
AND network_id = ?`
@ -34,9 +43,11 @@ func (b *BlockRangeSequentialDAO) getBlockRange(chainID uint64, address common.A
}
defer rows.Close()
blockRange = &ethTokensBlockRanges{}
if rows.Next() {
blockRange = NewBlockRange()
err = rows.Scan((*bigint.SQLBigInt)(blockRange.Start), (*bigint.SQLBigInt)(blockRange.FirstKnown), (*bigint.SQLBigInt)(blockRange.LastKnown))
blockRange = newEthTokensBlockRanges()
err = rows.Scan((*bigint.SQLBigInt)(blockRange.eth.Start), (*bigint.SQLBigInt)(blockRange.eth.FirstKnown), (*bigint.SQLBigInt)(blockRange.eth.LastKnown),
(*bigint.SQLBigInt)(blockRange.tokens.Start), (*bigint.SQLBigInt)(blockRange.tokens.FirstKnown), (*bigint.SQLBigInt)(blockRange.tokens.LastKnown))
if err != nil {
return nil, err
}
@ -44,7 +55,7 @@ func (b *BlockRangeSequentialDAO) getBlockRange(chainID uint64, address common.A
return blockRange, nil
}
return nil, nil
return blockRange, nil
}
func (b *BlockRangeSequentialDAO) deleteRange(account common.Address) error {
@ -59,19 +70,85 @@ func (b *BlockRangeSequentialDAO) deleteRange(account common.Address) error {
return err
}
func (b *BlockRangeSequentialDAO) upsertRange(chainID uint64, account common.Address,
newBlockRange *BlockRange) (err error) {
log.Debug("upsert blocks range", "account", account, "chainID", chainID,
"start", newBlockRange.Start, "first", newBlockRange.FirstKnown, "last", newBlockRange.LastKnown)
blockRange, err := b.getBlockRange(chainID, account)
func (b *BlockRangeSequentialDAO) upsertRange(chainID uint64, account common.Address, newBlockRange *ethTokensBlockRanges) (err error) {
ethTokensBlockRange, err := b.getBlockRange(chainID, account)
if err != nil {
return err
}
ethBlockRange := prepareUpdatedBlockRange(chainID, account, ethTokensBlockRange.eth, newBlockRange.eth)
tokensBlockRange := prepareUpdatedBlockRange(chainID, account, ethTokensBlockRange.tokens, newBlockRange.tokens)
log.Debug("update eth and tokens blocks range", "account", account, "chainID", chainID,
"eth.start", ethBlockRange.Start, "eth.first", ethBlockRange.FirstKnown, "eth.last", ethBlockRange.LastKnown,
"tokens.start", tokensBlockRange.Start, "tokens.first", ethBlockRange.FirstKnown, "eth.last", ethBlockRange.LastKnown)
upsert, err := b.db.Prepare(`REPLACE INTO blocks_ranges_sequential
(network_id, address, blk_start, blk_first, blk_last, token_blk_start, token_blk_first, token_blk_last) VALUES (?, ?, ?, ?, ?, ?, ?, ?)`)
if err != nil {
return err
}
_, err = upsert.Exec(chainID, account, (*bigint.SQLBigInt)(ethBlockRange.Start), (*bigint.SQLBigInt)(ethBlockRange.FirstKnown), (*bigint.SQLBigInt)(ethBlockRange.LastKnown),
(*bigint.SQLBigInt)(tokensBlockRange.Start), (*bigint.SQLBigInt)(tokensBlockRange.FirstKnown), (*bigint.SQLBigInt)(tokensBlockRange.LastKnown))
return err
}
func (b *BlockRangeSequentialDAO) upsertEthRange(chainID uint64, account common.Address,
newBlockRange *BlockRange) (err error) {
ethTokensBlockRange, err := b.getBlockRange(chainID, account)
if err != nil {
return err
}
blockRange := prepareUpdatedBlockRange(chainID, account, ethTokensBlockRange.eth, newBlockRange)
log.Debug("update eth blocks range", "account", account, "chainID", chainID,
"start", blockRange.Start, "first", blockRange.FirstKnown, "last", blockRange.LastKnown)
upsert, err := b.db.Prepare(`REPLACE INTO blocks_ranges_sequential
(network_id, address, blk_start, blk_first, blk_last) VALUES (?, ?, ?, ?, ?)`)
if err != nil {
return err
}
_, err = upsert.Exec(chainID, account, (*bigint.SQLBigInt)(blockRange.Start), (*bigint.SQLBigInt)(blockRange.FirstKnown),
(*bigint.SQLBigInt)(blockRange.LastKnown))
return err
}
func (b *BlockRangeSequentialDAO) upsertTokenRange(chainID uint64, account common.Address,
newBlockRange *BlockRange) (err error) {
ethTokensBlockRange, err := b.getBlockRange(chainID, account)
if err != nil {
return err
}
blockRange := prepareUpdatedBlockRange(chainID, account, ethTokensBlockRange.tokens, newBlockRange)
log.Debug("update tokens blocks range", "account", account, "chainID", chainID,
"start", blockRange.Start, "first", blockRange.FirstKnown, "last", blockRange.LastKnown)
upsert, err := b.db.Prepare(`REPLACE INTO blocks_ranges_sequential
(network_id, address, token_blk_start, token_blk_first, token_blk_last) VALUES (?, ?, ?, ?, ?)`)
if err != nil {
return err
}
_, err = upsert.Exec(chainID, account, (*bigint.SQLBigInt)(blockRange.Start), (*bigint.SQLBigInt)(blockRange.FirstKnown),
(*bigint.SQLBigInt)(blockRange.LastKnown))
return err
}
func prepareUpdatedBlockRange(chainID uint64, account common.Address, blockRange, newBlockRange *BlockRange) *BlockRange {
// Update existing range
if blockRange != nil {
if newBlockRange != nil {
// Ovewrite start block if there was not any or if new one is older, because it can be precised only
// to a greater value, because no history can be before some block that is considered
// as a start of history, but due to concurrent block range checks, a newer greater block
@ -91,21 +168,10 @@ func (b *BlockRangeSequentialDAO) upsertRange(chainID uint64, account common.Add
(blockRange.LastKnown != nil && newBlockRange.LastKnown != nil && blockRange.LastKnown.Cmp(newBlockRange.LastKnown) < 0) {
blockRange.LastKnown = newBlockRange.LastKnown
}
log.Debug("update blocks range", "account", account, "chainID", chainID,
"start", blockRange.Start, "first", blockRange.FirstKnown, "last", blockRange.LastKnown)
}
} else {
blockRange = newBlockRange
}
upsert, err := b.db.Prepare(`REPLACE INTO blocks_ranges_sequential
(network_id, address, blk_start, blk_first, blk_last) VALUES (?, ?, ?, ?, ?)`)
if err != nil {
return err
}
_, err = upsert.Exec(chainID, account, (*bigint.SQLBigInt)(blockRange.Start), (*bigint.SQLBigInt)(blockRange.FirstKnown),
(*bigint.SQLBigInt)(blockRange.LastKnown))
return
return blockRange
}

View File

@ -6,6 +6,8 @@ import (
"math/big"
"time"
"golang.org/x/exp/maps"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/event"
@ -113,7 +115,6 @@ func (c *ethHistoricalCommand) Run(ctx context.Context) (err error) {
type erc20HistoricalCommand struct {
erc20 BatchDownloader
address common.Address
chainClient chain.ClientInterface
feed *event.Feed
@ -151,13 +152,13 @@ func getErc20BatchSize(chainID uint64) *big.Int {
}
func (c *erc20HistoricalCommand) Run(ctx context.Context) (err error) {
log.Debug("wallet historical downloader for erc20 transfers start", "chainID", c.chainClient.NetworkID(), "address", c.address,
log.Debug("wallet historical downloader for erc20 transfers start", "chainID", c.chainClient.NetworkID(),
"from", c.from, "to", c.to)
start := time.Now()
if c.iterator == nil {
c.iterator, err = SetupIterativeDownloader(
c.chainClient, c.address,
c.chainClient,
c.erc20, getErc20BatchSize(c.chainClient.NetworkID()), c.to, c.from)
if err != nil {
log.Error("failed to setup historical downloader for erc20")
@ -172,7 +173,7 @@ func (c *erc20HistoricalCommand) Run(ctx context.Context) (err error) {
}
c.foundHeaders = append(c.foundHeaders, headers...)
}
log.Debug("wallet historical downloader for erc20 transfers finished", "chainID", c.chainClient.NetworkID(), "address", c.address,
log.Debug("wallet historical downloader for erc20 transfers finished", "chainID", c.chainClient.NetworkID(),
"from", c.from, "to", c.to, "time", time.Since(start), "headers", len(c.foundHeaders))
return nil
}
@ -501,7 +502,7 @@ func (c *loadTransfersCommand) Command() async.Command {
}
func (c *loadTransfersCommand) LoadTransfers(ctx context.Context, limit int, blocksByAddress map[common.Address][]*big.Int) error {
return loadTransfers(ctx, c.accounts, c.blockDAO, c.db, c.chainClient, limit, blocksByAddress,
return loadTransfers(ctx, c.blockDAO, c.db, c.chainClient, limit, blocksByAddress,
c.transactionManager, c.pendingTxManager, c.tokenManager, c.feed)
}
@ -510,16 +511,17 @@ func (c *loadTransfersCommand) Run(parent context.Context) (err error) {
return
}
func loadTransfers(ctx context.Context, accounts []common.Address, blockDAO *BlockDAO, db *Database,
func loadTransfers(ctx context.Context, blockDAO *BlockDAO, db *Database,
chainClient chain.ClientInterface, blocksLimitPerAccount int, blocksByAddress map[common.Address][]*big.Int,
transactionManager *TransactionManager, pendingTxManager *transactions.PendingTxTracker,
tokenManager *token.Manager, feed *event.Feed) error {
log.Debug("loadTransfers start", "accounts", accounts, "chain", chainClient.NetworkID(), "limit", blocksLimitPerAccount)
log.Debug("loadTransfers start", "chain", chainClient.NetworkID(), "limit", blocksLimitPerAccount)
start := time.Now()
group := async.NewGroup(ctx)
accounts := maps.Keys(blocksByAddress)
for _, address := range accounts {
transfers := &transfersCommand{
db: db,

View File

@ -34,47 +34,187 @@ func (c *findNewBlocksCommand) Command() async.Command {
}
func (c *findNewBlocksCommand) Run(parent context.Context) (err error) {
log.Debug("start findNewBlocksCommand", "account", c.account, "chain", c.chainClient.NetworkID(), "noLimit", c.noLimit, "from", c.fromBlockNumber, "to", c.toBlockNumber)
headNum, err := getHeadBlockNumber(parent, c.chainClient)
if err != nil {
// c.error = err
return err // Might need to retry a couple of times
return err
}
blockRange, err := loadBlockRangeInfo(c.chainClient.NetworkID(), c.account, c.blockRangeDAO)
if err != nil {
log.Error("findBlocksCommand loadBlockRangeInfo", "error", err)
// c.error = err
return err // Will keep spinning forever nomatter what
}
// In case no block range is in DB, skip until history blocks are fetched
if blockRange != nil {
c.fromBlockNumber = blockRange.LastKnown
log.Debug("Launching new blocks command", "chainID", c.chainClient.NetworkID(), "account", c.account,
"from", c.fromBlockNumber, "headNum", headNum)
// In case interval between checks is set smaller than block mining time,
// we might need to wait for the next block to be mined
// In case this is the first check, skip it, history fetching will do it
if c.fromBlockNumber.Cmp(headNum) >= 0 {
return
return nil
}
c.toBlockNumber = headNum
c.findAndSaveEthBlocks(parent, c.fromBlockNumber, headNum)
c.findAndSaveTokenBlocks(parent, c.fromBlockNumber, headNum)
_ = c.findBlocksCommand.Run(parent)
}
log.Debug("end findNewBlocksCommand", "account", c.account, "chain", c.chainClient.NetworkID(), "noLimit", c.noLimit, "from", c.fromBlockNumber, "to", c.toBlockNumber)
c.fromBlockNumber = headNum
return nil
}
func (c *findNewBlocksCommand) findAndSaveEthBlocks(parent context.Context, fromNum, headNum *big.Int) {
// Check ETH transfers for each account independently
for _, account := range c.accounts {
log.Debug("start findNewBlocksCommand", "account", account, "chain", c.chainClient.NetworkID(), "noLimit", c.noLimit, "from", fromNum, "to", headNum)
headers, startBlockNum, _ := c.findBlocksWithEthTransfers(parent, account, fromNum, headNum)
if len(headers) > 0 {
log.Debug("findNewBlocksCommand saving headers", "len", len(headers), "lastBlockNumber", headNum,
"balance", c.balanceCacher.Cache().GetBalance(account, c.chainClient.NetworkID(), headNum),
"nonce", c.balanceCacher.Cache().GetNonce(account, c.chainClient.NetworkID(), headNum))
err := c.db.SaveBlocks(c.chainClient.NetworkID(), headers)
if err != nil {
c.error = err
break
}
c.blocksFound(headers)
}
err := c.markEthBlockRangeChecked(account, &BlockRange{startBlockNum, fromNum, headNum})
if err != nil {
c.error = err
break
}
log.Debug("end findNewBlocksCommand", "account", account, "chain", c.chainClient.NetworkID(), "noLimit", c.noLimit, "from", fromNum, "to", headNum)
}
}
func (c *findNewBlocksCommand) findAndSaveTokenBlocks(parent context.Context, fromNum, headNum *big.Int) {
// Check token transfers for all accounts.
// Each account's last checked block can be different, so we can get duplicated headers,
// so we need to deduplicate them
const incomingOnly = false
erc20Headers, err := c.fastIndexErc20(parent, fromNum, headNum, incomingOnly)
if err != nil {
log.Error("findNewBlocksCommand fastIndexErc20", "err", err, "account", c.accounts, "chain", c.chainClient.NetworkID())
c.error = err
return
}
if len(erc20Headers) > 0 {
log.Debug("findNewBlocksCommand saving headers", "len", len(erc20Headers), "from", fromNum, "to", headNum)
// get not loaded headers from DB for all accs and blocks
preLoadedTransactions, err := c.db.GetTransactionsToLoad(c.chainClient.NetworkID(), common.Address{}, nil)
if err != nil {
c.error = err
return
}
tokenBlocksFiltered := filterNewPreloadedTransactions(erc20Headers, preLoadedTransactions)
err = c.db.SaveBlocks(c.chainClient.NetworkID(), tokenBlocksFiltered)
if err != nil {
c.error = err
return
}
c.blocksFound(tokenBlocksFiltered)
}
err = c.markTokenBlockRangeChecked(c.accounts, fromNum, headNum)
if err != nil {
c.error = err
return
}
}
func (c *findNewBlocksCommand) markTokenBlockRangeChecked(accounts []common.Address, from, to *big.Int) error {
log.Debug("markTokenBlockRangeChecked", "chain", c.chainClient.NetworkID(), "from", from.Uint64(), "to", to.Uint64())
for _, account := range accounts {
err := c.blockRangeDAO.upsertTokenRange(c.chainClient.NetworkID(), account, &BlockRange{LastKnown: to})
if err != nil {
c.error = err
log.Error("findNewBlocksCommand upsertTokenRange", "error", err)
return err
}
}
return nil
}
func filterNewPreloadedTransactions(erc20Headers []*DBHeader, preLoadedTransfers []*PreloadedTransaction) []*DBHeader {
var uniqueErc20Headers []*DBHeader
for _, header := range erc20Headers {
loaded := false
for _, transfer := range preLoadedTransfers {
if header.PreloadedTransactions[0].ID == transfer.ID {
loaded = true
break
}
}
if !loaded {
uniqueErc20Headers = append(uniqueErc20Headers, header)
}
}
return uniqueErc20Headers
}
func (c *findNewBlocksCommand) findBlocksWithEthTransfers(parent context.Context, account common.Address, fromOrig, toOrig *big.Int) (headers []*DBHeader, startBlockNum *big.Int, err error) {
log.Debug("start findNewBlocksCommand::findBlocksWithEthTransfers", "account", account, "chain", c.chainClient.NetworkID(), "noLimit", c.noLimit, "from", c.fromBlockNumber, "to", c.toBlockNumber)
rangeSize := big.NewInt(int64(c.defaultNodeBlockChunkSize))
from, to := new(big.Int).Set(fromOrig), new(big.Int).Set(toOrig)
// Limit the range size to DefaultNodeBlockChunkSize
if new(big.Int).Sub(to, from).Cmp(rangeSize) > 0 {
from.Sub(to, rangeSize)
}
for {
if from.Cmp(to) == 0 {
log.Debug("findNewBlocksCommand empty range", "from", from, "to", to)
break
}
fromBlock := &Block{Number: from}
var newFromBlock *Block
var ethHeaders []*DBHeader
newFromBlock, ethHeaders, startBlockNum, err = c.fastIndex(parent, c.balanceCacher, fromBlock, to)
if err != nil {
log.Error("findNewBlocksCommand checkRange fastIndex", "err", err, "account", account,
"chain", c.chainClient.NetworkID())
c.error = err
// return err // In case c.noLimit is true, hystrix "max concurrency" may be reached and we will not be able to index ETH transfers
return nil, nil, nil
}
log.Debug("findNewBlocksCommand checkRange", "chainID", c.chainClient.NetworkID(), "account", account,
"startBlock", startBlockNum, "newFromBlock", newFromBlock.Number, "toBlockNumber", to, "noLimit", c.noLimit)
headers = append(headers, ethHeaders...)
if startBlockNum != nil && startBlockNum.Cmp(from) >= 0 {
log.Debug("Checked all ranges, stop execution", "startBlock", startBlockNum, "from", from, "to", to)
break
}
nextFrom, nextTo := nextRange(c.defaultNodeBlockChunkSize, newFromBlock.Number, fromOrig)
if nextFrom.Cmp(from) == 0 && nextTo.Cmp(to) == 0 {
log.Debug("findNewBlocksCommand empty next range", "from", from, "to", to)
break
}
from = nextFrom
to = nextTo
}
log.Debug("end findNewBlocksCommand::findBlocksWithEthTransfers", "account", account, "chain", c.chainClient.NetworkID(), "noLimit", c.noLimit)
return headers, startBlockNum, nil
}
// TODO NewFindBlocksCommand
type findBlocksCommand struct {
account common.Address
accounts []common.Address
db *Database
accountsDB *accounts.Database
blockRangeDAO *BlockRangeSequentialDAO
@ -108,7 +248,7 @@ type ERC20BlockRange struct {
to *big.Int
}
func (c *findBlocksCommand) ERC20ScanByBalance(parent context.Context, fromBlock, toBlock *big.Int, token common.Address) ([]ERC20BlockRange, error) {
func (c *findBlocksCommand) ERC20ScanByBalance(parent context.Context, account common.Address, fromBlock, toBlock *big.Int, token common.Address) ([]ERC20BlockRange, error) {
var err error
batchSize := getErc20BatchSize(c.chainClient.NetworkID())
ranges := [][]*big.Int{{fromBlock, toBlock}}
@ -120,7 +260,7 @@ func (c *findBlocksCommand) ERC20ScanByBalance(parent context.Context, fromBlock
from, to := blockRange[0], blockRange[1]
fromBalance, ok := cache[from.Int64()]
if !ok {
fromBalance, err = c.tokenManager.GetTokenBalanceAt(parent, c.chainClient, c.account, token, from)
fromBalance, err = c.tokenManager.GetTokenBalanceAt(parent, c.chainClient, account, token, from)
if err != nil {
return nil, err
}
@ -133,7 +273,7 @@ func (c *findBlocksCommand) ERC20ScanByBalance(parent context.Context, fromBlock
toBalance, ok := cache[to.Int64()]
if !ok {
toBalance, err = c.tokenManager.GetTokenBalanceAt(parent, c.chainClient, c.account, token, to)
toBalance, err = c.tokenManager.GetTokenBalanceAt(parent, c.chainClient, account, token, to)
if err != nil {
return nil, err
}
@ -168,8 +308,8 @@ func (c *findBlocksCommand) ERC20ScanByBalance(parent context.Context, fromBlock
return foundRanges, nil
}
func (c *findBlocksCommand) checkERC20Tail(parent context.Context) ([]*DBHeader, error) {
log.Debug("checkERC20Tail", "account", c.account, "to block", c.startBlockNumber, "from", c.resFromBlock.Number)
func (c *findBlocksCommand) checkERC20Tail(parent context.Context, account common.Address) ([]*DBHeader, error) {
log.Debug("checkERC20Tail", "account", account, "to block", c.startBlockNumber, "from", c.resFromBlock.Number)
tokens, err := c.tokenManager.GetTokens(c.chainClient.NetworkID())
if err != nil {
return nil, err
@ -185,18 +325,18 @@ func (c *findBlocksCommand) checkERC20Tail(parent context.Context) ([]*DBHeader,
clients[c.chainClient.NetworkID()] = c.chainClient
atBlocks := make(map[uint64]*big.Int, 1)
atBlocks[c.chainClient.NetworkID()] = from
balances, err := c.tokenManager.GetBalancesAtByChain(parent, clients, []common.Address{c.account}, addresses, atBlocks)
balances, err := c.tokenManager.GetBalancesAtByChain(parent, clients, []common.Address{account}, addresses, atBlocks)
if err != nil {
return nil, err
}
foundRanges := []ERC20BlockRange{}
for token, balance := range balances[c.chainClient.NetworkID()][c.account] {
for token, balance := range balances[c.chainClient.NetworkID()][account] {
bigintBalance := big.NewInt(balance.ToInt().Int64())
if bigintBalance.Cmp(big.NewInt(0)) <= 0 {
continue
}
result, err := c.ERC20ScanByBalance(parent, big.NewInt(0), from, token)
result, err := c.ERC20ScanByBalance(parent, account, big.NewInt(0), from, token)
if err != nil {
return nil, err
}
@ -229,7 +369,9 @@ func (c *findBlocksCommand) checkERC20Tail(parent context.Context) ([]*DBHeader,
var mnemonicCheckEnabled = false
func (c *findBlocksCommand) Run(parent context.Context) (err error) {
log.Debug("start findBlocksCommand", "account", c.account, "chain", c.chainClient.NetworkID(), "noLimit", c.noLimit, "from", c.fromBlockNumber, "to", c.toBlockNumber)
log.Debug("start findBlocksCommand", "accounts", c.accounts, "chain", c.chainClient.NetworkID(), "noLimit", c.noLimit, "from", c.fromBlockNumber, "to", c.toBlockNumber)
account := c.accounts[0] // For now this command supports only 1 account
mnemonicWasNotShown, err := c.accountsDB.GetMnemonicWasNotShown()
if err != nil {
c.error = err
@ -237,19 +379,18 @@ func (c *findBlocksCommand) Run(parent context.Context) (err error) {
}
if mnemonicCheckEnabled && mnemonicWasNotShown {
account, err := c.accountsDB.GetAccountByAddress(nodetypes.BytesToAddress(c.account.Bytes()))
account, err := c.accountsDB.GetAccountByAddress(nodetypes.BytesToAddress(account.Bytes()))
if err != nil {
c.error = err
return err
}
if account.AddressWasNotShown {
log.Info("skip findBlocksCommand, mnemonic has not been shown and the address has not been shared yet", "address", c.account)
log.Info("skip findBlocksCommand, mnemonic has not been shown and the address has not been shared yet", "address", account)
return nil
}
}
rangeSize := big.NewInt(int64(c.defaultNodeBlockChunkSize))
from, to := new(big.Int).Set(c.fromBlockNumber), new(big.Int).Set(c.toBlockNumber)
// Limit the range size to DefaultNodeBlockChunkSize
@ -266,7 +407,7 @@ func (c *findBlocksCommand) Run(parent context.Context) (err error) {
var headers []*DBHeader
if c.reachedETHHistoryStart {
if c.fromBlockNumber.Cmp(zero) == 0 && c.startBlockNumber != nil && c.startBlockNumber.Cmp(zero) == 1 {
headers, err = c.checkERC20Tail(parent)
headers, err = c.checkERC20Tail(parent, account)
if err != nil {
c.error = err
}
@ -276,17 +417,17 @@ func (c *findBlocksCommand) Run(parent context.Context) (err error) {
}
if c.error != nil {
log.Error("findBlocksCommand checkRange", "error", c.error, "account", c.account,
log.Error("findBlocksCommand checkRange", "error", c.error, "account", account,
"chain", c.chainClient.NetworkID(), "from", from, "to", to)
break
}
if len(headers) > 0 {
log.Debug("findBlocksCommand saving headers", "len", len(headers), "lastBlockNumber", to,
"balance", c.balanceCacher.Cache().GetBalance(c.account, c.chainClient.NetworkID(), to),
"nonce", c.balanceCacher.Cache().GetNonce(c.account, c.chainClient.NetworkID(), to))
"balance", c.balanceCacher.Cache().GetBalance(account, c.chainClient.NetworkID(), to),
"nonce", c.balanceCacher.Cache().GetNonce(account, c.chainClient.NetworkID(), to))
err = c.db.SaveBlocks(c.chainClient.NetworkID(), c.account, headers)
err = c.db.SaveBlocks(c.chainClient.NetworkID(), headers)
if err != nil {
c.error = err
// return err
@ -297,11 +438,11 @@ func (c *findBlocksCommand) Run(parent context.Context) (err error) {
}
if c.reachedETHHistoryStart {
log.Debug("findBlocksCommand reached first ETH transfer and checked erc20 tail", "chain", c.chainClient.NetworkID(), "account", c.account)
log.Debug("findBlocksCommand reached first ETH transfer and checked erc20 tail", "chain", c.chainClient.NetworkID(), "account", account)
break
}
err = c.upsertBlockRange(&BlockRange{c.startBlockNumber, c.resFromBlock.Number, to})
err = c.markEthBlockRangeChecked(account, &BlockRange{c.startBlockNumber, c.resFromBlock.Number, to})
if err != nil {
break
}
@ -329,7 +470,7 @@ func (c *findBlocksCommand) Run(parent context.Context) (err error) {
to = nextTo
}
log.Debug("end findBlocksCommand", "account", c.account, "chain", c.chainClient.NetworkID(), "noLimit", c.noLimit)
log.Debug("end findBlocksCommand", "account", account, "chain", c.chainClient.NetworkID(), "noLimit", c.noLimit)
return nil
}
@ -338,11 +479,11 @@ func (c *findBlocksCommand) blocksFound(headers []*DBHeader) {
c.blocksLoadedCh <- headers
}
func (c *findBlocksCommand) upsertBlockRange(blockRange *BlockRange) error {
func (c *findBlocksCommand) markEthBlockRangeChecked(account common.Address, blockRange *BlockRange) error {
log.Debug("upsert block range", "Start", blockRange.Start, "FirstKnown", blockRange.FirstKnown, "LastKnown", blockRange.LastKnown,
"chain", c.chainClient.NetworkID(), "account", c.account)
"chain", c.chainClient.NetworkID(), "account", account)
err := c.blockRangeDAO.upsertRange(c.chainClient.NetworkID(), c.account, blockRange)
err := c.blockRangeDAO.upsertEthRange(c.chainClient.NetworkID(), account, blockRange)
if err != nil {
c.error = err
log.Error("findBlocksCommand upsertRange", "error", err)
@ -355,24 +496,25 @@ func (c *findBlocksCommand) upsertBlockRange(blockRange *BlockRange) error {
func (c *findBlocksCommand) checkRange(parent context.Context, from *big.Int, to *big.Int) (
foundHeaders []*DBHeader, err error) {
account := c.accounts[0]
fromBlock := &Block{Number: from}
newFromBlock, ethHeaders, startBlock, err := c.fastIndex(parent, c.balanceCacher, fromBlock, to)
if err != nil {
log.Error("findBlocksCommand checkRange fastIndex", "err", err, "account", c.account,
log.Error("findBlocksCommand checkRange fastIndex", "err", err, "account", account,
"chain", c.chainClient.NetworkID())
c.error = err
// return err // In case c.noLimit is true, hystrix "max concurrency" may be reached and we will not be able to index ETH transfers
return nil, nil
}
log.Debug("findBlocksCommand checkRange", "chainID", c.chainClient.NetworkID(), "account", c.account,
log.Debug("findBlocksCommand checkRange", "chainID", c.chainClient.NetworkID(), "account", account,
"startBlock", startBlock, "newFromBlock", newFromBlock.Number, "toBlockNumber", to, "noLimit", c.noLimit)
// There could be incoming ERC20 transfers which don't change the balance
// and nonce of ETH account, so we keep looking for them
erc20Headers, err := c.fastIndexErc20(parent, newFromBlock.Number, to, false)
if err != nil {
log.Error("findBlocksCommand checkRange fastIndexErc20", "err", err, "account", c.account, "chain", c.chainClient.NetworkID())
log.Error("findBlocksCommand checkRange fastIndexErc20", "err", err, "account", account, "chain", c.chainClient.NetworkID())
c.error = err
// return err
return nil, nil
@ -387,7 +529,7 @@ func (c *findBlocksCommand) checkRange(parent context.Context, from *big.Int, to
c.resFromBlock = newFromBlock
c.startBlockNumber = startBlock
log.Debug("end findBlocksCommand checkRange", "chainID", c.chainClient.NetworkID(), "account", c.account,
log.Debug("end findBlocksCommand checkRange", "chainID", c.chainClient.NetworkID(), "account", account,
"c.startBlock", c.startBlockNumber, "newFromBlock", newFromBlock.Number,
"toBlockNumber", to, "c.resFromBlock", c.resFromBlock.Number)
@ -395,7 +537,7 @@ func (c *findBlocksCommand) checkRange(parent context.Context, from *big.Int, to
}
func loadBlockRangeInfo(chainID uint64, account common.Address, blockDAO *BlockRangeSequentialDAO) (
*BlockRange, error) {
*ethTokensBlockRanges, error) {
blockRange, err := blockDAO.getBlockRange(chainID, account)
if err != nil {
@ -410,11 +552,7 @@ func loadBlockRangeInfo(chainID uint64, account common.Address, blockDAO *BlockR
// Returns if all blocks are loaded, which means that start block (beginning of account history)
// has been found and all block headers saved to the DB
func areAllHistoryBlocksLoaded(blockInfo *BlockRange) bool {
if blockInfo == nil {
return false
}
if blockInfo.FirstKnown != nil && blockInfo.Start != nil &&
if blockInfo != nil && blockInfo.FirstKnown != nil && blockInfo.Start != nil &&
blockInfo.Start.Cmp(blockInfo.FirstKnown) >= 0 {
return true
}
@ -431,7 +569,7 @@ func areAllHistoryBlocksLoadedForAddress(blockRangeDAO *BlockRangeSequentialDAO,
return false, err
}
return areAllHistoryBlocksLoaded(blockRange), nil
return areAllHistoryBlocksLoaded(blockRange.eth) && areAllHistoryBlocksLoaded(blockRange.tokens), nil
}
// run fast indexing for every accont up to canonical chain head minus safety depth.
@ -440,7 +578,8 @@ func (c *findBlocksCommand) fastIndex(ctx context.Context, bCacher balance.Cache
fromBlock *Block, toBlockNumber *big.Int) (resultingFrom *Block, headers []*DBHeader,
startBlock *big.Int, err error) {
log.Debug("fast index started", "chainID", c.chainClient.NetworkID(), "account", c.account,
account := c.accounts[0]
log.Debug("fast index started", "chainID", c.chainClient.NetworkID(), "account", account,
"from", fromBlock.Number, "to", toBlockNumber)
start := time.Now()
@ -449,7 +588,7 @@ func (c *findBlocksCommand) fastIndex(ctx context.Context, bCacher balance.Cache
command := &ethHistoricalCommand{
chainClient: c.chainClient,
balanceCacher: bCacher,
address: c.account,
address: account,
feed: c.feed,
from: fromBlock,
to: toBlockNumber,
@ -471,7 +610,7 @@ func (c *findBlocksCommand) fastIndex(ctx context.Context, bCacher balance.Cache
resultingFrom = &Block{Number: command.resultingFrom}
headers = command.foundHeaders
startBlock = command.startBlock
log.Debug("fast indexer finished", "chainID", c.chainClient.NetworkID(), "account", c.account, "in", time.Since(start),
log.Debug("fast indexer finished", "chainID", c.chainClient.NetworkID(), "account", account, "in", time.Since(start),
"startBlock", command.startBlock, "resultingFrom", resultingFrom.Number, "headers", len(headers))
return
}
@ -486,10 +625,9 @@ func (c *findBlocksCommand) fastIndexErc20(ctx context.Context, fromBlockNumber
group := async.NewGroup(ctx)
erc20 := &erc20HistoricalCommand{
erc20: NewERC20TransfersDownloader(c.chainClient, []common.Address{c.account}, types.LatestSignerForChainID(c.chainClient.ToBigInt()), incomingOnly),
erc20: NewERC20TransfersDownloader(c.chainClient, c.accounts, types.LatestSignerForChainID(c.chainClient.ToBigInt()), incomingOnly),
chainClient: c.chainClient,
feed: c.feed,
address: c.account,
from: fromBlockNumber,
to: toBlockNumber,
foundHeaders: []*DBHeader{},
@ -501,47 +639,47 @@ func (c *findBlocksCommand) fastIndexErc20(ctx context.Context, fromBlockNumber
return nil, ctx.Err()
case <-group.WaitAsync():
headers := erc20.foundHeaders
log.Debug("fast indexer Erc20 finished", "chainID", c.chainClient.NetworkID(), "account", c.account,
log.Debug("fast indexer Erc20 finished", "chainID", c.chainClient.NetworkID(),
"in", time.Since(start), "headers", len(headers))
return headers, nil
}
}
func loadTransfersLoop(ctx context.Context, account common.Address, blockDAO *BlockDAO, db *Database,
func loadTransfersLoop(ctx context.Context, blockDAO *BlockDAO, db *Database,
chainClient chain.ClientInterface, transactionManager *TransactionManager, pendingTxManager *transactions.PendingTxTracker,
tokenManager *token.Manager, feed *event.Feed, blocksLoadedCh <-chan []*DBHeader) {
log.Debug("loadTransfersLoop start", "chain", chainClient.NetworkID(), "account", account)
log.Debug("loadTransfersLoop start", "chain", chainClient.NetworkID())
for {
select {
case <-ctx.Done():
log.Info("loadTransfersLoop done", "chain", chainClient.NetworkID(), "account", account, "error", ctx.Err())
log.Info("loadTransfersLoop done", "chain", chainClient.NetworkID(), "error", ctx.Err())
return
case dbHeaders := <-blocksLoadedCh:
log.Debug("loadTransfersOnDemand transfers received", "chain", chainClient.NetworkID(), "account", account, "headers", len(dbHeaders))
log.Debug("loadTransfersOnDemand transfers received", "chain", chainClient.NetworkID(), "headers", len(dbHeaders))
blockNums := make([]*big.Int, len(dbHeaders))
for i, dbHeader := range dbHeaders {
blockNums[i] = dbHeader.Number
blocksByAddress := map[common.Address][]*big.Int{}
// iterate over headers and group them by address
for _, dbHeader := range dbHeaders {
blocksByAddress[dbHeader.Address] = append(blocksByAddress[dbHeader.Address], dbHeader.Number)
}
blocksByAddress := map[common.Address][]*big.Int{account: blockNums}
go func() {
_ = loadTransfers(ctx, []common.Address{account}, blockDAO, db, chainClient, noBlockLimit,
_ = loadTransfers(ctx, blockDAO, db, chainClient, noBlockLimit,
blocksByAddress, transactionManager, pendingTxManager, tokenManager, feed)
}()
}
}
}
func newLoadBlocksAndTransfersCommand(account common.Address, db *Database, accountsDB *accounts.Database,
func newLoadBlocksAndTransfersCommand(accounts []common.Address, db *Database, accountsDB *accounts.Database,
blockDAO *BlockDAO, blockRangesSeqDAO *BlockRangeSequentialDAO, chainClient chain.ClientInterface, feed *event.Feed,
transactionManager *TransactionManager, pendingTxManager *transactions.PendingTxTracker,
tokenManager *token.Manager, balanceCacher balance.Cacher, omitHistory bool) *loadBlocksAndTransfersCommand {
return &loadBlocksAndTransfersCommand{
account: account,
accounts: accounts,
db: db,
blockRangeDAO: blockRangesSeqDAO,
accountsDB: accountsDB,
@ -558,7 +696,7 @@ func newLoadBlocksAndTransfersCommand(account common.Address, db *Database, acco
}
type loadBlocksAndTransfersCommand struct {
account common.Address
accounts []common.Address
db *Database
accountsDB *accounts.Database
blockRangeDAO *BlockRangeSequentialDAO
@ -574,11 +712,11 @@ type loadBlocksAndTransfersCommand struct {
omitHistory bool
// Not to be set by the caller
transfersLoaded bool // For event RecentHistoryReady to be sent only once per account during app lifetime
transfersLoaded map[common.Address]bool // For event RecentHistoryReady to be sent only once per account during app lifetime
}
func (c *loadBlocksAndTransfersCommand) Run(parent context.Context) error {
log.Info("start load all transfers command", "chain", c.chainClient.NetworkID(), "account", c.account)
log.Debug("start load all transfers command", "chain", c.chainClient.NetworkID(), "accounts", c.accounts)
ctx := parent
@ -597,25 +735,28 @@ func (c *loadBlocksAndTransfersCommand) Run(parent context.Context) error {
c.startTransfersLoop(ctx)
fromNum := big.NewInt(0)
toNum, err := getHeadBlockNumber(ctx, c.chainClient)
headNum, err := getHeadBlockNumber(ctx, c.chainClient)
if err != nil {
return err
}
// This will start findBlocksCommand which will run until success when all blocks are loaded
err = c.fetchHistoryBlocks(parent, group, fromNum, toNum, c.blocksLoadedCh)
// Iterate over all accounts and load blocks for each account
for _, account := range c.accounts {
err = c.fetchHistoryBlocks(parent, group, account, fromNum, headNum, c.blocksLoadedCh)
for err != nil {
group.Stop()
group.Wait()
return err
}
}
c.startFetchingNewBlocks(group, c.account, c.blocksLoadedCh)
c.startFetchingNewBlocks(group, c.accounts, headNum, c.blocksLoadedCh)
select {
case <-ctx.Done():
return ctx.Err()
case <-group.WaitAsync():
log.Debug("end loadBlocksAndTransfers command", "chain", c.chainClient.NetworkID(), "account", c.account)
log.Debug("end loadBlocksAndTransfers command", "chain", c.chainClient.NetworkID(), "accounts", c.accounts)
return nil
}
}
@ -628,33 +769,97 @@ func (c *loadBlocksAndTransfersCommand) Command() async.Command {
}
func (c *loadBlocksAndTransfersCommand) startTransfersLoop(ctx context.Context) {
go loadTransfersLoop(ctx, c.account, c.blockDAO, c.db, c.chainClient, c.transactionManager,
go loadTransfersLoop(ctx, c.blockDAO, c.db, c.chainClient, c.transactionManager,
c.pendingTxManager, c.tokenManager, c.feed, c.blocksLoadedCh)
}
func (c *loadBlocksAndTransfersCommand) fetchHistoryBlocks(ctx context.Context, group *async.Group, fromNum, toNum *big.Int, blocksLoadedCh chan []*DBHeader) error {
log.Debug("fetchHistoryBlocks start", "chainID", c.chainClient.NetworkID(), "account", c.account, "omit", c.omitHistory)
func (c *loadBlocksAndTransfersCommand) fetchHistoryBlocks(ctx context.Context, group *async.Group, account common.Address, fromNum, toNum *big.Int, blocksLoadedCh chan []*DBHeader) error {
log.Debug("fetchHistoryBlocks start", "chainID", c.chainClient.NetworkID(), "account", account, "omit", c.omitHistory)
if c.omitHistory {
blockRange := &BlockRange{nil, big.NewInt(0), toNum}
err := c.blockRangeDAO.upsertRange(c.chainClient.NetworkID(), c.account, blockRange)
blockRange := &ethTokensBlockRanges{eth: &BlockRange{nil, big.NewInt(0), toNum}, tokens: &BlockRange{nil, big.NewInt(0), toNum}}
err := c.blockRangeDAO.upsertRange(c.chainClient.NetworkID(), account, blockRange)
log.Error("fetchHistoryBlocks upsertRange", "error", err)
return err
}
blockRange, err := loadBlockRangeInfo(c.chainClient.NetworkID(), c.account, c.blockRangeDAO)
blockRange, err := loadBlockRangeInfo(c.chainClient.NetworkID(), account, c.blockRangeDAO)
if err != nil {
log.Error("findBlocksCommand loadBlockRangeInfo", "error", err)
// c.error = err
return err // Will keep spinning forever nomatter what
}
allHistoryLoaded := areAllHistoryBlocksLoaded(blockRange)
ranges := [][]*big.Int{}
if !allHistoryLoaded {
to := getToHistoryBlockNumber(toNum, blockRange, allHistoryLoaded)
// There are 2 history intervals:
// 1) from 0 to FirstKnown
// 2) from LastKnown to `toNum`` (head)
// If we blockRange is nil, we need to load all blocks from `fromNum` to `toNum`
// As current implementation checks ETH first then tokens, tokens ranges maybe behind ETH ranges in
// cases when block searching was interrupted, so we use tokens ranges
if blockRange != nil {
if blockRange.tokens.LastKnown != nil && toNum.Cmp(blockRange.tokens.LastKnown) > 0 {
ranges = append(ranges, []*big.Int{blockRange.tokens.LastKnown, toNum})
}
if blockRange.tokens.FirstKnown != nil {
if fromNum.Cmp(blockRange.tokens.FirstKnown) < 0 {
ranges = append(ranges, []*big.Int{fromNum, blockRange.tokens.FirstKnown})
} else {
if !c.transfersLoaded[account] {
transfersLoaded, err := c.areAllTransfersLoaded(account)
if err != nil {
return err
}
if transfersLoaded {
if c.transfersLoaded == nil {
c.transfersLoaded = make(map[common.Address]bool)
}
c.transfersLoaded[account] = true
c.notifyHistoryReady(account)
}
}
}
}
} else {
ranges = append(ranges, []*big.Int{fromNum, toNum})
}
for _, rangeItem := range ranges {
fbc := &findBlocksCommand{
account: c.account,
accounts: []common.Address{account},
db: c.db,
accountsDB: c.accountsDB,
blockRangeDAO: c.blockRangeDAO,
chainClient: c.chainClient,
balanceCacher: c.balanceCacher,
feed: c.feed,
noLimit: false,
fromBlockNumber: rangeItem[0],
toBlockNumber: rangeItem[1],
transactionManager: c.transactionManager,
tokenManager: c.tokenManager,
blocksLoadedCh: blocksLoadedCh,
defaultNodeBlockChunkSize: DefaultNodeBlockChunkSize,
}
group.Add(fbc.Command())
}
log.Debug("fetchHistoryBlocks end", "chainID", c.chainClient.NetworkID(), "account", account)
return nil
}
func (c *loadBlocksAndTransfersCommand) startFetchingNewBlocks(group *async.Group, addresses []common.Address, fromNum *big.Int, blocksLoadedCh chan<- []*DBHeader) {
log.Debug("startFetchingNewBlocks", "chainID", c.chainClient.NetworkID(), "accounts", addresses, "db", c.accountsDB)
newBlocksCmd := &findNewBlocksCommand{
findBlocksCommand: &findBlocksCommand{
accounts: addresses,
db: c.db,
accountsDB: c.accountsDB,
blockRangeDAO: c.blockRangeDAO,
@ -663,46 +868,6 @@ func (c *loadBlocksAndTransfersCommand) fetchHistoryBlocks(ctx context.Context,
feed: c.feed,
noLimit: false,
fromBlockNumber: fromNum,
toBlockNumber: to,
transactionManager: c.transactionManager,
tokenManager: c.tokenManager,
blocksLoadedCh: blocksLoadedCh,
defaultNodeBlockChunkSize: DefaultNodeBlockChunkSize,
}
group.Add(fbc.Command())
} else {
if !c.transfersLoaded {
transfersLoaded, err := c.areAllTransfersLoaded()
if err != nil {
return err
}
if transfersLoaded {
c.transfersLoaded = true
c.notifyHistoryReady()
}
}
}
log.Debug("fetchHistoryBlocks end", "chainID", c.chainClient.NetworkID(), "account", c.account)
return nil
}
func (c *loadBlocksAndTransfersCommand) startFetchingNewBlocks(group *async.Group, address common.Address, blocksLoadedCh chan<- []*DBHeader) {
log.Debug("startFetchingNewBlocks", "chainID", c.chainClient.NetworkID(), "account", address, "db", c.accountsDB)
newBlocksCmd := &findNewBlocksCommand{
findBlocksCommand: &findBlocksCommand{
account: address,
db: c.db,
accountsDB: c.accountsDB,
blockRangeDAO: c.blockRangeDAO,
chainClient: c.chainClient,
balanceCacher: c.balanceCacher,
feed: c.feed,
noLimit: false,
transactionManager: c.transactionManager,
tokenManager: c.tokenManager,
blocksLoadedCh: blocksLoadedCh,
@ -714,24 +879,31 @@ func (c *loadBlocksAndTransfersCommand) startFetchingNewBlocks(group *async.Grou
func (c *loadBlocksAndTransfersCommand) fetchTransfersForLoadedBlocks(group *async.Group) error {
log.Debug("fetchTransfers start", "chainID", c.chainClient.NetworkID(), "account", c.account)
log.Debug("fetchTransfers start", "chainID", c.chainClient.NetworkID(), "accounts", c.accounts)
blocks, err := c.blockDAO.GetBlocksToLoadByAddress(c.chainClient.NetworkID(), c.account, numberOfBlocksCheckedPerIteration)
blocksMap := make(map[common.Address][]*big.Int)
for _, account := range c.accounts {
blocks, err := c.blockDAO.GetBlocksToLoadByAddress(c.chainClient.NetworkID(), account, numberOfBlocksCheckedPerIteration)
if err != nil {
log.Error("loadBlocksAndTransfersCommand GetBlocksToLoadByAddress", "error", err)
return err
}
if len(blocks) == 0 {
log.Debug("fetchTransfers no blocks to load", "chainID", c.chainClient.NetworkID(), "account", c.account)
log.Debug("fetchTransfers no blocks to load", "chainID", c.chainClient.NetworkID(), "account", account)
continue
}
blocksMap[account] = blocks
}
if len(blocksMap) == 0 {
log.Debug("fetchTransfers no blocks to load", "chainID", c.chainClient.NetworkID())
return nil
}
blocksMap := make(map[common.Address][]*big.Int)
blocksMap[c.account] = blocks
txCommand := &loadTransfersCommand{
accounts: []common.Address{c.account},
accounts: c.accounts,
db: c.db,
blockDAO: c.blockDAO,
chainClient: c.chainClient,
@ -747,32 +919,31 @@ func (c *loadBlocksAndTransfersCommand) fetchTransfersForLoadedBlocks(group *asy
return nil
}
func (c *loadBlocksAndTransfersCommand) notifyHistoryReady() {
func (c *loadBlocksAndTransfersCommand) notifyHistoryReady(account common.Address) {
if c.feed != nil {
c.feed.Send(walletevent.Event{
Type: EventRecentHistoryReady,
Accounts: []common.Address{c.account},
Accounts: []common.Address{account},
ChainID: c.chainClient.NetworkID(),
})
}
}
func (c *loadBlocksAndTransfersCommand) areAllTransfersLoaded() (bool, error) {
allBlocksLoaded, err := areAllHistoryBlocksLoadedForAddress(c.blockRangeDAO, c.chainClient.NetworkID(), c.account)
func (c *loadBlocksAndTransfersCommand) areAllTransfersLoaded(account common.Address) (bool, error) {
allBlocksLoaded, err := areAllHistoryBlocksLoadedForAddress(c.blockRangeDAO, c.chainClient.NetworkID(), account)
if err != nil {
log.Error("loadBlockAndTransfersCommand allHistoryBlocksLoaded", "error", err)
return false, err
}
if allBlocksLoaded {
firstHeader, err := c.blockDAO.GetFirstSavedBlock(c.chainClient.NetworkID(), c.account)
headers, err := c.blockDAO.GetBlocksToLoadByAddress(c.chainClient.NetworkID(), account, 1)
if err != nil {
log.Error("loadBlocksAndTransfersCommand GetFirstSavedBlock", "error", err)
return false, err
}
// If first block is Loaded, we have fetched all the transfers
if firstHeader != nil && firstHeader.Loaded {
if len(headers) == 0 {
return true, nil
}
}
@ -787,6 +958,7 @@ func getHeadBlockNumber(parent context.Context, chainClient chain.ClientInterfac
head, err := chainClient.HeaderByNumber(ctx, nil)
cancel()
if err != nil {
log.Error("getHeadBlockNumber", "error", err)
return nil, err
}
@ -808,16 +980,3 @@ func nextRange(maxRangeSize int, prevFrom, zeroBlockNumber *big.Int) (*big.Int,
return from, to
}
func getToHistoryBlockNumber(headNum *big.Int, blockRange *BlockRange, allHistoryLoaded bool) *big.Int {
var toBlockNum *big.Int
if blockRange != nil {
if !allHistoryLoaded {
toBlockNum = blockRange.FirstKnown
}
} else {
toBlockNum = headNum
}
return toBlockNum
}

View File

@ -946,7 +946,7 @@ func TestFindBlocksCommand(t *testing.T) {
accDB, err := accounts.NewDB(appdb)
require.NoError(t, err)
fbc := &findBlocksCommand{
account: common.HexToAddress("0x1234"),
accounts: []common.Address{common.HexToAddress("0x1234")},
db: wdb,
blockRangeDAO: &BlockRangeSequentialDAO{wdb.client},
accountsDB: accDB,
@ -1067,13 +1067,14 @@ func TestFetchTransfersForLoadedBlocks(t *testing.T) {
},
})
address := common.HexToAddress("0x1234")
chainClient := newMockChainClient()
tracker := transactions.NewPendingTxTracker(db, chainClient, nil, &event.Feed{}, transactions.PendingCheckInterval)
accDB, err := accounts.NewDB(wdb.client)
require.NoError(t, err)
cmd := &loadBlocksAndTransfersCommand{
account: common.HexToAddress("0x1234"),
accounts: []common.Address{address},
db: wdb,
blockRangeDAO: &BlockRangeSequentialDAO{wdb.client},
blockDAO: &BlockDAO{db},
@ -1098,7 +1099,7 @@ func TestFetchTransfersForLoadedBlocks(t *testing.T) {
fromNum := big.NewInt(0)
toNum, err := getHeadBlockNumber(ctx, cmd.chainClient)
require.NoError(t, err)
err = cmd.fetchHistoryBlocks(ctx, group, fromNum, toNum, blockChannel)
err = cmd.fetchHistoryBlocks(ctx, group, address, fromNum, toNum, blockChannel)
require.NoError(t, err)
select {

View File

@ -187,7 +187,7 @@ func checkRangesWithStartBlock(parent context.Context, client balance.Reader, ca
if err != nil {
return err
}
c.PushHeader(toDBHeader(header, *firstTransaction.BlockHash))
c.PushHeader(toDBHeader(header, *firstTransaction.BlockHash, account))
return nil
}
mid := new(big.Int).Add(from, to)

View File

@ -45,7 +45,7 @@ func TestController_watchAccountsChanges(t *testing.T) {
chainID := uint64(777)
// Insert blocks
database := NewDB(walletDB)
err = database.SaveBlocks(chainID, address, []*DBHeader{
err = database.SaveBlocks(chainID, []*DBHeader{
{
Number: big.NewInt(1),
Hash: common.Hash{1},
@ -70,7 +70,7 @@ func TestController_watchAccountsChanges(t *testing.T) {
// Insert block ranges
blockRangesDAO := &BlockRangeSequentialDAO{walletDB}
err = blockRangesDAO.upsertRange(chainID, address, NewBlockRange())
err = blockRangesDAO.upsertRange(chainID, address, newEthTokensBlockRanges())
require.NoError(t, err)
ranges, err := blockRangesDAO.getBlockRange(chainID, address)
@ -112,7 +112,8 @@ func TestController_watchAccountsChanges(t *testing.T) {
ranges, err = blockRangesDAO.getBlockRange(chainID, address)
require.NoError(t, err)
require.Nil(t, ranges)
require.Nil(t, ranges.eth)
require.Nil(t, ranges.tokens)
}
@ -152,7 +153,7 @@ func TestController_cleanupAccountLeftovers(t *testing.T) {
chainID := uint64(777)
// Insert blocks
database := NewDB(walletDB)
err = database.SaveBlocks(chainID, removedAddr, []*DBHeader{
err = database.SaveBlocks(chainID, []*DBHeader{
{
Number: big.NewInt(1),
Hash: common.Hash{1},
@ -162,7 +163,7 @@ func TestController_cleanupAccountLeftovers(t *testing.T) {
},
})
require.NoError(t, err)
err = database.SaveBlocks(chainID, common.Address(existingAddr), []*DBHeader{
err = database.SaveBlocks(chainID, []*DBHeader{
{
Number: big.NewInt(2),
Hash: common.Hash{2},

View File

@ -32,12 +32,13 @@ type DBHeader struct {
Loaded bool
}
func toDBHeader(header *types.Header, blockHash common.Hash) *DBHeader {
func toDBHeader(header *types.Header, blockHash common.Hash, account common.Address) *DBHeader {
return &DBHeader{
Hash: blockHash,
Number: header.Number,
Timestamp: header.Time,
Loaded: false,
Address: account,
}
}
@ -87,37 +88,7 @@ func (db *Database) Close() error {
return db.client.Close()
}
func (db *Database) ProcessBlocks(chainID uint64, account common.Address, from *big.Int, to *Block, headers []*DBHeader) (err error) {
var (
tx *sql.Tx
)
tx, err = db.client.Begin()
if err != nil {
return err
}
defer func() {
if err == nil {
err = tx.Commit()
return
}
_ = tx.Rollback()
}()
err = insertBlocksWithTransactions(chainID, tx, account, headers)
if err != nil {
return
}
err = upsertRange(chainID, tx, account, from, to)
if err != nil {
return
}
return
}
func (db *Database) SaveBlocks(chainID uint64, account common.Address, headers []*DBHeader) (err error) {
func (db *Database) SaveBlocks(chainID uint64, headers []*DBHeader) (err error) {
var (
tx *sql.Tx
)
@ -133,7 +104,7 @@ func (db *Database) SaveBlocks(chainID uint64, account common.Address, headers [
_ = tx.Rollback()
}()
err = insertBlocksWithTransactions(chainID, tx, account, headers)
err = insertBlocksWithTransactions(chainID, tx, headers)
if err != nil {
return
}
@ -141,42 +112,13 @@ func (db *Database) SaveBlocks(chainID uint64, account common.Address, headers [
return
}
// ProcessTransfers atomically adds/removes blocks and adds new transfers.
func (db *Database) ProcessTransfers(chainID uint64, transfers []Transfer, removed []*DBHeader) (err error) {
var (
tx *sql.Tx
)
tx, err = db.client.Begin()
if err != nil {
return err
}
defer func() {
if err == nil {
err = tx.Commit()
return
}
_ = tx.Rollback()
}()
err = deleteHeaders(tx, removed)
func saveTransfersMarkBlocksLoaded(creator statementCreator, chainID uint64, address common.Address, transfers []Transfer, blocks []*big.Int) (err error) {
err = updateOrInsertTransfers(chainID, creator, transfers)
if err != nil {
return
}
err = updateOrInsertTransfers(chainID, tx, transfers)
if err != nil {
return
}
return
}
func saveTransfersMarkBlocksLoaded(tx statementCreator, chainID uint64, address common.Address, transfers []Transfer, blocks []*big.Int) (err error) {
err = updateOrInsertTransfers(chainID, tx, transfers)
if err != nil {
return
}
err = markBlocksAsLoaded(chainID, tx, address, blocks)
err = markBlocksAsLoaded(chainID, creator, address, blocks)
if err != nil {
return
}
@ -258,10 +200,16 @@ func (db *Database) GetTransfersForIdentities(ctx context.Context, identities []
func (db *Database) GetTransactionsToLoad(chainID uint64, address common.Address, blockNumber *big.Int) (rst []*PreloadedTransaction, err error) {
query := newTransfersQueryForPreloadedTransactions().
FilterNetwork(chainID).
FilterAddress(address).
FilterBlockNumber(blockNumber).
FilterLoaded(0)
if address != (common.Address{}) {
query.FilterAddress(address)
}
if blockNumber != nil {
query.FilterBlockNumber(blockNumber)
}
rows, err := db.client.Query(query.String(), query.Args()...)
if err != nil {
return
@ -275,29 +223,6 @@ type statementCreator interface {
Prepare(query string) (*sql.Stmt, error)
}
func deleteHeaders(creator statementCreator, headers []*DBHeader) error {
delete, err := creator.Prepare("DELETE FROM blocks WHERE blk_hash = ?")
if err != nil {
return err
}
deleteTransfers, err := creator.Prepare("DELETE FROM transfers WHERE blk_hash = ?")
if err != nil {
return err
}
for _, h := range headers {
_, err = delete.Exec(h.Hash)
if err != nil {
return err
}
_, err = deleteTransfers.Exec(h.Hash)
if err != nil {
return err
}
}
return nil
}
// Only used by status-mobile
func (db *Database) InsertBlock(chainID uint64, account common.Address, blockNumber *big.Int, blockHash common.Hash) error {
var (
@ -341,7 +266,7 @@ func insertBlockDBFields(creator statementCreator, block blockDBFields) error {
return err
}
func insertBlocksWithTransactions(chainID uint64, creator statementCreator, account common.Address, headers []*DBHeader) error {
func insertBlocksWithTransactions(chainID uint64, creator statementCreator, headers []*DBHeader) error {
insert, err := creator.Prepare("INSERT OR IGNORE INTO blocks(network_id, address, blk_number, blk_hash, loaded) VALUES (?, ?, ?, ?, ?)")
if err != nil {
return err
@ -361,7 +286,7 @@ func insertBlocksWithTransactions(chainID uint64, creator statementCreator, acco
}
for _, header := range headers {
_, err = insert.Exec(chainID, account, (*bigint.SQLBigInt)(header.Number), header.Hash, header.Loaded)
_, err = insert.Exec(chainID, header.Address, (*bigint.SQLBigInt)(header.Number), header.Hash, header.Loaded)
if err != nil {
return err
}
@ -371,7 +296,7 @@ func insertBlocksWithTransactions(chainID uint64, creator statementCreator, acco
logIndex = new(uint)
*logIndex = transaction.Log.Index
}
res, err := updateTx.Exec(&JSONBlob{transaction.Log}, logIndex, chainID, account, transaction.ID)
res, err := updateTx.Exec(&JSONBlob{transaction.Log}, logIndex, chainID, header.Address, transaction.ID)
if err != nil {
return err
}
@ -385,7 +310,8 @@ func insertBlocksWithTransactions(chainID uint64, creator statementCreator, acco
tokenID := (*bigint.SQLBigIntBytes)(transaction.TokenID)
txValue := sqlite.BigIntToPadded128BitsStr(transaction.Value)
_, err = insertTx.Exec(chainID, account, account, transaction.ID, (*bigint.SQLBigInt)(header.Number), header.Hash, transaction.Type, &JSONBlob{transaction.Log}, logIndex, tokenID, txValue)
// Is that correct to set sender as account address?
_, err = insertTx.Exec(chainID, header.Address, header.Address, transaction.ID, (*bigint.SQLBigInt)(header.Number), header.Hash, transaction.Type, &JSONBlob{transaction.Log}, logIndex, tokenID, txValue)
if err != nil {
log.Error("error saving token transfer", "err", err)
return err
@ -567,7 +493,10 @@ func updateOrInsertTransfersDBFields(creator statementCreator, transfers []trans
return nil
}
// markBlocksAsLoaded(tx, address, chainID, blocks)
// markBlocksAsLoaded(chainID, tx, address, blockNumbers)
// In case block contains both ETH and token transfers, it will be marked as loaded on ETH transfer processing.
// This is not a problem since for token transfers we have preloaded transactions and blocks 'loaded' flag is needed
// for ETH transfers only.
func markBlocksAsLoaded(chainID uint64, creator statementCreator, address common.Address, blocks []*big.Int) error {
update, err := creator.Prepare("UPDATE blocks SET loaded=? WHERE address=? AND blk_number=? AND network_id=?")
if err != nil {

View File

@ -23,39 +23,31 @@ func setupTestDB(t *testing.T) (*Database, *BlockDAO, func()) {
}
}
func TestDBProcessBlocks(t *testing.T) {
db, block, stop := setupTestDB(t)
func TestDBSaveBlocks(t *testing.T) {
db, _, stop := setupTestDB(t)
defer stop()
address := common.Address{1}
from := big.NewInt(0)
to := big.NewInt(10)
blocks := []*DBHeader{
{
Number: big.NewInt(1),
Hash: common.Hash{1},
Address: address,
},
{
Number: big.NewInt(2),
Hash: common.Hash{2},
Address: address,
}}
t.Log(blocks)
nonce := int64(0)
lastBlock := &Block{
Number: to,
Balance: big.NewInt(0),
Nonce: &nonce,
}
require.NoError(t, db.ProcessBlocks(777, common.Address{1}, from, lastBlock, blocks))
t.Log(block.GetLastBlockByAddress(777, common.Address{1}, 40))
require.NoError(t, db.SaveBlocks(777, blocks))
transfers := []Transfer{
{
ID: common.Hash{1},
Type: w_common.EthTransfer,
BlockHash: common.Hash{2},
BlockNumber: big.NewInt(1),
Address: common.Address{1},
Address: address,
Timestamp: 123,
From: common.Address{1},
From: address,
},
}
tx, err := db.client.BeginTx(context.Background(), nil)
@ -65,15 +57,16 @@ func TestDBProcessBlocks(t *testing.T) {
require.NoError(t, tx.Commit())
}
func TestDBProcessTransfer(t *testing.T) {
func TestDBSaveTransfers(t *testing.T) {
db, _, stop := setupTestDB(t)
defer stop()
address := common.Address{1}
header := &DBHeader{
Number: big.NewInt(1),
Hash: common.Hash{1},
Address: common.Address{1},
Address: address,
}
tx := types.NewTransaction(1, common.Address{1}, nil, 10, big.NewInt(10), nil)
tx := types.NewTransaction(1, address, nil, 10, big.NewInt(10), nil)
transfers := []Transfer{
{
ID: common.Hash{1},
@ -82,62 +75,12 @@ func TestDBProcessTransfer(t *testing.T) {
BlockNumber: header.Number,
Transaction: tx,
Receipt: types.NewReceipt(nil, false, 100),
Address: common.Address{1},
Address: address,
MultiTransactionID: 0,
},
}
nonce := int64(0)
lastBlock := &Block{
Number: big.NewInt(0),
Balance: big.NewInt(0),
Nonce: &nonce,
}
require.NoError(t, db.ProcessBlocks(777, common.Address{1}, big.NewInt(1), lastBlock, []*DBHeader{header}))
require.NoError(t, db.ProcessTransfers(777, transfers, []*DBHeader{}))
}
func TestDBReorgTransfers(t *testing.T) {
db, _, stop := setupTestDB(t)
defer stop()
rcpt := types.NewReceipt(nil, false, 100)
rcpt.Logs = []*types.Log{}
original := &DBHeader{
Number: big.NewInt(1),
Hash: common.Hash{1},
Address: common.Address{1},
}
replaced := &DBHeader{
Number: big.NewInt(1),
Hash: common.Hash{2},
Address: common.Address{1},
}
originalTX := types.NewTransaction(1, common.Address{1}, nil, 10, big.NewInt(10), nil)
replacedTX := types.NewTransaction(2, common.Address{1}, nil, 10, big.NewInt(10), nil)
nonce := int64(0)
lastBlock := &Block{
Number: original.Number,
Balance: big.NewInt(0),
Nonce: &nonce,
}
require.NoError(t, db.ProcessBlocks(777, original.Address, original.Number, lastBlock, []*DBHeader{original}))
require.NoError(t, db.ProcessTransfers(777, []Transfer{
{w_common.EthTransfer, common.Hash{1}, *originalTX.To(), original.Number, original.Hash, 100, originalTX, true, 1777, common.Address{1}, rcpt, nil, nil, nil, "2100", NoMultiTransactionID},
}, []*DBHeader{}))
nonce = int64(0)
lastBlock = &Block{
Number: replaced.Number,
Balance: big.NewInt(0),
Nonce: &nonce,
}
require.NoError(t, db.ProcessBlocks(777, replaced.Address, replaced.Number, lastBlock, []*DBHeader{replaced}))
require.NoError(t, db.ProcessTransfers(777, []Transfer{
{w_common.EthTransfer, common.Hash{2}, *replacedTX.To(), replaced.Number, replaced.Hash, 100, replacedTX, true, 1777, common.Address{1}, rcpt, nil, nil, nil, "2100", NoMultiTransactionID},
}, []*DBHeader{original}))
all, err := db.GetTransfers(777, big.NewInt(0), nil)
require.NoError(t, err)
require.Len(t, all, 1)
require.Equal(t, replacedTX.Hash(), all[0].Transaction.Hash())
require.NoError(t, db.SaveBlocks(777, []*DBHeader{header}))
require.NoError(t, saveTransfersMarkBlocksLoaded(db.client, 777, address, transfers, []*big.Int{header.Number}))
}
func TestDBGetTransfersFromBlock(t *testing.T) {
@ -145,14 +88,17 @@ func TestDBGetTransfersFromBlock(t *testing.T) {
defer stop()
headers := []*DBHeader{}
transfers := []Transfer{}
address := common.Address{1}
blockNumbers := []*big.Int{}
for i := 1; i < 10; i++ {
header := &DBHeader{
Number: big.NewInt(int64(i)),
Hash: common.Hash{byte(i)},
Address: common.Address{1},
Address: address,
}
headers = append(headers, header)
tx := types.NewTransaction(uint64(i), common.Address{1}, nil, 10, big.NewInt(10), nil)
blockNumbers = append(blockNumbers, header.Number)
tx := types.NewTransaction(uint64(i), address, nil, 10, big.NewInt(10), nil)
receipt := types.NewReceipt(nil, false, 100)
receipt.Logs = []*types.Log{}
transfer := Transfer{
@ -162,18 +108,12 @@ func TestDBGetTransfersFromBlock(t *testing.T) {
BlockHash: header.Hash,
Transaction: tx,
Receipt: receipt,
Address: common.Address{1},
Address: address,
}
transfers = append(transfers, transfer)
}
nonce := int64(0)
lastBlock := &Block{
Number: headers[len(headers)-1].Number,
Balance: big.NewInt(0),
Nonce: &nonce,
}
require.NoError(t, db.ProcessBlocks(777, headers[0].Address, headers[0].Number, lastBlock, headers))
require.NoError(t, db.ProcessTransfers(777, transfers, []*DBHeader{}))
require.NoError(t, db.SaveBlocks(777, headers))
require.NoError(t, saveTransfersMarkBlocksLoaded(db.client, 777, address, transfers, blockNumbers))
rst, err := db.GetTransfers(777, big.NewInt(7), nil)
require.NoError(t, err)
require.Len(t, rst, 3)

View File

@ -6,6 +6,8 @@ import (
"math/big"
"time"
"golang.org/x/exp/slices" // since 1.21, this is in the standard library
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
@ -326,26 +328,28 @@ type ERC20TransfersDownloader struct {
signer types.Signer
}
func (d *ERC20TransfersDownloader) paddedAddress(address common.Address) common.Hash {
rst := common.Hash{}
copy(rst[12:], address[:])
func topicFromAddressSlice(addresses []common.Address) []common.Hash {
rst := make([]common.Hash, len(addresses))
for i, address := range addresses {
rst[i] = common.BytesToHash(address.Bytes())
}
return rst
}
func (d *ERC20TransfersDownloader) inboundTopics(address common.Address) [][]common.Hash {
return [][]common.Hash{{d.signature}, {}, {d.paddedAddress(address)}}
func (d *ERC20TransfersDownloader) inboundTopics(addresses []common.Address) [][]common.Hash {
return [][]common.Hash{{d.signature}, {}, topicFromAddressSlice(addresses)}
}
func (d *ERC20TransfersDownloader) outboundTopics(address common.Address) [][]common.Hash {
return [][]common.Hash{{d.signature}, {d.paddedAddress(address)}, {}}
func (d *ERC20TransfersDownloader) outboundTopics(addresses []common.Address) [][]common.Hash {
return [][]common.Hash{{d.signature}, topicFromAddressSlice(addresses), {}}
}
func (d *ERC20TransfersDownloader) inboundERC20OutboundERC1155Topics(address common.Address) [][]common.Hash {
return [][]common.Hash{{d.signature, d.signatureErc1155Single, d.signatureErc1155Batch}, {}, {d.paddedAddress(address)}}
func (d *ERC20TransfersDownloader) inboundERC20OutboundERC1155Topics(addresses []common.Address) [][]common.Hash {
return [][]common.Hash{{d.signature, d.signatureErc1155Single, d.signatureErc1155Batch}, {}, topicFromAddressSlice(addresses)}
}
func (d *ERC20TransfersDownloader) inboundTopicsERC1155(address common.Address) [][]common.Hash {
return [][]common.Hash{{d.signatureErc1155Single, d.signatureErc1155Batch}, {}, {}, {d.paddedAddress(address)}}
func (d *ERC20TransfersDownloader) inboundTopicsERC1155(addresses []common.Address) [][]common.Hash {
return [][]common.Hash{{d.signatureErc1155Single, d.signatureErc1155Batch}, {}, {}, topicFromAddressSlice(addresses)}
}
func (d *ETHDownloader) fetchTransactionReceipt(parent context.Context, txHash common.Hash) (*types.Receipt, error) {
@ -454,7 +458,7 @@ func (d *ETHDownloader) subTransactionsFromTransactionData(address, from common.
return rst, nil
}
func (d *ERC20TransfersDownloader) blocksFromLogs(parent context.Context, logs []types.Log, address common.Address) ([]*DBHeader, error) {
func (d *ERC20TransfersDownloader) blocksFromLogs(parent context.Context, logs []types.Log) ([]*DBHeader, error) {
concurrent := NewConcurrentDownloader(parent, NoThreadLimit)
for i := range logs {
@ -464,15 +468,20 @@ func (d *ERC20TransfersDownloader) blocksFromLogs(parent context.Context, logs [
continue
}
var address common.Address
from, to, txIDs, tokenIDs, values, err := w_common.ParseTransferLog(l)
if err != nil {
log.Error("failed to parse transfer log", "log", l, "address", address, "error", err)
log.Error("failed to parse transfer log", "log", l, "address", d.accounts, "error", err)
continue
}
// Double check provider returned the correct log
if from != address && to != address {
log.Error("from/to address mismatch", "log", l, "address", address)
if slices.Contains(d.accounts, from) {
address = from
} else if !slices.Contains(d.accounts, to) {
address = to
} else {
log.Error("from/to address mismatch", "log", l, "addresses", d.accounts)
continue
}
@ -480,7 +489,7 @@ func (d *ERC20TransfersDownloader) blocksFromLogs(parent context.Context, logs [
logType := w_common.EventTypeToSubtransactionType(eventType)
for i, txID := range txIDs {
log.Debug("block from logs", "block", l.BlockNumber, "log", l, "logType", logType, "address", address, "txID", txID)
log.Debug("block from logs", "block", l.BlockNumber, "log", l, "logType", logType, "txID", txID)
// For ERC20 there is no tokenID, so we use nil
var tokenID *big.Int
@ -491,6 +500,7 @@ func (d *ERC20TransfersDownloader) blocksFromLogs(parent context.Context, logs [
header := &DBHeader{
Number: big.NewInt(int64(l.BlockNumber)),
Hash: l.BlockHash,
Address: address,
PreloadedTransactions: []*PreloadedTransaction{{
ID: txID,
Type: logType,
@ -523,14 +533,13 @@ func (d *ERC20TransfersDownloader) GetHeadersInRange(parent context.Context, fro
headers := []*DBHeader{}
ctx := context.Background()
var err error
for _, address := range d.accounts {
outbound := []types.Log{}
var inboundOrMixed []types.Log // inbound ERC20 or outbound ERC1155 share the same signature for our purposes
if !d.incomingOnly {
outbound, err = d.client.FilterLogs(ctx, ethereum.FilterQuery{
FromBlock: from,
ToBlock: to,
Topics: d.outboundTopics(address),
Topics: d.outboundTopics(d.accounts),
})
if err != nil {
return nil, err
@ -538,7 +547,7 @@ func (d *ERC20TransfersDownloader) GetHeadersInRange(parent context.Context, fro
inboundOrMixed, err = d.client.FilterLogs(ctx, ethereum.FilterQuery{
FromBlock: from,
ToBlock: to,
Topics: d.inboundERC20OutboundERC1155Topics(address),
Topics: d.inboundERC20OutboundERC1155Topics(d.accounts),
})
if err != nil {
return nil, err
@ -547,7 +556,7 @@ func (d *ERC20TransfersDownloader) GetHeadersInRange(parent context.Context, fro
inboundOrMixed, err = d.client.FilterLogs(ctx, ethereum.FilterQuery{
FromBlock: from,
ToBlock: to,
Topics: d.inboundTopics(address),
Topics: d.inboundTopics(d.accounts),
})
if err != nil {
return nil, err
@ -557,7 +566,7 @@ func (d *ERC20TransfersDownloader) GetHeadersInRange(parent context.Context, fro
inbound1155, err := d.client.FilterLogs(ctx, ethereum.FilterQuery{
FromBlock: from,
ToBlock: to,
Topics: d.inboundTopicsERC1155(address),
Topics: d.inboundTopicsERC1155(d.accounts),
})
if err != nil {
return nil, err
@ -567,22 +576,20 @@ func (d *ERC20TransfersDownloader) GetHeadersInRange(parent context.Context, fro
if len(logs) == 0 {
log.Debug("no logs found for account")
continue
return nil, nil
}
rst, err := d.blocksFromLogs(parent, logs, address)
rst, err := d.blocksFromLogs(parent, logs)
if err != nil {
return nil, err
}
if len(rst) == 0 {
log.Warn("no headers found in logs for account", "chainID", d.client.NetworkID(), "address", address, "from", from, "to", to)
continue
log.Warn("no headers found in logs for account", "chainID", d.client.NetworkID(), "addresses", d.accounts, "from", from, "to", to)
} else {
headers = append(headers, rst...)
log.Debug("found erc20 transfers for account", "chainID", d.client.NetworkID(), "address", address,
log.Debug("found erc20 transfers for account", "chainID", d.client.NetworkID(), "addresses", d.accounts,
"from", from, "to", to, "headers", len(headers))
}
}
log.Debug("get erc20 transfers in range end", "chainID", d.client.NetworkID(),
"from", from, "to", to, "headers", len(headers), "took", time.Since(start))

View File

@ -5,20 +5,18 @@ import (
"errors"
"math/big"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
)
// SetupIterativeDownloader configures IterativeDownloader with last known synced block.
func SetupIterativeDownloader(
client HeaderReader, address common.Address,
downloader BatchDownloader, size *big.Int, to *big.Int, from *big.Int) (*IterativeDownloader, error) {
client HeaderReader, downloader BatchDownloader, size *big.Int, to *big.Int, from *big.Int) (*IterativeDownloader, error) {
if to == nil || from == nil {
return nil, errors.New("to or from cannot be nil")
}
log.Debug("iterative downloader", "address", address, "from", from, "to", to, "size", size)
log.Debug("iterative downloader", "from", from, "to", to, "size", size)
d := &IterativeDownloader{
client: client,
batchSize: size,

View File

@ -60,9 +60,9 @@ type SequentialFetchStrategy struct {
}
func (s *SequentialFetchStrategy) newCommand(chainClient chain.ClientInterface,
account common.Address) async.Commander {
accounts []common.Address) async.Commander {
return newLoadBlocksAndTransfersCommand(account, s.db, s.accountsDB, s.blockDAO, s.blockRangesSeqDAO, chainClient, s.feed,
return newLoadBlocksAndTransfersCommand(accounts, s.db, s.accountsDB, s.blockDAO, s.blockRangesSeqDAO, chainClient, s.feed,
s.transactionManager, s.pendingTxManager, s.tokenManager, s.balanceCacher, s.omitHistory)
}
@ -83,11 +83,9 @@ func (s *SequentialFetchStrategy) start() error {
}
for _, chainClient := range s.chainClients {
for _, address := range s.accounts {
ctl := s.newCommand(chainClient, address)
ctl := s.newCommand(chainClient, s.accounts)
s.group.Add(ctl.Command())
}
}
return nil
}

View File

@ -11,7 +11,6 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/status-im/status-go/services/wallet/common"
w_common "github.com/status-im/status-go/services/wallet/common"
"github.com/status-im/status-go/services/wallet/testutils"
"github.com/status-im/status-go/services/wallet/token"
@ -325,7 +324,7 @@ func InsertTestTransferWithOptions(tb testing.TB, db *sql.DB, address eth_common
blockHash: blkHash,
blockNumber: big.NewInt(tr.BlkNumber),
sender: tr.From,
transferType: w_common.Type(tokenType),
transferType: common.Type(tokenType),
timestamp: uint64(tr.Timestamp),
multiTransactionID: tr.MultiTransactionID,
baseGasFees: "0x0",
@ -375,3 +374,8 @@ func InsertTestMultiTransaction(tb testing.TB, db *sql.DB, tr *TestMultiTransact
tr.MultiTransactionID = MultiTransactionIDType(rowID)
return tr.MultiTransactionID
}
// For using in tests only outside the package
func SaveTransfersMarkBlocksLoaded(database *Database, chainID uint64, address eth_common.Address, transfers []Transfer, blocks []*big.Int) error {
return saveTransfersMarkBlocksLoaded(database.client, chainID, address, transfers, blocks)
}

View File

@ -12,6 +12,7 @@
// 1698257443_add_community_metadata_to_wallet_db.up.sql (323B)
// 1699987075_add_timestamp_and_state_to_community_data_cache.up.sql (865B)
// 1700414564_add_wallet_connect_pairings_table.up.sql (439B)
// 1701101493_add_token_blocks_range.up.sql (469B)
// doc.go (74B)
package migrations
@ -96,7 +97,7 @@ func _1691753758_initialUpSql() (*asset, error) {
return nil, err
}
info := bindataFileInfo{name: "1691753758_initial.up.sql", size: 5738, mode: os.FileMode(0644), modTime: time.Unix(1700316153, 0)}
info := bindataFileInfo{name: "1691753758_initial.up.sql", size: 5738, mode: os.FileMode(0644), modTime: time.Unix(1701079799, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x6b, 0x25, 0x31, 0xc8, 0x27, 0x3, 0x6b, 0x9f, 0x15, 0x42, 0x2f, 0x85, 0xfb, 0xe3, 0x6, 0xea, 0xf7, 0x97, 0x12, 0x56, 0x3c, 0x9a, 0x5b, 0x1a, 0xca, 0xb1, 0x23, 0xfa, 0xcd, 0x57, 0x25, 0x5c}}
return a, nil
}
@ -116,7 +117,7 @@ func _1692701329_add_collectibles_and_collections_data_cacheUpSql() (*asset, err
return nil, err
}
info := bindataFileInfo{name: "1692701329_add_collectibles_and_collections_data_cache.up.sql", size: 1808, mode: os.FileMode(0644), modTime: time.Unix(1700316153, 0)}
info := bindataFileInfo{name: "1692701329_add_collectibles_and_collections_data_cache.up.sql", size: 1808, mode: os.FileMode(0644), modTime: time.Unix(1699882401, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x1, 0x51, 0xf4, 0x2b, 0x92, 0xde, 0x59, 0x65, 0xd8, 0x9b, 0x57, 0xe0, 0xfd, 0x7b, 0x12, 0xb, 0x29, 0x6e, 0x9d, 0xb5, 0x90, 0xe, 0xfa, 0x12, 0x97, 0xd, 0x61, 0x60, 0x7f, 0x32, 0x1d, 0xc3}}
return a, nil
}
@ -136,7 +137,7 @@ func _1692701339_add_scope_to_pendingUpSql() (*asset, error) {
return nil, err
}
info := bindataFileInfo{name: "1692701339_add_scope_to_pending.up.sql", size: 576, mode: os.FileMode(0644), modTime: time.Unix(1700316153, 0)}
info := bindataFileInfo{name: "1692701339_add_scope_to_pending.up.sql", size: 576, mode: os.FileMode(0644), modTime: time.Unix(1699882401, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x36, 0x8a, 0x5e, 0xe2, 0x63, 0x15, 0x37, 0xba, 0x55, 0x18, 0xf3, 0xcc, 0xe0, 0x5, 0x84, 0xe1, 0x5b, 0xe8, 0x1, 0x32, 0x6b, 0x9f, 0x7d, 0x9f, 0xd9, 0x23, 0x6c, 0xa9, 0xb5, 0xdc, 0xf4, 0x93}}
return a, nil
}
@ -156,7 +157,7 @@ func _1694540071_add_collectibles_ownership_update_timestampUpSql() (*asset, err
return nil, err
}
info := bindataFileInfo{name: "1694540071_add_collectibles_ownership_update_timestamp.up.sql", size: 349, mode: os.FileMode(0644), modTime: time.Unix(1700316153, 0)}
info := bindataFileInfo{name: "1694540071_add_collectibles_ownership_update_timestamp.up.sql", size: 349, mode: os.FileMode(0644), modTime: time.Unix(1699882401, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x7f, 0x45, 0xc7, 0xce, 0x79, 0x63, 0xbc, 0x6f, 0x83, 0x5f, 0xe2, 0x3, 0x56, 0xcc, 0x5, 0x2f, 0x85, 0xda, 0x7e, 0xea, 0xf5, 0xd2, 0xac, 0x19, 0xd4, 0xd8, 0x5e, 0xdd, 0xed, 0xe2, 0xa9, 0x97}}
return a, nil
}
@ -176,7 +177,7 @@ func _1694692748_add_raw_balance_to_token_balancesUpSql() (*asset, error) {
return nil, err
}
info := bindataFileInfo{name: "1694692748_add_raw_balance_to_token_balances.up.sql", size: 165, mode: os.FileMode(0644), modTime: time.Unix(1700316153, 0)}
info := bindataFileInfo{name: "1694692748_add_raw_balance_to_token_balances.up.sql", size: 165, mode: os.FileMode(0644), modTime: time.Unix(1699882401, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xd4, 0xe0, 0x5b, 0x42, 0xf0, 0x96, 0xa5, 0xf5, 0xed, 0xc0, 0x97, 0x88, 0xb0, 0x6d, 0xfe, 0x7d, 0x97, 0x2e, 0x17, 0xd2, 0x16, 0xbc, 0x2a, 0xf2, 0xcc, 0x67, 0x9e, 0xc5, 0x47, 0xf6, 0x69, 0x1}}
return a, nil
}
@ -196,7 +197,7 @@ func _1695133989_add_community_id_to_collectibles_and_collections_data_cacheUpSq
return nil, err
}
info := bindataFileInfo{name: "1695133989_add_community_id_to_collectibles_and_collections_data_cache.up.sql", size: 275, mode: os.FileMode(0644), modTime: time.Unix(1700316153, 0)}
info := bindataFileInfo{name: "1695133989_add_community_id_to_collectibles_and_collections_data_cache.up.sql", size: 275, mode: os.FileMode(0644), modTime: time.Unix(1699882401, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xfa, 0x2, 0xa, 0x7f, 0x4b, 0xd1, 0x3, 0xd0, 0x3, 0x29, 0x84, 0x31, 0xed, 0x49, 0x4f, 0xb1, 0x2d, 0xd7, 0x80, 0x41, 0x5b, 0xfa, 0x6, 0xae, 0xb4, 0xf6, 0x6b, 0x49, 0xee, 0x57, 0x33, 0x76}}
return a, nil
}
@ -216,7 +217,7 @@ func _1695932536_balance_history_v2UpSql() (*asset, error) {
return nil, err
}
info := bindataFileInfo{name: "1695932536_balance_history_v2.up.sql", size: 653, mode: os.FileMode(0644), modTime: time.Unix(1700316153, 0)}
info := bindataFileInfo{name: "1695932536_balance_history_v2.up.sql", size: 653, mode: os.FileMode(0644), modTime: time.Unix(1699882401, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x37, 0xf4, 0x14, 0x91, 0xf6, 0x5f, 0xc4, 0x9b, 0xb7, 0x83, 0x32, 0x72, 0xbe, 0x82, 0x42, 0x39, 0xa4, 0x3b, 0xc9, 0x78, 0x3d, 0xca, 0xd4, 0xbf, 0xfc, 0x7a, 0x33, 0x1e, 0xcd, 0x9e, 0xe4, 0x85}}
return a, nil
}
@ -236,7 +237,7 @@ func _1696853635_input_dataUpSql() (*asset, error) {
return nil, err
}
info := bindataFileInfo{name: "1696853635_input_data.up.sql", size: 23140, mode: os.FileMode(0644), modTime: time.Unix(1700316153, 0)}
info := bindataFileInfo{name: "1696853635_input_data.up.sql", size: 23140, mode: os.FileMode(0644), modTime: time.Unix(1699882401, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x89, 0x30, 0x33, 0x33, 0x55, 0xc5, 0x57, 0x2b, 0xaf, 0xef, 0x3d, 0x8d, 0x2a, 0xaa, 0x5c, 0x32, 0xd1, 0xf4, 0xd, 0x4a, 0xd0, 0x33, 0x4a, 0xe8, 0xf6, 0x8, 0x6b, 0x65, 0xcc, 0xba, 0xed, 0x42}}
return a, nil
}
@ -256,7 +257,7 @@ func _1698117918_add_community_id_to_tokensUpSql() (*asset, error) {
return nil, err
}
info := bindataFileInfo{name: "1698117918_add_community_id_to_tokens.up.sql", size: 61, mode: os.FileMode(0644), modTime: time.Unix(1700316153, 0)}
info := bindataFileInfo{name: "1698117918_add_community_id_to_tokens.up.sql", size: 61, mode: os.FileMode(0644), modTime: time.Unix(1699882401, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xb3, 0x82, 0xdb, 0xde, 0x3, 0x3, 0xc, 0x67, 0xf3, 0x54, 0xc4, 0xad, 0xd6, 0xce, 0x56, 0xfb, 0xc1, 0x87, 0xd7, 0xda, 0xab, 0xec, 0x1, 0xe1, 0x7d, 0xb3, 0x63, 0xd6, 0xe5, 0x5d, 0x1c, 0x15}}
return a, nil
}
@ -276,7 +277,7 @@ func _1698257443_add_community_metadata_to_wallet_dbUpSql() (*asset, error) {
return nil, err
}
info := bindataFileInfo{name: "1698257443_add_community_metadata_to_wallet_db.up.sql", size: 323, mode: os.FileMode(0644), modTime: time.Unix(1700316153, 0)}
info := bindataFileInfo{name: "1698257443_add_community_metadata_to_wallet_db.up.sql", size: 323, mode: os.FileMode(0644), modTime: time.Unix(1699882401, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x22, 0xd3, 0x4, 0x25, 0xfa, 0x23, 0x1, 0x48, 0x83, 0x26, 0x20, 0xf2, 0x3d, 0xbc, 0xc1, 0xa7, 0x7c, 0x27, 0x7c, 0x1d, 0x63, 0x3, 0xa, 0xd0, 0xce, 0x47, 0x86, 0xdc, 0xa1, 0x3c, 0x2, 0x1c}}
return a, nil
}
@ -296,7 +297,7 @@ func _1699987075_add_timestamp_and_state_to_community_data_cacheUpSql() (*asset,
return nil, err
}
info := bindataFileInfo{name: "1699987075_add_timestamp_and_state_to_community_data_cache.up.sql", size: 865, mode: os.FileMode(0644), modTime: time.Unix(1700316153, 0)}
info := bindataFileInfo{name: "1699987075_add_timestamp_and_state_to_community_data_cache.up.sql", size: 865, mode: os.FileMode(0644), modTime: time.Unix(1701152469, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xc3, 0xee, 0x37, 0xf9, 0x7f, 0x9e, 0xfe, 0x93, 0x66, 0x2b, 0xd, 0x57, 0xf4, 0x89, 0x6c, 0x51, 0xfd, 0x14, 0xe9, 0xcd, 0xab, 0x65, 0xe7, 0xa7, 0x83, 0x7e, 0xe0, 0x5c, 0x14, 0x49, 0xf3, 0xe5}}
return a, nil
}
@ -316,11 +317,31 @@ func _1700414564_add_wallet_connect_pairings_tableUpSql() (*asset, error) {
return nil, err
}
info := bindataFileInfo{name: "1700414564_add_wallet_connect_pairings_table.up.sql", size: 439, mode: os.FileMode(0644), modTime: time.Unix(1700503490, 0)}
info := bindataFileInfo{name: "1700414564_add_wallet_connect_pairings_table.up.sql", size: 439, mode: os.FileMode(0644), modTime: time.Unix(1701326149, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xa9, 0x77, 0x5e, 0x19, 0x62, 0x3c, 0x3a, 0x81, 0x16, 0xa0, 0x95, 0x35, 0x62, 0xab, 0x5e, 0x2b, 0xea, 0x11, 0x71, 0x11, 0xd0, 0x9, 0xab, 0x9c, 0xab, 0xf2, 0xdd, 0x5f, 0x88, 0x83, 0x9a, 0x93}}
return a, nil
}
var __1701101493_add_token_blocks_rangeUpSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xac\xcf\x41\x0b\x82\x30\x18\xc6\xf1\xfb\x3e\xc5\xfb\x05\x06\xdd\xa5\xc3\x74\x2b\x84\x65\x51\xf3\x3c\xa6\xae\x10\xd7\x56\xbe\x33\xe8\xdb\x47\x0a\x5d\x8a\x40\xea\xf6\x5c\x9e\x1f\xfc\x29\x05\xd6\x34\x50\x07\x37\x9c\x3d\x12\x26\x95\xd8\x83\x62\xa9\x14\x50\xb9\x50\x77\xa8\x7b\xe3\x4f\x16\x35\xda\xeb\x60\x7d\x6c\x8d\x03\xc6\x39\x64\x5b\x59\x6e\x0a\x88\xa1\xb3\x5e\x57\xae\xd3\x18\x4d\x1f\x21\xcd\xd7\x79\xa1\x80\x8b\x15\x2b\xa5\x82\x45\xf2\x83\x78\x6c\x7b\xfc\xaf\xe8\xcc\x47\x90\x50\x0a\x59\xb8\xdc\xe1\x66\xdc\x60\x91\x94\x3b\xce\xd4\x17\xfb\x20\xd4\x5b\xf8\x12\x5e\x3b\x99\x09\x4c\x9d\x13\x30\xee\xb9\xc0\x98\x35\xfd\x9f\x33\x21\x8f\x00\x00\x00\xff\xff\x6f\x8c\xdc\x59\xd5\x01\x00\x00")
func _1701101493_add_token_blocks_rangeUpSqlBytes() ([]byte, error) {
return bindataRead(
__1701101493_add_token_blocks_rangeUpSql,
"1701101493_add_token_blocks_range.up.sql",
)
}
func _1701101493_add_token_blocks_rangeUpSql() (*asset, error) {
bytes, err := _1701101493_add_token_blocks_rangeUpSqlBytes()
if err != nil {
return nil, err
}
info := bindataFileInfo{name: "1701101493_add_token_blocks_range.up.sql", size: 469, mode: os.FileMode(0644), modTime: time.Unix(1701435193, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xe0, 0x37, 0xfb, 0x1a, 0x6c, 0x8c, 0xa8, 0x1e, 0xa2, 0xa5, 0x1f, 0x90, 0x73, 0x3e, 0x31, 0x5f, 0x48, 0x1e, 0x9a, 0x37, 0x27, 0x1c, 0xc, 0x67, 0x1, 0xcd, 0xec, 0x85, 0x4c, 0x1c, 0x26, 0x52}}
return a, nil
}
var _docGo = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x2c\xc9\xb1\x0d\xc4\x20\x0c\x05\xd0\x9e\x29\xfe\x02\xd8\xfd\x6d\xe3\x4b\xac\x2f\x44\x82\x09\x78\x7f\xa5\x49\xfd\xa6\x1d\xdd\xe8\xd8\xcf\x55\x8a\x2a\xe3\x47\x1f\xbe\x2c\x1d\x8c\xfa\x6f\xe3\xb4\x34\xd4\xd9\x89\xbb\x71\x59\xb6\x18\x1b\x35\x20\xa2\x9f\x0a\x03\xa2\xe5\x0d\x00\x00\xff\xff\x60\xcd\x06\xbe\x4a\x00\x00\x00")
func docGoBytes() ([]byte, error) {
@ -336,7 +357,7 @@ func docGo() (*asset, error) {
return nil, err
}
info := bindataFileInfo{name: "doc.go", size: 74, mode: os.FileMode(0644), modTime: time.Unix(1700316153, 0)}
info := bindataFileInfo{name: "doc.go", size: 74, mode: os.FileMode(0644), modTime: time.Unix(1699882401, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xde, 0x7c, 0x28, 0xcd, 0x47, 0xf2, 0xfa, 0x7c, 0x51, 0x2d, 0xd8, 0x38, 0xb, 0xb0, 0x34, 0x9d, 0x4c, 0x62, 0xa, 0x9e, 0x28, 0xc3, 0x31, 0x23, 0xd9, 0xbb, 0x89, 0x9f, 0xa0, 0x89, 0x1f, 0xe8}}
return a, nil
}
@ -456,6 +477,8 @@ var _bindata = map[string]func() (*asset, error){
"1700414564_add_wallet_connect_pairings_table.up.sql": _1700414564_add_wallet_connect_pairings_tableUpSql,
"1701101493_add_token_blocks_range.up.sql": _1701101493_add_token_blocks_rangeUpSql,
"doc.go": docGo,
}
@ -463,11 +486,13 @@ var _bindata = map[string]func() (*asset, error){
// directory embedded in the file by go-bindata.
// For example if you run go-bindata on data/... and data contains the
// following hierarchy:
//
// data/
// foo.txt
// img/
// a.png
// b.png
//
// then AssetDir("data") would return []string{"foo.txt", "img"},
// AssetDir("data/img") would return []string{"a.png", "b.png"},
// AssetDir("foo.txt") and AssetDir("notexist") would return an error, and
@ -512,6 +537,7 @@ var _bintree = &bintree{nil, map[string]*bintree{
"1698257443_add_community_metadata_to_wallet_db.up.sql": &bintree{_1698257443_add_community_metadata_to_wallet_dbUpSql, map[string]*bintree{}},
"1699987075_add_timestamp_and_state_to_community_data_cache.up.sql": &bintree{_1699987075_add_timestamp_and_state_to_community_data_cacheUpSql, map[string]*bintree{}},
"1700414564_add_wallet_connect_pairings_table.up.sql": &bintree{_1700414564_add_wallet_connect_pairings_tableUpSql, map[string]*bintree{}},
"1701101493_add_token_blocks_range.up.sql": &bintree{_1701101493_add_token_blocks_rangeUpSql, map[string]*bintree{}},
"doc.go": &bintree{docGo, map[string]*bintree{}},
}}

View File

@ -0,0 +1,9 @@
-- Add columns
ALTER TABLE blocks_ranges_sequential ADD COLUMN token_blk_start BIGINT DEFAULT 0;
ALTER TABLE blocks_ranges_sequential ADD COLUMN token_blk_first BIGINT DEFAULT 0;
ALTER TABLE blocks_ranges_sequential ADD COLUMN token_blk_last BIGINT DEFAULT 0;
-- Copy values
UPDATE blocks_ranges_sequential SET token_blk_start = blk_start;
UPDATE blocks_ranges_sequential SET token_blk_first = blk_first;
UPDATE blocks_ranges_sequential SET token_blk_last = blk_last;