chore(wallet): reorganize multitx processing code

This commit is contained in:
Dario Gabriel Lipicar 2023-06-13 11:06:36 -03:00 committed by dlipicar
parent bc92df79d5
commit fb6e2a41f7
1 changed files with 60 additions and 58 deletions

View File

@ -63,6 +63,8 @@ type ethHistoricalCommand struct {
threadLimit uint32 threadLimit uint32
} }
type Transaction []*Transfer
func (c *ethHistoricalCommand) Command() async.Command { func (c *ethHistoricalCommand) Command() async.Command {
return async.FiniteCommand{ return async.FiniteCommand{
Interval: 5 * time.Second, Interval: 5 * time.Second,
@ -393,6 +395,7 @@ func (c *transfersCommand) Run(ctx context.Context) (err error) {
err = c.processMultiTransactions(ctx, allTransfers) err = c.processMultiTransactions(ctx, allTransfers)
if err != nil { if err != nil {
log.Error("processMultiTransactions error", "error", err)
return err return err
} }
@ -435,77 +438,76 @@ func (c *transfersCommand) Run(ctx context.Context) (err error) {
return nil return nil
} }
func (c *transfersCommand) checkAndProcessPendingMultiTx(subTx *Transfer) (MultiTransactionIDType, error) { // Mark all subTxs of a given Tx with the same multiTxID
// Update MultiTransactionID from pending entry func setMultiTxID(tx Transaction, multiTxID MultiTransactionIDType) {
entry, err := c.transactionManager.GetPendingEntry(c.chainClient.ChainID, subTx.ID) for _, subTx := range tx {
if err == nil { subTx.MultiTransactionID = multiTxID
// Propagate the MultiTransactionID, in case the pending entry was a multi-transaction
return entry.MultiTransactionID, nil
} else if err != sql.ErrNoRows {
log.Error("GetPendingEntry error", "error", err)
return NoMultiTransactionID, err
} }
return NoMultiTransactionID, nil
} }
func (c *transfersCommand) checkAndProcessSwapMultiTx(ctx context.Context, subTx *Transfer) (MultiTransactionIDType, error) { func (c *transfersCommand) propagatePendingMultiTx(tx Transaction) error {
switch subTx.Type { multiTxID := NoMultiTransactionID
// If the Tx contains any uniswapV2Swap/uniswapV3Swap subTx, generate a Swap multiTx // If any subTx matches a pending entry, mark all of them with the corresponding multiTxID
case w_common.UniswapV2Swap, w_common.UniswapV3Swap: for _, subTx := range tx {
multiTransaction, err := buildUniswapSwapMultitransaction(ctx, c.chainClient, c.tokenManager, subTx) // Update MultiTransactionID from pending entry
if err != nil { entry, err := c.transactionManager.GetPendingEntry(c.chainClient.ChainID, subTx.ID)
return NoMultiTransactionID, err if err == nil {
} // Propagate the MultiTransactionID, in case the pending entry was a multi-transaction
multiTxID = entry.MultiTransactionID
if multiTransaction != nil { break
id, err := c.transactionManager.InsertMultiTransaction(multiTransaction) } else if err != sql.ErrNoRows {
if err != nil { log.Error("GetPendingEntry error", "error", err)
return NoMultiTransactionID, err return err
}
return id, nil
} }
} }
return NoMultiTransactionID, nil if multiTxID != NoMultiTransactionID {
setMultiTxID(tx, multiTxID)
}
return nil
}
func (c *transfersCommand) checkAndProcessSwapMultiTx(ctx context.Context, tx Transaction) (bool, error) {
for _, subTx := range tx {
switch subTx.Type {
// If the Tx contains any uniswapV2Swap/uniswapV3Swap subTx, generate a Swap multiTx
case w_common.UniswapV2Swap, w_common.UniswapV3Swap:
multiTransaction, err := buildUniswapSwapMultitransaction(ctx, c.chainClient, c.tokenManager, subTx)
if err != nil {
return false, err
}
if multiTransaction != nil {
id, err := c.transactionManager.InsertMultiTransaction(multiTransaction)
if err != nil {
return false, err
}
setMultiTxID(tx, id)
return true, nil
}
}
}
return false, nil
} }
func (c *transfersCommand) processMultiTransactions(ctx context.Context, allTransfers []Transfer) error { func (c *transfersCommand) processMultiTransactions(ctx context.Context, allTransfers []Transfer) error {
subTxsByTxHash := subTransactionsByTxHash(allTransfers) txByTxHash := subTransactionListToTransactionsByTxHash(allTransfers)
// Detect / Generate multitransactions // Detect / Generate multitransactions
// Iterate over all detected transactions // Iterate over all detected transactions
for _, subTxs := range subTxsByTxHash { for _, tx := range txByTxHash {
multiTxID := NoMultiTransactionID
var err error var err error
// Iterate over transaction's subtransactions // First check for pre-existing pending transaction
for _, subTx := range subTxs { err = c.propagatePendingMultiTx(tx)
if subTx.MultiTransactionID == NoMultiTransactionID { if err != nil {
// First check every subTX for pending transaction return err
multiTxID, err = c.checkAndProcessPendingMultiTx(subTx)
if err != nil {
return err
}
if multiTxID != NoMultiTransactionID {
break
}
// Then check for a Swap transaction
multiTxID, err = c.checkAndProcessSwapMultiTx(ctx, subTx)
if err != nil {
return err
}
if multiTxID != NoMultiTransactionID {
break
}
}
} }
// Mark all subTxs of a given Tx with the same multiTxID // Then check for a Swap transaction
if multiTxID != NoMultiTransactionID { _, err = c.checkAndProcessSwapMultiTx(ctx, tx)
for _, subTx := range subTxs { if err != nil {
subTx.MultiTransactionID = multiTxID return err
}
} }
} }
@ -879,8 +881,8 @@ func uniquePreloadedTransactionPerTxHash(allTransactions []*PreloadedTransaction
} }
// Organize subTransactions by Transaction Hash // Organize subTransactions by Transaction Hash
func subTransactionsByTxHash(subTransactions []Transfer) map[common.Hash][]*Transfer { func subTransactionListToTransactionsByTxHash(subTransactions []Transfer) map[common.Hash]Transaction {
rst := map[common.Hash][]*Transfer{} rst := map[common.Hash]Transaction{}
for index := range subTransactions { for index := range subTransactions {
subTx := &subTransactions[index] subTx := &subTransactions[index]