610 lines
18 KiB
Go
610 lines
18 KiB
Go
package transactions
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"math/big"
|
|
"strings"
|
|
"time"
|
|
|
|
eth "github.com/ethereum/go-ethereum/common"
|
|
"github.com/ethereum/go-ethereum/event"
|
|
"github.com/ethereum/go-ethereum/log"
|
|
"github.com/ethereum/go-ethereum/p2p"
|
|
ethrpc "github.com/ethereum/go-ethereum/rpc"
|
|
|
|
"github.com/status-im/status-go/rpc"
|
|
"github.com/status-im/status-go/services/rpcfilters"
|
|
"github.com/status-im/status-go/services/wallet/bigint"
|
|
"github.com/status-im/status-go/services/wallet/common"
|
|
"github.com/status-im/status-go/services/wallet/walletevent"
|
|
)
|
|
|
|
const (
|
|
// PendingTransactionUpdate is emitted when a pending transaction is updated (added or deleted)
|
|
EventPendingTransactionUpdate walletevent.EventType = "pending-transaction-update"
|
|
// Caries StatusChangedPayload in message
|
|
EventPendingTransactionStatusChanged walletevent.EventType = "pending-transaction-status-changed"
|
|
|
|
PendingCheckInterval = 10 * time.Second
|
|
)
|
|
|
|
var (
|
|
ErrStillPending = errors.New("transaction is still pending")
|
|
)
|
|
|
|
type TxStatus = string
|
|
|
|
// Values for status column in pending_transactions
|
|
const (
|
|
Pending TxStatus = "Pending"
|
|
Done TxStatus = "Done"
|
|
)
|
|
|
|
type AutoDeleteType = bool
|
|
|
|
const (
|
|
AutoDelete AutoDeleteType = true
|
|
Keep AutoDeleteType = false
|
|
)
|
|
|
|
type StatusChangedPayload struct {
|
|
ChainID common.ChainID `json:"chainId"`
|
|
Hash eth.Hash `json:"hash"`
|
|
Status *TxStatus `json:"status,omitempty"`
|
|
}
|
|
|
|
type PendingTxTracker struct {
|
|
db *sql.DB
|
|
rpcClient rpc.ClientInterface
|
|
|
|
rpcFilter *rpcfilters.Service
|
|
eventFeed *event.Feed
|
|
|
|
taskRunner *ConditionalRepeater
|
|
log log.Logger
|
|
}
|
|
|
|
func NewPendingTxTracker(db *sql.DB, rpcClient rpc.ClientInterface, rpcFilter *rpcfilters.Service, eventFeed *event.Feed, checkInterval time.Duration) *PendingTxTracker {
|
|
tm := &PendingTxTracker{
|
|
db: db,
|
|
rpcClient: rpcClient,
|
|
eventFeed: eventFeed,
|
|
rpcFilter: rpcFilter,
|
|
log: log.New("package", "status-go/transactions.PendingTxTracker"),
|
|
}
|
|
tm.taskRunner = NewConditionalRepeater(checkInterval, func(ctx context.Context) bool {
|
|
return tm.fetchAndUpdateDB(ctx)
|
|
})
|
|
return tm
|
|
}
|
|
|
|
type txStatusRes struct {
|
|
// TODO - 11861: propagate real status
|
|
Status TxStatus
|
|
hash eth.Hash
|
|
}
|
|
|
|
func (tm *PendingTxTracker) fetchAndUpdateDB(ctx context.Context) bool {
|
|
res := WorkNotDone
|
|
|
|
txs, err := tm.GetAllPending()
|
|
if err != nil {
|
|
tm.log.Error("Failed to get pending transactions", "error", err)
|
|
return WorkDone
|
|
}
|
|
tm.log.Debug("Checking for PT status", "count", len(txs))
|
|
|
|
txsMap := make(map[common.ChainID][]eth.Hash)
|
|
for _, tx := range txs {
|
|
chainID := tx.ChainID
|
|
txsMap[chainID] = append(txsMap[chainID], tx.Hash)
|
|
}
|
|
|
|
doneCount := 0
|
|
// Batch request for each chain
|
|
for chainID, txs := range txsMap {
|
|
tm.log.Debug("Processing PTs", "chainID", chainID, "count", len(txs))
|
|
batchRes, err := fetchBatchTxStatus(ctx, tm.rpcClient, chainID, txs, tm.log)
|
|
if err != nil {
|
|
tm.log.Error("Failed to batch fetch pending transactions status for", "chainID", chainID, "error", err)
|
|
continue
|
|
}
|
|
if len(batchRes) == 0 {
|
|
tm.log.Debug("No change to PTs status", "chainID", chainID)
|
|
continue
|
|
}
|
|
tm.log.Debug("PTs done", "chainID", chainID, "count", len(batchRes))
|
|
doneCount += len(batchRes)
|
|
|
|
updateRes, err := tm.updateDBStatus(ctx, chainID, batchRes)
|
|
if err != nil {
|
|
tm.log.Error("Failed to update pending transactions status for", "chainID", chainID, "error", err)
|
|
continue
|
|
}
|
|
|
|
tm.log.Debug("Emit notifications for PTs", "chainID", chainID, "count", len(updateRes))
|
|
tm.emitNotifications(chainID, updateRes)
|
|
}
|
|
|
|
if len(txs) == doneCount {
|
|
res = WorkDone
|
|
}
|
|
|
|
tm.log.Debug("Done PTs iteration", "count", doneCount, "completed", res)
|
|
|
|
return res
|
|
}
|
|
|
|
// fetchBatchTxStatus returns not pending transactions (confirmed or errored)
|
|
// it excludes the still pending or errored request from the result
|
|
func fetchBatchTxStatus(ctx context.Context, rpcClient rpc.ClientInterface, chainID common.ChainID, hashes []eth.Hash, log log.Logger) ([]txStatusRes, error) {
|
|
chainClient, err := rpcClient.AbstractEthClient(chainID)
|
|
if err != nil {
|
|
log.Error("Failed to get chain client", "error", err)
|
|
return nil, err
|
|
}
|
|
|
|
reqCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
|
|
defer cancel()
|
|
|
|
batch := make([]ethrpc.BatchElem, 0, len(hashes))
|
|
for _, hash := range hashes {
|
|
jsonRes := make(map[string]interface{})
|
|
batch = append(batch, ethrpc.BatchElem{
|
|
Method: "eth_getTransactionByHash",
|
|
Args: []interface{}{hash},
|
|
Result: &jsonRes,
|
|
})
|
|
}
|
|
|
|
err = chainClient.BatchCallContext(reqCtx, batch)
|
|
if err != nil {
|
|
log.Error("Transactions request fail", "error", err)
|
|
return nil, err
|
|
}
|
|
|
|
res := make([]txStatusRes, 0, len(batch))
|
|
for i, b := range batch {
|
|
isPending := true
|
|
err := b.Error
|
|
if err != nil {
|
|
log.Error("Failed to get transaction", "error", err, "hash", hashes[i])
|
|
continue
|
|
} else {
|
|
jsonRes := *(b.Result.(*map[string]interface{}))
|
|
if jsonRes != nil {
|
|
if blNo, ok := jsonRes["blockNumber"]; ok {
|
|
isPending = blNo == nil
|
|
}
|
|
}
|
|
}
|
|
|
|
if !isPending {
|
|
res = append(res, txStatusRes{
|
|
hash: hashes[i],
|
|
})
|
|
}
|
|
}
|
|
return res, nil
|
|
}
|
|
|
|
// updateDBStatus returns entries that were updated only
|
|
func (tm *PendingTxTracker) updateDBStatus(ctx context.Context, chainID common.ChainID, statuses []txStatusRes) ([]eth.Hash, error) {
|
|
res := make([]eth.Hash, 0, len(statuses))
|
|
tx, err := tm.db.BeginTx(ctx, nil)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to begin transaction: %w", err)
|
|
}
|
|
|
|
updateStmt, err := tx.PrepareContext(ctx, `UPDATE pending_transactions SET status = ? WHERE network_id = ? AND hash = ?`)
|
|
if err != nil {
|
|
rollErr := tx.Rollback()
|
|
if rollErr != nil {
|
|
err = fmt.Errorf("failed to rollback transaction due to: %w", err)
|
|
}
|
|
return nil, fmt.Errorf("failed to prepare update statement: %w", err)
|
|
}
|
|
|
|
checkAutoDelStmt, err := tx.PrepareContext(ctx, `SELECT auto_delete FROM pending_transactions WHERE network_id = ? AND hash = ?`)
|
|
if err != nil {
|
|
rollErr := tx.Rollback()
|
|
if rollErr != nil {
|
|
err = fmt.Errorf("failed to rollback transaction: %w", err)
|
|
}
|
|
return nil, fmt.Errorf("failed to prepare auto delete statement: %w", err)
|
|
}
|
|
|
|
notifyFunctions := make([]func(), 0, len(statuses))
|
|
for _, br := range statuses {
|
|
row := checkAutoDelStmt.QueryRowContext(ctx, chainID, br.hash)
|
|
var autoDel bool
|
|
err = row.Scan(&autoDel)
|
|
if err != nil {
|
|
if err == sql.ErrNoRows {
|
|
tm.log.Warn("Missing entry while checking for auto_delete", "hash", br.hash)
|
|
} else {
|
|
tm.log.Error("Failed to retrieve auto_delete for pending transaction", "error", err, "hash", br.hash)
|
|
}
|
|
continue
|
|
}
|
|
|
|
if autoDel {
|
|
notifyFn, err := tm.DeleteBySQLTx(tx, chainID, br.hash)
|
|
if err != nil && err != ErrStillPending {
|
|
tm.log.Error("Failed to delete pending transaction", "error", err, "hash", br.hash)
|
|
continue
|
|
}
|
|
notifyFunctions = append(notifyFunctions, notifyFn)
|
|
} else {
|
|
// If the entry was not deleted, update the status
|
|
// TODO - #11861: fix status - `br.status`
|
|
txStatus := Done
|
|
|
|
res, err := updateStmt.ExecContext(ctx, txStatus, chainID, br.hash)
|
|
if err != nil {
|
|
tm.log.Error("Failed to update pending transaction status", "error", err, "hash", br.hash)
|
|
continue
|
|
}
|
|
affected, err := res.RowsAffected()
|
|
if err != nil {
|
|
tm.log.Error("Failed to get updated rows", "error", err, "hash", br.hash)
|
|
continue
|
|
}
|
|
|
|
if affected == 0 {
|
|
tm.log.Warn("Missing entry to update for", "hash", br.hash)
|
|
continue
|
|
}
|
|
}
|
|
|
|
res = append(res, br.hash)
|
|
}
|
|
|
|
err = tx.Commit()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to commit transaction: %w", err)
|
|
}
|
|
|
|
for _, fn := range notifyFunctions {
|
|
fn()
|
|
}
|
|
|
|
return res, nil
|
|
}
|
|
|
|
func (tm *PendingTxTracker) emitNotifications(chainID common.ChainID, changes []eth.Hash) {
|
|
if tm.eventFeed != nil {
|
|
for _, hash := range changes {
|
|
status := StatusChangedPayload{
|
|
ChainID: chainID,
|
|
Hash: hash,
|
|
// TODO - #11861: status
|
|
}
|
|
|
|
jsonPayload, err := json.Marshal(status)
|
|
if err != nil {
|
|
tm.log.Error("Failed to marshal pending transaction status", "error", err, "hash", hash)
|
|
continue
|
|
}
|
|
tm.eventFeed.Send(walletevent.Event{
|
|
Type: EventPendingTransactionStatusChanged,
|
|
ChainID: uint64(chainID),
|
|
Message: string(jsonPayload),
|
|
})
|
|
}
|
|
}
|
|
}
|
|
|
|
// PendingTransaction called with autoDelete = false will keep the transaction in the database until it is confirmed by the caller using Delete
|
|
func (tm *PendingTxTracker) TrackPendingTransaction(chainID common.ChainID, hash eth.Hash, from eth.Address, trType PendingTrxType, autoDelete AutoDeleteType) error {
|
|
err := tm.addPending(&PendingTransaction{
|
|
ChainID: chainID,
|
|
Hash: hash,
|
|
From: from,
|
|
Timestamp: uint64(time.Now().Unix()),
|
|
Type: trType,
|
|
AutoDelete: &autoDelete,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
tm.taskRunner.RunUntilDone()
|
|
|
|
return nil
|
|
}
|
|
|
|
func (tm *PendingTxTracker) Start() error {
|
|
tm.taskRunner.RunUntilDone()
|
|
return nil
|
|
}
|
|
|
|
// APIs returns a list of new APIs.
|
|
func (tm *PendingTxTracker) APIs() []ethrpc.API {
|
|
return []ethrpc.API{
|
|
{
|
|
Namespace: "pending",
|
|
Version: "0.1.0",
|
|
Service: tm,
|
|
Public: true,
|
|
},
|
|
}
|
|
}
|
|
|
|
// Protocols returns a new protocols list. In this case, there are none.
|
|
func (tm *PendingTxTracker) Protocols() []p2p.Protocol {
|
|
return []p2p.Protocol{}
|
|
}
|
|
|
|
func (tm *PendingTxTracker) Stop() error {
|
|
tm.taskRunner.Stop()
|
|
return nil
|
|
}
|
|
|
|
type PendingTrxType string
|
|
|
|
const (
|
|
RegisterENS PendingTrxType = "RegisterENS"
|
|
ReleaseENS PendingTrxType = "ReleaseENS"
|
|
SetPubKey PendingTrxType = "SetPubKey"
|
|
BuyStickerPack PendingTrxType = "BuyStickerPack"
|
|
WalletTransfer PendingTrxType = "WalletTransfer"
|
|
DeployCommunityToken PendingTrxType = "DeployCommunityToken"
|
|
AirdropCommunityToken PendingTrxType = "AirdropCommunityToken"
|
|
RemoteDestructCollectible PendingTrxType = "RemoteDestructCollectible"
|
|
BurnCommunityToken PendingTrxType = "BurnCommunityToken"
|
|
DeployOwnerToken PendingTrxType = "DeployOwnerToken"
|
|
)
|
|
|
|
type PendingTransaction struct {
|
|
Hash eth.Hash `json:"hash"`
|
|
Timestamp uint64 `json:"timestamp"`
|
|
Value bigint.BigInt `json:"value"`
|
|
From eth.Address `json:"from"`
|
|
To eth.Address `json:"to"`
|
|
Data string `json:"data"`
|
|
Symbol string `json:"symbol"`
|
|
GasPrice bigint.BigInt `json:"gasPrice"`
|
|
GasLimit bigint.BigInt `json:"gasLimit"`
|
|
Type PendingTrxType `json:"type"`
|
|
AdditionalData string `json:"additionalData"`
|
|
ChainID common.ChainID `json:"network_id"`
|
|
MultiTransactionID int64 `json:"multi_transaction_id"`
|
|
|
|
// nil will insert the default value (Pending) in DB
|
|
Status *TxStatus `json:"status,omitempty"`
|
|
// nil will insert the default value (true) in DB
|
|
AutoDelete *bool `json:"autoDelete,omitempty"`
|
|
}
|
|
|
|
const selectFromPending = `SELECT hash, timestamp, value, from_address, to_address, data,
|
|
symbol, gas_price, gas_limit, type, additional_data,
|
|
network_id, COALESCE(multi_transaction_id, 0), status, auto_delete
|
|
FROM pending_transactions
|
|
`
|
|
|
|
func rowsToTransactions(rows *sql.Rows) (transactions []*PendingTransaction, err error) {
|
|
for rows.Next() {
|
|
transaction := &PendingTransaction{
|
|
Value: bigint.BigInt{Int: new(big.Int)},
|
|
GasPrice: bigint.BigInt{Int: new(big.Int)},
|
|
GasLimit: bigint.BigInt{Int: new(big.Int)},
|
|
}
|
|
|
|
transaction.Status = new(TxStatus)
|
|
transaction.AutoDelete = new(bool)
|
|
err := rows.Scan(&transaction.Hash,
|
|
&transaction.Timestamp,
|
|
(*bigint.SQLBigIntBytes)(transaction.Value.Int),
|
|
&transaction.From,
|
|
&transaction.To,
|
|
&transaction.Data,
|
|
&transaction.Symbol,
|
|
(*bigint.SQLBigIntBytes)(transaction.GasPrice.Int),
|
|
(*bigint.SQLBigIntBytes)(transaction.GasLimit.Int),
|
|
&transaction.Type,
|
|
&transaction.AdditionalData,
|
|
&transaction.ChainID,
|
|
&transaction.MultiTransactionID,
|
|
transaction.Status,
|
|
transaction.AutoDelete,
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
transactions = append(transactions, transaction)
|
|
}
|
|
return transactions, nil
|
|
}
|
|
|
|
func (tm *PendingTxTracker) GetAllPending() ([]*PendingTransaction, error) {
|
|
rows, err := tm.db.Query(selectFromPending+"WHERE status = ?", Pending)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
return rowsToTransactions(rows)
|
|
}
|
|
|
|
func (tm *PendingTxTracker) GetPendingByAddress(chainIDs []uint64, address eth.Address) ([]*PendingTransaction, error) {
|
|
if len(chainIDs) == 0 {
|
|
return nil, errors.New("GetPendingByAddress: at least 1 chainID is required")
|
|
}
|
|
|
|
inVector := strings.Repeat("?, ", len(chainIDs)-1) + "?"
|
|
var parameters []interface{}
|
|
for _, c := range chainIDs {
|
|
parameters = append(parameters, c)
|
|
}
|
|
|
|
parameters = append(parameters, address)
|
|
|
|
rows, err := tm.db.Query(fmt.Sprintf(selectFromPending+"WHERE network_id in (%s) AND from_address = ?", inVector), parameters...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
return rowsToTransactions(rows)
|
|
}
|
|
|
|
// GetPendingEntry returns sql.ErrNoRows if no pending transaction is found for the given identity
|
|
func (tm *PendingTxTracker) GetPendingEntry(chainID common.ChainID, hash eth.Hash) (*PendingTransaction, error) {
|
|
rows, err := tm.db.Query(selectFromPending+"WHERE network_id = ? AND hash = ?", chainID, hash)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
trs, err := rowsToTransactions(rows)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if len(trs) == 0 {
|
|
return nil, sql.ErrNoRows
|
|
}
|
|
return trs[0], nil
|
|
}
|
|
|
|
// StoreAndTrackPendingTx store the details of a pending transaction and track it until it is mined
|
|
func (tm *PendingTxTracker) StoreAndTrackPendingTx(transaction *PendingTransaction) error {
|
|
err := tm.addPending(transaction)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
tm.taskRunner.RunUntilDone()
|
|
|
|
return err
|
|
}
|
|
|
|
func (tm *PendingTxTracker) addPending(transaction *PendingTransaction) error {
|
|
insert, err := tm.db.Prepare(`INSERT OR REPLACE INTO pending_transactions
|
|
(network_id, hash, timestamp, value, from_address, to_address,
|
|
data, symbol, gas_price, gas_limit, type, additional_data, multi_transaction_id, status, auto_delete)
|
|
VALUES
|
|
(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? , ?)`)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
_, err = insert.Exec(
|
|
transaction.ChainID,
|
|
transaction.Hash,
|
|
transaction.Timestamp,
|
|
(*bigint.SQLBigIntBytes)(transaction.Value.Int),
|
|
transaction.From,
|
|
transaction.To,
|
|
transaction.Data,
|
|
transaction.Symbol,
|
|
(*bigint.SQLBigIntBytes)(transaction.GasPrice.Int),
|
|
(*bigint.SQLBigIntBytes)(transaction.GasLimit.Int),
|
|
transaction.Type,
|
|
transaction.AdditionalData,
|
|
transaction.MultiTransactionID,
|
|
transaction.Status,
|
|
transaction.AutoDelete,
|
|
)
|
|
// Notify listeners of new pending transaction (used in activity history)
|
|
if err == nil {
|
|
tm.notifyPendingTransactionListeners(transaction.ChainID, []eth.Address{transaction.From, transaction.To}, transaction.Timestamp)
|
|
}
|
|
if tm.rpcFilter != nil {
|
|
tm.rpcFilter.TriggerTransactionSentToUpstreamEvent(&rpcfilters.PendingTxInfo{
|
|
Hash: transaction.Hash,
|
|
Type: string(transaction.Type),
|
|
From: transaction.From,
|
|
ChainID: uint64(transaction.ChainID),
|
|
})
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (tm *PendingTxTracker) notifyPendingTransactionListeners(chainID common.ChainID, addresses []eth.Address, timestamp uint64) {
|
|
if tm.eventFeed != nil {
|
|
tm.eventFeed.Send(walletevent.Event{
|
|
Type: EventPendingTransactionUpdate,
|
|
ChainID: uint64(chainID),
|
|
Accounts: addresses,
|
|
At: int64(timestamp),
|
|
})
|
|
}
|
|
}
|
|
|
|
// DeleteBySQLTx returns ErrStillPending if the transaction is still pending
|
|
func (tm *PendingTxTracker) DeleteBySQLTx(tx *sql.Tx, chainID common.ChainID, hash eth.Hash) (notify func(), err error) {
|
|
row := tx.QueryRow(`SELECT from_address, to_address, timestamp, status FROM pending_transactions WHERE network_id = ? AND hash = ?`, chainID, hash)
|
|
var from, to eth.Address
|
|
var timestamp uint64
|
|
var status TxStatus
|
|
err = row.Scan(&from, &to, ×tamp, &status)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
_, err = tx.Exec(`DELETE FROM pending_transactions WHERE network_id = ? AND hash = ?`, chainID, hash)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err == nil && status == Pending {
|
|
err = ErrStillPending
|
|
}
|
|
return func() {
|
|
tm.notifyPendingTransactionListeners(chainID, []eth.Address{from, to}, timestamp)
|
|
}, err
|
|
}
|
|
|
|
func GetTransferData(tx *sql.Tx, chainID common.ChainID, hash eth.Hash) (txType *PendingTrxType, MTID *int64, err error) {
|
|
row := tx.QueryRow(`SELECT type, multi_transaction_id FROM pending_transactions WHERE network_id = ? AND hash = ?`, chainID, hash, txType)
|
|
txType = new(PendingTrxType)
|
|
MTID = new(int64)
|
|
err = row.Scan(txType, MTID)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
return txType, MTID, nil
|
|
}
|
|
|
|
// Watch returns sql.ErrNoRows if no pending transaction is found for the given identity
|
|
// tx.Status is not nill if err is nil
|
|
func (tm *PendingTxTracker) Watch(ctx context.Context, chainID common.ChainID, hash eth.Hash) (*TxStatus, error) {
|
|
tx, err := tm.GetPendingEntry(chainID, hash)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return tx.Status, nil
|
|
}
|
|
|
|
// Delete returns ErrStillPending if the deleted transaction was still pending
|
|
// The transactions are suppose to be deleted by the client only after they are confirmed
|
|
func (tm *PendingTxTracker) Delete(ctx context.Context, chainID common.ChainID, transactionHash eth.Hash) error {
|
|
tx, err := tm.db.BeginTx(ctx, nil)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to begin transaction: %w", err)
|
|
}
|
|
|
|
notifyFn, err := tm.DeleteBySQLTx(tx, chainID, transactionHash)
|
|
if err != nil && err != ErrStillPending {
|
|
rollErr := tx.Rollback()
|
|
if rollErr != nil {
|
|
return fmt.Errorf("failed to rollback transaction due to error: %w", err)
|
|
}
|
|
return err
|
|
}
|
|
|
|
commitErr := tx.Commit()
|
|
if commitErr != nil {
|
|
return fmt.Errorf("failed to commit transaction: %w", commitErr)
|
|
}
|
|
notifyFn()
|
|
return err
|
|
}
|