From fb6e2a41f73c2ac37a9d055d77825df32dafb3fd Mon Sep 17 00:00:00 2001 From: Dario Gabriel Lipicar Date: Tue, 13 Jun 2023 11:06:36 -0300 Subject: [PATCH] chore(wallet): reorganize multitx processing code --- services/wallet/transfer/commands.go | 118 ++++++++++++++------------- 1 file changed, 60 insertions(+), 58 deletions(-) diff --git a/services/wallet/transfer/commands.go b/services/wallet/transfer/commands.go index 6b78743c8..c0911e6bd 100644 --- a/services/wallet/transfer/commands.go +++ b/services/wallet/transfer/commands.go @@ -63,6 +63,8 @@ type ethHistoricalCommand struct { threadLimit uint32 } +type Transaction []*Transfer + func (c *ethHistoricalCommand) Command() async.Command { return async.FiniteCommand{ Interval: 5 * time.Second, @@ -393,6 +395,7 @@ func (c *transfersCommand) Run(ctx context.Context) (err error) { err = c.processMultiTransactions(ctx, allTransfers) if err != nil { + log.Error("processMultiTransactions error", "error", err) return err } @@ -435,77 +438,76 @@ func (c *transfersCommand) Run(ctx context.Context) (err error) { return nil } -func (c *transfersCommand) checkAndProcessPendingMultiTx(subTx *Transfer) (MultiTransactionIDType, error) { - // Update MultiTransactionID from pending entry - entry, err := c.transactionManager.GetPendingEntry(c.chainClient.ChainID, subTx.ID) - if err == nil { - // Propagate the MultiTransactionID, in case the pending entry was a multi-transaction - return entry.MultiTransactionID, nil - } else if err != sql.ErrNoRows { - log.Error("GetPendingEntry error", "error", err) - return NoMultiTransactionID, err +// Mark all subTxs of a given Tx with the same multiTxID +func setMultiTxID(tx Transaction, multiTxID MultiTransactionIDType) { + for _, subTx := range tx { + subTx.MultiTransactionID = multiTxID } - - return NoMultiTransactionID, nil } -func (c *transfersCommand) checkAndProcessSwapMultiTx(ctx context.Context, subTx *Transfer) (MultiTransactionIDType, error) { - switch subTx.Type { - // If the Tx contains any uniswapV2Swap/uniswapV3Swap subTx, generate a Swap multiTx - case w_common.UniswapV2Swap, w_common.UniswapV3Swap: - multiTransaction, err := buildUniswapSwapMultitransaction(ctx, c.chainClient, c.tokenManager, subTx) - if err != nil { - return NoMultiTransactionID, err - } - - if multiTransaction != nil { - id, err := c.transactionManager.InsertMultiTransaction(multiTransaction) - if err != nil { - return NoMultiTransactionID, err - } - return id, nil +func (c *transfersCommand) propagatePendingMultiTx(tx Transaction) error { + multiTxID := NoMultiTransactionID + // If any subTx matches a pending entry, mark all of them with the corresponding multiTxID + for _, subTx := range tx { + // Update MultiTransactionID from pending entry + entry, err := c.transactionManager.GetPendingEntry(c.chainClient.ChainID, subTx.ID) + if err == nil { + // Propagate the MultiTransactionID, in case the pending entry was a multi-transaction + multiTxID = entry.MultiTransactionID + break + } else if err != sql.ErrNoRows { + log.Error("GetPendingEntry error", "error", err) + return err } } - return NoMultiTransactionID, nil + if multiTxID != NoMultiTransactionID { + setMultiTxID(tx, multiTxID) + } + return nil +} + +func (c *transfersCommand) checkAndProcessSwapMultiTx(ctx context.Context, tx Transaction) (bool, error) { + for _, subTx := range tx { + switch subTx.Type { + // If the Tx contains any uniswapV2Swap/uniswapV3Swap subTx, generate a Swap multiTx + case w_common.UniswapV2Swap, w_common.UniswapV3Swap: + multiTransaction, err := buildUniswapSwapMultitransaction(ctx, c.chainClient, c.tokenManager, subTx) + if err != nil { + return false, err + } + + if multiTransaction != nil { + id, err := c.transactionManager.InsertMultiTransaction(multiTransaction) + if err != nil { + return false, err + } + setMultiTxID(tx, id) + return true, nil + } + } + } + + return false, nil } func (c *transfersCommand) processMultiTransactions(ctx context.Context, allTransfers []Transfer) error { - subTxsByTxHash := subTransactionsByTxHash(allTransfers) + txByTxHash := subTransactionListToTransactionsByTxHash(allTransfers) // Detect / Generate multitransactions // Iterate over all detected transactions - for _, subTxs := range subTxsByTxHash { - multiTxID := NoMultiTransactionID + for _, tx := range txByTxHash { var err error - // Iterate over transaction's subtransactions - for _, subTx := range subTxs { - if subTx.MultiTransactionID == NoMultiTransactionID { - // First check every subTX for pending transaction - multiTxID, err = c.checkAndProcessPendingMultiTx(subTx) - if err != nil { - return err - } - if multiTxID != NoMultiTransactionID { - break - } - - // Then check for a Swap transaction - multiTxID, err = c.checkAndProcessSwapMultiTx(ctx, subTx) - if err != nil { - return err - } - if multiTxID != NoMultiTransactionID { - break - } - } + // First check for pre-existing pending transaction + err = c.propagatePendingMultiTx(tx) + if err != nil { + return err } - // Mark all subTxs of a given Tx with the same multiTxID - if multiTxID != NoMultiTransactionID { - for _, subTx := range subTxs { - subTx.MultiTransactionID = multiTxID - } + // Then check for a Swap transaction + _, err = c.checkAndProcessSwapMultiTx(ctx, tx) + if err != nil { + return err } } @@ -879,8 +881,8 @@ func uniquePreloadedTransactionPerTxHash(allTransactions []*PreloadedTransaction } // Organize subTransactions by Transaction Hash -func subTransactionsByTxHash(subTransactions []Transfer) map[common.Hash][]*Transfer { - rst := map[common.Hash][]*Transfer{} +func subTransactionListToTransactionsByTxHash(subTransactions []Transfer) map[common.Hash]Transaction { + rst := map[common.Hash]Transaction{} for index := range subTransactions { subTx := &subTransactions[index]