feat(wallet)_: store route execution data to db

This commit is contained in:
Dario Gabriel Lipicar 2024-10-16 02:09:56 -03:00 committed by dlipicar
parent f15c64ced3
commit cfcef92e64
14 changed files with 994 additions and 58 deletions

View File

@ -61,6 +61,10 @@ func ZeroBigIntValue() *big.Int {
return big.NewInt(0) return big.NewInt(0)
} }
func ZeroHash() ethCommon.Hash {
return ethCommon.Hash{}
}
func (c ChainID) String() string { func (c ChainID) String() string {
return strconv.FormatUint(uint64(c), 10) return strconv.FormatUint(uint64(c), 10)
} }

View File

@ -2,11 +2,16 @@ package routeexecution
import ( import (
"context" "context"
"database/sql"
"time" "time"
"go.uber.org/zap"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/status-im/status-go/eth-node/types" "github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/logutils"
status_common "github.com/status-im/status-go/common" status_common "github.com/status-im/status-go/common"
statusErrors "github.com/status-im/status-go/errors" statusErrors "github.com/status-im/status-go/errors"
@ -23,16 +28,26 @@ type Manager struct {
router *router.Router router *router.Router
transactionManager *transfer.TransactionManager transactionManager *transfer.TransactionManager
transferController *transfer.Controller transferController *transfer.Controller
db *DB
// Local data used for storage purposes
buildInputParams *requests.RouterBuildTransactionsParams
} }
func NewManager(router *router.Router, transactionManager *transfer.TransactionManager, transferController *transfer.Controller) *Manager { func NewManager(walletDB *sql.DB, router *router.Router, transactionManager *transfer.TransactionManager, transferController *transfer.Controller) *Manager {
return &Manager{ return &Manager{
router: router, router: router,
transactionManager: transactionManager, transactionManager: transactionManager,
transferController: transferController, transferController: transferController,
db: NewDB(walletDB),
} }
} }
func (m *Manager) clearLocalRouteData() {
m.buildInputParams = nil
m.transactionManager.ClearLocalRouterTransactionsData()
}
func (m *Manager) BuildTransactionsFromRoute(ctx context.Context, buildInputParams *requests.RouterBuildTransactionsParams) { func (m *Manager) BuildTransactionsFromRoute(ctx context.Context, buildInputParams *requests.RouterBuildTransactionsParams) {
go func() { go func() {
defer status_common.LogOnPanic() defer status_common.LogOnPanic()
@ -48,7 +63,7 @@ func (m *Manager) BuildTransactionsFromRoute(ctx context.Context, buildInputPara
defer func() { defer func() {
if err != nil { if err != nil {
m.transactionManager.ClearLocalRouterTransactionsData() m.clearLocalRouteData()
err = statusErrors.CreateErrorResponseFromError(err) err = statusErrors.CreateErrorResponseFromError(err)
response.SendDetails.ErrorResponse = err.(*statusErrors.ErrorResponse) response.SendDetails.ErrorResponse = err.(*statusErrors.ErrorResponse)
} }
@ -62,6 +77,8 @@ func (m *Manager) BuildTransactionsFromRoute(ctx context.Context, buildInputPara
return return
} }
m.buildInputParams = buildInputParams
updateFields(response.SendDetails, routeInputParams) updateFields(response.SendDetails, routeInputParams)
// notify client that sending transactions started (has 3 steps, building txs, signing txs, sending txs) // notify client that sending transactions started (has 3 steps, building txs, signing txs, sending txs)
@ -108,7 +125,7 @@ func (m *Manager) SendRouterTransactionsWithSignatures(ctx context.Context, send
} }
if clearLocalData { if clearLocalData {
m.transactionManager.ClearLocalRouterTransactionsData() m.clearLocalRouteData()
} }
if err != nil { if err != nil {
@ -163,6 +180,20 @@ func (m *Manager) SendRouterTransactionsWithSignatures(ctx context.Context, send
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
response.SentTransactions, err = m.transactionManager.SendRouterTransactions(ctx, multiTx) response.SentTransactions, err = m.transactionManager.SendRouterTransactions(ctx, multiTx)
if err != nil {
log.Error("Error sending router transactions", "error", err)
// TODO #16556: Handle partially successful Tx sends?
// Don't return, store whichever transactions were successfully sent
}
// don't overwrite err since we want to process it in the deferred function
var tmpErr error
routerTransactions := m.transactionManager.GetRouterTransactions()
routeData := NewRouteData(&routeInputParams, m.buildInputParams, routerTransactions)
tmpErr = m.db.PutRouteData(routeData)
if tmpErr != nil {
log.Error("Error storing route data", "error", tmpErr)
}
var ( var (
chainIDs []uint64 chainIDs []uint64
@ -173,13 +204,17 @@ func (m *Manager) SendRouterTransactionsWithSignatures(ctx context.Context, send
addresses = append(addresses, common.Address(tx.FromAddress)) addresses = append(addresses, common.Address(tx.FromAddress))
go func(chainId uint64, txHash common.Hash) { go func(chainId uint64, txHash common.Hash) {
defer status_common.LogOnPanic() defer status_common.LogOnPanic()
err = m.transactionManager.WatchTransaction(context.Background(), chainId, txHash) tmpErr = m.transactionManager.WatchTransaction(context.Background(), chainId, txHash)
if err != nil { if tmpErr != nil {
logutils.ZapLogger().Error("Error watching transaction", zap.Error(tmpErr))
return return
} }
}(tx.FromChain, common.Hash(tx.Hash)) }(tx.FromChain, common.Hash(tx.Hash))
} }
err = m.transferController.CheckRecentHistory(chainIDs, addresses) tmpErr = m.transferController.CheckRecentHistory(chainIDs, addresses)
if tmpErr != nil {
logutils.ZapLogger().Error("Error checking recent history", zap.Error(tmpErr))
}
}() }()
} }

View File

@ -0,0 +1,373 @@
package routeexecution
import (
"database/sql"
sq "github.com/Masterminds/squirrel"
"github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/services/wallet/requests"
"github.com/status-im/status-go/services/wallet/router/routes"
"github.com/status-im/status-go/sqlite"
)
type DB struct {
db *sql.DB
}
func NewDB(db *sql.DB) *DB {
return &DB{db: db}
}
func (db *DB) PutRouteData(routeData *RouteData) (err error) {
var tx *sql.Tx
tx, err = db.db.Begin()
if err != nil {
return err
}
defer func() {
if err == nil {
err = tx.Commit()
return
}
_ = tx.Rollback()
}()
if err = putRouteInputParams(tx, routeData.RouteInputParams); err != nil {
return
}
if err = putBuildTxParams(tx, routeData.BuildInputParams); err != nil {
return
}
if err = putPathsData(tx, routeData.RouteInputParams.Uuid, routeData.PathsData); err != nil {
return
}
return
}
func (db *DB) GetRouteData(uuid string) (*RouteData, error) {
return getRouteData(db.db, uuid)
}
func putRouteInputParams(creator sqlite.StatementCreator, p *requests.RouteInputParams) error {
q := sq.Replace("route_input_parameters").
SetMap(sq.Eq{"route_input_params_json": &sqlite.JSONBlob{Data: p}})
query, args, err := q.ToSql()
if err != nil {
return err
}
stmt, err := creator.Prepare(query)
if err != nil {
return err
}
defer stmt.Close()
_, err = stmt.Exec(args...)
return err
}
func putBuildTxParams(creator sqlite.StatementCreator, p *requests.RouterBuildTransactionsParams) error {
q := sq.Replace("route_build_tx_parameters").
SetMap(sq.Eq{"route_build_tx_params_json": &sqlite.JSONBlob{Data: p}})
query, args, err := q.ToSql()
if err != nil {
return err
}
stmt, err := creator.Prepare(query)
if err != nil {
return err
}
defer stmt.Close()
_, err = stmt.Exec(args...)
return err
}
func putPathsData(creator sqlite.StatementCreator, uuid string, d []*PathData) error {
for i, pathData := range d {
if err := putPathData(creator, uuid, i, pathData); err != nil {
return err
}
}
return nil
}
func putPathData(creator sqlite.StatementCreator, uuid string, pathIdx int, d *PathData) (err error) {
err = putPath(creator, uuid, pathIdx, d.Path)
if err != nil {
return
}
for txIdx, txData := range d.TransactionsData {
err = putPathTransaction(creator, uuid, pathIdx, txIdx, txData)
if err != nil {
return
}
err = putSentTransaction(creator, txData)
if err != nil {
return
}
}
return
}
func putPath(
creator sqlite.StatementCreator,
uuid string,
pathIdx int,
p *routes.Path) error {
q := sq.Replace("route_paths").
SetMap(sq.Eq{"uuid": uuid, "path_idx": pathIdx, "path_json": &sqlite.JSONBlob{Data: p}})
query, args, err := q.ToSql()
if err != nil {
return err
}
stmt, err := creator.Prepare(query)
if err != nil {
return err
}
defer stmt.Close()
_, err = stmt.Exec(args...)
return err
}
func putPathTransaction(
creator sqlite.StatementCreator,
uuid string,
pathIdx int,
txIdx int,
txData *TransactionData,
) error {
q := sq.Replace("route_path_transactions").
SetMap(sq.Eq{
"uuid": uuid,
"path_idx": pathIdx,
"tx_idx": txIdx,
"is_approval": txData.IsApproval,
"chain_id": txData.ChainID,
"tx_hash": txData.TxHash[:],
"tx_args_json": &sqlite.JSONBlob{Data: txData.TxArgs},
})
query, args, err := q.ToSql()
if err != nil {
return err
}
stmt, err := creator.Prepare(query)
if err != nil {
return err
}
defer stmt.Close()
_, err = stmt.Exec(args...)
return err
}
func putSentTransaction(
creator sqlite.StatementCreator,
txData *TransactionData,
) error {
q := sq.Replace("sent_transactions").
SetMap(sq.Eq{
"chain_id": txData.ChainID,
"tx_hash": txData.TxHash[:],
"tx_json": &sqlite.JSONBlob{Data: txData.Tx},
})
query, args, err := q.ToSql()
if err != nil {
return err
}
stmt, err := creator.Prepare(query)
if err != nil {
return err
}
defer stmt.Close()
_, err = stmt.Exec(args...)
return err
}
func getRouteData(creator sqlite.StatementCreator, uuid string) (*RouteData, error) {
routeInputParams, err := getRouteInputParams(creator, uuid)
if err != nil {
return nil, err
}
buildTxParams, err := getBuildTxParams(creator, uuid)
if err != nil {
return nil, err
}
pathsData, err := getPathsData(creator, uuid)
if err != nil {
return nil, err
}
return &RouteData{
RouteInputParams: routeInputParams,
BuildInputParams: buildTxParams,
PathsData: pathsData,
}, nil
}
func getRouteInputParams(creator sqlite.StatementCreator, uuid string) (*requests.RouteInputParams, error) {
var p requests.RouteInputParams
q := sq.Select("route_input_params_json").
From("route_input_parameters").
Where(sq.Eq{"uuid": uuid})
query, args, err := q.ToSql()
if err != nil {
return nil, err
}
stmt, err := creator.Prepare(query)
if err != nil {
return nil, err
}
defer stmt.Close()
err = stmt.QueryRow(args...).Scan(&sqlite.JSONBlob{Data: &p})
return &p, err
}
func getBuildTxParams(creator sqlite.StatementCreator, uuid string) (*requests.RouterBuildTransactionsParams, error) {
var p requests.RouterBuildTransactionsParams
q := sq.Select("route_build_tx_params_json").
From("route_build_tx_parameters").
Where(sq.Eq{"uuid": uuid})
query, args, err := q.ToSql()
if err != nil {
return nil, err
}
stmt, err := creator.Prepare(query)
if err != nil {
return nil, err
}
defer stmt.Close()
err = stmt.QueryRow(args...).Scan(&sqlite.JSONBlob{Data: &p})
return &p, err
}
func getPathsData(creator sqlite.StatementCreator, uuid string) ([]*PathData, error) {
var pathsData []*PathData
paths, err := getPaths(creator, uuid)
if err != nil {
return nil, err
}
for pathIdx, path := range paths {
pathData := &PathData{Path: path}
txs, err := getPathTransactions(creator, uuid, pathIdx)
if err != nil {
return nil, err
}
pathData.TransactionsData = txs
pathsData = append(pathsData, pathData)
}
return pathsData, nil
}
func getPaths(creator sqlite.StatementCreator, uuid string) ([]*routes.Path, error) {
var paths []*routes.Path
q := sq.Select("path_json").
From("route_paths").
Where(sq.Eq{"uuid": uuid}).
OrderBy("path_idx ASC")
query, args, err := q.ToSql()
if err != nil {
return nil, err
}
stmt, err := creator.Prepare(query)
if err != nil {
return nil, err
}
defer stmt.Close()
rows, err := stmt.Query(args...)
if err != nil {
return nil, err
}
defer rows.Close()
for rows.Next() {
var p routes.Path
err = rows.Scan(&sqlite.JSONBlob{Data: &p})
if err != nil {
return nil, err
}
paths = append(paths, &p)
}
return paths, nil
}
func getPathTransactions(creator sqlite.StatementCreator, uuid string, pathIdx int) ([]*TransactionData, error) {
txs := make([]*TransactionData, 0, 2)
q := sq.Select("rpt.is_approval", "rpt.chain_id", "rpt.tx_hash", "rpt.tx_args_json", "st.tx_json").
From("route_path_transactions rpt").
LeftJoin(`sent_transactions st ON
rpt.chain_id = st.chain_id AND
rpt.tx_hash = st.tx_hash`).
Where(sq.Eq{"rpt.uuid": uuid, "rpt.path_idx": pathIdx}).
OrderBy("rpt.tx_idx ASC")
query, args, err := q.ToSql()
if err != nil {
return nil, err
}
stmt, err := creator.Prepare(query)
if err != nil {
return nil, err
}
defer stmt.Close()
rows, err := stmt.Query(args...)
if err != nil {
return nil, err
}
defer rows.Close()
for rows.Next() {
var tx TransactionData
var txHash sql.RawBytes
err = rows.Scan(&tx.IsApproval, &tx.ChainID, &txHash, &sqlite.JSONBlob{Data: &tx.TxArgs}, &sqlite.JSONBlob{Data: &tx.Tx})
if err != nil {
return nil, err
}
if len(txHash) > 0 {
tx.TxHash = types.BytesToHash(txHash)
}
txs = append(txs, &tx)
}
return txs, nil
}

File diff suppressed because one or more lines are too long

View File

@ -0,0 +1,48 @@
package routeexecution_test
import (
"testing"
"github.com/status-im/status-go/services/wallet/routeexecution"
"github.com/status-im/status-go/t/helpers"
"github.com/status-im/status-go/walletdatabase"
"github.com/stretchr/testify/require"
)
func Test_PutRouteData(t *testing.T) {
testData := []dbTestData{
createDBTestData("USDTSwapApprove", getUSDTSwapApproveDBTestData(), getUSDTSwapTxDBTestData()), // After placing the Swap Tx, we expect to get info for both txs
createDBTestData("USDTSwapTx", getUSDTSwapTxDBTestData(), getUSDTSwapTxDBTestData()),
createDBTestData("ETHSwapTx", getETHSwapTxDBTestData(), getETHSwapTxDBTestData()),
createDBTestData("ETHBridgeTx", getETHBridgeTxDBTestData(), getETHBridgeTxDBTestData()),
createDBTestData("USDTSendTx", getUSDTSendTxDBTestData(), getUSDTSendTxDBTestData()),
}
walletDB, closeFn, err := helpers.SetupTestSQLDB(walletdatabase.DbInitializer{}, "routeexecution-tests")
require.NoError(t, err)
defer func() {
require.NoError(t, closeFn())
}()
routeDB := routeexecution.NewDB(walletDB)
for _, tt := range testData {
t.Run("Put_"+tt.name, func(t *testing.T) {
insertedParams := tt.insertedParams
routeData := routeexecution.NewRouteData(&insertedParams.routeInputParams, insertedParams.buildInputParams, insertedParams.transactionDetails)
err := routeDB.PutRouteData(routeData)
require.NoError(t, err)
})
}
for _, tt := range testData {
t.Run("Get_"+tt.name, func(t *testing.T) {
expectedParams := tt.expectedParams
routeData := routeexecution.NewRouteData(&expectedParams.routeInputParams, expectedParams.buildInputParams, expectedParams.transactionDetails)
readRouteData, err := routeDB.GetRouteData(routeData.RouteInputParams.Uuid)
require.NoError(t, err)
require.EqualExportedValues(t, routeData, readRouteData)
})
}
}

View File

@ -0,0 +1,78 @@
package routeexecution
import (
ethTypes "github.com/ethereum/go-ethereum/core/types"
"github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/services/wallet/requests"
"github.com/status-im/status-go/services/wallet/router/routes"
"github.com/status-im/status-go/services/wallet/transfer"
"github.com/status-im/status-go/transactions"
)
// These structs oontain all route execution data
// that's stored to the DB
type RouteData struct {
RouteInputParams *requests.RouteInputParams
BuildInputParams *requests.RouterBuildTransactionsParams
PathsData []*PathData
}
type PathData struct {
Path *routes.Path
TransactionsData []*TransactionData
}
type TransactionData struct {
ChainID uint64
TxHash types.Hash
IsApproval bool
TxArgs *transactions.SendTxArgs
Tx *ethTypes.Transaction
}
func NewRouteData(routeInputParams *requests.RouteInputParams,
buildInputParams *requests.RouterBuildTransactionsParams,
transactionDetails []*transfer.RouterTransactionDetails) *RouteData {
pathDataPerProcessorName := make(map[string]*PathData)
pathsData := make([]*PathData, 0, len(transactionDetails))
for _, td := range transactionDetails {
transactionsData := make([]*TransactionData, 0, 2)
if td.IsApprovalPlaced() {
transactionsData = append(transactionsData, &TransactionData{
ChainID: td.RouterPath.FromChain.ChainID,
TxHash: td.ApprovalTxSentHash,
IsApproval: true,
TxArgs: td.ApprovalTxArgs,
Tx: td.ApprovalTx,
})
}
if td.IsTxPlaced() {
transactionsData = append(transactionsData, &TransactionData{
ChainID: td.RouterPath.FromChain.ChainID,
TxHash: td.TxSentHash,
IsApproval: false,
TxArgs: td.TxArgs,
Tx: td.Tx,
})
}
var pathData *PathData
var ok bool
if pathData, ok = pathDataPerProcessorName[td.RouterPath.ProcessorName]; !ok {
pathData = &PathData{
Path: td.RouterPath,
TransactionsData: make([]*TransactionData, 0, 2),
}
pathsData = append(pathsData, pathData)
pathDataPerProcessorName[td.RouterPath.ProcessorName] = pathData
}
pathData.TransactionsData = append(pathData.TransactionsData, transactionsData...)
}
return &RouteData{
RouteInputParams: routeInputParams,
BuildInputParams: buildInputParams,
PathsData: pathsData,
}
}

View File

@ -196,7 +196,7 @@ func NewService(
router.AddPathProcessor(processor) router.AddPathProcessor(processor)
} }
routeExecutionManager := routeexecution.NewManager(router, transactionManager, transferController) routeExecutionManager := routeexecution.NewManager(db, router, transactionManager, transferController)
return &Service{ return &Service{
db: db, db: db,

View File

@ -44,17 +44,25 @@ type TransactionDescription struct {
} }
type RouterTransactionDetails struct { type RouterTransactionDetails struct {
routerPath *routes.Path RouterPath *routes.Path
txArgs *transactions.SendTxArgs TxArgs *transactions.SendTxArgs
tx *ethTypes.Transaction Tx *ethTypes.Transaction
txHashToSign types.Hash TxHashToSign types.Hash
txSignature []byte TxSignature []byte
txSentHash types.Hash TxSentHash types.Hash
approvalTxArgs *transactions.SendTxArgs ApprovalTxArgs *transactions.SendTxArgs
approvalTx *ethTypes.Transaction ApprovalTx *ethTypes.Transaction
approvalHashToSign types.Hash ApprovalHashToSign types.Hash
approvalSignature []byte ApprovalSignature []byte
approvalTxSentHash types.Hash ApprovalTxSentHash types.Hash
}
func (rtd *RouterTransactionDetails) IsTxPlaced() bool {
return rtd.TxSentHash != types.Hash(wallet_common.ZeroHash())
}
func (rtd *RouterTransactionDetails) IsApprovalPlaced() bool {
return rtd.ApprovalTxSentHash != types.Hash(wallet_common.ZeroHash())
} }
type TransactionManager struct { type TransactionManager struct {

View File

@ -34,8 +34,8 @@ func (tm *TransactionManager) ClearLocalRouterTransactionsData() {
func (tm *TransactionManager) ApprovalRequiredForPath(pathProcessorName string) bool { func (tm *TransactionManager) ApprovalRequiredForPath(pathProcessorName string) bool {
for _, desc := range tm.routerTransactions { for _, desc := range tm.routerTransactions {
if desc.routerPath.ProcessorName == pathProcessorName && if desc.RouterPath.ProcessorName == pathProcessorName &&
desc.routerPath.ApprovalRequired { desc.RouterPath.ApprovalRequired {
return true return true
} }
} }
@ -44,8 +44,7 @@ func (tm *TransactionManager) ApprovalRequiredForPath(pathProcessorName string)
func (tm *TransactionManager) ApprovalPlacedForPath(pathProcessorName string) bool { func (tm *TransactionManager) ApprovalPlacedForPath(pathProcessorName string) bool {
for _, desc := range tm.routerTransactions { for _, desc := range tm.routerTransactions {
if desc.routerPath.ProcessorName == pathProcessorName && if desc.RouterPath.ProcessorName == pathProcessorName && desc.IsApprovalPlaced() {
desc.approvalTxSentHash != (types.Hash{}) {
return true return true
} }
} }
@ -54,8 +53,7 @@ func (tm *TransactionManager) ApprovalPlacedForPath(pathProcessorName string) bo
func (tm *TransactionManager) TxPlacedForPath(pathProcessorName string) bool { func (tm *TransactionManager) TxPlacedForPath(pathProcessorName string) bool {
for _, desc := range tm.routerTransactions { for _, desc := range tm.routerTransactions {
if desc.routerPath.ProcessorName == pathProcessorName && if desc.RouterPath.ProcessorName == pathProcessorName && desc.IsTxPlaced() {
desc.txSentHash != (types.Hash{}) {
return true return true
} }
} }
@ -103,10 +101,10 @@ func (tm *TransactionManager) buildApprovalTxForPath(path *routes.Path, addressF
usedNonces[path.FromChain.ChainID] = int64(usedNonce) usedNonces[path.FromChain.ChainID] = int64(usedNonce)
tm.routerTransactions = append(tm.routerTransactions, &RouterTransactionDetails{ tm.routerTransactions = append(tm.routerTransactions, &RouterTransactionDetails{
routerPath: path, RouterPath: path,
approvalTxArgs: approavalSendArgs, ApprovalTxArgs: approavalSendArgs,
approvalTx: builtApprovalTx, ApprovalTx: builtApprovalTx,
approvalHashToSign: types.Hash(approvalTxHash), ApprovalHashToSign: types.Hash(approvalTxHash),
}) })
return types.Hash(approvalTxHash), nil return types.Hash(approvalTxHash), nil
@ -193,10 +191,10 @@ func (tm *TransactionManager) buildTxForPath(path *routes.Path, pathProcessors m
usedNonces[path.FromChain.ChainID] = int64(usedNonce) usedNonces[path.FromChain.ChainID] = int64(usedNonce)
tm.routerTransactions = append(tm.routerTransactions, &RouterTransactionDetails{ tm.routerTransactions = append(tm.routerTransactions, &RouterTransactionDetails{
routerPath: path, RouterPath: path,
txArgs: sendArgs, TxArgs: sendArgs,
tx: builtTx, Tx: builtTx,
txHashToSign: types.Hash(txHash), TxHashToSign: types.Hash(txHash),
}) })
return types.Hash(txHash), nil return types.Hash(txHash), nil
@ -291,20 +289,20 @@ func (tm *TransactionManager) ValidateAndAddSignaturesToRouterTransactions(signa
// check if all transactions have been signed // check if all transactions have been signed
for _, desc := range tm.routerTransactions { for _, desc := range tm.routerTransactions {
if desc.approvalTx != nil && desc.approvalTxSentHash == (types.Hash{}) { if desc.ApprovalTx != nil && desc.ApprovalTxSentHash == (types.Hash{}) {
sig, err := getSignatureForTxHash(desc.approvalHashToSign.String(), signatures) sig, err := getSignatureForTxHash(desc.ApprovalHashToSign.String(), signatures)
if err != nil { if err != nil {
return err return err
} }
desc.approvalSignature = sig desc.ApprovalSignature = sig
} }
if desc.tx != nil && desc.txSentHash == (types.Hash{}) { if desc.Tx != nil && desc.TxSentHash == (types.Hash{}) {
sig, err := getSignatureForTxHash(desc.txHashToSign.String(), signatures) sig, err := getSignatureForTxHash(desc.TxHashToSign.String(), signatures)
if err != nil { if err != nil {
return err return err
} }
desc.txSignature = sig desc.TxSignature = sig
} }
} }
@ -316,41 +314,45 @@ func (tm *TransactionManager) SendRouterTransactions(ctx context.Context, multiT
// send transactions // send transactions
for _, desc := range tm.routerTransactions { for _, desc := range tm.routerTransactions {
if desc.approvalTx != nil && desc.approvalTxSentHash == (types.Hash{}) { if desc.ApprovalTx != nil && !desc.IsApprovalPlaced() {
var approvalTxWithSignature *ethTypes.Transaction var approvalTxWithSignature *ethTypes.Transaction
approvalTxWithSignature, err = tm.transactor.AddSignatureToTransaction(desc.approvalTxArgs.FromChainID, desc.approvalTx, desc.approvalSignature) approvalTxWithSignature, err = tm.transactor.AddSignatureToTransaction(desc.ApprovalTxArgs.FromChainID, desc.ApprovalTx, desc.ApprovalSignature)
if err != nil { if err != nil {
return nil, err return
} }
desc.approvalTxSentHash, err = tm.transactor.SendTransactionWithSignature(common.Address(desc.approvalTxArgs.From), desc.approvalTxArgs.FromTokenID, multiTx.ID, approvalTxWithSignature) desc.ApprovalTxSentHash, err = tm.transactor.SendTransactionWithSignature(common.Address(desc.ApprovalTxArgs.From), desc.ApprovalTxArgs.FromTokenID, multiTx.ID, approvalTxWithSignature)
if err != nil { if err != nil {
return nil, err return
} }
transactions = append(transactions, responses.NewRouterSentTransaction(desc.approvalTxArgs, desc.approvalTxSentHash, true)) transactions = append(transactions, responses.NewRouterSentTransaction(desc.ApprovalTxArgs, desc.ApprovalTxSentHash, true))
// if approval is needed for swap, then we need to wait for the approval tx to be mined before sending the swap tx // if approval is needed for swap, then we need to wait for the approval tx to be mined before sending the swap tx
if desc.routerPath.ProcessorName == pathprocessor.ProcessorSwapParaswapName { if desc.RouterPath.ProcessorName == pathprocessor.ProcessorSwapParaswapName {
continue continue
} }
} }
if desc.tx != nil && desc.txSentHash == (types.Hash{}) { if desc.Tx != nil && !desc.IsTxPlaced() {
var txWithSignature *ethTypes.Transaction var txWithSignature *ethTypes.Transaction
txWithSignature, err = tm.transactor.AddSignatureToTransaction(desc.txArgs.FromChainID, desc.tx, desc.txSignature) txWithSignature, err = tm.transactor.AddSignatureToTransaction(desc.TxArgs.FromChainID, desc.Tx, desc.TxSignature)
if err != nil { if err != nil {
return nil, err return
} }
desc.txSentHash, err = tm.transactor.SendTransactionWithSignature(common.Address(desc.txArgs.From), desc.txArgs.FromTokenID, multiTx.ID, txWithSignature) desc.TxSentHash, err = tm.transactor.SendTransactionWithSignature(common.Address(desc.TxArgs.From), desc.TxArgs.FromTokenID, multiTx.ID, txWithSignature)
if err != nil { if err != nil {
return nil, err return
} }
transactions = append(transactions, responses.NewRouterSentTransaction(desc.txArgs, desc.txSentHash, false)) transactions = append(transactions, responses.NewRouterSentTransaction(desc.TxArgs, desc.TxSentHash, false))
} }
} }
return return
} }
func (tm *TransactionManager) GetRouterTransactions() []*RouterTransactionDetails {
return tm.routerTransactions
}

View File

@ -75,8 +75,9 @@ type StatusChangedPayload struct {
// PendingTxTracker implements StatusService in common/status_node_service.go // PendingTxTracker implements StatusService in common/status_node_service.go
type PendingTxTracker struct { type PendingTxTracker struct {
db *sql.DB db *sql.DB
rpcClient rpc.ClientInterface trackedTxDB *DB
rpcClient rpc.ClientInterface
rpcFilter *rpcfilters.Service rpcFilter *rpcfilters.Service
eventFeed *event.Feed eventFeed *event.Feed
@ -87,11 +88,12 @@ type PendingTxTracker struct {
func NewPendingTxTracker(db *sql.DB, rpcClient rpc.ClientInterface, rpcFilter *rpcfilters.Service, eventFeed *event.Feed, checkInterval time.Duration) *PendingTxTracker { func NewPendingTxTracker(db *sql.DB, rpcClient rpc.ClientInterface, rpcFilter *rpcfilters.Service, eventFeed *event.Feed, checkInterval time.Duration) *PendingTxTracker {
tm := &PendingTxTracker{ tm := &PendingTxTracker{
db: db, db: db,
rpcClient: rpcClient, trackedTxDB: NewDB(db),
eventFeed: eventFeed, rpcClient: rpcClient,
rpcFilter: rpcFilter, eventFeed: eventFeed,
logger: logutils.ZapLogger().Named("PendingTxTracker"), rpcFilter: rpcFilter,
logger: logutils.ZapLogger().Named("PendingTxTracker"),
} }
tm.taskRunner = NewConditionalRepeater(checkInterval, func(ctx context.Context) bool { tm.taskRunner = NewConditionalRepeater(checkInterval, func(ctx context.Context) bool {
return tm.fetchAndUpdateDB(ctx) return tm.fetchAndUpdateDB(ctx)
@ -238,6 +240,20 @@ func fetchBatchTxStatus(ctx context.Context, rpcClient rpc.ClientInterface, chai
// updateDBStatus returns entries that were updated only // updateDBStatus returns entries that were updated only
func (tm *PendingTxTracker) updateDBStatus(ctx context.Context, chainID common.ChainID, statuses []txStatusRes) ([]txStatusRes, error) { func (tm *PendingTxTracker) updateDBStatus(ctx context.Context, chainID common.ChainID, statuses []txStatusRes) ([]txStatusRes, error) {
for _, br := range statuses {
err := tm.trackedTxDB.UpdateTxStatus(
TxIdentity{
ChainID: chainID,
Hash: br.hash,
},
br.Status,
)
if err != nil {
tm.logger.Error("Failed to update trackedTx status", zap.Stringer("hash", br.hash), zap.Error(err))
continue
}
}
res := make([]txStatusRes, 0, len(statuses)) res := make([]txStatusRes, 0, len(statuses))
tx, err := tm.db.BeginTx(ctx, nil) tx, err := tm.db.BeginTx(ctx, nil)
if err != nil { if err != nil {
@ -557,6 +573,18 @@ func (tm *PendingTxTracker) StoreAndTrackPendingTx(transaction *PendingTransacti
} }
func (tm *PendingTxTracker) addPending(transaction *PendingTransaction) error { func (tm *PendingTxTracker) addPending(transaction *PendingTransaction) error {
err := tm.trackedTxDB.PutTx(TrackedTx{
ID: TxIdentity{
ChainID: transaction.ChainID,
Hash: transaction.Hash,
},
Status: Pending,
Timestamp: transaction.Timestamp,
})
if err != nil {
return err
}
var notifyFn func() var notifyFn func()
tx, err := tm.db.Begin() tx, err := tm.db.Begin()
if err != nil { if err != nil {
@ -637,6 +665,7 @@ func (tm *PendingTxTracker) addPending(transaction *PendingTransaction) error {
transaction.AutoDelete, transaction.AutoDelete,
transaction.Nonce, transaction.Nonce,
) )
// Notify listeners of new pending transaction (used in activity history) // Notify listeners of new pending transaction (used in activity history)
if err == nil { if err == nil {
tm.notifyPendingTransactionListeners(PendingTxUpdatePayload{ tm.notifyPendingTransactionListeners(PendingTxUpdatePayload{

View File

@ -0,0 +1,115 @@
package transactions
import (
"database/sql"
sq "github.com/Masterminds/squirrel"
"github.com/status-im/status-go/sqlite"
)
type TrackedTx struct {
ID TxIdentity `json:"id"`
Timestamp uint64 `json:"timestamp"`
Status TxStatus `json:"status"`
}
type DB struct {
db *sql.DB
}
func NewDB(db *sql.DB) *DB {
return &DB{db: db}
}
func (db *DB) PutTx(transaction TrackedTx) (err error) {
var tx *sql.Tx
tx, err = db.db.Begin()
if err != nil {
return err
}
defer func() {
if err == nil {
err = tx.Commit()
return
}
_ = tx.Rollback()
}()
return putTx(tx, transaction)
}
func (db *DB) GetTx(txID TxIdentity) (tx TrackedTx, err error) {
q := sq.Select("chain_id", "tx_hash", "tx_status", "timestamp").
From("tracked_transactions").
Where(sq.Eq{"chain_id": txID.ChainID, "tx_hash": txID.Hash})
query, args, err := q.ToSql()
if err != nil {
return
}
row := db.db.QueryRow(query, args...)
err = row.Scan(&tx.ID.ChainID, &tx.ID.Hash, &tx.Status, &tx.Timestamp)
return
}
func putTx(creator sqlite.StatementCreator, tx TrackedTx) error {
q := sq.Replace("tracked_transactions").
Columns("chain_id", "tx_hash", "tx_status", "timestamp").
Values(tx.ID.ChainID, tx.ID.Hash, tx.Status, tx.Timestamp)
query, args, err := q.ToSql()
if err != nil {
return err
}
stmt, err := creator.Prepare(query)
if err != nil {
return err
}
defer stmt.Close()
_, err = stmt.Exec(args...)
return err
}
func (db *DB) UpdateTxStatus(txID TxIdentity, status TxStatus) (err error) {
var tx *sql.Tx
tx, err = db.db.Begin()
if err != nil {
return err
}
defer func() {
if err == nil {
err = tx.Commit()
return
}
_ = tx.Rollback()
}()
return updateTxStatus(tx, txID, status)
}
func updateTxStatus(creator sqlite.StatementCreator, txID TxIdentity, status TxStatus) error {
q := sq.Update("tracked_transactions").
Set("tx_status", status).
Where(sq.Eq{"chain_id": txID.ChainID, "tx_hash": txID.Hash})
query, args, err := q.ToSql()
if err != nil {
return err
}
stmt, err := creator.Prepare(query)
if err != nil {
return err
}
defer stmt.Close()
_, err = stmt.Exec(args...)
return err
}

View File

@ -0,0 +1,90 @@
package transactions_test
import (
"math/rand"
"strconv"
"testing"
crypto_rand "crypto/rand"
eth "github.com/ethereum/go-ethereum/common"
"github.com/status-im/status-go/services/wallet/common"
"github.com/status-im/status-go/t/helpers"
"github.com/status-im/status-go/transactions"
"github.com/status-im/status-go/walletdatabase"
"github.com/stretchr/testify/require"
)
func getRandomStatus() transactions.TxStatus {
switch rand.Intn(3) { // nolint: gosec
case 0:
return transactions.Pending
case 1:
return transactions.Success
case 2:
return transactions.Failed
}
return transactions.Pending
}
func getRandomTrackedTx() transactions.TrackedTx {
tx := transactions.TrackedTx{
ID: transactions.TxIdentity{
ChainID: common.ChainID(rand.Uint64() % 10), // nolint: gosec
Hash: eth.Hash{},
},
Timestamp: 123,
Status: getRandomStatus(),
}
_, _ = crypto_rand.Read(tx.ID.Hash[:])
return tx
}
func getTestData() []struct {
name string
tx transactions.TrackedTx
} {
testData := make([]struct {
name string
tx transactions.TrackedTx
}, 10)
for i := range testData {
testData[i].name = "test_" + strconv.Itoa(i)
testData[i].tx = getRandomTrackedTx()
}
return testData
}
func Test_PuTrackedTx(t *testing.T) {
walletDB, closeFn, err := helpers.SetupTestSQLDB(walletdatabase.DbInitializer{}, "pendingtxtracker-tests")
require.NoError(t, err)
defer func() {
require.NoError(t, closeFn())
}()
db := transactions.NewDB(walletDB)
for _, tt := range getTestData() {
t.Run(tt.name, func(t *testing.T) {
err := db.PutTx(tt.tx)
require.NoError(t, err)
readTx, err := db.GetTx(tt.tx.ID)
require.NoError(t, err)
require.EqualExportedValues(t, tt.tx, readTx)
newStatus := getRandomStatus()
err = db.UpdateTxStatus(tt.tx.ID, newStatus)
require.NoError(t, err)
readTx, err = db.GetTx(tt.tx.ID)
require.NoError(t, err)
require.Equal(t, newStatus, readTx.Status)
})
}
}

View File

@ -0,0 +1,50 @@
-- store route input parameters
CREATE TABLE IF NOT EXISTS route_input_parameters (
uuid TEXT NOT NULL AS (json_extract(route_input_params_json, '$.uuid')),
route_input_params_json JSON NOT NULL
);
CREATE UNIQUE INDEX IF NOT EXISTS idx_route_input_parameters_per_uuid ON route_input_parameters (uuid);
-- store route build tx parameters
CREATE TABLE IF NOT EXISTS route_build_tx_parameters (
uuid TEXT NOT NULL AS (json_extract(route_build_tx_params_json, '$.uuid')),
route_build_tx_params_json JSON NOT NULL,
FOREIGN KEY(uuid) REFERENCES route_input_parameters(uuid) ON DELETE CASCADE
);
CREATE UNIQUE INDEX IF NOT EXISTS idx_route_build_tx_parameters_per_uuid ON route_build_tx_parameters (uuid);
-- store route paths
CREATE TABLE IF NOT EXISTS route_paths (
uuid TEXT NOT NULL,
path_idx INTEGER NOT NULL,
path_json JSON NOT NULL,
FOREIGN KEY(uuid) REFERENCES route_input_parameters(uuid) ON DELETE CASCADE
);
CREATE UNIQUE INDEX IF NOT EXISTS idx_route_path_per_uuid_index ON route_paths (uuid, path_idx);
-- store route path transactions
CREATE TABLE IF NOT EXISTS route_path_transactions (
uuid TEXT NOT NULL,
path_idx INTEGER NOT NULL,
tx_idx INTEGER NOT NULL,
is_approval BOOLEAN NOT NULL,
chain_id UNSIGNED BIGINT NOT NULL,
tx_hash BLOB NOT NULL,
tx_args_json JSON NOT NULL,
FOREIGN KEY(uuid, path_idx) REFERENCES route_paths(uuid, path_idx) ON DELETE CASCADE
);
CREATE UNIQUE INDEX IF NOT EXISTS idx_route_path_transaction_per_uuid_path_idx_tx_idx ON route_path_transactions (uuid, path_idx, tx_idx);
CREATE UNIQUE INDEX IF NOT EXISTS idx_route_path_transaction_per_chain_id_tx_hash ON route_path_transactions (chain_id, tx_hash);
-- store sent transactions
CREATE TABLE IF NOT EXISTS sent_transactions (
chain_id UNSIGNED BIGINT NOT NULL,
tx_hash BLOB NOT NULL,
tx_json JSON NOT NULL
);
CREATE UNIQUE INDEX IF NOT EXISTS idx_sent_transactions_per_chain_id_tx_hash ON sent_transactions (chain_id, tx_hash);

View File

@ -0,0 +1,9 @@
-- store state of tracked transactions
CREATE TABLE IF NOT EXISTS tracked_transactions(
chain_id UNSIGNED BIGINT NOT NULL,
tx_hash BLOB NOT NULL,
tx_status STRING NOT NULL,
timestamp INTEGER NOT NULL
);
CREATE UNIQUE INDEX IF NOT EXISTS idx_tracked_transactions_per_chain_id_tx_hash ON tracked_transactions (chain_id, tx_hash);