This commit is contained in:
Khushboo Mehta 2022-11-14 15:45:49 +01:00
parent 2341dedfba
commit 233431db8e
3 changed files with 41 additions and 37 deletions

View File

@ -68,7 +68,7 @@ const (
type PendingTransaction struct { type PendingTransaction struct {
Hash common.Hash `json:"hash"` Hash common.Hash `json:"hash"`
Timestamp uint64 `json:"timestamp"` Timestamp uint64 `json:"timestamp"`
Value bigint.BigInt `json:"value"` Value *hexutil.Big `json:"value"`
From common.Address `json:"from"` From common.Address `json:"from"`
To common.Address `json:"to"` To common.Address `json:"to"`
Data string `json:"data"` Data string `json:"data"`
@ -106,13 +106,13 @@ func (tm *TransactionManager) getAllPendings(chainIDs []uint64) ([]*PendingTrans
var transactions []*PendingTransaction var transactions []*PendingTransaction
for rows.Next() { for rows.Next() {
transaction := &PendingTransaction{ transaction := &PendingTransaction{
Value: bigint.BigInt{Int: new(big.Int)}, Value: new(hexutil.Big),
GasPrice: bigint.BigInt{Int: new(big.Int)}, GasPrice: bigint.BigInt{Int: new(big.Int)},
GasLimit: bigint.BigInt{Int: new(big.Int)}, GasLimit: bigint.BigInt{Int: new(big.Int)},
} }
err := rows.Scan(&transaction.Hash, err := rows.Scan(&transaction.Hash,
&transaction.Timestamp, &transaction.Timestamp,
(*bigint.SQLBigIntBytes)(transaction.Value.Int), &transaction.Value,
&transaction.From, &transaction.From,
&transaction.To, &transaction.To,
&transaction.Data, &transaction.Data,
@ -159,13 +159,13 @@ func (tm *TransactionManager) getPendingByAddress(chainIDs []uint64, address com
var transactions []*PendingTransaction var transactions []*PendingTransaction
for rows.Next() { for rows.Next() {
transaction := &PendingTransaction{ transaction := &PendingTransaction{
Value: bigint.BigInt{Int: new(big.Int)}, Value: new(hexutil.Big),
GasPrice: bigint.BigInt{Int: new(big.Int)}, GasPrice: bigint.BigInt{Int: new(big.Int)},
GasLimit: bigint.BigInt{Int: new(big.Int)}, GasLimit: bigint.BigInt{Int: new(big.Int)},
} }
err := rows.Scan(&transaction.Hash, err := rows.Scan(&transaction.Hash,
&transaction.Timestamp, &transaction.Timestamp,
(*bigint.SQLBigIntBytes)(transaction.Value.Int), &transaction.Value,
&transaction.From, &transaction.From,
&transaction.To, &transaction.To,
&transaction.Data, &transaction.Data,
@ -187,6 +187,7 @@ func (tm *TransactionManager) getPendingByAddress(chainIDs []uint64, address com
} }
func (tm *TransactionManager) addPending(transaction PendingTransaction) error { func (tm *TransactionManager) addPending(transaction PendingTransaction) error {
fmt.Println("addPending >>>>>>>>>>>>>>>")
insert, err := tm.db.Prepare(`INSERT OR REPLACE INTO pending_transactions insert, err := tm.db.Prepare(`INSERT OR REPLACE INTO pending_transactions
(network_id, hash, timestamp, value, from_address, to_address, (network_id, hash, timestamp, value, from_address, to_address,
data, symbol, gas_price, gas_limit, type, additional_data) data, symbol, gas_price, gas_limit, type, additional_data)
@ -199,7 +200,7 @@ func (tm *TransactionManager) addPending(transaction PendingTransaction) error {
transaction.ChainID, transaction.ChainID,
transaction.Hash, transaction.Hash,
transaction.Timestamp, transaction.Timestamp,
(*bigint.SQLBigIntBytes)(transaction.Value.Int), transaction.Value,
transaction.From, transaction.From,
transaction.To, transaction.To,
transaction.Data, transaction.Data,
@ -242,7 +243,8 @@ func (tm *TransactionManager) createMultiTransaction(ctx context.Context, multiT
if err != nil { if err != nil {
return nil, err return nil, err
} }
result, err := insert.Exec( fmt.Println("multiTransaction.FromAmount.String() = ", multiTransaction.FromAmount.String())
result, err := insert.Exec(
multiTransaction.FromAddress, multiTransaction.FromAddress,
multiTransaction.FromAsset, multiTransaction.FromAsset,
multiTransaction.FromAmount.String(), multiTransaction.FromAmount.String(),
@ -262,6 +264,7 @@ func (tm *TransactionManager) createMultiTransaction(ctx context.Context, multiT
hashes := make(map[uint64][]types.Hash) hashes := make(map[uint64][]types.Hash)
for _, tx := range data { for _, tx := range data {
fmt.Println("tx.Value() = ", tx.Value(), " >>> ", tx)
hash, err := bridges[tx.BridgeName].Send(tx, selectedAccount) hash, err := bridges[tx.BridgeName].Send(tx, selectedAccount)
if err != nil { if err != nil {
return nil, err return nil, err
@ -269,13 +272,14 @@ func (tm *TransactionManager) createMultiTransaction(ctx context.Context, multiT
err = tm.addPending(PendingTransaction{ err = tm.addPending(PendingTransaction{
Hash: common.Hash(hash), Hash: common.Hash(hash),
Timestamp: uint64(time.Now().Unix()), Timestamp: uint64(time.Now().Unix()),
Value: bigint.BigInt{tx.Value()}, Value: multiTransaction.FromAmount,
From: common.Address(tx.From()), From: common.Address(tx.From()),
To: common.Address(tx.To()), To: common.Address(tx.To()),
Data: tx.Data().String(), Data: tx.Data().String(),
Type: WalletTransfer, Type: WalletTransfer,
ChainID: tx.ChainID, ChainID: tx.ChainID,
MultiTransactionID: multiTransactionID, MultiTransactionID: multiTransactionID,
Symbol: multiTransaction.FromAsset,
}) })
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -5,11 +5,11 @@ import (
"math/big" "math/big"
"strings" "strings"
"time" "time"
"fmt"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/status-im/status-go/services/wallet/async" "github.com/status-im/status-go/services/wallet/async"
"github.com/status-im/status-go/services/wallet/chain" "github.com/status-im/status-go/services/wallet/chain"
) )
@ -67,14 +67,14 @@ func (c *ethHistoricalCommand) Run(ctx context.Context) (err error) {
c.foundHeaders = headers c.foundHeaders = headers
c.resultingFrom = from c.resultingFrom = from
log.Info("eth historical downloader finished successfully", "address", c.address, "from", from, "to", c.to, "total blocks", len(headers), "time", time.Since(start)) fmt.Println("eth historical downloader finished successfully", "address", c.address, "from", from, "to", c.to, "total blocks", len(headers), "time", time.Since(start))
//err = c.db.ProcessBlocks(c.address, from, c.to, headers, ethTransfer) //err = c.db.ProcessBlocks(c.address, from, c.to, headers, ethTransfer)
if err != nil { if err != nil {
log.Error("failed to save found blocks with transfers", "error", err) fmt.Println("failed to save found blocks with transfers", "error", err)
return err return err
} }
log.Debug("eth transfers were persisted. command is closed") fmt.Println("eth transfers were persisted. command is closed")
return nil return nil
} }
@ -117,14 +117,14 @@ func (c *erc20HistoricalCommand) Run(ctx context.Context) (err error) {
c.db, c.chainClient, c.address, c.db, c.chainClient, c.address,
c.erc20, getErc20BatchSize(c.chainClient.ChainID), c.to, c.from) c.erc20, getErc20BatchSize(c.chainClient.ChainID), c.to, c.from)
if err != nil { if err != nil {
log.Error("failed to setup historical downloader for erc20") fmt.Println("failed to setup historical downloader for erc20")
return err return err
} }
} }
for !c.iterator.Finished() { for !c.iterator.Finished() {
headers, _, _, err := c.iterator.Next(ctx) headers, _, _, err := c.iterator.Next(ctx)
if err != nil { if err != nil {
log.Error("failed to get next batch", "error", err) fmt.Println("failed to get next batch", "error", err)
return err return err
} }
c.foundHeaders = append(c.foundHeaders, headers...) c.foundHeaders = append(c.foundHeaders, headers...)
@ -132,11 +132,11 @@ func (c *erc20HistoricalCommand) Run(ctx context.Context) (err error) {
/*err = c.db.ProcessBlocks(c.address, from, to, headers, erc20Transfer) /*err = c.db.ProcessBlocks(c.address, from, to, headers, erc20Transfer)
if err != nil { if err != nil {
c.iterator.Revert() c.iterator.Revert()
log.Error("failed to save downloaded erc20 blocks with transfers", "error", err) fmt.Println("failed to save downloaded erc20 blocks with transfers", "error", err)
return err return err
}*/ }*/
} }
log.Info("wallet historical downloader for erc20 transfers finished", "in", time.Since(start)) fmt.Println("wallet historical downloader for erc20 transfers finished", "in", time.Since(start))
return nil return nil
} }
@ -161,7 +161,7 @@ func (c *controlCommand) LoadTransfers(ctx context.Context, downloader *ETHDownl
} }
func (c *controlCommand) Run(parent context.Context) error { func (c *controlCommand) Run(parent context.Context) error {
log.Info("start control command") fmt.Println("start control command")
ctx, cancel := context.WithTimeout(parent, 3*time.Second) ctx, cancel := context.WithTimeout(parent, 3*time.Second)
head, err := c.chainClient.HeaderByNumber(ctx, nil) head, err := c.chainClient.HeaderByNumber(ctx, nil)
cancel() cancel()
@ -177,10 +177,10 @@ func (c *controlCommand) Run(parent context.Context) error {
Accounts: c.accounts, Accounts: c.accounts,
}) })
log.Info("current head is", "block number", head.Number) fmt.Println("current head is", "block number", head.Number)
lastKnownEthBlocks, accountsWithoutHistory, err := c.block.GetLastKnownBlockByAddresses(c.chainClient.ChainID, c.accounts) lastKnownEthBlocks, accountsWithoutHistory, err := c.block.GetLastKnownBlockByAddresses(c.chainClient.ChainID, c.accounts)
if err != nil { if err != nil {
log.Error("failed to load last head from database", "error", err) fmt.Println("failed to load last head from database", "error", err)
if c.NewError(err) { if c.NewError(err) {
return nil return nil
} }
@ -282,7 +282,7 @@ func (c *controlCommand) Run(parent context.Context) error {
BlockNumber: target, BlockNumber: target,
}) })
log.Info("end control command") fmt.Println("end control command")
return err return err
} }
@ -293,9 +293,9 @@ func nonArchivalNodeError(err error) bool {
func (c *controlCommand) NewError(err error) bool { func (c *controlCommand) NewError(err error) bool {
c.errorsCount++ c.errorsCount++
log.Error("controlCommand error", "error", err, "counter", c.errorsCount) fmt.Println("controlCommand error", "error", err, "counter", c.errorsCount)
if nonArchivalNodeError(err) { if nonArchivalNodeError(err) {
log.Info("Non archival node detected") fmt.Println("Non archival node detected")
c.nonArchivalRPCNode = true c.nonArchivalRPCNode = true
c.feed.Send(Event{ c.feed.Send(Event{
Type: EventNonArchivalNodeDetected, Type: EventNonArchivalNodeDetected,
@ -337,18 +337,18 @@ func (c *transfersCommand) Command() async.Command {
func (c *transfersCommand) Run(ctx context.Context) (err error) { func (c *transfersCommand) Run(ctx context.Context) (err error) {
allTransfers, err := getTransfersByBlocks(ctx, c.db, c.eth, c.address, []*big.Int{c.block}) allTransfers, err := getTransfersByBlocks(ctx, c.db, c.eth, c.address, []*big.Int{c.block})
if err != nil { if err != nil {
log.Info("getTransfersByBlocks error", "error", err) fmt.Println("getTransfersByBlocks error", "error", err)
return err return err
} }
err = c.db.SaveTranfers(c.chainClient.ChainID, c.address, allTransfers, []*big.Int{c.block}) err = c.db.SaveTranfers(c.chainClient.ChainID, c.address, allTransfers, []*big.Int{c.block})
if err != nil { if err != nil {
log.Error("SaveTranfers error", "error", err) fmt.Println("SaveTranfers error", "error", err)
return err return err
} }
c.fetchedTransfers = allTransfers c.fetchedTransfers = allTransfers
log.Debug("transfers loaded", "address", c.address, "len", len(allTransfers)) fmt.Println("transfers loaded", "address", c.address, "len", len(allTransfers))
return nil return nil
} }
@ -409,7 +409,7 @@ func (c *findAndCheckBlockRangeCommand) Command() async.Command {
} }
func (c *findAndCheckBlockRangeCommand) Run(parent context.Context) (err error) { func (c *findAndCheckBlockRangeCommand) Run(parent context.Context) (err error) {
log.Debug("start findAndCHeckBlockRangeCommand") fmt.Println("start findAndCHeckBlockRangeCommand")
newFromByAddress, ethHeadersByAddress, err := c.fastIndex(parent, c.balanceCache, c.fromByAddress, c.toByAddress) newFromByAddress, ethHeadersByAddress, err := c.fastIndex(parent, c.balanceCache, c.fromByAddress, c.toByAddress)
if err != nil { if err != nil {
c.error = err c.error = err
@ -460,7 +460,7 @@ func (c *findAndCheckBlockRangeCommand) Run(parent context.Context) (err error)
} }
lastBlockNumber := c.toByAddress[address] lastBlockNumber := c.toByAddress[address]
log.Debug("saving headers", "len", len(uniqHeaders), "lastBlockNumber", lastBlockNumber, "balance", c.balanceCache.ReadCachedBalance(address, lastBlockNumber), "nonce", c.balanceCache.ReadCachedNonce(address, lastBlockNumber)) fmt.Println("saving headers", "len", len(uniqHeaders), "lastBlockNumber", lastBlockNumber, "balance", c.balanceCache.ReadCachedBalance(address, lastBlockNumber), "nonce", c.balanceCache.ReadCachedNonce(address, lastBlockNumber))
to := &LastKnownBlock{ to := &LastKnownBlock{
Number: lastBlockNumber, Number: lastBlockNumber,
Balance: c.balanceCache.ReadCachedBalance(address, lastBlockNumber), Balance: c.balanceCache.ReadCachedBalance(address, lastBlockNumber),
@ -517,7 +517,7 @@ func (c *findAndCheckBlockRangeCommand) fastIndex(ctx context.Context, bCache *b
resultingFromByAddress[command.address] = command.resultingFrom resultingFromByAddress[command.address] = command.resultingFrom
headers[command.address] = command.foundHeaders headers[command.address] = command.foundHeaders
} }
log.Info("fast indexer finished", "in", time.Since(start)) fmt.Println("fast indexer finished", "in", time.Since(start))
return resultingFromByAddress, headers, nil return resultingFromByAddress, headers, nil
} }
} }
@ -551,7 +551,7 @@ func (c *findAndCheckBlockRangeCommand) fastIndexErc20(ctx context.Context, from
for _, command := range commands { for _, command := range commands {
headres[command.address] = command.foundHeaders headres[command.address] = command.foundHeaders
} }
log.Info("fast indexer Erc20 finished", "in", time.Since(start)) fmt.Println("fast indexer Erc20 finished", "in", time.Since(start))
return headres, nil return headres, nil
} }
} }
@ -603,7 +603,7 @@ func loadTransfers(ctx context.Context, accounts []common.Address, block *Block,
transfersByAddress[command.address] = append(transfers, transfer) transfersByAddress[command.address] = append(transfers, transfer)
} }
} }
log.Info("loadTransfers finished", "in", time.Since(start)) fmt.Println("loadTransfers finished", "in", time.Since(start))
return transfersByAddress, nil return transfersByAddress, nil
} }
} }
@ -631,7 +631,7 @@ func findFirstRange(c context.Context, account common.Address, initialTo *big.In
} }
firstNonce, err := client.NonceAt(c, account, to) firstNonce, err := client.NonceAt(c, account, to)
log.Info("find range with 20 <= len(tx) <= 25", "account", account, "firstNonce", firstNonce, "from", from, "to", to) fmt.Println("find range with 20 <= len(tx) <= 25", "account", account, "firstNonce", firstNonce, "from", from, "to", to)
if err != nil { if err != nil {
return nil, err return nil, err
@ -664,15 +664,15 @@ func findFirstRange(c context.Context, account common.Address, initialTo *big.In
} }
nonceDiff = firstNonce - fromNonce nonceDiff = firstNonce - fromNonce
log.Info("next nonce", "from", from, "n", fromNonce, "diff", firstNonce-fromNonce) fmt.Println("next nonce", "from", from, "n", fromNonce, "diff", firstNonce-fromNonce)
if goal <= nonceDiff && nonceDiff <= (goal+5) { if goal <= nonceDiff && nonceDiff <= (goal+5) {
log.Info("range found", "account", account, "from", from, "to", to) fmt.Println("range found", "account", account, "from", from, "to", to)
return from, nil return from, nil
} }
} }
log.Info("range found", "account", account, "from", from, "to", to) fmt.Println("range found", "account", account, "from", from, "to", to)
return from, nil return from, nil
} }
@ -700,7 +700,7 @@ func getTransfersByBlocks(ctx context.Context, db *Database, downloader *ETHDown
if err != nil { if err != nil {
return nil, err return nil, err
} }
log.Debug("loadTransfers", "block", block, "new transfers", len(transfers)) fmt.Println("loadTransfers", "block", block, "new transfers", len(transfers))
if len(transfers) > 0 { if len(transfers) > 0 {
allTransfers = append(allTransfers, transfers...) allTransfers = append(allTransfers, transfers...)
} }

View File

@ -19,7 +19,7 @@ const (
// EventFetchingHistoryError emitted when fetching of tx history failed // EventFetchingHistoryError emitted when fetching of tx history failed
EventFetchingHistoryError EventType = "fetching-history-error" EventFetchingHistoryError EventType = "fetching-history-error"
// EventNonArchivalNodeDetected emitted when a connection to a non archival node is detected // EventNonArchivalNodeDetected emitted when a connection to a non archival node is detected
EventNonArchivalNodeDetected EventType = "non-archival-node-detected" EventNonArchivalNodeDetected EventType = "non-archival-node-detected"
) )
// Event is a type for transfer events. // Event is a type for transfer events.