Prevent from completing tx multiple times (#330)
This commit prevents from completing (i.e. sending) the same transaction multiple times.
This commit is contained in:
parent
9eee21f1ca
commit
79f744954c
|
@ -150,13 +150,14 @@ type QueuedTxID string
|
||||||
|
|
||||||
// QueuedTx holds enough information to complete the queued transaction.
|
// QueuedTx holds enough information to complete the queued transaction.
|
||||||
type QueuedTx struct {
|
type QueuedTx struct {
|
||||||
ID QueuedTxID
|
ID QueuedTxID
|
||||||
Hash common.Hash
|
Hash common.Hash
|
||||||
Context context.Context
|
Context context.Context
|
||||||
Args SendTxArgs
|
Args SendTxArgs
|
||||||
Done chan struct{}
|
InProgress bool // true if transaction is being sent
|
||||||
Discard chan struct{}
|
Done chan struct{}
|
||||||
Err error
|
Discard chan struct{}
|
||||||
|
Err error
|
||||||
}
|
}
|
||||||
|
|
||||||
// SendTxArgs represents the arguments to submit a new transaction into the transaction pool.
|
// SendTxArgs represents the arguments to submit a new transaction into the transaction pool.
|
||||||
|
|
|
@ -7,6 +7,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/accounts/keystore"
|
"github.com/ethereum/go-ethereum/accounts/keystore"
|
||||||
|
gethcommon "github.com/ethereum/go-ethereum/common"
|
||||||
"github.com/status-im/status-go/geth/common"
|
"github.com/status-im/status-go/geth/common"
|
||||||
"github.com/status-im/status-go/geth/log"
|
"github.com/status-im/status-go/geth/log"
|
||||||
)
|
)
|
||||||
|
@ -21,10 +22,12 @@ const (
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
ErrQueuedTxIDNotFound = errors.New("transaction hash not found")
|
ErrQueuedTxIDNotFound = errors.New("transaction hash not found")
|
||||||
ErrQueuedTxTimedOut = errors.New("transaction sending timed out")
|
ErrQueuedTxTimedOut = errors.New("transaction sending timed out")
|
||||||
ErrQueuedTxDiscarded = errors.New("transaction has been discarded")
|
ErrQueuedTxDiscarded = errors.New("transaction has been discarded")
|
||||||
ErrInvalidCompleteTxSender = errors.New("transaction can only be completed by the same account which created it")
|
ErrQueuedTxInProgress = errors.New("transaction is in progress")
|
||||||
|
ErrQueuedTxAlreadyProcessed = errors.New("transaction has been already processed")
|
||||||
|
ErrInvalidCompleteTxSender = errors.New("transaction can only be completed by the same account which created it")
|
||||||
)
|
)
|
||||||
|
|
||||||
// TxQueue is capped container that holds pending transactions
|
// TxQueue is capped container that holds pending transactions
|
||||||
|
@ -191,6 +194,33 @@ func (q *TxQueue) Remove(id common.QueuedTxID) {
|
||||||
delete(q.transactions, id)
|
delete(q.transactions, id)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// StartProcessing marks a transaction as in progress. It's thread-safe and
|
||||||
|
// prevents from processing the same transaction multiple times.
|
||||||
|
func (q *TxQueue) StartProcessing(tx *common.QueuedTx) error {
|
||||||
|
q.mu.Lock()
|
||||||
|
defer q.mu.Unlock()
|
||||||
|
|
||||||
|
if tx.Hash != (gethcommon.Hash{}) || tx.Err != nil {
|
||||||
|
return ErrQueuedTxAlreadyProcessed
|
||||||
|
}
|
||||||
|
|
||||||
|
if tx.InProgress {
|
||||||
|
return ErrQueuedTxInProgress
|
||||||
|
}
|
||||||
|
|
||||||
|
tx.InProgress = true
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// StopProcessing removes the "InProgress" flag from the transaction.
|
||||||
|
func (q *TxQueue) StopProcessing(tx *common.QueuedTx) {
|
||||||
|
q.mu.Lock()
|
||||||
|
defer q.mu.Unlock()
|
||||||
|
|
||||||
|
tx.InProgress = false
|
||||||
|
}
|
||||||
|
|
||||||
// Count returns number of currently queued transactions
|
// Count returns number of currently queued transactions
|
||||||
func (q *TxQueue) Count() int {
|
func (q *TxQueue) Count() int {
|
||||||
q.mu.RLock()
|
q.mu.RLock()
|
||||||
|
|
|
@ -138,6 +138,11 @@ func (m *TxQueueManager) CompleteTransaction(id common.QueuedTxID, password stri
|
||||||
return gethcommon.Hash{}, err
|
return gethcommon.Hash{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err := m.txQueue.StartProcessing(queuedTx); err != nil {
|
||||||
|
return gethcommon.Hash{}, err
|
||||||
|
}
|
||||||
|
defer m.txQueue.StopProcessing(queuedTx)
|
||||||
|
|
||||||
selectedAccount, err := m.accountManager.SelectedAccount()
|
selectedAccount, err := m.accountManager.SelectedAccount()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warn("failed to get a selected account", "err", err)
|
log.Warn("failed to get a selected account", "err", err)
|
||||||
|
@ -179,7 +184,7 @@ func (m *TxQueueManager) CompleteTransaction(id common.QueuedTxID, password stri
|
||||||
|
|
||||||
queuedTx.Hash = hash
|
queuedTx.Hash = hash
|
||||||
queuedTx.Err = txErr
|
queuedTx.Err = txErr
|
||||||
queuedTx.Done <- struct{}{} // sendTransaction() waits on this, notify so that it can return
|
queuedTx.Done <- struct{}{}
|
||||||
|
|
||||||
return hash, txErr
|
return hash, txErr
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,6 +3,7 @@ package node
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/accounts/keystore"
|
"github.com/ethereum/go-ethereum/accounts/keystore"
|
||||||
|
@ -92,6 +93,69 @@ func (s *TxQueueTestSuite) TestCompleteTransaction() {
|
||||||
s.False(txQueueManager.TransactionQueue().Has(tx.ID))
|
s.False(txQueueManager.TransactionQueue().Has(tx.ID))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *TxQueueTestSuite) TestCompleteTransactionMultipleTimes() {
|
||||||
|
s.accountManagerMock.EXPECT().SelectedAccount().Return(&common.SelectedExtKey{
|
||||||
|
Address: common.FromAddress(TestConfig.Account1.Address),
|
||||||
|
}, nil)
|
||||||
|
|
||||||
|
s.nodeManagerMock.EXPECT().NodeConfig().Return(
|
||||||
|
params.NewNodeConfig("/tmp", params.RopstenNetworkID, true),
|
||||||
|
)
|
||||||
|
|
||||||
|
// TODO(adam): StatusBackend as an interface would allow a better solution.
|
||||||
|
// As we want to avoid network connection, we mock LES with a known error
|
||||||
|
// and treat as success.
|
||||||
|
s.nodeManagerMock.EXPECT().LightEthereumService().Return(nil, errTxAssumedSent)
|
||||||
|
|
||||||
|
txQueueManager := NewTxQueueManager(s.nodeManagerMock, s.accountManagerMock)
|
||||||
|
|
||||||
|
txQueueManager.Start()
|
||||||
|
defer txQueueManager.Stop()
|
||||||
|
|
||||||
|
tx := txQueueManager.CreateTransaction(context.Background(), common.SendTxArgs{
|
||||||
|
From: common.FromAddress(TestConfig.Account1.Address),
|
||||||
|
To: common.ToAddress(TestConfig.Account2.Address),
|
||||||
|
})
|
||||||
|
|
||||||
|
// TransactionQueueHandler is required to enqueue a transaction.
|
||||||
|
txQueueManager.SetTransactionQueueHandler(func(queuedTx *common.QueuedTx) {
|
||||||
|
s.Equal(tx.ID, queuedTx.ID)
|
||||||
|
})
|
||||||
|
|
||||||
|
txQueueManager.SetTransactionReturnHandler(func(queuedTx *common.QueuedTx, err error) {
|
||||||
|
s.Equal(tx.ID, queuedTx.ID)
|
||||||
|
s.Equal(errTxAssumedSent, err)
|
||||||
|
})
|
||||||
|
|
||||||
|
err := txQueueManager.QueueTransaction(tx)
|
||||||
|
s.NoError(err)
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
var mu sync.Mutex
|
||||||
|
completeTxErrors := make(map[error]int)
|
||||||
|
for i := 0; i < 3; i++ {
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
_, err := txQueueManager.CompleteTransaction(tx.ID, TestConfig.Account1.Password)
|
||||||
|
mu.Lock()
|
||||||
|
completeTxErrors[err]++
|
||||||
|
mu.Unlock()
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
err = txQueueManager.WaitForTransaction(tx)
|
||||||
|
s.Equal(errTxAssumedSent, err)
|
||||||
|
// Check that error is assigned to the transaction.
|
||||||
|
s.Equal(errTxAssumedSent, tx.Err)
|
||||||
|
// Transaction should be already removed from the queue.
|
||||||
|
s.False(txQueueManager.TransactionQueue().Has(tx.ID))
|
||||||
|
|
||||||
|
// Wait for all CompleteTransaction calls.
|
||||||
|
wg.Wait()
|
||||||
|
s.Equal(completeTxErrors[errTxAssumedSent], 1)
|
||||||
|
}
|
||||||
|
|
||||||
func (s *TxQueueTestSuite) TestAccountMismatch() {
|
func (s *TxQueueTestSuite) TestAccountMismatch() {
|
||||||
s.accountManagerMock.EXPECT().SelectedAccount().Return(&common.SelectedExtKey{
|
s.accountManagerMock.EXPECT().SelectedAccount().Return(&common.SelectedExtKey{
|
||||||
Address: common.FromAddress(TestConfig.Account2.Address),
|
Address: common.FromAddress(TestConfig.Account2.Address),
|
||||||
|
|
Loading…
Reference in New Issue