From 82185b54b5e23b096086cd491d380bbc7b2c74ef Mon Sep 17 00:00:00 2001 From: Ivan Belyakov Date: Mon, 27 Nov 2023 11:08:17 +0100 Subject: [PATCH] feat(wallet): separate ETH and tokens search ranges to allow calling `getLogs` for multiple accounts simultaneously. For now only used for new transfers detection. Detection of `new` transfers has been changed, now they are searched from head and forward. Previously they were searched from last scanned block forward. --- services/local-notifications/core_test.go | 10 +- services/wallet/transfer/block_dao.go | 36 -- .../transfer/block_ranges_sequential_dao.go | 140 +++-- services/wallet/transfer/commands.go | 16 +- .../wallet/transfer/commands_sequential.go | 501 ++++++++++++------ .../transfer/commands_sequential_test.go | 7 +- services/wallet/transfer/concurrent.go | 2 +- services/wallet/transfer/controller_test.go | 11 +- services/wallet/transfer/database.go | 119 +---- services/wallet/transfer/database_test.go | 112 +--- services/wallet/transfer/downloader.go | 141 ++--- services/wallet/transfer/iterative.go | 6 +- .../transfer/sequential_fetch_strategy.go | 10 +- services/wallet/transfer/testutils.go | 8 +- walletdatabase/migrations/bindata.go | 62 ++- .../1701101493_add_token_blocks_range.up.sql | 9 + 16 files changed, 644 insertions(+), 546 deletions(-) create mode 100644 walletdatabase/migrations/sql/1701101493_add_token_blocks_range.up.sql diff --git a/services/local-notifications/core_test.go b/services/local-notifications/core_test.go index bcfc12c15..758b5bb99 100644 --- a/services/local-notifications/core_test.go +++ b/services/local-notifications/core_test.go @@ -117,14 +117,8 @@ func TestTransactionNotification(t *testing.T) { Address: header.Address, }, } - nonce := int64(0) - lastBlock := &transfer.Block{ - Number: big.NewInt(1), - Balance: big.NewInt(0), - Nonce: &nonce, - } - require.NoError(t, walletDb.ProcessBlocks(1777, header.Address, big.NewInt(1), lastBlock, []*transfer.DBHeader{header})) - require.NoError(t, walletDb.ProcessTransfers(1777, transfers, []*transfer.DBHeader{})) + require.NoError(t, walletDb.SaveBlocks(1777, []*transfer.DBHeader{header})) + require.NoError(t, transfer.SaveTransfersMarkBlocksLoaded(walletDb, 1777, header.Address, transfers, []*big.Int{header.Number})) feed.Send(walletevent.Event{ Type: transfer.EventRecentHistoryReady, diff --git a/services/wallet/transfer/block_dao.go b/services/wallet/transfer/block_dao.go index e52285c08..e1d373629 100644 --- a/services/wallet/transfer/block_dao.go +++ b/services/wallet/transfer/block_dao.go @@ -388,39 +388,3 @@ func insertRange(chainID uint64, creator statementCreator, account common.Addres _, err = insert.Exec(chainID, account, (*bigint.SQLBigInt)(from), (*bigint.SQLBigInt)(to)) return err } - -func upsertRange(chainID uint64, creator statementCreator, account common.Address, from *big.Int, to *Block) (err error) { - log.Debug("upsert blocks range", "account", account, "network id", chainID, "from", from, "to", to.Number, "balance", to.Balance) - update, err := creator.Prepare(`UPDATE blocks_ranges - SET blk_to = ?, balance = ?, nonce = ? - WHERE address = ? - AND network_id = ? - AND blk_to = ?`) - - if err != nil { - return err - } - - res, err := update.Exec((*bigint.SQLBigInt)(to.Number), (*bigint.SQLBigIntBytes)(to.Balance), to.Nonce, account, chainID, (*bigint.SQLBigInt)(from)) - - if err != nil { - return err - } - affected, err := res.RowsAffected() - if err != nil { - return err - } - if affected == 0 { - insert, err := creator.Prepare("INSERT INTO blocks_ranges (network_id, address, blk_from, blk_to, balance, nonce) VALUES (?, ?, ?, ?, ?, ?)") - if err != nil { - return err - } - - _, err = insert.Exec(chainID, account, (*bigint.SQLBigInt)(from), (*bigint.SQLBigInt)(to.Number), (*bigint.SQLBigIntBytes)(to.Balance), to.Nonce) - if err != nil { - return err - } - } - - return -} diff --git a/services/wallet/transfer/block_ranges_sequential_dao.go b/services/wallet/transfer/block_ranges_sequential_dao.go index 859fbcd6e..2e94bd49c 100644 --- a/services/wallet/transfer/block_ranges_sequential_dao.go +++ b/services/wallet/transfer/block_ranges_sequential_dao.go @@ -23,8 +23,17 @@ func NewBlockRange() *BlockRange { return &BlockRange{Start: &big.Int{}, FirstKnown: &big.Int{}, LastKnown: &big.Int{}} } -func (b *BlockRangeSequentialDAO) getBlockRange(chainID uint64, address common.Address) (blockRange *BlockRange, err error) { - query := `SELECT blk_start, blk_first, blk_last FROM blocks_ranges_sequential +type ethTokensBlockRanges struct { + eth *BlockRange + tokens *BlockRange +} + +func newEthTokensBlockRanges() *ethTokensBlockRanges { + return ðTokensBlockRanges{eth: NewBlockRange(), tokens: NewBlockRange()} +} + +func (b *BlockRangeSequentialDAO) getBlockRange(chainID uint64, address common.Address) (blockRange *ethTokensBlockRanges, err error) { + query := `SELECT blk_start, blk_first, blk_last, token_blk_start, token_blk_first, token_blk_last FROM blocks_ranges_sequential WHERE address = ? AND network_id = ?` @@ -34,9 +43,11 @@ func (b *BlockRangeSequentialDAO) getBlockRange(chainID uint64, address common.A } defer rows.Close() + blockRange = ðTokensBlockRanges{} if rows.Next() { - blockRange = NewBlockRange() - err = rows.Scan((*bigint.SQLBigInt)(blockRange.Start), (*bigint.SQLBigInt)(blockRange.FirstKnown), (*bigint.SQLBigInt)(blockRange.LastKnown)) + blockRange = newEthTokensBlockRanges() + err = rows.Scan((*bigint.SQLBigInt)(blockRange.eth.Start), (*bigint.SQLBigInt)(blockRange.eth.FirstKnown), (*bigint.SQLBigInt)(blockRange.eth.LastKnown), + (*bigint.SQLBigInt)(blockRange.tokens.Start), (*bigint.SQLBigInt)(blockRange.tokens.FirstKnown), (*bigint.SQLBigInt)(blockRange.tokens.LastKnown)) if err != nil { return nil, err } @@ -44,7 +55,7 @@ func (b *BlockRangeSequentialDAO) getBlockRange(chainID uint64, address common.A return blockRange, nil } - return nil, nil + return blockRange, nil } func (b *BlockRangeSequentialDAO) deleteRange(account common.Address) error { @@ -59,45 +70,44 @@ func (b *BlockRangeSequentialDAO) deleteRange(account common.Address) error { return err } -func (b *BlockRangeSequentialDAO) upsertRange(chainID uint64, account common.Address, - newBlockRange *BlockRange) (err error) { - - log.Debug("upsert blocks range", "account", account, "chainID", chainID, - "start", newBlockRange.Start, "first", newBlockRange.FirstKnown, "last", newBlockRange.LastKnown) - - blockRange, err := b.getBlockRange(chainID, account) +func (b *BlockRangeSequentialDAO) upsertRange(chainID uint64, account common.Address, newBlockRange *ethTokensBlockRanges) (err error) { + ethTokensBlockRange, err := b.getBlockRange(chainID, account) if err != nil { return err } - // Update existing range - if blockRange != nil { - // Ovewrite start block if there was not any or if new one is older, because it can be precised only - // to a greater value, because no history can be before some block that is considered - // as a start of history, but due to concurrent block range checks, a newer greater block - // can be found that matches criteria of a start block (nonce is zero, balances are equal) - if newBlockRange.Start != nil && (blockRange.Start == nil || blockRange.Start.Cmp(newBlockRange.Start) < 0) { - blockRange.Start = newBlockRange.Start - } + ethBlockRange := prepareUpdatedBlockRange(chainID, account, ethTokensBlockRange.eth, newBlockRange.eth) + tokensBlockRange := prepareUpdatedBlockRange(chainID, account, ethTokensBlockRange.tokens, newBlockRange.tokens) - // Overwrite first known block if there was not any or if new one is older - if (blockRange.FirstKnown == nil && newBlockRange.FirstKnown != nil) || - (blockRange.FirstKnown != nil && newBlockRange.FirstKnown != nil && blockRange.FirstKnown.Cmp(newBlockRange.FirstKnown) > 0) { - blockRange.FirstKnown = newBlockRange.FirstKnown - } + log.Debug("update eth and tokens blocks range", "account", account, "chainID", chainID, + "eth.start", ethBlockRange.Start, "eth.first", ethBlockRange.FirstKnown, "eth.last", ethBlockRange.LastKnown, + "tokens.start", tokensBlockRange.Start, "tokens.first", ethBlockRange.FirstKnown, "eth.last", ethBlockRange.LastKnown) - // Overwrite last known block if there was not any or if new one is newer - if (blockRange.LastKnown == nil && newBlockRange.LastKnown != nil) || - (blockRange.LastKnown != nil && newBlockRange.LastKnown != nil && blockRange.LastKnown.Cmp(newBlockRange.LastKnown) < 0) { - blockRange.LastKnown = newBlockRange.LastKnown - } - - log.Debug("update blocks range", "account", account, "chainID", chainID, - "start", blockRange.Start, "first", blockRange.FirstKnown, "last", blockRange.LastKnown) - } else { - blockRange = newBlockRange + upsert, err := b.db.Prepare(`REPLACE INTO blocks_ranges_sequential + (network_id, address, blk_start, blk_first, blk_last, token_blk_start, token_blk_first, token_blk_last) VALUES (?, ?, ?, ?, ?, ?, ?, ?)`) + if err != nil { + return err } + _, err = upsert.Exec(chainID, account, (*bigint.SQLBigInt)(ethBlockRange.Start), (*bigint.SQLBigInt)(ethBlockRange.FirstKnown), (*bigint.SQLBigInt)(ethBlockRange.LastKnown), + (*bigint.SQLBigInt)(tokensBlockRange.Start), (*bigint.SQLBigInt)(tokensBlockRange.FirstKnown), (*bigint.SQLBigInt)(tokensBlockRange.LastKnown)) + + return err +} + +func (b *BlockRangeSequentialDAO) upsertEthRange(chainID uint64, account common.Address, + newBlockRange *BlockRange) (err error) { + + ethTokensBlockRange, err := b.getBlockRange(chainID, account) + if err != nil { + return err + } + + blockRange := prepareUpdatedBlockRange(chainID, account, ethTokensBlockRange.eth, newBlockRange) + + log.Debug("update eth blocks range", "account", account, "chainID", chainID, + "start", blockRange.Start, "first", blockRange.FirstKnown, "last", blockRange.LastKnown) + upsert, err := b.db.Prepare(`REPLACE INTO blocks_ranges_sequential (network_id, address, blk_start, blk_first, blk_last) VALUES (?, ?, ?, ?, ?)`) if err != nil { @@ -107,5 +117,61 @@ func (b *BlockRangeSequentialDAO) upsertRange(chainID uint64, account common.Add _, err = upsert.Exec(chainID, account, (*bigint.SQLBigInt)(blockRange.Start), (*bigint.SQLBigInt)(blockRange.FirstKnown), (*bigint.SQLBigInt)(blockRange.LastKnown)) - return + return err +} + +func (b *BlockRangeSequentialDAO) upsertTokenRange(chainID uint64, account common.Address, + newBlockRange *BlockRange) (err error) { + + ethTokensBlockRange, err := b.getBlockRange(chainID, account) + if err != nil { + return err + } + + blockRange := prepareUpdatedBlockRange(chainID, account, ethTokensBlockRange.tokens, newBlockRange) + + log.Debug("update tokens blocks range", "account", account, "chainID", chainID, + "start", blockRange.Start, "first", blockRange.FirstKnown, "last", blockRange.LastKnown) + + upsert, err := b.db.Prepare(`REPLACE INTO blocks_ranges_sequential + (network_id, address, token_blk_start, token_blk_first, token_blk_last) VALUES (?, ?, ?, ?, ?)`) + if err != nil { + return err + } + + _, err = upsert.Exec(chainID, account, (*bigint.SQLBigInt)(blockRange.Start), (*bigint.SQLBigInt)(blockRange.FirstKnown), + (*bigint.SQLBigInt)(blockRange.LastKnown)) + + return err +} + +func prepareUpdatedBlockRange(chainID uint64, account common.Address, blockRange, newBlockRange *BlockRange) *BlockRange { + // Update existing range + if blockRange != nil { + if newBlockRange != nil { + // Ovewrite start block if there was not any or if new one is older, because it can be precised only + // to a greater value, because no history can be before some block that is considered + // as a start of history, but due to concurrent block range checks, a newer greater block + // can be found that matches criteria of a start block (nonce is zero, balances are equal) + if newBlockRange.Start != nil && (blockRange.Start == nil || blockRange.Start.Cmp(newBlockRange.Start) < 0) { + blockRange.Start = newBlockRange.Start + } + + // Overwrite first known block if there was not any or if new one is older + if (blockRange.FirstKnown == nil && newBlockRange.FirstKnown != nil) || + (blockRange.FirstKnown != nil && newBlockRange.FirstKnown != nil && blockRange.FirstKnown.Cmp(newBlockRange.FirstKnown) > 0) { + blockRange.FirstKnown = newBlockRange.FirstKnown + } + + // Overwrite last known block if there was not any or if new one is newer + if (blockRange.LastKnown == nil && newBlockRange.LastKnown != nil) || + (blockRange.LastKnown != nil && newBlockRange.LastKnown != nil && blockRange.LastKnown.Cmp(newBlockRange.LastKnown) < 0) { + blockRange.LastKnown = newBlockRange.LastKnown + } + } + } else { + blockRange = newBlockRange + } + + return blockRange } diff --git a/services/wallet/transfer/commands.go b/services/wallet/transfer/commands.go index e774c9060..a376fe686 100644 --- a/services/wallet/transfer/commands.go +++ b/services/wallet/transfer/commands.go @@ -6,6 +6,8 @@ import ( "math/big" "time" + "golang.org/x/exp/maps" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/event" @@ -113,7 +115,6 @@ func (c *ethHistoricalCommand) Run(ctx context.Context) (err error) { type erc20HistoricalCommand struct { erc20 BatchDownloader - address common.Address chainClient chain.ClientInterface feed *event.Feed @@ -151,13 +152,13 @@ func getErc20BatchSize(chainID uint64) *big.Int { } func (c *erc20HistoricalCommand) Run(ctx context.Context) (err error) { - log.Debug("wallet historical downloader for erc20 transfers start", "chainID", c.chainClient.NetworkID(), "address", c.address, + log.Debug("wallet historical downloader for erc20 transfers start", "chainID", c.chainClient.NetworkID(), "from", c.from, "to", c.to) start := time.Now() if c.iterator == nil { c.iterator, err = SetupIterativeDownloader( - c.chainClient, c.address, + c.chainClient, c.erc20, getErc20BatchSize(c.chainClient.NetworkID()), c.to, c.from) if err != nil { log.Error("failed to setup historical downloader for erc20") @@ -172,7 +173,7 @@ func (c *erc20HistoricalCommand) Run(ctx context.Context) (err error) { } c.foundHeaders = append(c.foundHeaders, headers...) } - log.Debug("wallet historical downloader for erc20 transfers finished", "chainID", c.chainClient.NetworkID(), "address", c.address, + log.Debug("wallet historical downloader for erc20 transfers finished", "chainID", c.chainClient.NetworkID(), "from", c.from, "to", c.to, "time", time.Since(start), "headers", len(c.foundHeaders)) return nil } @@ -501,7 +502,7 @@ func (c *loadTransfersCommand) Command() async.Command { } func (c *loadTransfersCommand) LoadTransfers(ctx context.Context, limit int, blocksByAddress map[common.Address][]*big.Int) error { - return loadTransfers(ctx, c.accounts, c.blockDAO, c.db, c.chainClient, limit, blocksByAddress, + return loadTransfers(ctx, c.blockDAO, c.db, c.chainClient, limit, blocksByAddress, c.transactionManager, c.pendingTxManager, c.tokenManager, c.feed) } @@ -510,16 +511,17 @@ func (c *loadTransfersCommand) Run(parent context.Context) (err error) { return } -func loadTransfers(ctx context.Context, accounts []common.Address, blockDAO *BlockDAO, db *Database, +func loadTransfers(ctx context.Context, blockDAO *BlockDAO, db *Database, chainClient chain.ClientInterface, blocksLimitPerAccount int, blocksByAddress map[common.Address][]*big.Int, transactionManager *TransactionManager, pendingTxManager *transactions.PendingTxTracker, tokenManager *token.Manager, feed *event.Feed) error { - log.Debug("loadTransfers start", "accounts", accounts, "chain", chainClient.NetworkID(), "limit", blocksLimitPerAccount) + log.Debug("loadTransfers start", "chain", chainClient.NetworkID(), "limit", blocksLimitPerAccount) start := time.Now() group := async.NewGroup(ctx) + accounts := maps.Keys(blocksByAddress) for _, address := range accounts { transfers := &transfersCommand{ db: db, diff --git a/services/wallet/transfer/commands_sequential.go b/services/wallet/transfer/commands_sequential.go index 65f4eab5c..495d8bbeb 100644 --- a/services/wallet/transfer/commands_sequential.go +++ b/services/wallet/transfer/commands_sequential.go @@ -34,47 +34,187 @@ func (c *findNewBlocksCommand) Command() async.Command { } func (c *findNewBlocksCommand) Run(parent context.Context) (err error) { - log.Debug("start findNewBlocksCommand", "account", c.account, "chain", c.chainClient.NetworkID(), "noLimit", c.noLimit, "from", c.fromBlockNumber, "to", c.toBlockNumber) - headNum, err := getHeadBlockNumber(parent, c.chainClient) if err != nil { - // c.error = err - return err // Might need to retry a couple of times + return err } - blockRange, err := loadBlockRangeInfo(c.chainClient.NetworkID(), c.account, c.blockRangeDAO) - if err != nil { - log.Error("findBlocksCommand loadBlockRangeInfo", "error", err) - // c.error = err - return err // Will keep spinning forever nomatter what + // In case this is the first check, skip it, history fetching will do it + if c.fromBlockNumber.Cmp(headNum) >= 0 { + return nil } - // In case no block range is in DB, skip until history blocks are fetched - if blockRange != nil { - c.fromBlockNumber = blockRange.LastKnown + c.findAndSaveEthBlocks(parent, c.fromBlockNumber, headNum) + c.findAndSaveTokenBlocks(parent, c.fromBlockNumber, headNum) - log.Debug("Launching new blocks command", "chainID", c.chainClient.NetworkID(), "account", c.account, - "from", c.fromBlockNumber, "headNum", headNum) - - // In case interval between checks is set smaller than block mining time, - // we might need to wait for the next block to be mined - if c.fromBlockNumber.Cmp(headNum) >= 0 { - return - } - - c.toBlockNumber = headNum - - _ = c.findBlocksCommand.Run(parent) - } - - log.Debug("end findNewBlocksCommand", "account", c.account, "chain", c.chainClient.NetworkID(), "noLimit", c.noLimit, "from", c.fromBlockNumber, "to", c.toBlockNumber) + c.fromBlockNumber = headNum return nil } +func (c *findNewBlocksCommand) findAndSaveEthBlocks(parent context.Context, fromNum, headNum *big.Int) { + // Check ETH transfers for each account independently + for _, account := range c.accounts { + log.Debug("start findNewBlocksCommand", "account", account, "chain", c.chainClient.NetworkID(), "noLimit", c.noLimit, "from", fromNum, "to", headNum) + + headers, startBlockNum, _ := c.findBlocksWithEthTransfers(parent, account, fromNum, headNum) + if len(headers) > 0 { + log.Debug("findNewBlocksCommand saving headers", "len", len(headers), "lastBlockNumber", headNum, + "balance", c.balanceCacher.Cache().GetBalance(account, c.chainClient.NetworkID(), headNum), + "nonce", c.balanceCacher.Cache().GetNonce(account, c.chainClient.NetworkID(), headNum)) + + err := c.db.SaveBlocks(c.chainClient.NetworkID(), headers) + if err != nil { + c.error = err + break + } + + c.blocksFound(headers) + } + + err := c.markEthBlockRangeChecked(account, &BlockRange{startBlockNum, fromNum, headNum}) + if err != nil { + c.error = err + break + } + + log.Debug("end findNewBlocksCommand", "account", account, "chain", c.chainClient.NetworkID(), "noLimit", c.noLimit, "from", fromNum, "to", headNum) + } +} + +func (c *findNewBlocksCommand) findAndSaveTokenBlocks(parent context.Context, fromNum, headNum *big.Int) { + // Check token transfers for all accounts. + // Each account's last checked block can be different, so we can get duplicated headers, + // so we need to deduplicate them + const incomingOnly = false + erc20Headers, err := c.fastIndexErc20(parent, fromNum, headNum, incomingOnly) + if err != nil { + log.Error("findNewBlocksCommand fastIndexErc20", "err", err, "account", c.accounts, "chain", c.chainClient.NetworkID()) + c.error = err + return + } + + if len(erc20Headers) > 0 { + log.Debug("findNewBlocksCommand saving headers", "len", len(erc20Headers), "from", fromNum, "to", headNum) + + // get not loaded headers from DB for all accs and blocks + preLoadedTransactions, err := c.db.GetTransactionsToLoad(c.chainClient.NetworkID(), common.Address{}, nil) + if err != nil { + c.error = err + return + } + + tokenBlocksFiltered := filterNewPreloadedTransactions(erc20Headers, preLoadedTransactions) + + err = c.db.SaveBlocks(c.chainClient.NetworkID(), tokenBlocksFiltered) + if err != nil { + c.error = err + return + } + + c.blocksFound(tokenBlocksFiltered) + } + + err = c.markTokenBlockRangeChecked(c.accounts, fromNum, headNum) + if err != nil { + c.error = err + return + } +} + +func (c *findNewBlocksCommand) markTokenBlockRangeChecked(accounts []common.Address, from, to *big.Int) error { + log.Debug("markTokenBlockRangeChecked", "chain", c.chainClient.NetworkID(), "from", from.Uint64(), "to", to.Uint64()) + + for _, account := range accounts { + err := c.blockRangeDAO.upsertTokenRange(c.chainClient.NetworkID(), account, &BlockRange{LastKnown: to}) + if err != nil { + c.error = err + log.Error("findNewBlocksCommand upsertTokenRange", "error", err) + return err + } + } + + return nil +} + +func filterNewPreloadedTransactions(erc20Headers []*DBHeader, preLoadedTransfers []*PreloadedTransaction) []*DBHeader { + var uniqueErc20Headers []*DBHeader + for _, header := range erc20Headers { + loaded := false + for _, transfer := range preLoadedTransfers { + if header.PreloadedTransactions[0].ID == transfer.ID { + loaded = true + break + } + } + + if !loaded { + uniqueErc20Headers = append(uniqueErc20Headers, header) + } + } + + return uniqueErc20Headers +} + +func (c *findNewBlocksCommand) findBlocksWithEthTransfers(parent context.Context, account common.Address, fromOrig, toOrig *big.Int) (headers []*DBHeader, startBlockNum *big.Int, err error) { + log.Debug("start findNewBlocksCommand::findBlocksWithEthTransfers", "account", account, "chain", c.chainClient.NetworkID(), "noLimit", c.noLimit, "from", c.fromBlockNumber, "to", c.toBlockNumber) + + rangeSize := big.NewInt(int64(c.defaultNodeBlockChunkSize)) + + from, to := new(big.Int).Set(fromOrig), new(big.Int).Set(toOrig) + + // Limit the range size to DefaultNodeBlockChunkSize + if new(big.Int).Sub(to, from).Cmp(rangeSize) > 0 { + from.Sub(to, rangeSize) + } + + for { + if from.Cmp(to) == 0 { + log.Debug("findNewBlocksCommand empty range", "from", from, "to", to) + break + } + + fromBlock := &Block{Number: from} + + var newFromBlock *Block + var ethHeaders []*DBHeader + newFromBlock, ethHeaders, startBlockNum, err = c.fastIndex(parent, c.balanceCacher, fromBlock, to) + if err != nil { + log.Error("findNewBlocksCommand checkRange fastIndex", "err", err, "account", account, + "chain", c.chainClient.NetworkID()) + c.error = err + // return err // In case c.noLimit is true, hystrix "max concurrency" may be reached and we will not be able to index ETH transfers + return nil, nil, nil + } + log.Debug("findNewBlocksCommand checkRange", "chainID", c.chainClient.NetworkID(), "account", account, + "startBlock", startBlockNum, "newFromBlock", newFromBlock.Number, "toBlockNumber", to, "noLimit", c.noLimit) + + headers = append(headers, ethHeaders...) + + if startBlockNum != nil && startBlockNum.Cmp(from) >= 0 { + log.Debug("Checked all ranges, stop execution", "startBlock", startBlockNum, "from", from, "to", to) + break + } + + nextFrom, nextTo := nextRange(c.defaultNodeBlockChunkSize, newFromBlock.Number, fromOrig) + + if nextFrom.Cmp(from) == 0 && nextTo.Cmp(to) == 0 { + log.Debug("findNewBlocksCommand empty next range", "from", from, "to", to) + break + } + + from = nextFrom + to = nextTo + } + + log.Debug("end findNewBlocksCommand::findBlocksWithEthTransfers", "account", account, "chain", c.chainClient.NetworkID(), "noLimit", c.noLimit) + + return headers, startBlockNum, nil +} + // TODO NewFindBlocksCommand type findBlocksCommand struct { - account common.Address + accounts []common.Address db *Database accountsDB *accounts.Database blockRangeDAO *BlockRangeSequentialDAO @@ -108,7 +248,7 @@ type ERC20BlockRange struct { to *big.Int } -func (c *findBlocksCommand) ERC20ScanByBalance(parent context.Context, fromBlock, toBlock *big.Int, token common.Address) ([]ERC20BlockRange, error) { +func (c *findBlocksCommand) ERC20ScanByBalance(parent context.Context, account common.Address, fromBlock, toBlock *big.Int, token common.Address) ([]ERC20BlockRange, error) { var err error batchSize := getErc20BatchSize(c.chainClient.NetworkID()) ranges := [][]*big.Int{{fromBlock, toBlock}} @@ -120,7 +260,7 @@ func (c *findBlocksCommand) ERC20ScanByBalance(parent context.Context, fromBlock from, to := blockRange[0], blockRange[1] fromBalance, ok := cache[from.Int64()] if !ok { - fromBalance, err = c.tokenManager.GetTokenBalanceAt(parent, c.chainClient, c.account, token, from) + fromBalance, err = c.tokenManager.GetTokenBalanceAt(parent, c.chainClient, account, token, from) if err != nil { return nil, err } @@ -133,7 +273,7 @@ func (c *findBlocksCommand) ERC20ScanByBalance(parent context.Context, fromBlock toBalance, ok := cache[to.Int64()] if !ok { - toBalance, err = c.tokenManager.GetTokenBalanceAt(parent, c.chainClient, c.account, token, to) + toBalance, err = c.tokenManager.GetTokenBalanceAt(parent, c.chainClient, account, token, to) if err != nil { return nil, err } @@ -168,8 +308,8 @@ func (c *findBlocksCommand) ERC20ScanByBalance(parent context.Context, fromBlock return foundRanges, nil } -func (c *findBlocksCommand) checkERC20Tail(parent context.Context) ([]*DBHeader, error) { - log.Debug("checkERC20Tail", "account", c.account, "to block", c.startBlockNumber, "from", c.resFromBlock.Number) +func (c *findBlocksCommand) checkERC20Tail(parent context.Context, account common.Address) ([]*DBHeader, error) { + log.Debug("checkERC20Tail", "account", account, "to block", c.startBlockNumber, "from", c.resFromBlock.Number) tokens, err := c.tokenManager.GetTokens(c.chainClient.NetworkID()) if err != nil { return nil, err @@ -185,18 +325,18 @@ func (c *findBlocksCommand) checkERC20Tail(parent context.Context) ([]*DBHeader, clients[c.chainClient.NetworkID()] = c.chainClient atBlocks := make(map[uint64]*big.Int, 1) atBlocks[c.chainClient.NetworkID()] = from - balances, err := c.tokenManager.GetBalancesAtByChain(parent, clients, []common.Address{c.account}, addresses, atBlocks) + balances, err := c.tokenManager.GetBalancesAtByChain(parent, clients, []common.Address{account}, addresses, atBlocks) if err != nil { return nil, err } foundRanges := []ERC20BlockRange{} - for token, balance := range balances[c.chainClient.NetworkID()][c.account] { + for token, balance := range balances[c.chainClient.NetworkID()][account] { bigintBalance := big.NewInt(balance.ToInt().Int64()) if bigintBalance.Cmp(big.NewInt(0)) <= 0 { continue } - result, err := c.ERC20ScanByBalance(parent, big.NewInt(0), from, token) + result, err := c.ERC20ScanByBalance(parent, account, big.NewInt(0), from, token) if err != nil { return nil, err } @@ -229,7 +369,9 @@ func (c *findBlocksCommand) checkERC20Tail(parent context.Context) ([]*DBHeader, var mnemonicCheckEnabled = false func (c *findBlocksCommand) Run(parent context.Context) (err error) { - log.Debug("start findBlocksCommand", "account", c.account, "chain", c.chainClient.NetworkID(), "noLimit", c.noLimit, "from", c.fromBlockNumber, "to", c.toBlockNumber) + log.Debug("start findBlocksCommand", "accounts", c.accounts, "chain", c.chainClient.NetworkID(), "noLimit", c.noLimit, "from", c.fromBlockNumber, "to", c.toBlockNumber) + + account := c.accounts[0] // For now this command supports only 1 account mnemonicWasNotShown, err := c.accountsDB.GetMnemonicWasNotShown() if err != nil { c.error = err @@ -237,19 +379,18 @@ func (c *findBlocksCommand) Run(parent context.Context) (err error) { } if mnemonicCheckEnabled && mnemonicWasNotShown { - account, err := c.accountsDB.GetAccountByAddress(nodetypes.BytesToAddress(c.account.Bytes())) + account, err := c.accountsDB.GetAccountByAddress(nodetypes.BytesToAddress(account.Bytes())) if err != nil { c.error = err return err } if account.AddressWasNotShown { - log.Info("skip findBlocksCommand, mnemonic has not been shown and the address has not been shared yet", "address", c.account) + log.Info("skip findBlocksCommand, mnemonic has not been shown and the address has not been shared yet", "address", account) return nil } } rangeSize := big.NewInt(int64(c.defaultNodeBlockChunkSize)) - from, to := new(big.Int).Set(c.fromBlockNumber), new(big.Int).Set(c.toBlockNumber) // Limit the range size to DefaultNodeBlockChunkSize @@ -266,7 +407,7 @@ func (c *findBlocksCommand) Run(parent context.Context) (err error) { var headers []*DBHeader if c.reachedETHHistoryStart { if c.fromBlockNumber.Cmp(zero) == 0 && c.startBlockNumber != nil && c.startBlockNumber.Cmp(zero) == 1 { - headers, err = c.checkERC20Tail(parent) + headers, err = c.checkERC20Tail(parent, account) if err != nil { c.error = err } @@ -276,17 +417,17 @@ func (c *findBlocksCommand) Run(parent context.Context) (err error) { } if c.error != nil { - log.Error("findBlocksCommand checkRange", "error", c.error, "account", c.account, + log.Error("findBlocksCommand checkRange", "error", c.error, "account", account, "chain", c.chainClient.NetworkID(), "from", from, "to", to) break } if len(headers) > 0 { log.Debug("findBlocksCommand saving headers", "len", len(headers), "lastBlockNumber", to, - "balance", c.balanceCacher.Cache().GetBalance(c.account, c.chainClient.NetworkID(), to), - "nonce", c.balanceCacher.Cache().GetNonce(c.account, c.chainClient.NetworkID(), to)) + "balance", c.balanceCacher.Cache().GetBalance(account, c.chainClient.NetworkID(), to), + "nonce", c.balanceCacher.Cache().GetNonce(account, c.chainClient.NetworkID(), to)) - err = c.db.SaveBlocks(c.chainClient.NetworkID(), c.account, headers) + err = c.db.SaveBlocks(c.chainClient.NetworkID(), headers) if err != nil { c.error = err // return err @@ -297,11 +438,11 @@ func (c *findBlocksCommand) Run(parent context.Context) (err error) { } if c.reachedETHHistoryStart { - log.Debug("findBlocksCommand reached first ETH transfer and checked erc20 tail", "chain", c.chainClient.NetworkID(), "account", c.account) + log.Debug("findBlocksCommand reached first ETH transfer and checked erc20 tail", "chain", c.chainClient.NetworkID(), "account", account) break } - err = c.upsertBlockRange(&BlockRange{c.startBlockNumber, c.resFromBlock.Number, to}) + err = c.markEthBlockRangeChecked(account, &BlockRange{c.startBlockNumber, c.resFromBlock.Number, to}) if err != nil { break } @@ -329,7 +470,7 @@ func (c *findBlocksCommand) Run(parent context.Context) (err error) { to = nextTo } - log.Debug("end findBlocksCommand", "account", c.account, "chain", c.chainClient.NetworkID(), "noLimit", c.noLimit) + log.Debug("end findBlocksCommand", "account", account, "chain", c.chainClient.NetworkID(), "noLimit", c.noLimit) return nil } @@ -338,11 +479,11 @@ func (c *findBlocksCommand) blocksFound(headers []*DBHeader) { c.blocksLoadedCh <- headers } -func (c *findBlocksCommand) upsertBlockRange(blockRange *BlockRange) error { +func (c *findBlocksCommand) markEthBlockRangeChecked(account common.Address, blockRange *BlockRange) error { log.Debug("upsert block range", "Start", blockRange.Start, "FirstKnown", blockRange.FirstKnown, "LastKnown", blockRange.LastKnown, - "chain", c.chainClient.NetworkID(), "account", c.account) + "chain", c.chainClient.NetworkID(), "account", account) - err := c.blockRangeDAO.upsertRange(c.chainClient.NetworkID(), c.account, blockRange) + err := c.blockRangeDAO.upsertEthRange(c.chainClient.NetworkID(), account, blockRange) if err != nil { c.error = err log.Error("findBlocksCommand upsertRange", "error", err) @@ -355,24 +496,25 @@ func (c *findBlocksCommand) upsertBlockRange(blockRange *BlockRange) error { func (c *findBlocksCommand) checkRange(parent context.Context, from *big.Int, to *big.Int) ( foundHeaders []*DBHeader, err error) { + account := c.accounts[0] fromBlock := &Block{Number: from} newFromBlock, ethHeaders, startBlock, err := c.fastIndex(parent, c.balanceCacher, fromBlock, to) if err != nil { - log.Error("findBlocksCommand checkRange fastIndex", "err", err, "account", c.account, + log.Error("findBlocksCommand checkRange fastIndex", "err", err, "account", account, "chain", c.chainClient.NetworkID()) c.error = err // return err // In case c.noLimit is true, hystrix "max concurrency" may be reached and we will not be able to index ETH transfers return nil, nil } - log.Debug("findBlocksCommand checkRange", "chainID", c.chainClient.NetworkID(), "account", c.account, + log.Debug("findBlocksCommand checkRange", "chainID", c.chainClient.NetworkID(), "account", account, "startBlock", startBlock, "newFromBlock", newFromBlock.Number, "toBlockNumber", to, "noLimit", c.noLimit) // There could be incoming ERC20 transfers which don't change the balance // and nonce of ETH account, so we keep looking for them erc20Headers, err := c.fastIndexErc20(parent, newFromBlock.Number, to, false) if err != nil { - log.Error("findBlocksCommand checkRange fastIndexErc20", "err", err, "account", c.account, "chain", c.chainClient.NetworkID()) + log.Error("findBlocksCommand checkRange fastIndexErc20", "err", err, "account", account, "chain", c.chainClient.NetworkID()) c.error = err // return err return nil, nil @@ -387,7 +529,7 @@ func (c *findBlocksCommand) checkRange(parent context.Context, from *big.Int, to c.resFromBlock = newFromBlock c.startBlockNumber = startBlock - log.Debug("end findBlocksCommand checkRange", "chainID", c.chainClient.NetworkID(), "account", c.account, + log.Debug("end findBlocksCommand checkRange", "chainID", c.chainClient.NetworkID(), "account", account, "c.startBlock", c.startBlockNumber, "newFromBlock", newFromBlock.Number, "toBlockNumber", to, "c.resFromBlock", c.resFromBlock.Number) @@ -395,7 +537,7 @@ func (c *findBlocksCommand) checkRange(parent context.Context, from *big.Int, to } func loadBlockRangeInfo(chainID uint64, account common.Address, blockDAO *BlockRangeSequentialDAO) ( - *BlockRange, error) { + *ethTokensBlockRanges, error) { blockRange, err := blockDAO.getBlockRange(chainID, account) if err != nil { @@ -410,11 +552,7 @@ func loadBlockRangeInfo(chainID uint64, account common.Address, blockDAO *BlockR // Returns if all blocks are loaded, which means that start block (beginning of account history) // has been found and all block headers saved to the DB func areAllHistoryBlocksLoaded(blockInfo *BlockRange) bool { - if blockInfo == nil { - return false - } - - if blockInfo.FirstKnown != nil && blockInfo.Start != nil && + if blockInfo != nil && blockInfo.FirstKnown != nil && blockInfo.Start != nil && blockInfo.Start.Cmp(blockInfo.FirstKnown) >= 0 { return true } @@ -431,7 +569,7 @@ func areAllHistoryBlocksLoadedForAddress(blockRangeDAO *BlockRangeSequentialDAO, return false, err } - return areAllHistoryBlocksLoaded(blockRange), nil + return areAllHistoryBlocksLoaded(blockRange.eth) && areAllHistoryBlocksLoaded(blockRange.tokens), nil } // run fast indexing for every accont up to canonical chain head minus safety depth. @@ -440,7 +578,8 @@ func (c *findBlocksCommand) fastIndex(ctx context.Context, bCacher balance.Cache fromBlock *Block, toBlockNumber *big.Int) (resultingFrom *Block, headers []*DBHeader, startBlock *big.Int, err error) { - log.Debug("fast index started", "chainID", c.chainClient.NetworkID(), "account", c.account, + account := c.accounts[0] + log.Debug("fast index started", "chainID", c.chainClient.NetworkID(), "account", account, "from", fromBlock.Number, "to", toBlockNumber) start := time.Now() @@ -449,7 +588,7 @@ func (c *findBlocksCommand) fastIndex(ctx context.Context, bCacher balance.Cache command := ðHistoricalCommand{ chainClient: c.chainClient, balanceCacher: bCacher, - address: c.account, + address: account, feed: c.feed, from: fromBlock, to: toBlockNumber, @@ -471,7 +610,7 @@ func (c *findBlocksCommand) fastIndex(ctx context.Context, bCacher balance.Cache resultingFrom = &Block{Number: command.resultingFrom} headers = command.foundHeaders startBlock = command.startBlock - log.Debug("fast indexer finished", "chainID", c.chainClient.NetworkID(), "account", c.account, "in", time.Since(start), + log.Debug("fast indexer finished", "chainID", c.chainClient.NetworkID(), "account", account, "in", time.Since(start), "startBlock", command.startBlock, "resultingFrom", resultingFrom.Number, "headers", len(headers)) return } @@ -486,10 +625,9 @@ func (c *findBlocksCommand) fastIndexErc20(ctx context.Context, fromBlockNumber group := async.NewGroup(ctx) erc20 := &erc20HistoricalCommand{ - erc20: NewERC20TransfersDownloader(c.chainClient, []common.Address{c.account}, types.LatestSignerForChainID(c.chainClient.ToBigInt()), incomingOnly), + erc20: NewERC20TransfersDownloader(c.chainClient, c.accounts, types.LatestSignerForChainID(c.chainClient.ToBigInt()), incomingOnly), chainClient: c.chainClient, feed: c.feed, - address: c.account, from: fromBlockNumber, to: toBlockNumber, foundHeaders: []*DBHeader{}, @@ -501,47 +639,47 @@ func (c *findBlocksCommand) fastIndexErc20(ctx context.Context, fromBlockNumber return nil, ctx.Err() case <-group.WaitAsync(): headers := erc20.foundHeaders - log.Debug("fast indexer Erc20 finished", "chainID", c.chainClient.NetworkID(), "account", c.account, + log.Debug("fast indexer Erc20 finished", "chainID", c.chainClient.NetworkID(), "in", time.Since(start), "headers", len(headers)) return headers, nil } } -func loadTransfersLoop(ctx context.Context, account common.Address, blockDAO *BlockDAO, db *Database, +func loadTransfersLoop(ctx context.Context, blockDAO *BlockDAO, db *Database, chainClient chain.ClientInterface, transactionManager *TransactionManager, pendingTxManager *transactions.PendingTxTracker, tokenManager *token.Manager, feed *event.Feed, blocksLoadedCh <-chan []*DBHeader) { - log.Debug("loadTransfersLoop start", "chain", chainClient.NetworkID(), "account", account) + log.Debug("loadTransfersLoop start", "chain", chainClient.NetworkID()) for { select { case <-ctx.Done(): - log.Info("loadTransfersLoop done", "chain", chainClient.NetworkID(), "account", account, "error", ctx.Err()) + log.Info("loadTransfersLoop done", "chain", chainClient.NetworkID(), "error", ctx.Err()) return case dbHeaders := <-blocksLoadedCh: - log.Debug("loadTransfersOnDemand transfers received", "chain", chainClient.NetworkID(), "account", account, "headers", len(dbHeaders)) + log.Debug("loadTransfersOnDemand transfers received", "chain", chainClient.NetworkID(), "headers", len(dbHeaders)) - blockNums := make([]*big.Int, len(dbHeaders)) - for i, dbHeader := range dbHeaders { - blockNums[i] = dbHeader.Number + blocksByAddress := map[common.Address][]*big.Int{} + // iterate over headers and group them by address + for _, dbHeader := range dbHeaders { + blocksByAddress[dbHeader.Address] = append(blocksByAddress[dbHeader.Address], dbHeader.Number) } - blocksByAddress := map[common.Address][]*big.Int{account: blockNums} go func() { - _ = loadTransfers(ctx, []common.Address{account}, blockDAO, db, chainClient, noBlockLimit, + _ = loadTransfers(ctx, blockDAO, db, chainClient, noBlockLimit, blocksByAddress, transactionManager, pendingTxManager, tokenManager, feed) }() } } } -func newLoadBlocksAndTransfersCommand(account common.Address, db *Database, accountsDB *accounts.Database, +func newLoadBlocksAndTransfersCommand(accounts []common.Address, db *Database, accountsDB *accounts.Database, blockDAO *BlockDAO, blockRangesSeqDAO *BlockRangeSequentialDAO, chainClient chain.ClientInterface, feed *event.Feed, transactionManager *TransactionManager, pendingTxManager *transactions.PendingTxTracker, tokenManager *token.Manager, balanceCacher balance.Cacher, omitHistory bool) *loadBlocksAndTransfersCommand { return &loadBlocksAndTransfersCommand{ - account: account, + accounts: accounts, db: db, blockRangeDAO: blockRangesSeqDAO, accountsDB: accountsDB, @@ -558,7 +696,7 @@ func newLoadBlocksAndTransfersCommand(account common.Address, db *Database, acco } type loadBlocksAndTransfersCommand struct { - account common.Address + accounts []common.Address db *Database accountsDB *accounts.Database blockRangeDAO *BlockRangeSequentialDAO @@ -574,11 +712,11 @@ type loadBlocksAndTransfersCommand struct { omitHistory bool // Not to be set by the caller - transfersLoaded bool // For event RecentHistoryReady to be sent only once per account during app lifetime + transfersLoaded map[common.Address]bool // For event RecentHistoryReady to be sent only once per account during app lifetime } func (c *loadBlocksAndTransfersCommand) Run(parent context.Context) error { - log.Info("start load all transfers command", "chain", c.chainClient.NetworkID(), "account", c.account) + log.Debug("start load all transfers command", "chain", c.chainClient.NetworkID(), "accounts", c.accounts) ctx := parent @@ -597,25 +735,28 @@ func (c *loadBlocksAndTransfersCommand) Run(parent context.Context) error { c.startTransfersLoop(ctx) fromNum := big.NewInt(0) - toNum, err := getHeadBlockNumber(ctx, c.chainClient) + headNum, err := getHeadBlockNumber(ctx, c.chainClient) if err != nil { return err } // This will start findBlocksCommand which will run until success when all blocks are loaded - err = c.fetchHistoryBlocks(parent, group, fromNum, toNum, c.blocksLoadedCh) - for err != nil { - group.Stop() - group.Wait() - return err + // Iterate over all accounts and load blocks for each account + for _, account := range c.accounts { + err = c.fetchHistoryBlocks(parent, group, account, fromNum, headNum, c.blocksLoadedCh) + for err != nil { + group.Stop() + group.Wait() + return err + } } - c.startFetchingNewBlocks(group, c.account, c.blocksLoadedCh) + c.startFetchingNewBlocks(group, c.accounts, headNum, c.blocksLoadedCh) select { case <-ctx.Done(): return ctx.Err() case <-group.WaitAsync(): - log.Debug("end loadBlocksAndTransfers command", "chain", c.chainClient.NetworkID(), "account", c.account) + log.Debug("end loadBlocksAndTransfers command", "chain", c.chainClient.NetworkID(), "accounts", c.accounts) return nil } } @@ -628,33 +769,97 @@ func (c *loadBlocksAndTransfersCommand) Command() async.Command { } func (c *loadBlocksAndTransfersCommand) startTransfersLoop(ctx context.Context) { - go loadTransfersLoop(ctx, c.account, c.blockDAO, c.db, c.chainClient, c.transactionManager, + go loadTransfersLoop(ctx, c.blockDAO, c.db, c.chainClient, c.transactionManager, c.pendingTxManager, c.tokenManager, c.feed, c.blocksLoadedCh) } -func (c *loadBlocksAndTransfersCommand) fetchHistoryBlocks(ctx context.Context, group *async.Group, fromNum, toNum *big.Int, blocksLoadedCh chan []*DBHeader) error { - log.Debug("fetchHistoryBlocks start", "chainID", c.chainClient.NetworkID(), "account", c.account, "omit", c.omitHistory) +func (c *loadBlocksAndTransfersCommand) fetchHistoryBlocks(ctx context.Context, group *async.Group, account common.Address, fromNum, toNum *big.Int, blocksLoadedCh chan []*DBHeader) error { + + log.Debug("fetchHistoryBlocks start", "chainID", c.chainClient.NetworkID(), "account", account, "omit", c.omitHistory) if c.omitHistory { - blockRange := &BlockRange{nil, big.NewInt(0), toNum} - err := c.blockRangeDAO.upsertRange(c.chainClient.NetworkID(), c.account, blockRange) + blockRange := ðTokensBlockRanges{eth: &BlockRange{nil, big.NewInt(0), toNum}, tokens: &BlockRange{nil, big.NewInt(0), toNum}} + err := c.blockRangeDAO.upsertRange(c.chainClient.NetworkID(), account, blockRange) + log.Error("fetchHistoryBlocks upsertRange", "error", err) return err } - blockRange, err := loadBlockRangeInfo(c.chainClient.NetworkID(), c.account, c.blockRangeDAO) + blockRange, err := loadBlockRangeInfo(c.chainClient.NetworkID(), account, c.blockRangeDAO) if err != nil { log.Error("findBlocksCommand loadBlockRangeInfo", "error", err) // c.error = err return err // Will keep spinning forever nomatter what } - allHistoryLoaded := areAllHistoryBlocksLoaded(blockRange) + ranges := [][]*big.Int{} - if !allHistoryLoaded { - to := getToHistoryBlockNumber(toNum, blockRange, allHistoryLoaded) + // There are 2 history intervals: + // 1) from 0 to FirstKnown + // 2) from LastKnown to `toNum`` (head) + // If we blockRange is nil, we need to load all blocks from `fromNum` to `toNum` + // As current implementation checks ETH first then tokens, tokens ranges maybe behind ETH ranges in + // cases when block searching was interrupted, so we use tokens ranges + if blockRange != nil { + if blockRange.tokens.LastKnown != nil && toNum.Cmp(blockRange.tokens.LastKnown) > 0 { + ranges = append(ranges, []*big.Int{blockRange.tokens.LastKnown, toNum}) + } + if blockRange.tokens.FirstKnown != nil { + if fromNum.Cmp(blockRange.tokens.FirstKnown) < 0 { + ranges = append(ranges, []*big.Int{fromNum, blockRange.tokens.FirstKnown}) + } else { + if !c.transfersLoaded[account] { + transfersLoaded, err := c.areAllTransfersLoaded(account) + if err != nil { + return err + } + + if transfersLoaded { + if c.transfersLoaded == nil { + c.transfersLoaded = make(map[common.Address]bool) + } + c.transfersLoaded[account] = true + c.notifyHistoryReady(account) + } + } + } + } + } else { + ranges = append(ranges, []*big.Int{fromNum, toNum}) + } + + for _, rangeItem := range ranges { fbc := &findBlocksCommand{ - account: c.account, + accounts: []common.Address{account}, + db: c.db, + accountsDB: c.accountsDB, + blockRangeDAO: c.blockRangeDAO, + chainClient: c.chainClient, + balanceCacher: c.balanceCacher, + feed: c.feed, + noLimit: false, + fromBlockNumber: rangeItem[0], + toBlockNumber: rangeItem[1], + transactionManager: c.transactionManager, + tokenManager: c.tokenManager, + blocksLoadedCh: blocksLoadedCh, + defaultNodeBlockChunkSize: DefaultNodeBlockChunkSize, + } + group.Add(fbc.Command()) + } + + log.Debug("fetchHistoryBlocks end", "chainID", c.chainClient.NetworkID(), "account", account) + + return nil +} + +func (c *loadBlocksAndTransfersCommand) startFetchingNewBlocks(group *async.Group, addresses []common.Address, fromNum *big.Int, blocksLoadedCh chan<- []*DBHeader) { + + log.Debug("startFetchingNewBlocks", "chainID", c.chainClient.NetworkID(), "accounts", addresses, "db", c.accountsDB) + + newBlocksCmd := &findNewBlocksCommand{ + findBlocksCommand: &findBlocksCommand{ + accounts: addresses, db: c.db, accountsDB: c.accountsDB, blockRangeDAO: c.blockRangeDAO, @@ -663,46 +868,6 @@ func (c *loadBlocksAndTransfersCommand) fetchHistoryBlocks(ctx context.Context, feed: c.feed, noLimit: false, fromBlockNumber: fromNum, - toBlockNumber: to, - transactionManager: c.transactionManager, - tokenManager: c.tokenManager, - blocksLoadedCh: blocksLoadedCh, - defaultNodeBlockChunkSize: DefaultNodeBlockChunkSize, - } - group.Add(fbc.Command()) - } else { - if !c.transfersLoaded { - transfersLoaded, err := c.areAllTransfersLoaded() - if err != nil { - return err - } - - if transfersLoaded { - c.transfersLoaded = true - c.notifyHistoryReady() - } - } - } - - log.Debug("fetchHistoryBlocks end", "chainID", c.chainClient.NetworkID(), "account", c.account) - - return nil -} - -func (c *loadBlocksAndTransfersCommand) startFetchingNewBlocks(group *async.Group, address common.Address, blocksLoadedCh chan<- []*DBHeader) { - - log.Debug("startFetchingNewBlocks", "chainID", c.chainClient.NetworkID(), "account", address, "db", c.accountsDB) - - newBlocksCmd := &findNewBlocksCommand{ - findBlocksCommand: &findBlocksCommand{ - account: address, - db: c.db, - accountsDB: c.accountsDB, - blockRangeDAO: c.blockRangeDAO, - chainClient: c.chainClient, - balanceCacher: c.balanceCacher, - feed: c.feed, - noLimit: false, transactionManager: c.transactionManager, tokenManager: c.tokenManager, blocksLoadedCh: blocksLoadedCh, @@ -714,24 +879,31 @@ func (c *loadBlocksAndTransfersCommand) startFetchingNewBlocks(group *async.Grou func (c *loadBlocksAndTransfersCommand) fetchTransfersForLoadedBlocks(group *async.Group) error { - log.Debug("fetchTransfers start", "chainID", c.chainClient.NetworkID(), "account", c.account) + log.Debug("fetchTransfers start", "chainID", c.chainClient.NetworkID(), "accounts", c.accounts) - blocks, err := c.blockDAO.GetBlocksToLoadByAddress(c.chainClient.NetworkID(), c.account, numberOfBlocksCheckedPerIteration) - if err != nil { - log.Error("loadBlocksAndTransfersCommand GetBlocksToLoadByAddress", "error", err) - return err + blocksMap := make(map[common.Address][]*big.Int) + for _, account := range c.accounts { + blocks, err := c.blockDAO.GetBlocksToLoadByAddress(c.chainClient.NetworkID(), account, numberOfBlocksCheckedPerIteration) + if err != nil { + log.Error("loadBlocksAndTransfersCommand GetBlocksToLoadByAddress", "error", err) + return err + } + + if len(blocks) == 0 { + log.Debug("fetchTransfers no blocks to load", "chainID", c.chainClient.NetworkID(), "account", account) + continue + } + + blocksMap[account] = blocks } - if len(blocks) == 0 { - log.Debug("fetchTransfers no blocks to load", "chainID", c.chainClient.NetworkID(), "account", c.account) + if len(blocksMap) == 0 { + log.Debug("fetchTransfers no blocks to load", "chainID", c.chainClient.NetworkID()) return nil } - blocksMap := make(map[common.Address][]*big.Int) - blocksMap[c.account] = blocks - txCommand := &loadTransfersCommand{ - accounts: []common.Address{c.account}, + accounts: c.accounts, db: c.db, blockDAO: c.blockDAO, chainClient: c.chainClient, @@ -747,32 +919,31 @@ func (c *loadBlocksAndTransfersCommand) fetchTransfersForLoadedBlocks(group *asy return nil } -func (c *loadBlocksAndTransfersCommand) notifyHistoryReady() { +func (c *loadBlocksAndTransfersCommand) notifyHistoryReady(account common.Address) { if c.feed != nil { c.feed.Send(walletevent.Event{ Type: EventRecentHistoryReady, - Accounts: []common.Address{c.account}, + Accounts: []common.Address{account}, ChainID: c.chainClient.NetworkID(), }) } } -func (c *loadBlocksAndTransfersCommand) areAllTransfersLoaded() (bool, error) { - allBlocksLoaded, err := areAllHistoryBlocksLoadedForAddress(c.blockRangeDAO, c.chainClient.NetworkID(), c.account) +func (c *loadBlocksAndTransfersCommand) areAllTransfersLoaded(account common.Address) (bool, error) { + allBlocksLoaded, err := areAllHistoryBlocksLoadedForAddress(c.blockRangeDAO, c.chainClient.NetworkID(), account) if err != nil { log.Error("loadBlockAndTransfersCommand allHistoryBlocksLoaded", "error", err) return false, err } if allBlocksLoaded { - firstHeader, err := c.blockDAO.GetFirstSavedBlock(c.chainClient.NetworkID(), c.account) + headers, err := c.blockDAO.GetBlocksToLoadByAddress(c.chainClient.NetworkID(), account, 1) if err != nil { log.Error("loadBlocksAndTransfersCommand GetFirstSavedBlock", "error", err) return false, err } - // If first block is Loaded, we have fetched all the transfers - if firstHeader != nil && firstHeader.Loaded { + if len(headers) == 0 { return true, nil } } @@ -787,6 +958,7 @@ func getHeadBlockNumber(parent context.Context, chainClient chain.ClientInterfac head, err := chainClient.HeaderByNumber(ctx, nil) cancel() if err != nil { + log.Error("getHeadBlockNumber", "error", err) return nil, err } @@ -808,16 +980,3 @@ func nextRange(maxRangeSize int, prevFrom, zeroBlockNumber *big.Int) (*big.Int, return from, to } - -func getToHistoryBlockNumber(headNum *big.Int, blockRange *BlockRange, allHistoryLoaded bool) *big.Int { - var toBlockNum *big.Int - if blockRange != nil { - if !allHistoryLoaded { - toBlockNum = blockRange.FirstKnown - } - } else { - toBlockNum = headNum - } - - return toBlockNum -} diff --git a/services/wallet/transfer/commands_sequential_test.go b/services/wallet/transfer/commands_sequential_test.go index 506c82ef3..7a7a3539d 100644 --- a/services/wallet/transfer/commands_sequential_test.go +++ b/services/wallet/transfer/commands_sequential_test.go @@ -946,7 +946,7 @@ func TestFindBlocksCommand(t *testing.T) { accDB, err := accounts.NewDB(appdb) require.NoError(t, err) fbc := &findBlocksCommand{ - account: common.HexToAddress("0x1234"), + accounts: []common.Address{common.HexToAddress("0x1234")}, db: wdb, blockRangeDAO: &BlockRangeSequentialDAO{wdb.client}, accountsDB: accDB, @@ -1067,13 +1067,14 @@ func TestFetchTransfersForLoadedBlocks(t *testing.T) { }, }) + address := common.HexToAddress("0x1234") chainClient := newMockChainClient() tracker := transactions.NewPendingTxTracker(db, chainClient, nil, &event.Feed{}, transactions.PendingCheckInterval) accDB, err := accounts.NewDB(wdb.client) require.NoError(t, err) cmd := &loadBlocksAndTransfersCommand{ - account: common.HexToAddress("0x1234"), + accounts: []common.Address{address}, db: wdb, blockRangeDAO: &BlockRangeSequentialDAO{wdb.client}, blockDAO: &BlockDAO{db}, @@ -1098,7 +1099,7 @@ func TestFetchTransfersForLoadedBlocks(t *testing.T) { fromNum := big.NewInt(0) toNum, err := getHeadBlockNumber(ctx, cmd.chainClient) require.NoError(t, err) - err = cmd.fetchHistoryBlocks(ctx, group, fromNum, toNum, blockChannel) + err = cmd.fetchHistoryBlocks(ctx, group, address, fromNum, toNum, blockChannel) require.NoError(t, err) select { diff --git a/services/wallet/transfer/concurrent.go b/services/wallet/transfer/concurrent.go index 1aac50a24..7183eff0b 100644 --- a/services/wallet/transfer/concurrent.go +++ b/services/wallet/transfer/concurrent.go @@ -187,7 +187,7 @@ func checkRangesWithStartBlock(parent context.Context, client balance.Reader, ca if err != nil { return err } - c.PushHeader(toDBHeader(header, *firstTransaction.BlockHash)) + c.PushHeader(toDBHeader(header, *firstTransaction.BlockHash, account)) return nil } mid := new(big.Int).Add(from, to) diff --git a/services/wallet/transfer/controller_test.go b/services/wallet/transfer/controller_test.go index 9144d5e8f..296e13505 100644 --- a/services/wallet/transfer/controller_test.go +++ b/services/wallet/transfer/controller_test.go @@ -45,7 +45,7 @@ func TestController_watchAccountsChanges(t *testing.T) { chainID := uint64(777) // Insert blocks database := NewDB(walletDB) - err = database.SaveBlocks(chainID, address, []*DBHeader{ + err = database.SaveBlocks(chainID, []*DBHeader{ { Number: big.NewInt(1), Hash: common.Hash{1}, @@ -70,7 +70,7 @@ func TestController_watchAccountsChanges(t *testing.T) { // Insert block ranges blockRangesDAO := &BlockRangeSequentialDAO{walletDB} - err = blockRangesDAO.upsertRange(chainID, address, NewBlockRange()) + err = blockRangesDAO.upsertRange(chainID, address, newEthTokensBlockRanges()) require.NoError(t, err) ranges, err := blockRangesDAO.getBlockRange(chainID, address) @@ -112,7 +112,8 @@ func TestController_watchAccountsChanges(t *testing.T) { ranges, err = blockRangesDAO.getBlockRange(chainID, address) require.NoError(t, err) - require.Nil(t, ranges) + require.Nil(t, ranges.eth) + require.Nil(t, ranges.tokens) } @@ -152,7 +153,7 @@ func TestController_cleanupAccountLeftovers(t *testing.T) { chainID := uint64(777) // Insert blocks database := NewDB(walletDB) - err = database.SaveBlocks(chainID, removedAddr, []*DBHeader{ + err = database.SaveBlocks(chainID, []*DBHeader{ { Number: big.NewInt(1), Hash: common.Hash{1}, @@ -162,7 +163,7 @@ func TestController_cleanupAccountLeftovers(t *testing.T) { }, }) require.NoError(t, err) - err = database.SaveBlocks(chainID, common.Address(existingAddr), []*DBHeader{ + err = database.SaveBlocks(chainID, []*DBHeader{ { Number: big.NewInt(2), Hash: common.Hash{2}, diff --git a/services/wallet/transfer/database.go b/services/wallet/transfer/database.go index 1060542d3..760173468 100644 --- a/services/wallet/transfer/database.go +++ b/services/wallet/transfer/database.go @@ -32,12 +32,13 @@ type DBHeader struct { Loaded bool } -func toDBHeader(header *types.Header, blockHash common.Hash) *DBHeader { +func toDBHeader(header *types.Header, blockHash common.Hash, account common.Address) *DBHeader { return &DBHeader{ Hash: blockHash, Number: header.Number, Timestamp: header.Time, Loaded: false, + Address: account, } } @@ -87,37 +88,7 @@ func (db *Database) Close() error { return db.client.Close() } -func (db *Database) ProcessBlocks(chainID uint64, account common.Address, from *big.Int, to *Block, headers []*DBHeader) (err error) { - var ( - tx *sql.Tx - ) - - tx, err = db.client.Begin() - if err != nil { - return err - } - defer func() { - if err == nil { - err = tx.Commit() - return - } - _ = tx.Rollback() - }() - - err = insertBlocksWithTransactions(chainID, tx, account, headers) - if err != nil { - return - } - - err = upsertRange(chainID, tx, account, from, to) - if err != nil { - return - } - - return -} - -func (db *Database) SaveBlocks(chainID uint64, account common.Address, headers []*DBHeader) (err error) { +func (db *Database) SaveBlocks(chainID uint64, headers []*DBHeader) (err error) { var ( tx *sql.Tx ) @@ -133,7 +104,7 @@ func (db *Database) SaveBlocks(chainID uint64, account common.Address, headers [ _ = tx.Rollback() }() - err = insertBlocksWithTransactions(chainID, tx, account, headers) + err = insertBlocksWithTransactions(chainID, tx, headers) if err != nil { return } @@ -141,42 +112,13 @@ func (db *Database) SaveBlocks(chainID uint64, account common.Address, headers [ return } -// ProcessTransfers atomically adds/removes blocks and adds new transfers. -func (db *Database) ProcessTransfers(chainID uint64, transfers []Transfer, removed []*DBHeader) (err error) { - var ( - tx *sql.Tx - ) - tx, err = db.client.Begin() - if err != nil { - return err - } - defer func() { - if err == nil { - err = tx.Commit() - return - } - _ = tx.Rollback() - }() - - err = deleteHeaders(tx, removed) +func saveTransfersMarkBlocksLoaded(creator statementCreator, chainID uint64, address common.Address, transfers []Transfer, blocks []*big.Int) (err error) { + err = updateOrInsertTransfers(chainID, creator, transfers) if err != nil { return } - err = updateOrInsertTransfers(chainID, tx, transfers) - if err != nil { - return - } - return -} - -func saveTransfersMarkBlocksLoaded(tx statementCreator, chainID uint64, address common.Address, transfers []Transfer, blocks []*big.Int) (err error) { - err = updateOrInsertTransfers(chainID, tx, transfers) - if err != nil { - return - } - - err = markBlocksAsLoaded(chainID, tx, address, blocks) + err = markBlocksAsLoaded(chainID, creator, address, blocks) if err != nil { return } @@ -258,10 +200,16 @@ func (db *Database) GetTransfersForIdentities(ctx context.Context, identities [] func (db *Database) GetTransactionsToLoad(chainID uint64, address common.Address, blockNumber *big.Int) (rst []*PreloadedTransaction, err error) { query := newTransfersQueryForPreloadedTransactions(). FilterNetwork(chainID). - FilterAddress(address). - FilterBlockNumber(blockNumber). FilterLoaded(0) + if address != (common.Address{}) { + query.FilterAddress(address) + } + + if blockNumber != nil { + query.FilterBlockNumber(blockNumber) + } + rows, err := db.client.Query(query.String(), query.Args()...) if err != nil { return @@ -275,29 +223,6 @@ type statementCreator interface { Prepare(query string) (*sql.Stmt, error) } -func deleteHeaders(creator statementCreator, headers []*DBHeader) error { - delete, err := creator.Prepare("DELETE FROM blocks WHERE blk_hash = ?") - if err != nil { - return err - } - deleteTransfers, err := creator.Prepare("DELETE FROM transfers WHERE blk_hash = ?") - if err != nil { - return err - } - for _, h := range headers { - _, err = delete.Exec(h.Hash) - if err != nil { - return err - } - - _, err = deleteTransfers.Exec(h.Hash) - if err != nil { - return err - } - } - return nil -} - // Only used by status-mobile func (db *Database) InsertBlock(chainID uint64, account common.Address, blockNumber *big.Int, blockHash common.Hash) error { var ( @@ -341,7 +266,7 @@ func insertBlockDBFields(creator statementCreator, block blockDBFields) error { return err } -func insertBlocksWithTransactions(chainID uint64, creator statementCreator, account common.Address, headers []*DBHeader) error { +func insertBlocksWithTransactions(chainID uint64, creator statementCreator, headers []*DBHeader) error { insert, err := creator.Prepare("INSERT OR IGNORE INTO blocks(network_id, address, blk_number, blk_hash, loaded) VALUES (?, ?, ?, ?, ?)") if err != nil { return err @@ -361,7 +286,7 @@ func insertBlocksWithTransactions(chainID uint64, creator statementCreator, acco } for _, header := range headers { - _, err = insert.Exec(chainID, account, (*bigint.SQLBigInt)(header.Number), header.Hash, header.Loaded) + _, err = insert.Exec(chainID, header.Address, (*bigint.SQLBigInt)(header.Number), header.Hash, header.Loaded) if err != nil { return err } @@ -371,7 +296,7 @@ func insertBlocksWithTransactions(chainID uint64, creator statementCreator, acco logIndex = new(uint) *logIndex = transaction.Log.Index } - res, err := updateTx.Exec(&JSONBlob{transaction.Log}, logIndex, chainID, account, transaction.ID) + res, err := updateTx.Exec(&JSONBlob{transaction.Log}, logIndex, chainID, header.Address, transaction.ID) if err != nil { return err } @@ -385,7 +310,8 @@ func insertBlocksWithTransactions(chainID uint64, creator statementCreator, acco tokenID := (*bigint.SQLBigIntBytes)(transaction.TokenID) txValue := sqlite.BigIntToPadded128BitsStr(transaction.Value) - _, err = insertTx.Exec(chainID, account, account, transaction.ID, (*bigint.SQLBigInt)(header.Number), header.Hash, transaction.Type, &JSONBlob{transaction.Log}, logIndex, tokenID, txValue) + // Is that correct to set sender as account address? + _, err = insertTx.Exec(chainID, header.Address, header.Address, transaction.ID, (*bigint.SQLBigInt)(header.Number), header.Hash, transaction.Type, &JSONBlob{transaction.Log}, logIndex, tokenID, txValue) if err != nil { log.Error("error saving token transfer", "err", err) return err @@ -567,7 +493,10 @@ func updateOrInsertTransfersDBFields(creator statementCreator, transfers []trans return nil } -// markBlocksAsLoaded(tx, address, chainID, blocks) +// markBlocksAsLoaded(chainID, tx, address, blockNumbers) +// In case block contains both ETH and token transfers, it will be marked as loaded on ETH transfer processing. +// This is not a problem since for token transfers we have preloaded transactions and blocks 'loaded' flag is needed +// for ETH transfers only. func markBlocksAsLoaded(chainID uint64, creator statementCreator, address common.Address, blocks []*big.Int) error { update, err := creator.Prepare("UPDATE blocks SET loaded=? WHERE address=? AND blk_number=? AND network_id=?") if err != nil { diff --git a/services/wallet/transfer/database_test.go b/services/wallet/transfer/database_test.go index bc904ffa7..247dbebda 100644 --- a/services/wallet/transfer/database_test.go +++ b/services/wallet/transfer/database_test.go @@ -23,39 +23,31 @@ func setupTestDB(t *testing.T) (*Database, *BlockDAO, func()) { } } -func TestDBProcessBlocks(t *testing.T) { - db, block, stop := setupTestDB(t) +func TestDBSaveBlocks(t *testing.T) { + db, _, stop := setupTestDB(t) defer stop() address := common.Address{1} - from := big.NewInt(0) - to := big.NewInt(10) blocks := []*DBHeader{ { - Number: big.NewInt(1), - Hash: common.Hash{1}, + Number: big.NewInt(1), + Hash: common.Hash{1}, + Address: address, }, { - Number: big.NewInt(2), - Hash: common.Hash{2}, + Number: big.NewInt(2), + Hash: common.Hash{2}, + Address: address, }} - t.Log(blocks) - nonce := int64(0) - lastBlock := &Block{ - Number: to, - Balance: big.NewInt(0), - Nonce: &nonce, - } - require.NoError(t, db.ProcessBlocks(777, common.Address{1}, from, lastBlock, blocks)) - t.Log(block.GetLastBlockByAddress(777, common.Address{1}, 40)) + require.NoError(t, db.SaveBlocks(777, blocks)) transfers := []Transfer{ { ID: common.Hash{1}, Type: w_common.EthTransfer, BlockHash: common.Hash{2}, BlockNumber: big.NewInt(1), - Address: common.Address{1}, + Address: address, Timestamp: 123, - From: common.Address{1}, + From: address, }, } tx, err := db.client.BeginTx(context.Background(), nil) @@ -65,15 +57,16 @@ func TestDBProcessBlocks(t *testing.T) { require.NoError(t, tx.Commit()) } -func TestDBProcessTransfer(t *testing.T) { +func TestDBSaveTransfers(t *testing.T) { db, _, stop := setupTestDB(t) defer stop() + address := common.Address{1} header := &DBHeader{ Number: big.NewInt(1), Hash: common.Hash{1}, - Address: common.Address{1}, + Address: address, } - tx := types.NewTransaction(1, common.Address{1}, nil, 10, big.NewInt(10), nil) + tx := types.NewTransaction(1, address, nil, 10, big.NewInt(10), nil) transfers := []Transfer{ { ID: common.Hash{1}, @@ -82,62 +75,12 @@ func TestDBProcessTransfer(t *testing.T) { BlockNumber: header.Number, Transaction: tx, Receipt: types.NewReceipt(nil, false, 100), - Address: common.Address{1}, + Address: address, MultiTransactionID: 0, }, } - nonce := int64(0) - lastBlock := &Block{ - Number: big.NewInt(0), - Balance: big.NewInt(0), - Nonce: &nonce, - } - require.NoError(t, db.ProcessBlocks(777, common.Address{1}, big.NewInt(1), lastBlock, []*DBHeader{header})) - require.NoError(t, db.ProcessTransfers(777, transfers, []*DBHeader{})) -} - -func TestDBReorgTransfers(t *testing.T) { - db, _, stop := setupTestDB(t) - defer stop() - rcpt := types.NewReceipt(nil, false, 100) - rcpt.Logs = []*types.Log{} - original := &DBHeader{ - Number: big.NewInt(1), - Hash: common.Hash{1}, - Address: common.Address{1}, - } - replaced := &DBHeader{ - Number: big.NewInt(1), - Hash: common.Hash{2}, - Address: common.Address{1}, - } - originalTX := types.NewTransaction(1, common.Address{1}, nil, 10, big.NewInt(10), nil) - replacedTX := types.NewTransaction(2, common.Address{1}, nil, 10, big.NewInt(10), nil) - nonce := int64(0) - lastBlock := &Block{ - Number: original.Number, - Balance: big.NewInt(0), - Nonce: &nonce, - } - require.NoError(t, db.ProcessBlocks(777, original.Address, original.Number, lastBlock, []*DBHeader{original})) - require.NoError(t, db.ProcessTransfers(777, []Transfer{ - {w_common.EthTransfer, common.Hash{1}, *originalTX.To(), original.Number, original.Hash, 100, originalTX, true, 1777, common.Address{1}, rcpt, nil, nil, nil, "2100", NoMultiTransactionID}, - }, []*DBHeader{})) - nonce = int64(0) - lastBlock = &Block{ - Number: replaced.Number, - Balance: big.NewInt(0), - Nonce: &nonce, - } - require.NoError(t, db.ProcessBlocks(777, replaced.Address, replaced.Number, lastBlock, []*DBHeader{replaced})) - require.NoError(t, db.ProcessTransfers(777, []Transfer{ - {w_common.EthTransfer, common.Hash{2}, *replacedTX.To(), replaced.Number, replaced.Hash, 100, replacedTX, true, 1777, common.Address{1}, rcpt, nil, nil, nil, "2100", NoMultiTransactionID}, - }, []*DBHeader{original})) - - all, err := db.GetTransfers(777, big.NewInt(0), nil) - require.NoError(t, err) - require.Len(t, all, 1) - require.Equal(t, replacedTX.Hash(), all[0].Transaction.Hash()) + require.NoError(t, db.SaveBlocks(777, []*DBHeader{header})) + require.NoError(t, saveTransfersMarkBlocksLoaded(db.client, 777, address, transfers, []*big.Int{header.Number})) } func TestDBGetTransfersFromBlock(t *testing.T) { @@ -145,14 +88,17 @@ func TestDBGetTransfersFromBlock(t *testing.T) { defer stop() headers := []*DBHeader{} transfers := []Transfer{} + address := common.Address{1} + blockNumbers := []*big.Int{} for i := 1; i < 10; i++ { header := &DBHeader{ Number: big.NewInt(int64(i)), Hash: common.Hash{byte(i)}, - Address: common.Address{1}, + Address: address, } headers = append(headers, header) - tx := types.NewTransaction(uint64(i), common.Address{1}, nil, 10, big.NewInt(10), nil) + blockNumbers = append(blockNumbers, header.Number) + tx := types.NewTransaction(uint64(i), address, nil, 10, big.NewInt(10), nil) receipt := types.NewReceipt(nil, false, 100) receipt.Logs = []*types.Log{} transfer := Transfer{ @@ -162,18 +108,12 @@ func TestDBGetTransfersFromBlock(t *testing.T) { BlockHash: header.Hash, Transaction: tx, Receipt: receipt, - Address: common.Address{1}, + Address: address, } transfers = append(transfers, transfer) } - nonce := int64(0) - lastBlock := &Block{ - Number: headers[len(headers)-1].Number, - Balance: big.NewInt(0), - Nonce: &nonce, - } - require.NoError(t, db.ProcessBlocks(777, headers[0].Address, headers[0].Number, lastBlock, headers)) - require.NoError(t, db.ProcessTransfers(777, transfers, []*DBHeader{})) + require.NoError(t, db.SaveBlocks(777, headers)) + require.NoError(t, saveTransfersMarkBlocksLoaded(db.client, 777, address, transfers, blockNumbers)) rst, err := db.GetTransfers(777, big.NewInt(7), nil) require.NoError(t, err) require.Len(t, rst, 3) diff --git a/services/wallet/transfer/downloader.go b/services/wallet/transfer/downloader.go index a1dc9136d..32eef314c 100644 --- a/services/wallet/transfer/downloader.go +++ b/services/wallet/transfer/downloader.go @@ -6,6 +6,8 @@ import ( "math/big" "time" + "golang.org/x/exp/slices" // since 1.21, this is in the standard library + "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" @@ -326,26 +328,28 @@ type ERC20TransfersDownloader struct { signer types.Signer } -func (d *ERC20TransfersDownloader) paddedAddress(address common.Address) common.Hash { - rst := common.Hash{} - copy(rst[12:], address[:]) +func topicFromAddressSlice(addresses []common.Address) []common.Hash { + rst := make([]common.Hash, len(addresses)) + for i, address := range addresses { + rst[i] = common.BytesToHash(address.Bytes()) + } return rst } -func (d *ERC20TransfersDownloader) inboundTopics(address common.Address) [][]common.Hash { - return [][]common.Hash{{d.signature}, {}, {d.paddedAddress(address)}} +func (d *ERC20TransfersDownloader) inboundTopics(addresses []common.Address) [][]common.Hash { + return [][]common.Hash{{d.signature}, {}, topicFromAddressSlice(addresses)} } -func (d *ERC20TransfersDownloader) outboundTopics(address common.Address) [][]common.Hash { - return [][]common.Hash{{d.signature}, {d.paddedAddress(address)}, {}} +func (d *ERC20TransfersDownloader) outboundTopics(addresses []common.Address) [][]common.Hash { + return [][]common.Hash{{d.signature}, topicFromAddressSlice(addresses), {}} } -func (d *ERC20TransfersDownloader) inboundERC20OutboundERC1155Topics(address common.Address) [][]common.Hash { - return [][]common.Hash{{d.signature, d.signatureErc1155Single, d.signatureErc1155Batch}, {}, {d.paddedAddress(address)}} +func (d *ERC20TransfersDownloader) inboundERC20OutboundERC1155Topics(addresses []common.Address) [][]common.Hash { + return [][]common.Hash{{d.signature, d.signatureErc1155Single, d.signatureErc1155Batch}, {}, topicFromAddressSlice(addresses)} } -func (d *ERC20TransfersDownloader) inboundTopicsERC1155(address common.Address) [][]common.Hash { - return [][]common.Hash{{d.signatureErc1155Single, d.signatureErc1155Batch}, {}, {}, {d.paddedAddress(address)}} +func (d *ERC20TransfersDownloader) inboundTopicsERC1155(addresses []common.Address) [][]common.Hash { + return [][]common.Hash{{d.signatureErc1155Single, d.signatureErc1155Batch}, {}, {}, topicFromAddressSlice(addresses)} } func (d *ETHDownloader) fetchTransactionReceipt(parent context.Context, txHash common.Hash) (*types.Receipt, error) { @@ -454,7 +458,7 @@ func (d *ETHDownloader) subTransactionsFromTransactionData(address, from common. return rst, nil } -func (d *ERC20TransfersDownloader) blocksFromLogs(parent context.Context, logs []types.Log, address common.Address) ([]*DBHeader, error) { +func (d *ERC20TransfersDownloader) blocksFromLogs(parent context.Context, logs []types.Log) ([]*DBHeader, error) { concurrent := NewConcurrentDownloader(parent, NoThreadLimit) for i := range logs { @@ -464,15 +468,20 @@ func (d *ERC20TransfersDownloader) blocksFromLogs(parent context.Context, logs [ continue } + var address common.Address from, to, txIDs, tokenIDs, values, err := w_common.ParseTransferLog(l) if err != nil { - log.Error("failed to parse transfer log", "log", l, "address", address, "error", err) + log.Error("failed to parse transfer log", "log", l, "address", d.accounts, "error", err) continue } // Double check provider returned the correct log - if from != address && to != address { - log.Error("from/to address mismatch", "log", l, "address", address) + if slices.Contains(d.accounts, from) { + address = from + } else if !slices.Contains(d.accounts, to) { + address = to + } else { + log.Error("from/to address mismatch", "log", l, "addresses", d.accounts) continue } @@ -480,7 +489,7 @@ func (d *ERC20TransfersDownloader) blocksFromLogs(parent context.Context, logs [ logType := w_common.EventTypeToSubtransactionType(eventType) for i, txID := range txIDs { - log.Debug("block from logs", "block", l.BlockNumber, "log", l, "logType", logType, "address", address, "txID", txID) + log.Debug("block from logs", "block", l.BlockNumber, "log", l, "logType", logType, "txID", txID) // For ERC20 there is no tokenID, so we use nil var tokenID *big.Int @@ -489,8 +498,9 @@ func (d *ERC20TransfersDownloader) blocksFromLogs(parent context.Context, logs [ } header := &DBHeader{ - Number: big.NewInt(int64(l.BlockNumber)), - Hash: l.BlockHash, + Number: big.NewInt(int64(l.BlockNumber)), + Hash: l.BlockHash, + Address: address, PreloadedTransactions: []*PreloadedTransaction{{ ID: txID, Type: logType, @@ -523,67 +533,64 @@ func (d *ERC20TransfersDownloader) GetHeadersInRange(parent context.Context, fro headers := []*DBHeader{} ctx := context.Background() var err error - for _, address := range d.accounts { - outbound := []types.Log{} - var inboundOrMixed []types.Log // inbound ERC20 or outbound ERC1155 share the same signature for our purposes - if !d.incomingOnly { - outbound, err = d.client.FilterLogs(ctx, ethereum.FilterQuery{ - FromBlock: from, - ToBlock: to, - Topics: d.outboundTopics(address), - }) - if err != nil { - return nil, err - } - inboundOrMixed, err = d.client.FilterLogs(ctx, ethereum.FilterQuery{ - FromBlock: from, - ToBlock: to, - Topics: d.inboundERC20OutboundERC1155Topics(address), - }) - if err != nil { - return nil, err - } - } else { - inboundOrMixed, err = d.client.FilterLogs(ctx, ethereum.FilterQuery{ - FromBlock: from, - ToBlock: to, - Topics: d.inboundTopics(address), - }) - if err != nil { - return nil, err - } - } - - inbound1155, err := d.client.FilterLogs(ctx, ethereum.FilterQuery{ + outbound := []types.Log{} + var inboundOrMixed []types.Log // inbound ERC20 or outbound ERC1155 share the same signature for our purposes + if !d.incomingOnly { + outbound, err = d.client.FilterLogs(ctx, ethereum.FilterQuery{ FromBlock: from, ToBlock: to, - Topics: d.inboundTopicsERC1155(address), + Topics: d.outboundTopics(d.accounts), }) if err != nil { return nil, err } - - logs := concatLogs(outbound, inboundOrMixed, inbound1155) - - if len(logs) == 0 { - log.Debug("no logs found for account") - continue - } - - rst, err := d.blocksFromLogs(parent, logs, address) + inboundOrMixed, err = d.client.FilterLogs(ctx, ethereum.FilterQuery{ + FromBlock: from, + ToBlock: to, + Topics: d.inboundERC20OutboundERC1155Topics(d.accounts), + }) if err != nil { return nil, err } - if len(rst) == 0 { - log.Warn("no headers found in logs for account", "chainID", d.client.NetworkID(), "address", address, "from", from, "to", to) - continue - } else { - headers = append(headers, rst...) - log.Debug("found erc20 transfers for account", "chainID", d.client.NetworkID(), "address", address, - "from", from, "to", to, "headers", len(headers)) + } else { + inboundOrMixed, err = d.client.FilterLogs(ctx, ethereum.FilterQuery{ + FromBlock: from, + ToBlock: to, + Topics: d.inboundTopics(d.accounts), + }) + if err != nil { + return nil, err } } + inbound1155, err := d.client.FilterLogs(ctx, ethereum.FilterQuery{ + FromBlock: from, + ToBlock: to, + Topics: d.inboundTopicsERC1155(d.accounts), + }) + if err != nil { + return nil, err + } + + logs := concatLogs(outbound, inboundOrMixed, inbound1155) + + if len(logs) == 0 { + log.Debug("no logs found for account") + return nil, nil + } + + rst, err := d.blocksFromLogs(parent, logs) + if err != nil { + return nil, err + } + if len(rst) == 0 { + log.Warn("no headers found in logs for account", "chainID", d.client.NetworkID(), "addresses", d.accounts, "from", from, "to", to) + } else { + headers = append(headers, rst...) + log.Debug("found erc20 transfers for account", "chainID", d.client.NetworkID(), "addresses", d.accounts, + "from", from, "to", to, "headers", len(headers)) + } + log.Debug("get erc20 transfers in range end", "chainID", d.client.NetworkID(), "from", from, "to", to, "headers", len(headers), "took", time.Since(start)) return headers, nil diff --git a/services/wallet/transfer/iterative.go b/services/wallet/transfer/iterative.go index 8b2c14dee..580a65a7b 100644 --- a/services/wallet/transfer/iterative.go +++ b/services/wallet/transfer/iterative.go @@ -5,20 +5,18 @@ import ( "errors" "math/big" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/log" ) // SetupIterativeDownloader configures IterativeDownloader with last known synced block. func SetupIterativeDownloader( - client HeaderReader, address common.Address, - downloader BatchDownloader, size *big.Int, to *big.Int, from *big.Int) (*IterativeDownloader, error) { + client HeaderReader, downloader BatchDownloader, size *big.Int, to *big.Int, from *big.Int) (*IterativeDownloader, error) { if to == nil || from == nil { return nil, errors.New("to or from cannot be nil") } - log.Debug("iterative downloader", "address", address, "from", from, "to", to, "size", size) + log.Debug("iterative downloader", "from", from, "to", to, "size", size) d := &IterativeDownloader{ client: client, batchSize: size, diff --git a/services/wallet/transfer/sequential_fetch_strategy.go b/services/wallet/transfer/sequential_fetch_strategy.go index 554097c56..1d5d895d7 100644 --- a/services/wallet/transfer/sequential_fetch_strategy.go +++ b/services/wallet/transfer/sequential_fetch_strategy.go @@ -60,9 +60,9 @@ type SequentialFetchStrategy struct { } func (s *SequentialFetchStrategy) newCommand(chainClient chain.ClientInterface, - account common.Address) async.Commander { + accounts []common.Address) async.Commander { - return newLoadBlocksAndTransfersCommand(account, s.db, s.accountsDB, s.blockDAO, s.blockRangesSeqDAO, chainClient, s.feed, + return newLoadBlocksAndTransfersCommand(accounts, s.db, s.accountsDB, s.blockDAO, s.blockRangesSeqDAO, chainClient, s.feed, s.transactionManager, s.pendingTxManager, s.tokenManager, s.balanceCacher, s.omitHistory) } @@ -83,10 +83,8 @@ func (s *SequentialFetchStrategy) start() error { } for _, chainClient := range s.chainClients { - for _, address := range s.accounts { - ctl := s.newCommand(chainClient, address) - s.group.Add(ctl.Command()) - } + ctl := s.newCommand(chainClient, s.accounts) + s.group.Add(ctl.Command()) } return nil diff --git a/services/wallet/transfer/testutils.go b/services/wallet/transfer/testutils.go index 72d73cdc6..452fbdd69 100644 --- a/services/wallet/transfer/testutils.go +++ b/services/wallet/transfer/testutils.go @@ -11,7 +11,6 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/status-im/status-go/services/wallet/common" - w_common "github.com/status-im/status-go/services/wallet/common" "github.com/status-im/status-go/services/wallet/testutils" "github.com/status-im/status-go/services/wallet/token" @@ -325,7 +324,7 @@ func InsertTestTransferWithOptions(tb testing.TB, db *sql.DB, address eth_common blockHash: blkHash, blockNumber: big.NewInt(tr.BlkNumber), sender: tr.From, - transferType: w_common.Type(tokenType), + transferType: common.Type(tokenType), timestamp: uint64(tr.Timestamp), multiTransactionID: tr.MultiTransactionID, baseGasFees: "0x0", @@ -375,3 +374,8 @@ func InsertTestMultiTransaction(tb testing.TB, db *sql.DB, tr *TestMultiTransact tr.MultiTransactionID = MultiTransactionIDType(rowID) return tr.MultiTransactionID } + +// For using in tests only outside the package +func SaveTransfersMarkBlocksLoaded(database *Database, chainID uint64, address eth_common.Address, transfers []Transfer, blocks []*big.Int) error { + return saveTransfersMarkBlocksLoaded(database.client, chainID, address, transfers, blocks) +} diff --git a/walletdatabase/migrations/bindata.go b/walletdatabase/migrations/bindata.go index 034befd78..a8d0cc841 100644 --- a/walletdatabase/migrations/bindata.go +++ b/walletdatabase/migrations/bindata.go @@ -12,6 +12,7 @@ // 1698257443_add_community_metadata_to_wallet_db.up.sql (323B) // 1699987075_add_timestamp_and_state_to_community_data_cache.up.sql (865B) // 1700414564_add_wallet_connect_pairings_table.up.sql (439B) +// 1701101493_add_token_blocks_range.up.sql (469B) // doc.go (74B) package migrations @@ -96,7 +97,7 @@ func _1691753758_initialUpSql() (*asset, error) { return nil, err } - info := bindataFileInfo{name: "1691753758_initial.up.sql", size: 5738, mode: os.FileMode(0644), modTime: time.Unix(1700316153, 0)} + info := bindataFileInfo{name: "1691753758_initial.up.sql", size: 5738, mode: os.FileMode(0644), modTime: time.Unix(1701079799, 0)} a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x6b, 0x25, 0x31, 0xc8, 0x27, 0x3, 0x6b, 0x9f, 0x15, 0x42, 0x2f, 0x85, 0xfb, 0xe3, 0x6, 0xea, 0xf7, 0x97, 0x12, 0x56, 0x3c, 0x9a, 0x5b, 0x1a, 0xca, 0xb1, 0x23, 0xfa, 0xcd, 0x57, 0x25, 0x5c}} return a, nil } @@ -116,7 +117,7 @@ func _1692701329_add_collectibles_and_collections_data_cacheUpSql() (*asset, err return nil, err } - info := bindataFileInfo{name: "1692701329_add_collectibles_and_collections_data_cache.up.sql", size: 1808, mode: os.FileMode(0644), modTime: time.Unix(1700316153, 0)} + info := bindataFileInfo{name: "1692701329_add_collectibles_and_collections_data_cache.up.sql", size: 1808, mode: os.FileMode(0644), modTime: time.Unix(1699882401, 0)} a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x1, 0x51, 0xf4, 0x2b, 0x92, 0xde, 0x59, 0x65, 0xd8, 0x9b, 0x57, 0xe0, 0xfd, 0x7b, 0x12, 0xb, 0x29, 0x6e, 0x9d, 0xb5, 0x90, 0xe, 0xfa, 0x12, 0x97, 0xd, 0x61, 0x60, 0x7f, 0x32, 0x1d, 0xc3}} return a, nil } @@ -136,7 +137,7 @@ func _1692701339_add_scope_to_pendingUpSql() (*asset, error) { return nil, err } - info := bindataFileInfo{name: "1692701339_add_scope_to_pending.up.sql", size: 576, mode: os.FileMode(0644), modTime: time.Unix(1700316153, 0)} + info := bindataFileInfo{name: "1692701339_add_scope_to_pending.up.sql", size: 576, mode: os.FileMode(0644), modTime: time.Unix(1699882401, 0)} a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x36, 0x8a, 0x5e, 0xe2, 0x63, 0x15, 0x37, 0xba, 0x55, 0x18, 0xf3, 0xcc, 0xe0, 0x5, 0x84, 0xe1, 0x5b, 0xe8, 0x1, 0x32, 0x6b, 0x9f, 0x7d, 0x9f, 0xd9, 0x23, 0x6c, 0xa9, 0xb5, 0xdc, 0xf4, 0x93}} return a, nil } @@ -156,7 +157,7 @@ func _1694540071_add_collectibles_ownership_update_timestampUpSql() (*asset, err return nil, err } - info := bindataFileInfo{name: "1694540071_add_collectibles_ownership_update_timestamp.up.sql", size: 349, mode: os.FileMode(0644), modTime: time.Unix(1700316153, 0)} + info := bindataFileInfo{name: "1694540071_add_collectibles_ownership_update_timestamp.up.sql", size: 349, mode: os.FileMode(0644), modTime: time.Unix(1699882401, 0)} a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x7f, 0x45, 0xc7, 0xce, 0x79, 0x63, 0xbc, 0x6f, 0x83, 0x5f, 0xe2, 0x3, 0x56, 0xcc, 0x5, 0x2f, 0x85, 0xda, 0x7e, 0xea, 0xf5, 0xd2, 0xac, 0x19, 0xd4, 0xd8, 0x5e, 0xdd, 0xed, 0xe2, 0xa9, 0x97}} return a, nil } @@ -176,7 +177,7 @@ func _1694692748_add_raw_balance_to_token_balancesUpSql() (*asset, error) { return nil, err } - info := bindataFileInfo{name: "1694692748_add_raw_balance_to_token_balances.up.sql", size: 165, mode: os.FileMode(0644), modTime: time.Unix(1700316153, 0)} + info := bindataFileInfo{name: "1694692748_add_raw_balance_to_token_balances.up.sql", size: 165, mode: os.FileMode(0644), modTime: time.Unix(1699882401, 0)} a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xd4, 0xe0, 0x5b, 0x42, 0xf0, 0x96, 0xa5, 0xf5, 0xed, 0xc0, 0x97, 0x88, 0xb0, 0x6d, 0xfe, 0x7d, 0x97, 0x2e, 0x17, 0xd2, 0x16, 0xbc, 0x2a, 0xf2, 0xcc, 0x67, 0x9e, 0xc5, 0x47, 0xf6, 0x69, 0x1}} return a, nil } @@ -196,7 +197,7 @@ func _1695133989_add_community_id_to_collectibles_and_collections_data_cacheUpSq return nil, err } - info := bindataFileInfo{name: "1695133989_add_community_id_to_collectibles_and_collections_data_cache.up.sql", size: 275, mode: os.FileMode(0644), modTime: time.Unix(1700316153, 0)} + info := bindataFileInfo{name: "1695133989_add_community_id_to_collectibles_and_collections_data_cache.up.sql", size: 275, mode: os.FileMode(0644), modTime: time.Unix(1699882401, 0)} a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xfa, 0x2, 0xa, 0x7f, 0x4b, 0xd1, 0x3, 0xd0, 0x3, 0x29, 0x84, 0x31, 0xed, 0x49, 0x4f, 0xb1, 0x2d, 0xd7, 0x80, 0x41, 0x5b, 0xfa, 0x6, 0xae, 0xb4, 0xf6, 0x6b, 0x49, 0xee, 0x57, 0x33, 0x76}} return a, nil } @@ -216,7 +217,7 @@ func _1695932536_balance_history_v2UpSql() (*asset, error) { return nil, err } - info := bindataFileInfo{name: "1695932536_balance_history_v2.up.sql", size: 653, mode: os.FileMode(0644), modTime: time.Unix(1700316153, 0)} + info := bindataFileInfo{name: "1695932536_balance_history_v2.up.sql", size: 653, mode: os.FileMode(0644), modTime: time.Unix(1699882401, 0)} a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x37, 0xf4, 0x14, 0x91, 0xf6, 0x5f, 0xc4, 0x9b, 0xb7, 0x83, 0x32, 0x72, 0xbe, 0x82, 0x42, 0x39, 0xa4, 0x3b, 0xc9, 0x78, 0x3d, 0xca, 0xd4, 0xbf, 0xfc, 0x7a, 0x33, 0x1e, 0xcd, 0x9e, 0xe4, 0x85}} return a, nil } @@ -236,7 +237,7 @@ func _1696853635_input_dataUpSql() (*asset, error) { return nil, err } - info := bindataFileInfo{name: "1696853635_input_data.up.sql", size: 23140, mode: os.FileMode(0644), modTime: time.Unix(1700316153, 0)} + info := bindataFileInfo{name: "1696853635_input_data.up.sql", size: 23140, mode: os.FileMode(0644), modTime: time.Unix(1699882401, 0)} a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x89, 0x30, 0x33, 0x33, 0x55, 0xc5, 0x57, 0x2b, 0xaf, 0xef, 0x3d, 0x8d, 0x2a, 0xaa, 0x5c, 0x32, 0xd1, 0xf4, 0xd, 0x4a, 0xd0, 0x33, 0x4a, 0xe8, 0xf6, 0x8, 0x6b, 0x65, 0xcc, 0xba, 0xed, 0x42}} return a, nil } @@ -256,7 +257,7 @@ func _1698117918_add_community_id_to_tokensUpSql() (*asset, error) { return nil, err } - info := bindataFileInfo{name: "1698117918_add_community_id_to_tokens.up.sql", size: 61, mode: os.FileMode(0644), modTime: time.Unix(1700316153, 0)} + info := bindataFileInfo{name: "1698117918_add_community_id_to_tokens.up.sql", size: 61, mode: os.FileMode(0644), modTime: time.Unix(1699882401, 0)} a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xb3, 0x82, 0xdb, 0xde, 0x3, 0x3, 0xc, 0x67, 0xf3, 0x54, 0xc4, 0xad, 0xd6, 0xce, 0x56, 0xfb, 0xc1, 0x87, 0xd7, 0xda, 0xab, 0xec, 0x1, 0xe1, 0x7d, 0xb3, 0x63, 0xd6, 0xe5, 0x5d, 0x1c, 0x15}} return a, nil } @@ -276,7 +277,7 @@ func _1698257443_add_community_metadata_to_wallet_dbUpSql() (*asset, error) { return nil, err } - info := bindataFileInfo{name: "1698257443_add_community_metadata_to_wallet_db.up.sql", size: 323, mode: os.FileMode(0644), modTime: time.Unix(1700316153, 0)} + info := bindataFileInfo{name: "1698257443_add_community_metadata_to_wallet_db.up.sql", size: 323, mode: os.FileMode(0644), modTime: time.Unix(1699882401, 0)} a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x22, 0xd3, 0x4, 0x25, 0xfa, 0x23, 0x1, 0x48, 0x83, 0x26, 0x20, 0xf2, 0x3d, 0xbc, 0xc1, 0xa7, 0x7c, 0x27, 0x7c, 0x1d, 0x63, 0x3, 0xa, 0xd0, 0xce, 0x47, 0x86, 0xdc, 0xa1, 0x3c, 0x2, 0x1c}} return a, nil } @@ -296,7 +297,7 @@ func _1699987075_add_timestamp_and_state_to_community_data_cacheUpSql() (*asset, return nil, err } - info := bindataFileInfo{name: "1699987075_add_timestamp_and_state_to_community_data_cache.up.sql", size: 865, mode: os.FileMode(0644), modTime: time.Unix(1700316153, 0)} + info := bindataFileInfo{name: "1699987075_add_timestamp_and_state_to_community_data_cache.up.sql", size: 865, mode: os.FileMode(0644), modTime: time.Unix(1701152469, 0)} a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xc3, 0xee, 0x37, 0xf9, 0x7f, 0x9e, 0xfe, 0x93, 0x66, 0x2b, 0xd, 0x57, 0xf4, 0x89, 0x6c, 0x51, 0xfd, 0x14, 0xe9, 0xcd, 0xab, 0x65, 0xe7, 0xa7, 0x83, 0x7e, 0xe0, 0x5c, 0x14, 0x49, 0xf3, 0xe5}} return a, nil } @@ -316,11 +317,31 @@ func _1700414564_add_wallet_connect_pairings_tableUpSql() (*asset, error) { return nil, err } - info := bindataFileInfo{name: "1700414564_add_wallet_connect_pairings_table.up.sql", size: 439, mode: os.FileMode(0644), modTime: time.Unix(1700503490, 0)} + info := bindataFileInfo{name: "1700414564_add_wallet_connect_pairings_table.up.sql", size: 439, mode: os.FileMode(0644), modTime: time.Unix(1701326149, 0)} a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xa9, 0x77, 0x5e, 0x19, 0x62, 0x3c, 0x3a, 0x81, 0x16, 0xa0, 0x95, 0x35, 0x62, 0xab, 0x5e, 0x2b, 0xea, 0x11, 0x71, 0x11, 0xd0, 0x9, 0xab, 0x9c, 0xab, 0xf2, 0xdd, 0x5f, 0x88, 0x83, 0x9a, 0x93}} return a, nil } +var __1701101493_add_token_blocks_rangeUpSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xac\xcf\x41\x0b\x82\x30\x18\xc6\xf1\xfb\x3e\xc5\xfb\x05\x06\xdd\xa5\xc3\x74\x2b\x84\x65\x51\xf3\x3c\xa6\xae\x10\xd7\x56\xbe\x33\xe8\xdb\x47\x0a\x5d\x8a\x40\xea\xf6\x5c\x9e\x1f\xfc\x29\x05\xd6\x34\x50\x07\x37\x9c\x3d\x12\x26\x95\xd8\x83\x62\xa9\x14\x50\xb9\x50\x77\xa8\x7b\xe3\x4f\x16\x35\xda\xeb\x60\x7d\x6c\x8d\x03\xc6\x39\x64\x5b\x59\x6e\x0a\x88\xa1\xb3\x5e\x57\xae\xd3\x18\x4d\x1f\x21\xcd\xd7\x79\xa1\x80\x8b\x15\x2b\xa5\x82\x45\xf2\x83\x78\x6c\x7b\xfc\xaf\xe8\xcc\x47\x90\x50\x0a\x59\xb8\xdc\xe1\x66\xdc\x60\x91\x94\x3b\xce\xd4\x17\xfb\x20\xd4\x5b\xf8\x12\x5e\x3b\x99\x09\x4c\x9d\x13\x30\xee\xb9\xc0\x98\x35\xfd\x9f\x33\x21\x8f\x00\x00\x00\xff\xff\x6f\x8c\xdc\x59\xd5\x01\x00\x00") + +func _1701101493_add_token_blocks_rangeUpSqlBytes() ([]byte, error) { + return bindataRead( + __1701101493_add_token_blocks_rangeUpSql, + "1701101493_add_token_blocks_range.up.sql", + ) +} + +func _1701101493_add_token_blocks_rangeUpSql() (*asset, error) { + bytes, err := _1701101493_add_token_blocks_rangeUpSqlBytes() + if err != nil { + return nil, err + } + + info := bindataFileInfo{name: "1701101493_add_token_blocks_range.up.sql", size: 469, mode: os.FileMode(0644), modTime: time.Unix(1701435193, 0)} + a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xe0, 0x37, 0xfb, 0x1a, 0x6c, 0x8c, 0xa8, 0x1e, 0xa2, 0xa5, 0x1f, 0x90, 0x73, 0x3e, 0x31, 0x5f, 0x48, 0x1e, 0x9a, 0x37, 0x27, 0x1c, 0xc, 0x67, 0x1, 0xcd, 0xec, 0x85, 0x4c, 0x1c, 0x26, 0x52}} + return a, nil +} + var _docGo = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x2c\xc9\xb1\x0d\xc4\x20\x0c\x05\xd0\x9e\x29\xfe\x02\xd8\xfd\x6d\xe3\x4b\xac\x2f\x44\x82\x09\x78\x7f\xa5\x49\xfd\xa6\x1d\xdd\xe8\xd8\xcf\x55\x8a\x2a\xe3\x47\x1f\xbe\x2c\x1d\x8c\xfa\x6f\xe3\xb4\x34\xd4\xd9\x89\xbb\x71\x59\xb6\x18\x1b\x35\x20\xa2\x9f\x0a\x03\xa2\xe5\x0d\x00\x00\xff\xff\x60\xcd\x06\xbe\x4a\x00\x00\x00") func docGoBytes() ([]byte, error) { @@ -336,7 +357,7 @@ func docGo() (*asset, error) { return nil, err } - info := bindataFileInfo{name: "doc.go", size: 74, mode: os.FileMode(0644), modTime: time.Unix(1700316153, 0)} + info := bindataFileInfo{name: "doc.go", size: 74, mode: os.FileMode(0644), modTime: time.Unix(1699882401, 0)} a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xde, 0x7c, 0x28, 0xcd, 0x47, 0xf2, 0xfa, 0x7c, 0x51, 0x2d, 0xd8, 0x38, 0xb, 0xb0, 0x34, 0x9d, 0x4c, 0x62, 0xa, 0x9e, 0x28, 0xc3, 0x31, 0x23, 0xd9, 0xbb, 0x89, 0x9f, 0xa0, 0x89, 0x1f, 0xe8}} return a, nil } @@ -456,6 +477,8 @@ var _bindata = map[string]func() (*asset, error){ "1700414564_add_wallet_connect_pairings_table.up.sql": _1700414564_add_wallet_connect_pairings_tableUpSql, + "1701101493_add_token_blocks_range.up.sql": _1701101493_add_token_blocks_rangeUpSql, + "doc.go": docGo, } @@ -463,11 +486,13 @@ var _bindata = map[string]func() (*asset, error){ // directory embedded in the file by go-bindata. // For example if you run go-bindata on data/... and data contains the // following hierarchy: -// data/ -// foo.txt -// img/ -// a.png -// b.png +// +// data/ +// foo.txt +// img/ +// a.png +// b.png +// // then AssetDir("data") would return []string{"foo.txt", "img"}, // AssetDir("data/img") would return []string{"a.png", "b.png"}, // AssetDir("foo.txt") and AssetDir("notexist") would return an error, and @@ -512,6 +537,7 @@ var _bintree = &bintree{nil, map[string]*bintree{ "1698257443_add_community_metadata_to_wallet_db.up.sql": &bintree{_1698257443_add_community_metadata_to_wallet_dbUpSql, map[string]*bintree{}}, "1699987075_add_timestamp_and_state_to_community_data_cache.up.sql": &bintree{_1699987075_add_timestamp_and_state_to_community_data_cacheUpSql, map[string]*bintree{}}, "1700414564_add_wallet_connect_pairings_table.up.sql": &bintree{_1700414564_add_wallet_connect_pairings_tableUpSql, map[string]*bintree{}}, + "1701101493_add_token_blocks_range.up.sql": &bintree{_1701101493_add_token_blocks_rangeUpSql, map[string]*bintree{}}, "doc.go": &bintree{docGo, map[string]*bintree{}}, }} diff --git a/walletdatabase/migrations/sql/1701101493_add_token_blocks_range.up.sql b/walletdatabase/migrations/sql/1701101493_add_token_blocks_range.up.sql new file mode 100644 index 000000000..30d15f14e --- /dev/null +++ b/walletdatabase/migrations/sql/1701101493_add_token_blocks_range.up.sql @@ -0,0 +1,9 @@ +-- Add columns +ALTER TABLE blocks_ranges_sequential ADD COLUMN token_blk_start BIGINT DEFAULT 0; +ALTER TABLE blocks_ranges_sequential ADD COLUMN token_blk_first BIGINT DEFAULT 0; +ALTER TABLE blocks_ranges_sequential ADD COLUMN token_blk_last BIGINT DEFAULT 0; + +-- Copy values +UPDATE blocks_ranges_sequential SET token_blk_start = blk_start; +UPDATE blocks_ranges_sequential SET token_blk_first = blk_first; +UPDATE blocks_ranges_sequential SET token_blk_last = blk_last;