package transactions import ( "context" "database/sql" "encoding/json" "errors" "fmt" "math/big" "strings" "time" "go.uber.org/zap" eth "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/p2p" ethrpc "github.com/ethereum/go-ethereum/rpc" ethTypes "github.com/status-im/status-go/eth-node/types" "github.com/status-im/status-go/logutils" "github.com/status-im/status-go/rpc" "github.com/status-im/status-go/services/rpcfilters" "github.com/status-im/status-go/services/wallet/bigint" "github.com/status-im/status-go/services/wallet/common" wallet_common "github.com/status-im/status-go/services/wallet/common" "github.com/status-im/status-go/services/wallet/responses" "github.com/status-im/status-go/services/wallet/routeexecution/storage" "github.com/status-im/status-go/services/wallet/walletevent" ) const ( // EventPendingTransactionUpdate is emitted when a pending transaction is updated (added or deleted). Carries PendingTxUpdatePayload in message EventPendingTransactionUpdate walletevent.EventType = "pending-transaction-update" // EventPendingTransactionStatusChanged carries StatusChangedPayload in message EventPendingTransactionStatusChanged walletevent.EventType = "pending-transaction-status-changed" PendingCheckInterval = 10 * time.Second GetTransactionReceiptRPCName = "eth_getTransactionReceipt" ) var ( ErrStillPending = errors.New("transaction is still pending") ) type TxStatus = string // Values for status column in pending_transactions const ( Pending TxStatus = "Pending" Success TxStatus = "Success" Failed TxStatus = "Failed" ) type AutoDeleteType = bool const ( AutoDelete AutoDeleteType = true Keep AutoDeleteType = false ) type TxIdentity struct { ChainID common.ChainID `json:"chainId"` Hash eth.Hash `json:"hash"` } type TxDetails struct { SendDetails *responses.SendDetails `json:"sendDetails"` SentTransactions []*responses.RouterSentTransaction `json:"sentTransactions"` } type PendingTxUpdatePayload struct { TxIdentity TxDetails Deleted bool `json:"deleted"` } type StatusChangedPayload struct { TxIdentity TxDetails Status TxStatus `json:"status"` } // PendingTxTracker implements StatusService in common/status_node_service.go type PendingTxTracker struct { db *sql.DB routeExecutionStorage *storage.DB trackedTxDB *DB rpcClient rpc.ClientInterface rpcFilter *rpcfilters.Service eventFeed *event.Feed taskRunner *ConditionalRepeater logger *zap.Logger } func NewPendingTxTracker(db *sql.DB, rpcClient rpc.ClientInterface, rpcFilter *rpcfilters.Service, eventFeed *event.Feed, checkInterval time.Duration) *PendingTxTracker { tm := &PendingTxTracker{ db: db, routeExecutionStorage: storage.NewDB(db), trackedTxDB: NewDB(db), rpcClient: rpcClient, eventFeed: eventFeed, rpcFilter: rpcFilter, logger: logutils.ZapLogger().Named("PendingTxTracker"), } tm.taskRunner = NewConditionalRepeater(checkInterval, func(ctx context.Context) bool { return tm.fetchAndUpdateDB(ctx) }) return tm } type txStatusRes struct { Status TxStatus hash eth.Hash } func (tm *PendingTxTracker) fetchAndUpdateDB(ctx context.Context) bool { res := WorkNotDone txs, err := tm.GetAllPending() if err != nil { tm.logger.Error("Failed to get pending transactions", zap.Error(err)) return WorkDone } tm.logger.Debug("Checking for PT status", zap.Int("count", len(txs))) txsMap := make(map[common.ChainID][]eth.Hash) for _, tx := range txs { chainID := tx.ChainID txsMap[chainID] = append(txsMap[chainID], tx.Hash) } doneCount := 0 // Batch request for each chain for chainID, txs := range txsMap { tm.logger.Debug("Processing PTs", zap.Stringer("chainID", chainID), zap.Int("count", len(txs))) batchRes, err := fetchBatchTxStatus(ctx, tm.rpcClient, chainID, txs, tm.logger) if err != nil { tm.logger.Error("Failed to batch fetch pending transactions status for", zap.Stringer("chainID", chainID), zap.Error(err)) continue } if len(batchRes) == 0 { tm.logger.Debug("No change to PTs status", zap.Stringer("chainID", chainID)) continue } tm.logger.Debug("PTs done", zap.Stringer("chainID", chainID), zap.Int("count", len(batchRes))) doneCount += len(batchRes) updateRes, err := tm.updateDBStatus(ctx, chainID, batchRes) if err != nil { tm.logger.Error("Failed to update pending transactions status for", zap.Stringer("chainID", chainID), zap.Error(err)) continue } tm.logger.Debug("Emit notifications for PTs", zap.Stringer("chainID", chainID), zap.Int("count", len(updateRes))) tm.emitNotifications(chainID, updateRes) } if len(txs) == doneCount { res = WorkDone } tm.logger.Debug("Done PTs iteration", zap.Int("count", doneCount), zap.Bool("completed", res)) return res } type nullableReceipt struct { *types.Receipt } func (nr *nullableReceipt) UnmarshalJSON(data []byte) error { transactionNotAvailable := (string(data) == "null") if transactionNotAvailable { return nil } return json.Unmarshal(data, &nr.Receipt) } // fetchBatchTxStatus returns not pending transactions (confirmed or errored) // it excludes the still pending or errored request from the result func fetchBatchTxStatus(ctx context.Context, rpcClient rpc.ClientInterface, chainID common.ChainID, hashes []eth.Hash, logger *zap.Logger) ([]txStatusRes, error) { chainClient, err := rpcClient.AbstractEthClient(chainID) if err != nil { logger.Error("Failed to get chain client", zap.Error(err)) return nil, err } reqCtx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() batch := make([]ethrpc.BatchElem, 0, len(hashes)) for _, hash := range hashes { batch = append(batch, ethrpc.BatchElem{ Method: GetTransactionReceiptRPCName, Args: []interface{}{hash}, Result: new(nullableReceipt), }) } err = chainClient.BatchCallContext(reqCtx, batch) if err != nil { logger.Error("Transactions request fail", zap.Error(err)) return nil, err } res := make([]txStatusRes, 0, len(batch)) for i, b := range batch { err := b.Error if err != nil { logger.Error("Failed to get transaction", zap.Stringer("hash", hashes[i]), zap.Error(err)) continue } if b.Result == nil { logger.Error("Transaction not found", zap.Stringer("hash", hashes[i])) continue } receiptWrapper, ok := b.Result.(*nullableReceipt) if !ok { logger.Error("Failed to cast transaction receipt", zap.Stringer("hash", hashes[i])) continue } if receiptWrapper == nil || receiptWrapper.Receipt == nil { // the transaction is not available yet continue } receipt := receiptWrapper.Receipt isPending := receipt != nil && receipt.BlockNumber == nil if !isPending { var status TxStatus if receipt.Status == types.ReceiptStatusSuccessful { status = Success } else { status = Failed } res = append(res, txStatusRes{ hash: hashes[i], Status: status, }) } } return res, nil } // updateDBStatus returns entries that were updated only func (tm *PendingTxTracker) updateDBStatus(ctx context.Context, chainID common.ChainID, statuses []txStatusRes) ([]txStatusRes, error) { for _, br := range statuses { err := tm.trackedTxDB.UpdateTxStatus( TxIdentity{ ChainID: chainID, Hash: br.hash, }, br.Status, ) if err != nil { tm.logger.Error("Failed to update trackedTx status", zap.Stringer("hash", br.hash), zap.Error(err)) continue } } res := make([]txStatusRes, 0, len(statuses)) tx, err := tm.db.BeginTx(ctx, nil) if err != nil { return nil, fmt.Errorf("failed to begin transaction: %w", err) } updateStmt, err := tx.PrepareContext(ctx, `UPDATE pending_transactions SET status = ? WHERE network_id = ? AND hash = ?`) if err != nil { rollErr := tx.Rollback() if rollErr != nil { err = fmt.Errorf("failed to rollback transaction due to: %w", err) } return nil, fmt.Errorf("failed to prepare update statement: %w", err) } checkAutoDelStmt, err := tx.PrepareContext(ctx, `SELECT auto_delete FROM pending_transactions WHERE network_id = ? AND hash = ?`) if err != nil { rollErr := tx.Rollback() if rollErr != nil { err = fmt.Errorf("failed to rollback transaction: %w", err) } return nil, fmt.Errorf("failed to prepare auto delete statement: %w", err) } notifyFunctions := make([]func(), 0, len(statuses)) for _, br := range statuses { row := checkAutoDelStmt.QueryRowContext(ctx, chainID, br.hash) var autoDel bool err = row.Scan(&autoDel) if err != nil { if err == sql.ErrNoRows { tm.logger.Warn("Missing entry while checking for auto_delete", zap.Stringer("hash", br.hash)) } else { tm.logger.Error("Failed to retrieve auto_delete for pending transaction", zap.Stringer("hash", br.hash), zap.Error(err)) } continue } if autoDel { notifyFn, err := tm.DeleteBySQLTx(tx, chainID, br.hash) if err != nil && err != ErrStillPending { tm.logger.Error("Failed to delete pending transaction", zap.Stringer("hash", br.hash), zap.Error(err)) continue } notifyFunctions = append(notifyFunctions, notifyFn) } else { // If the entry was not deleted, update the status txStatus := br.Status res, err := updateStmt.ExecContext(ctx, txStatus, chainID, br.hash) if err != nil { tm.logger.Error("Failed to update pending transaction status", zap.Stringer("hash", br.hash), zap.Error(err)) continue } affected, err := res.RowsAffected() if err != nil { tm.logger.Error("Failed to get updated rows", zap.Stringer("hash", br.hash), zap.Error(err)) continue } if affected == 0 { tm.logger.Warn("Missing entry to update for", zap.Stringer("hash", br.hash)) continue } } res = append(res, br) } err = tx.Commit() if err != nil { return nil, fmt.Errorf("failed to commit transaction: %w", err) } for _, fn := range notifyFunctions { fn() } return res, nil } func (tm *PendingTxTracker) updateTxDetails(txDetails *TxDetails, chainID uint64, txHash ethTypes.Hash) { if txDetails == nil { txDetails = &TxDetails{} } txDetails.SendDetails = &responses.SendDetails{} routeData, err := tm.routeExecutionStorage.GetRouteDataByHash(chainID, txHash) if err != nil { tm.logger.Warn("Missing tx data ", zap.Stringer("hash", txHash), zap.Error(err)) } if routeData != nil { if routeData.RouteInputParams != nil { txDetails.SendDetails.UpdateFields(*routeData.RouteInputParams) } for _, pd := range routeData.PathsData { if pd.IsApprovalPlaced() && pd.ApprovalTxData.SentHash == txHash { txDetails.SentTransactions = append(txDetails.SentTransactions, responses.NewRouterSentTransaction( pd.ApprovalTxData.TxArgs, pd.ApprovalTxData.SentHash, true)) } if pd.IsTxPlaced() && pd.TxData.SentHash == txHash { txDetails.SentTransactions = append(txDetails.SentTransactions, responses.NewRouterSentTransaction( pd.TxData.TxArgs, pd.TxData.SentHash, false)) } } } } func (tm *PendingTxTracker) emitNotifications(chainID common.ChainID, changes []txStatusRes) { if tm.eventFeed != nil { for _, change := range changes { payload := StatusChangedPayload{ TxIdentity: TxIdentity{ ChainID: chainID, Hash: change.hash, }, Status: change.Status, } tm.updateTxDetails(&payload.TxDetails, chainID.ToUint(), ethTypes.Hash(change.hash)) jsonPayload, err := json.Marshal(payload) if err != nil { tm.logger.Error("Failed to marshal pending transaction status", zap.Stringer("hash", change.hash), zap.Error(err)) continue } tm.eventFeed.Send(walletevent.Event{ Type: EventPendingTransactionStatusChanged, ChainID: uint64(chainID), Message: string(jsonPayload), }) } } } // PendingTransaction called with autoDelete = false will keep the transaction in the database until it is confirmed by the caller using Delete func (tm *PendingTxTracker) TrackPendingTransaction(chainID common.ChainID, hash eth.Hash, from eth.Address, to eth.Address, trType PendingTrxType, autoDelete AutoDeleteType, additionalData string) error { err := tm.addPendingAndNotify(&PendingTransaction{ ChainID: chainID, Hash: hash, From: from, To: to, Timestamp: uint64(time.Now().Unix()), Type: trType, AutoDelete: &autoDelete, AdditionalData: additionalData, }) if err != nil { return err } tm.taskRunner.RunUntilDone() return nil } func (tm *PendingTxTracker) Start() error { tm.taskRunner.RunUntilDone() return nil } // APIs returns a list of new APIs. func (tm *PendingTxTracker) APIs() []ethrpc.API { return []ethrpc.API{ { Namespace: "pending", Version: "0.1.0", Service: tm, Public: true, }, } } // Protocols returns a new protocols list. In this case, there are none. func (tm *PendingTxTracker) Protocols() []p2p.Protocol { return []p2p.Protocol{} } func (tm *PendingTxTracker) Stop() error { tm.taskRunner.Stop() return nil } type PendingTrxType string const ( RegisterENS PendingTrxType = "RegisterENS" ReleaseENS PendingTrxType = "ReleaseENS" SetPubKey PendingTrxType = "SetPubKey" BuyStickerPack PendingTrxType = "BuyStickerPack" WalletTransfer PendingTrxType = "WalletTransfer" DeployCommunityToken PendingTrxType = "DeployCommunityToken" AirdropCommunityToken PendingTrxType = "AirdropCommunityToken" RemoteDestructCollectible PendingTrxType = "RemoteDestructCollectible" BurnCommunityToken PendingTrxType = "BurnCommunityToken" DeployOwnerToken PendingTrxType = "DeployOwnerToken" SetSignerPublicKey PendingTrxType = "SetSignerPublicKey" WalletConnectTransfer PendingTrxType = "WalletConnectTransfer" ) type PendingTransaction struct { Hash eth.Hash `json:"hash"` Timestamp uint64 `json:"timestamp"` Value bigint.BigInt `json:"value"` From eth.Address `json:"from"` To eth.Address `json:"to"` Data string `json:"data"` Symbol string `json:"symbol"` GasPrice bigint.BigInt `json:"gasPrice"` GasLimit bigint.BigInt `json:"gasLimit"` Type PendingTrxType `json:"type"` AdditionalData string `json:"additionalData"` ChainID common.ChainID `json:"network_id"` MultiTransactionID wallet_common.MultiTransactionIDType `json:"multi_transaction_id"` Nonce uint64 `json:"nonce"` // nil will insert the default value (Pending) in DB Status *TxStatus `json:"status,omitempty"` // nil will insert the default value (true) in DB AutoDelete *bool `json:"autoDelete,omitempty"` } const selectFromPending = `SELECT hash, timestamp, value, from_address, to_address, data, symbol, gas_price, gas_limit, type, additional_data, network_id, COALESCE(multi_transaction_id, 0), status, auto_delete, nonce FROM pending_transactions ` func rowsToTransactions(rows *sql.Rows) (transactions []*PendingTransaction, err error) { for rows.Next() { transaction := &PendingTransaction{ Value: bigint.BigInt{Int: new(big.Int)}, GasPrice: bigint.BigInt{Int: new(big.Int)}, GasLimit: bigint.BigInt{Int: new(big.Int)}, } transaction.Status = new(TxStatus) transaction.AutoDelete = new(bool) err := rows.Scan(&transaction.Hash, &transaction.Timestamp, (*bigint.SQLBigIntBytes)(transaction.Value.Int), &transaction.From, &transaction.To, &transaction.Data, &transaction.Symbol, (*bigint.SQLBigIntBytes)(transaction.GasPrice.Int), (*bigint.SQLBigIntBytes)(transaction.GasLimit.Int), &transaction.Type, &transaction.AdditionalData, &transaction.ChainID, &transaction.MultiTransactionID, transaction.Status, transaction.AutoDelete, &transaction.Nonce, ) if err != nil { return nil, err } transactions = append(transactions, transaction) } return transactions, nil } func (tm *PendingTxTracker) GetAllPending() ([]*PendingTransaction, error) { if tm.db == nil { return nil, errors.New("database is not initialized") } rows, err := tm.db.Query(selectFromPending+"WHERE status = ?", Pending) if err != nil { return nil, err } defer rows.Close() return rowsToTransactions(rows) } func (tm *PendingTxTracker) GetPendingByAddress(chainIDs []uint64, address eth.Address) ([]*PendingTransaction, error) { if len(chainIDs) == 0 { return nil, errors.New("GetPendingByAddress: at least 1 chainID is required") } inVector := strings.Repeat("?, ", len(chainIDs)-1) + "?" var parameters []interface{} for _, c := range chainIDs { parameters = append(parameters, c) } parameters = append(parameters, address) rows, err := tm.db.Query(fmt.Sprintf(selectFromPending+"WHERE network_id in (%s) AND from_address = ?", inVector), parameters...) if err != nil { return nil, err } defer rows.Close() return rowsToTransactions(rows) } // GetPendingEntry returns sql.ErrNoRows if no pending transaction is found for the given identity func (tm *PendingTxTracker) GetPendingEntry(chainID common.ChainID, hash eth.Hash) (*PendingTransaction, error) { rows, err := tm.db.Query(selectFromPending+"WHERE network_id = ? AND hash = ?", chainID, hash) if err != nil { return nil, err } defer rows.Close() trs, err := rowsToTransactions(rows) if err != nil { return nil, err } if len(trs) == 0 { return nil, sql.ErrNoRows } return trs[0], nil } func (tm *PendingTxTracker) CountPendingTxsFromNonce(chainID common.ChainID, address eth.Address, nonce uint64) (pendingTx uint64, err error) { err = tm.db.QueryRow(` SELECT COUNT(nonce) FROM pending_transactions WHERE network_id = ? AND from_address = ? AND nonce >= ?`, chainID, address, nonce). Scan(&pendingTx) return } // StoreAndTrackPendingTx store the details of a pending transaction and track it until it is mined func (tm *PendingTxTracker) StoreAndTrackPendingTx(transaction *PendingTransaction) error { err := tm.addPendingAndNotify(transaction) if err != nil { return err } tm.taskRunner.RunUntilDone() return err } func (tm *PendingTxTracker) addPending(transaction *PendingTransaction) error { err := tm.trackedTxDB.PutTx(TrackedTx{ ID: TxIdentity{ ChainID: transaction.ChainID, Hash: transaction.Hash, }, Status: Pending, Timestamp: transaction.Timestamp, }) if err != nil { return err } var notifyFn func() tx, err := tm.db.Begin() if err != nil { return err } defer func() { if err == nil { err = tx.Commit() if notifyFn != nil { notifyFn() } return } _ = tx.Rollback() }() exists := true var hash eth.Hash err = tx.QueryRow(` SELECT hash FROM pending_transactions WHERE network_id = ? AND from_address = ? AND nonce = ? `, transaction.ChainID, transaction.From, transaction.Nonce). Scan(&hash) if err != nil { if err == sql.ErrNoRows { exists = false } else { return err } } if exists { notifyFn, err = tm.DeleteBySQLTx(tx, transaction.ChainID, hash) if err != nil && err != ErrStillPending { return err } } // TODO: maybe we should think of making (network_id, from_address, nonce) as primary key instead (network_id, hash) ???? var insert *sql.Stmt insert, err = tx.Prepare(`INSERT OR REPLACE INTO pending_transactions (network_id, hash, timestamp, value, from_address, to_address, data, symbol, gas_price, gas_limit, type, additional_data, multi_transaction_id, status, auto_delete, nonce) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? , ?, ?)`) if err != nil { return err } defer insert.Close() _, err = insert.Exec( transaction.ChainID, transaction.Hash, transaction.Timestamp, (*bigint.SQLBigIntBytes)(transaction.Value.Int), transaction.From, transaction.To, transaction.Data, transaction.Symbol, (*bigint.SQLBigIntBytes)(transaction.GasPrice.Int), (*bigint.SQLBigIntBytes)(transaction.GasLimit.Int), transaction.Type, transaction.AdditionalData, transaction.MultiTransactionID, transaction.Status, transaction.AutoDelete, transaction.Nonce, ) return err } func (tm *PendingTxTracker) addPendingAndNotify(transaction *PendingTransaction) error { err := tm.addPending(transaction) // Notify listeners of new pending transaction (used in activity history) if err == nil { tm.notifyPendingTransactionListeners(PendingTxUpdatePayload{ TxIdentity: TxIdentity{ ChainID: transaction.ChainID, Hash: transaction.Hash, }, Deleted: false, }, []eth.Address{transaction.From, transaction.To}, transaction.Timestamp) } if tm.rpcFilter != nil { tm.rpcFilter.TriggerTransactionSentToUpstreamEvent(&rpcfilters.PendingTxInfo{ Hash: transaction.Hash, Type: string(transaction.Type), From: transaction.From, ChainID: uint64(transaction.ChainID), }) } return err } func (tm *PendingTxTracker) notifyPendingTransactionListeners(payload PendingTxUpdatePayload, addresses []eth.Address, timestamp uint64) { tm.updateTxDetails(&payload.TxDetails, payload.ChainID.ToUint(), ethTypes.Hash(payload.Hash)) jsonPayload, err := json.Marshal(payload) if err != nil { tm.logger.Error("Failed to marshal PendingTxUpdatePayload", zap.Stringer("hash", payload.Hash), zap.Error(err)) return } if tm.eventFeed != nil { tm.eventFeed.Send(walletevent.Event{ Type: EventPendingTransactionUpdate, ChainID: uint64(payload.ChainID), Accounts: addresses, At: int64(timestamp), Message: string(jsonPayload), }) } } // DeleteBySQLTx returns ErrStillPending if the transaction is still pending func (tm *PendingTxTracker) DeleteBySQLTx(tx *sql.Tx, chainID common.ChainID, hash eth.Hash) (notify func(), err error) { row := tx.QueryRow(`SELECT from_address, to_address, timestamp, status FROM pending_transactions WHERE network_id = ? AND hash = ?`, chainID, hash) var from, to eth.Address var timestamp uint64 var status TxStatus err = row.Scan(&from, &to, ×tamp, &status) if err != nil { return nil, err } _, err = tx.Exec(`DELETE FROM pending_transactions WHERE network_id = ? AND hash = ?`, chainID, hash) if err != nil { return nil, err } if err == nil && status == Pending { err = ErrStillPending } return func() { tm.notifyPendingTransactionListeners(PendingTxUpdatePayload{ TxIdentity: TxIdentity{ ChainID: chainID, Hash: hash, }, Deleted: true, }, []eth.Address{from, to}, timestamp) }, err } // GetOwnedPendingStatus returns sql.ErrNoRows if no pending transaction is found for the given identity func GetOwnedPendingStatus(tx *sql.Tx, chainID common.ChainID, hash eth.Hash, ownerAddress eth.Address) (txType *PendingTrxType, mTID *int64, err error) { row := tx.QueryRow(`SELECT type, multi_transaction_id FROM pending_transactions WHERE network_id = ? AND hash = ? AND from_address = ?`, chainID, hash, ownerAddress) txType = new(PendingTrxType) mTID = new(int64) err = row.Scan(txType, mTID) if err != nil { return nil, nil, err } return txType, mTID, nil } // Watch returns sql.ErrNoRows if no pending transaction is found for the given identity // tx.Status is not nill if err is nil func (tm *PendingTxTracker) Watch(ctx context.Context, chainID common.ChainID, hash eth.Hash) (*TxStatus, error) { tx, err := tm.GetPendingEntry(chainID, hash) if err != nil { return nil, err } return tx.Status, nil } // Delete returns ErrStillPending if the deleted transaction was still pending // The transactions are suppose to be deleted by the client only after they are confirmed func (tm *PendingTxTracker) Delete(ctx context.Context, chainID common.ChainID, transactionHash eth.Hash) error { tx, err := tm.db.BeginTx(ctx, nil) if err != nil { return fmt.Errorf("failed to begin transaction: %w", err) } notifyFn, err := tm.DeleteBySQLTx(tx, chainID, transactionHash) if err != nil && err != ErrStillPending { rollErr := tx.Rollback() if rollErr != nil { return fmt.Errorf("failed to rollback transaction due to error: %w", err) } return err } commitErr := tx.Commit() if commitErr != nil { return fmt.Errorf("failed to commit transaction: %w", commitErr) } notifyFn() return err }