diff --git a/e2e/transactions/transactions_test.go b/e2e/transactions/transactions_test.go index 94912b858..180bd1939 100644 --- a/e2e/transactions/transactions_test.go +++ b/e2e/transactions/transactions_test.go @@ -495,7 +495,7 @@ func (s *TransactionsTestSuite) TestDiscardQueuedTransaction() { log.Info("transaction return event received", "id", event["id"].(string)) receivedErrMessage := event["error_message"].(string) - expectedErrMessage := queue.ErrQueuedTxDiscarded.Error() + expectedErrMessage := transactions.ErrQueuedTxDiscarded.Error() s.Equal(receivedErrMessage, expectedErrMessage) receivedErrCode := event["error_code"].(string) @@ -511,7 +511,7 @@ func (s *TransactionsTestSuite) TestDiscardQueuedTransaction() { To: common.ToAddress(TestConfig.Account2.Address), Value: (*hexutil.Big)(big.NewInt(1000000000000)), }) - s.EqualError(err, queue.ErrQueuedTxDiscarded.Error(), "transaction is expected to be discarded") + s.EqualError(err, transactions.ErrQueuedTxDiscarded.Error(), "transaction is expected to be discarded") select { case <-completeQueuedTransaction: @@ -659,7 +659,7 @@ func (s *TransactionsTestSuite) TestDiscardMultipleQueuedTransactions() { log.Info("transaction return event received", "id", event["id"].(string)) receivedErrMessage := event["error_message"].(string) - expectedErrMessage := queue.ErrQueuedTxDiscarded.Error() + expectedErrMessage := transactions.ErrQueuedTxDiscarded.Error() s.Equal(receivedErrMessage, expectedErrMessage) receivedErrCode := event["error_code"].(string) @@ -681,7 +681,7 @@ func (s *TransactionsTestSuite) TestDiscardMultipleQueuedTransactions() { To: common.ToAddress(TestConfig.Account2.Address), Value: (*hexutil.Big)(big.NewInt(1000000000000)), }) - require.EqualError(err, queue.ErrQueuedTxDiscarded.Error()) + require.EqualError(err, transactions.ErrQueuedTxDiscarded.Error()) require.Equal(gethcommon.Hash{}, txHashCheck, "transaction returned hash, while it shouldn't") } diff --git a/geth/api/api.go b/geth/api/api.go index f3c4848e2..4234ea4c9 100644 --- a/geth/api/api.go +++ b/geth/api/api.go @@ -172,7 +172,7 @@ func (api *StatusAPI) CompleteTransaction(id common.QueuedTxID, password string) } // CompleteTransactions instructs backend to complete sending of multiple transactions -func (api *StatusAPI) CompleteTransactions(ids []common.QueuedTxID, password string) map[common.QueuedTxID]common.RawCompleteTransactionResult { +func (api *StatusAPI) CompleteTransactions(ids []common.QueuedTxID, password string) map[common.QueuedTxID]common.TransactionResult { return api.b.txQueueManager.CompleteTransactions(ids, password) } diff --git a/geth/api/backend.go b/geth/api/backend.go index 962bcbcb7..ad5286936 100644 --- a/geth/api/backend.go +++ b/geth/api/backend.go @@ -200,22 +200,19 @@ func (m *StatusBackend) CallRPC(inputJSON string) string { } // SendTransaction creates a new transaction and waits until it's complete. -func (m *StatusBackend) SendTransaction(ctx context.Context, args common.SendTxArgs) (gethcommon.Hash, error) { +func (m *StatusBackend) SendTransaction(ctx context.Context, args common.SendTxArgs) (hash gethcommon.Hash, err error) { if ctx == nil { ctx = context.Background() } - tx := common.CreateTransaction(ctx, args) - - if err := m.txQueueManager.QueueTransaction(tx); err != nil { - return gethcommon.Hash{}, err + if err = m.txQueueManager.QueueTransaction(tx); err != nil { + return hash, err } - - if err := m.txQueueManager.WaitForTransaction(tx); err != nil { - return gethcommon.Hash{}, err + rst := m.txQueueManager.WaitForTransaction(tx) + if rst.Error != nil { + return hash, rst.Error } - - return tx.Hash, nil + return rst.Hash, nil } // CompleteTransaction instructs backend to complete sending of a given transaction @@ -224,7 +221,7 @@ func (m *StatusBackend) CompleteTransaction(id common.QueuedTxID, password strin } // CompleteTransactions instructs backend to complete sending of multiple transactions -func (m *StatusBackend) CompleteTransactions(ids []common.QueuedTxID, password string) map[common.QueuedTxID]common.RawCompleteTransactionResult { +func (m *StatusBackend) CompleteTransactions(ids []common.QueuedTxID, password string) map[common.QueuedTxID]common.TransactionResult { return m.txQueueManager.CompleteTransactions(ids, password) } diff --git a/geth/common/types.go b/geth/common/types.go index e35cfe09a..bea97177d 100644 --- a/geth/common/types.go +++ b/geth/common/types.go @@ -141,8 +141,8 @@ type AccountManager interface { AddressToDecryptedAccount(address, password string) (accounts.Account, *keystore.Key, error) } -// RawCompleteTransactionResult is a JSON returned from transaction complete function (used internally) -type RawCompleteTransactionResult struct { +// TransactionResult is a JSON returned from transaction complete function (used internally) +type TransactionResult struct { Hash common.Hash Error error } @@ -158,11 +158,9 @@ type QueuedTxID string // QueuedTx holds enough information to complete the queued transaction. type QueuedTx struct { ID QueuedTxID - Hash common.Hash Context context.Context Args SendTxArgs - Done chan struct{} - Err error + Result chan TransactionResult } // SendTxArgs represents the arguments to submit a new transaction into the transaction pool. diff --git a/geth/common/utils.go b/geth/common/utils.go index c4ad1d152..b81bd3455 100644 --- a/geth/common/utils.go +++ b/geth/common/utils.go @@ -157,9 +157,8 @@ func Fatalf(reason interface{}, args ...interface{}) { func CreateTransaction(ctx context.Context, args SendTxArgs) *QueuedTx { return &QueuedTx{ ID: QueuedTxID(uuid.New()), - Hash: common.Hash{}, Context: ctx, Args: args, - Done: make(chan struct{}), + Result: make(chan TransactionResult, 1), } } diff --git a/geth/transactions/errors.go b/geth/transactions/errors.go new file mode 100644 index 000000000..7eb38081e --- /dev/null +++ b/geth/transactions/errors.go @@ -0,0 +1,10 @@ +package transactions + +import "errors" + +var ( + //ErrQueuedTxTimedOut - error transaction sending timed out + ErrQueuedTxTimedOut = errors.New("transaction sending timed out") + //ErrQueuedTxDiscarded - error transaction discarded + ErrQueuedTxDiscarded = errors.New("transaction has been discarded") +) diff --git a/geth/transactions/fake/mock.go b/geth/transactions/fake/mock.go index 6b4218978..2b05dbbb6 100644 --- a/geth/transactions/fake/mock.go +++ b/geth/transactions/fake/mock.go @@ -14,31 +14,31 @@ import ( reflect "reflect" ) -// MockFakePublicTransactionPoolAPI is a mock of FakePublicTransactionPoolAPI interface -type MockFakePublicTransactionPoolAPI struct { +// MockPublicTransactionPoolAPI is a mock of PublicTransactionPoolAPI interface +type MockPublicTransactionPoolAPI struct { ctrl *gomock.Controller - recorder *MockFakePublicTransactionPoolAPIMockRecorder + recorder *MockPublicTransactionPoolAPIMockRecorder } -// MockFakePublicTransactionPoolAPIMockRecorder is the mock recorder for MockFakePublicTransactionPoolAPI -type MockFakePublicTransactionPoolAPIMockRecorder struct { - mock *MockFakePublicTransactionPoolAPI +// MockPublicTransactionPoolAPIMockRecorder is the mock recorder for MockPublicTransactionPoolAPI +type MockPublicTransactionPoolAPIMockRecorder struct { + mock *MockPublicTransactionPoolAPI } -// NewMockFakePublicTransactionPoolAPI creates a new mock instance -func NewMockFakePublicTransactionPoolAPI(ctrl *gomock.Controller) *MockFakePublicTransactionPoolAPI { - mock := &MockFakePublicTransactionPoolAPI{ctrl: ctrl} - mock.recorder = &MockFakePublicTransactionPoolAPIMockRecorder{mock} +// NewMockPublicTransactionPoolAPI creates a new mock instance +func NewMockPublicTransactionPoolAPI(ctrl *gomock.Controller) *MockPublicTransactionPoolAPI { + mock := &MockPublicTransactionPoolAPI{ctrl: ctrl} + mock.recorder = &MockPublicTransactionPoolAPIMockRecorder{mock} return mock } // EXPECT returns an object that allows the caller to indicate expected use -func (m *MockFakePublicTransactionPoolAPI) EXPECT() *MockFakePublicTransactionPoolAPIMockRecorder { +func (m *MockPublicTransactionPoolAPI) EXPECT() *MockPublicTransactionPoolAPIMockRecorder { return m.recorder } // GasPrice mocks base method -func (m *MockFakePublicTransactionPoolAPI) GasPrice(ctx context.Context) (*big.Int, error) { +func (m *MockPublicTransactionPoolAPI) GasPrice(ctx context.Context) (*big.Int, error) { ret := m.ctrl.Call(m, "GasPrice", ctx) ret0, _ := ret[0].(*big.Int) ret1, _ := ret[1].(error) @@ -46,12 +46,12 @@ func (m *MockFakePublicTransactionPoolAPI) GasPrice(ctx context.Context) (*big.I } // GasPrice indicates an expected call of GasPrice -func (mr *MockFakePublicTransactionPoolAPIMockRecorder) GasPrice(ctx interface{}) *gomock.Call { - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GasPrice", reflect.TypeOf((*MockFakePublicTransactionPoolAPI)(nil).GasPrice), ctx) +func (mr *MockPublicTransactionPoolAPIMockRecorder) GasPrice(ctx interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GasPrice", reflect.TypeOf((*MockPublicTransactionPoolAPI)(nil).GasPrice), ctx) } // EstimateGas mocks base method -func (m *MockFakePublicTransactionPoolAPI) EstimateGas(ctx context.Context, args CallArgs) (*hexutil.Big, error) { +func (m *MockPublicTransactionPoolAPI) EstimateGas(ctx context.Context, args CallArgs) (*hexutil.Big, error) { ret := m.ctrl.Call(m, "EstimateGas", ctx, args) ret0, _ := ret[0].(*hexutil.Big) ret1, _ := ret[1].(error) @@ -59,12 +59,12 @@ func (m *MockFakePublicTransactionPoolAPI) EstimateGas(ctx context.Context, args } // EstimateGas indicates an expected call of EstimateGas -func (mr *MockFakePublicTransactionPoolAPIMockRecorder) EstimateGas(ctx, args interface{}) *gomock.Call { - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EstimateGas", reflect.TypeOf((*MockFakePublicTransactionPoolAPI)(nil).EstimateGas), ctx, args) +func (mr *MockPublicTransactionPoolAPIMockRecorder) EstimateGas(ctx, args interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EstimateGas", reflect.TypeOf((*MockPublicTransactionPoolAPI)(nil).EstimateGas), ctx, args) } // GetTransactionCount mocks base method -func (m *MockFakePublicTransactionPoolAPI) GetTransactionCount(ctx context.Context, address common.Address, blockNr rpc.BlockNumber) (*hexutil.Uint64, error) { +func (m *MockPublicTransactionPoolAPI) GetTransactionCount(ctx context.Context, address common.Address, blockNr rpc.BlockNumber) (*hexutil.Uint64, error) { ret := m.ctrl.Call(m, "GetTransactionCount", ctx, address, blockNr) ret0, _ := ret[0].(*hexutil.Uint64) ret1, _ := ret[1].(error) @@ -72,12 +72,12 @@ func (m *MockFakePublicTransactionPoolAPI) GetTransactionCount(ctx context.Conte } // GetTransactionCount indicates an expected call of GetTransactionCount -func (mr *MockFakePublicTransactionPoolAPIMockRecorder) GetTransactionCount(ctx, address, blockNr interface{}) *gomock.Call { - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetTransactionCount", reflect.TypeOf((*MockFakePublicTransactionPoolAPI)(nil).GetTransactionCount), ctx, address, blockNr) +func (mr *MockPublicTransactionPoolAPIMockRecorder) GetTransactionCount(ctx, address, blockNr interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetTransactionCount", reflect.TypeOf((*MockPublicTransactionPoolAPI)(nil).GetTransactionCount), ctx, address, blockNr) } // SendRawTransaction mocks base method -func (m *MockFakePublicTransactionPoolAPI) SendRawTransaction(ctx context.Context, encodedTx hexutil.Bytes) (common.Hash, error) { +func (m *MockPublicTransactionPoolAPI) SendRawTransaction(ctx context.Context, encodedTx hexutil.Bytes) (common.Hash, error) { ret := m.ctrl.Call(m, "SendRawTransaction", ctx, encodedTx) ret0, _ := ret[0].(common.Hash) ret1, _ := ret[1].(error) @@ -85,6 +85,6 @@ func (m *MockFakePublicTransactionPoolAPI) SendRawTransaction(ctx context.Contex } // SendRawTransaction indicates an expected call of SendRawTransaction -func (mr *MockFakePublicTransactionPoolAPIMockRecorder) SendRawTransaction(ctx, encodedTx interface{}) *gomock.Call { - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendRawTransaction", reflect.TypeOf((*MockFakePublicTransactionPoolAPI)(nil).SendRawTransaction), ctx, encodedTx) +func (mr *MockPublicTransactionPoolAPIMockRecorder) SendRawTransaction(ctx, encodedTx interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendRawTransaction", reflect.TypeOf((*MockPublicTransactionPoolAPI)(nil).SendRawTransaction), ctx, encodedTx) } diff --git a/geth/transactions/fake/txservice.go b/geth/transactions/fake/txservice.go index bcf3ac192..bff63e6b5 100644 --- a/geth/transactions/fake/txservice.go +++ b/geth/transactions/fake/txservice.go @@ -11,9 +11,9 @@ import ( ) // NewTestServer returns a mocked test server -func NewTestServer(ctrl *gomock.Controller) (*rpc.Server, *MockFakePublicTransactionPoolAPI) { +func NewTestServer(ctrl *gomock.Controller) (*rpc.Server, *MockPublicTransactionPoolAPI) { srv := rpc.NewServer() - svc := NewMockFakePublicTransactionPoolAPI(ctrl) + svc := NewMockPublicTransactionPoolAPI(ctrl) if err := srv.RegisterName("eth", svc); err != nil { panic(err) } diff --git a/geth/transactions/notifications.go b/geth/transactions/notifications.go index d307951ff..b82f7b69c 100644 --- a/geth/transactions/notifications.go +++ b/geth/transactions/notifications.go @@ -1,12 +1,9 @@ package transactions import ( - "strconv" - "github.com/ethereum/go-ethereum/accounts/keystore" "github.com/status-im/status-go/geth/common" "github.com/status-im/status-go/geth/signal" - "github.com/status-im/status-go/geth/transactions/queue" ) const ( @@ -30,10 +27,10 @@ const ( ) var txReturnCodes = map[error]int{ - nil: SendTransactionNoErrorCode, - keystore.ErrDecrypt: SendTransactionPasswordErrorCode, - queue.ErrQueuedTxTimedOut: SendTransactionTimeoutErrorCode, - queue.ErrQueuedTxDiscarded: SendTransactionDiscardedErrorCode, + nil: SendTransactionNoErrorCode, + keystore.ErrDecrypt: SendTransactionPasswordErrorCode, + ErrQueuedTxTimedOut: SendTransactionTimeoutErrorCode, + ErrQueuedTxDiscarded: SendTransactionDiscardedErrorCode, } // SendTransactionEvent is a signal sent on a send transaction request @@ -61,17 +58,17 @@ type ReturnSendTransactionEvent struct { Args common.SendTxArgs `json:"args"` MessageID string `json:"message_id"` ErrorMessage string `json:"error_message"` - ErrorCode string `json:"error_code"` + ErrorCode int `json:"error_code,string"` } // NotifyOnReturn returns handler that processes responses from internal tx manager -func NotifyOnReturn(queuedTx *common.QueuedTx) { - // discard notifications with empty tx - if queuedTx == nil { +func NotifyOnReturn(queuedTx *common.QueuedTx, err error) { + // we don't want to notify a user if tx was sent successfully + if err == nil { return } - // we don't want to notify a user if tx sent successfully - if queuedTx.Err == nil { + // discard notifications with empty tx + if queuedTx == nil { return } signal.Send(signal.Envelope{ @@ -80,8 +77,8 @@ func NotifyOnReturn(queuedTx *common.QueuedTx) { ID: string(queuedTx.ID), Args: queuedTx.Args, MessageID: common.MessageIDFromContext(queuedTx.Context), - ErrorMessage: queuedTx.Err.Error(), - ErrorCode: strconv.Itoa(sendTransactionErrorCode(queuedTx.Err)), + ErrorMessage: err.Error(), + ErrorCode: sendTransactionErrorCode(err), }, }) } diff --git a/geth/transactions/queue/queue.go b/geth/transactions/queue/queue.go index eea695442..ed6602f73 100644 --- a/geth/transactions/queue/queue.go +++ b/geth/transactions/queue/queue.go @@ -19,16 +19,12 @@ const ( ) var ( + // ErrQueuedTxExist - transaction was already enqueued + ErrQueuedTxExist = errors.New("transaction already exist in queue") //ErrQueuedTxIDNotFound - error transaction hash not found ErrQueuedTxIDNotFound = errors.New("transaction hash not found") - //ErrQueuedTxTimedOut - error transaction sending timed out - ErrQueuedTxTimedOut = errors.New("transaction sending timed out") - //ErrQueuedTxDiscarded - error transaction discarded - ErrQueuedTxDiscarded = errors.New("transaction has been discarded") - //ErrQueuedTxInProgress - error transaction in progress + //ErrQueuedTxInProgress - error transaction is in progress ErrQueuedTxInProgress = errors.New("transaction is in progress") - //ErrQueuedTxAlreadyProcessed - error transaction has already processed - ErrQueuedTxAlreadyProcessed = errors.New("transaction has been already processed") //ErrInvalidCompleteTxSender - error transaction with invalid sender ErrInvalidCompleteTxSender = errors.New("transaction can only be completed by the same account which created it") ) @@ -133,15 +129,18 @@ func (q *TxQueue) Reset() { // Enqueue enqueues incoming transaction func (q *TxQueue) Enqueue(tx *common.QueuedTx) error { log.Info(fmt.Sprintf("enqueue transaction: %s", tx.ID)) - if (tx.Hash != gethcommon.Hash{} || tx.Err != nil) { - return ErrQueuedTxAlreadyProcessed + q.mu.RLock() + if _, ok := q.transactions[tx.ID]; ok { + q.mu.RUnlock() + return ErrQueuedTxExist } + q.mu.RUnlock() - log.Info("before enqueueTicker") + // we can't hold a lock in this part + log.Debug("notifying eviction loop") q.enqueueTicker <- struct{}{} // notify eviction loop that we are trying to insert new item - log.Info("before evictableIDs") - q.evictableIDs <- tx.ID // this will block when we hit DefaultTxQueueCap - log.Info("after evictableIDs") + q.evictableIDs <- tx.ID // this will block when we hit DefaultTxQueueCap + log.Debug("notified eviction loop") q.mu.Lock() q.transactions[tx.ID] = tx @@ -204,17 +203,15 @@ func (q *TxQueue) Done(id common.QueuedTxID, hash gethcommon.Hash, err error) er func (q *TxQueue) done(tx *common.QueuedTx, hash gethcommon.Hash, err error) { delete(q.inprogress, tx.ID) - tx.Err = err // hash is updated only if err is nil, but transaction is not removed from a queue if err == nil { + q.transactions[tx.ID].Result <- common.TransactionResult{Hash: hash, Error: err} q.remove(tx.ID) - tx.Hash = hash - close(tx.Done) return } if _, transient := transientErrs[err.Error()]; !transient { + q.transactions[tx.ID].Result <- common.TransactionResult{Error: err} q.remove(tx.ID) - close(tx.Done) } } diff --git a/geth/transactions/queue/queue_test.go b/geth/transactions/queue/queue_test.go index e156e6b91..47a32ff5b 100644 --- a/geth/transactions/queue/queue_test.go +++ b/geth/transactions/queue/queue_test.go @@ -52,15 +52,13 @@ func (s *QueueTestSuite) TestGetTransaction() { } } -func (s *QueueTestSuite) TestEnqueueProcessedTransaction() { - // enqueue will fail if transaction with hash will be enqueued +func (s *QueueTestSuite) TestAlreadyEnqueued() { tx := common.CreateTransaction(context.Background(), common.SendTxArgs{}) - tx.Hash = gethcommon.Hash{1} - s.Equal(ErrQueuedTxAlreadyProcessed, s.queue.Enqueue(tx)) - + s.NoError(s.queue.Enqueue(tx)) + s.Equal(ErrQueuedTxExist, s.queue.Enqueue(tx)) + // try to enqueue another tx to double check locking tx = common.CreateTransaction(context.Background(), common.SendTxArgs{}) - tx.Err = errors.New("error") - s.Equal(ErrQueuedTxAlreadyProcessed, s.queue.Enqueue(tx)) + s.NoError(s.queue.Enqueue(tx)) } func (s *QueueTestSuite) testDone(hash gethcommon.Hash, err error) *common.QueuedTx { @@ -73,12 +71,12 @@ func (s *QueueTestSuite) testDone(hash gethcommon.Hash, err error) *common.Queue func (s *QueueTestSuite) TestDoneSuccess() { hash := gethcommon.Hash{1} tx := s.testDone(hash, nil) - s.NoError(tx.Err) - s.Equal(hash, tx.Hash) - s.False(s.queue.Has(tx.ID)) // event is sent only if transaction was removed from a queue select { - case <-tx.Done: + case rst := <-tx.Result: + s.NoError(rst.Error) + s.Equal(hash, rst.Hash) + s.False(s.queue.Has(tx.ID)) default: s.Fail("No event was sent to Done channel") } @@ -88,22 +86,22 @@ func (s *QueueTestSuite) TestDoneTransientError() { hash := gethcommon.Hash{1} err := keystore.ErrDecrypt tx := s.testDone(hash, err) - s.Equal(keystore.ErrDecrypt, tx.Err) - s.Equal(gethcommon.Hash{}, tx.Hash) s.True(s.queue.Has(tx.ID)) + _, inp := s.queue.inprogress[tx.ID] + s.False(inp) } func (s *QueueTestSuite) TestDoneError() { hash := gethcommon.Hash{1} err := errors.New("test") tx := s.testDone(hash, err) - s.Equal(err, tx.Err) - s.NotEqual(hash, tx.Hash) - s.Equal(gethcommon.Hash{}, tx.Hash) - s.False(s.queue.Has(tx.ID)) // event is sent only if transaction was removed from a queue select { - case <-tx.Done: + case rst := <-tx.Result: + s.Equal(err, rst.Error) + s.NotEqual(hash, rst.Hash) + s.Equal(gethcommon.Hash{}, rst.Hash) + s.False(s.queue.Has(tx.ID)) default: s.Fail("No event was sent to Done channel") } diff --git a/geth/transactions/txqueue_manager.go b/geth/transactions/txqueue_manager.go index 984e066fd..502a6444c 100644 --- a/geth/transactions/txqueue_manager.go +++ b/geth/transactions/txqueue_manager.go @@ -17,7 +17,7 @@ const ( // SendTxDefaultErrorCode is sent by default, when error is not nil, but type is unknown/unexpected. SendTxDefaultErrorCode = SendTransactionDefaultErrorCode // DefaultTxSendCompletionTimeout defines how many seconds to wait before returning result in sentTransaction(). - DefaultTxSendCompletionTimeout = 300 + DefaultTxSendCompletionTimeout = 300 * time.Second defaultGas = 90000 defaultTimeout = time.Minute @@ -25,22 +25,24 @@ const ( // Manager provides means to manage internal Status Backend (injected into LES) type Manager struct { - nodeManager common.NodeManager - accountManager common.AccountManager - txQueue *queue.TxQueue - ethTxClient EthTransactor - addrLock *AddrLocker - notify bool + nodeManager common.NodeManager + accountManager common.AccountManager + txQueue *queue.TxQueue + ethTxClient EthTransactor + addrLock *AddrLocker + notify bool + completionTimeout time.Duration } // NewManager returns a new Manager. func NewManager(nodeManager common.NodeManager, accountManager common.AccountManager) *Manager { return &Manager{ - nodeManager: nodeManager, - accountManager: accountManager, - txQueue: queue.New(), - addrLock: &AddrLocker{}, - notify: true, + nodeManager: nodeManager, + accountManager: accountManager, + txQueue: queue.New(), + addrLock: &AddrLocker{}, + notify: true, + completionTimeout: DefaultTxSendCompletionTimeout, } } @@ -75,34 +77,41 @@ func (m *Manager) QueueTransaction(tx *common.QueuedTx) error { to = tx.Args.To.Hex() } log.Info("queue a new transaction", "id", tx.ID, "from", tx.Args.From.Hex(), "to", to) - err := m.txQueue.Enqueue(tx) + if err := m.txQueue.Enqueue(tx); err != nil { + return err + } if m.notify { NotifyOnEnqueue(tx) } - return err + return nil } func (m *Manager) txDone(tx *common.QueuedTx, hash gethcommon.Hash, err error) { - m.txQueue.Done(tx.ID, hash, err) //nolint: errcheck + if err := m.txQueue.Done(tx.ID, hash, err); err == queue.ErrQueuedTxIDNotFound { + log.Warn("transaction is already removed from a queue", tx.ID) + return + } if m.notify { - NotifyOnReturn(tx) + NotifyOnReturn(tx, err) } } // WaitForTransaction adds a transaction to the queue and blocks // until it's completed, discarded or times out. -func (m *Manager) WaitForTransaction(tx *common.QueuedTx) error { +func (m *Manager) WaitForTransaction(tx *common.QueuedTx) common.TransactionResult { log.Info("wait for transaction", "id", tx.ID) // now wait up until transaction is: // - completed (via CompleteQueuedTransaction), // - discarded (via DiscardQueuedTransaction) // - or times out - select { - case <-tx.Done: - case <-time.After(DefaultTxSendCompletionTimeout * time.Second): - m.txDone(tx, gethcommon.Hash{}, queue.ErrQueuedTxTimedOut) + for { + select { + case rst := <-tx.Result: + return rst + case <-time.After(m.completionTimeout): + m.txDone(tx, gethcommon.Hash{}, ErrQueuedTxTimedOut) + } } - return tx.Err } // CompleteTransaction instructs backend to complete sending of a given transaction. @@ -224,11 +233,11 @@ func (m *Manager) completeTransaction(queuedTx *common.QueuedTx, selectedAccount } // CompleteTransactions instructs backend to complete sending of multiple transactions -func (m *Manager) CompleteTransactions(ids []common.QueuedTxID, password string) map[common.QueuedTxID]common.RawCompleteTransactionResult { - results := make(map[common.QueuedTxID]common.RawCompleteTransactionResult) +func (m *Manager) CompleteTransactions(ids []common.QueuedTxID, password string) map[common.QueuedTxID]common.TransactionResult { + results := make(map[common.QueuedTxID]common.TransactionResult) for _, txID := range ids { txHash, txErr := m.CompleteTransaction(txID, password) - results[txID] = common.RawCompleteTransactionResult{ + results[txID] = common.TransactionResult{ Hash: txHash, Error: txErr, } @@ -242,9 +251,9 @@ func (m *Manager) DiscardTransaction(id common.QueuedTxID) error { if err != nil { return err } - err = m.txQueue.Done(id, gethcommon.Hash{}, queue.ErrQueuedTxDiscarded) + err = m.txQueue.Done(id, gethcommon.Hash{}, ErrQueuedTxDiscarded) if m.notify { - NotifyOnReturn(tx) + NotifyOnReturn(tx, ErrQueuedTxDiscarded) } return err } @@ -269,19 +278,16 @@ func (m *Manager) DiscardTransactions(ids []common.QueuedTxID) map[common.Queued // It accepts one param which is a slice with a map of transaction params. func (m *Manager) SendTransactionRPCHandler(ctx context.Context, args ...interface{}) (interface{}, error) { log.Info("SendTransactionRPCHandler called") - // TODO(adam): it's a hack to parse arguments as common.RPCCall can do that. // We should refactor parsing these params to a separate struct. rpcCall := common.RPCCall{Params: args} tx := common.CreateTransaction(ctx, rpcCall.ToSendTxArgs()) - if err := m.QueueTransaction(tx); err != nil { return nil, err } - - if err := m.WaitForTransaction(tx); err != nil { - return nil, err + rst := m.WaitForTransaction(tx) + if rst.Error != nil { + return nil, rst.Error } - - return tx.Hash.Hex(), nil + return rst.Hash.Hex(), nil } diff --git a/geth/transactions/txqueue_manager_test.go b/geth/transactions/txqueue_manager_test.go index 3fa4af314..28e175748 100644 --- a/geth/transactions/txqueue_manager_test.go +++ b/geth/transactions/txqueue_manager_test.go @@ -36,7 +36,7 @@ type TxQueueTestSuite struct { server *gethrpc.Server client *gethrpc.Client txServiceMockCtrl *gomock.Controller - txServiceMock *fake.MockFakePublicTransactionPoolAPI + txServiceMock *fake.MockPublicTransactionPoolAPI } func (s *TxQueueTestSuite) SetupTest() { @@ -98,24 +98,26 @@ func (s *TxQueueTestSuite) TestCompleteTransaction() { From: common.FromAddress(TestConfig.Account1.Address), To: common.ToAddress(TestConfig.Account2.Address), }) - err := txQueueManager.QueueTransaction(tx) - s.NoError(err) + s.NoError(txQueueManager.QueueTransaction(tx)) w := make(chan struct{}) + var ( + hash gethcommon.Hash + err error + ) go func() { - hash, err := txQueueManager.CompleteTransaction(tx.ID, password) + hash, err = txQueueManager.CompleteTransaction(tx.ID, password) s.NoError(err) - s.Equal(tx.Hash, hash) close(w) }() - err = txQueueManager.WaitForTransaction(tx) - s.NoError(err) + rst := txQueueManager.WaitForTransaction(tx) // Check that error is assigned to the transaction. - s.NoError(tx.Err) + s.NoError(rst.Error) // Transaction should be already removed from the queue. s.False(txQueueManager.TransactionQueue().Has(tx.ID)) s.NoError(WaitClosed(w, time.Second)) + s.Equal(hash, rst.Hash) } func (s *TxQueueTestSuite) TestCompleteTransactionMultipleTimes() { @@ -141,8 +143,7 @@ func (s *TxQueueTestSuite) TestCompleteTransactionMultipleTimes() { To: common.ToAddress(TestConfig.Account2.Address), }) - err := txQueueManager.QueueTransaction(tx) - s.NoError(err) + s.NoError(txQueueManager.QueueTransaction(tx)) var ( wg sync.WaitGroup @@ -168,10 +169,9 @@ func (s *TxQueueTestSuite) TestCompleteTransactionMultipleTimes() { }() } - err = txQueueManager.WaitForTransaction(tx) - s.NoError(err) + rst := txQueueManager.WaitForTransaction(tx) // Check that error is assigned to the transaction. - s.NoError(tx.Err) + s.NoError(rst.Error) // Transaction should be already removed from the queue. s.False(txQueueManager.TransactionQueue().Has(tx.ID)) @@ -197,10 +197,9 @@ func (s *TxQueueTestSuite) TestAccountMismatch() { To: common.ToAddress(TestConfig.Account2.Address), }) - err := txQueueManager.QueueTransaction(tx) - s.NoError(err) + s.NoError(txQueueManager.QueueTransaction(tx)) - _, err = txQueueManager.CompleteTransaction(tx.ID, TestConfig.Account1.Password) + _, err := txQueueManager.CompleteTransaction(tx.ID, TestConfig.Account1.Password) s.Equal(err, queue.ErrInvalidCompleteTxSender) // Transaction should stay in the queue as mismatched accounts @@ -227,10 +226,9 @@ func (s *TxQueueTestSuite) TestInvalidPassword() { To: common.ToAddress(TestConfig.Account2.Address), }) - err := txQueueManager.QueueTransaction(tx) - s.NoError(err) + s.NoError(txQueueManager.QueueTransaction(tx)) - _, err = txQueueManager.CompleteTransaction(tx.ID, password) + _, err := txQueueManager.CompleteTransaction(tx.ID, password) s.Equal(err.Error(), keystore.ErrDecrypt.Error()) // Transaction should stay in the queue as mismatched accounts @@ -250,20 +248,34 @@ func (s *TxQueueTestSuite) TestDiscardTransaction() { To: common.ToAddress(TestConfig.Account2.Address), }) - err := txQueueManager.QueueTransaction(tx) - s.NoError(err) - + s.NoError(txQueueManager.QueueTransaction(tx)) w := make(chan struct{}) go func() { s.NoError(txQueueManager.DiscardTransaction(tx.ID)) close(w) }() - err = txQueueManager.WaitForTransaction(tx) - s.Equal(queue.ErrQueuedTxDiscarded, err) - // Check that error is assigned to the transaction. - s.Equal(queue.ErrQueuedTxDiscarded, tx.Err) + rst := txQueueManager.WaitForTransaction(tx) + s.Equal(ErrQueuedTxDiscarded, rst.Error) // Transaction should be already removed from the queue. s.False(txQueueManager.TransactionQueue().Has(tx.ID)) s.NoError(WaitClosed(w, time.Second)) } + +func (s *TxQueueTestSuite) TestCompletionTimedOut() { + txQueueManager := NewManager(s.nodeManagerMock, s.accountManagerMock) + txQueueManager.DisableNotificactions() + txQueueManager.completionTimeout = time.Nanosecond + + txQueueManager.Start() + defer txQueueManager.Stop() + + tx := common.CreateTransaction(context.Background(), common.SendTxArgs{ + From: common.FromAddress(TestConfig.Account1.Address), + To: common.ToAddress(TestConfig.Account2.Address), + }) + + s.NoError(txQueueManager.QueueTransaction(tx)) + rst := txQueueManager.WaitForTransaction(tx) + s.Equal(ErrQueuedTxTimedOut, rst.Error) +} diff --git a/lib/utils.go b/lib/utils.go index 216b5b0d5..8335f85ab 100644 --- a/lib/utils.go +++ b/lib/utils.go @@ -1049,7 +1049,7 @@ func testDiscardTransaction(t *testing.T) bool { //nolint: gocyclo t.Logf("transaction return event received: {id: %s}\n", event["id"].(string)) receivedErrMessage := event["error_message"].(string) - expectedErrMessage := queue.ErrQueuedTxDiscarded.Error() + expectedErrMessage := transactions.ErrQueuedTxDiscarded.Error() if receivedErrMessage != expectedErrMessage { t.Errorf("unexpected error message received: got %v", receivedErrMessage) return @@ -1071,7 +1071,7 @@ func testDiscardTransaction(t *testing.T) bool { //nolint: gocyclo To: common.ToAddress(TestConfig.Account2.Address), Value: (*hexutil.Big)(big.NewInt(1000000000000)), }) - if err != queue.ErrQueuedTxDiscarded { + if err != transactions.ErrQueuedTxDiscarded { t.Errorf("expected error not thrown: %v", err) return false } @@ -1136,7 +1136,7 @@ func testDiscardMultipleQueuedTransactions(t *testing.T) bool { //nolint: gocycl t.Logf("transaction return event received: {id: %s}\n", event["id"].(string)) receivedErrMessage := event["error_message"].(string) - expectedErrMessage := queue.ErrQueuedTxDiscarded.Error() + expectedErrMessage := transactions.ErrQueuedTxDiscarded.Error() if receivedErrMessage != expectedErrMessage { t.Errorf("unexpected error message received: got %v", receivedErrMessage) return @@ -1162,7 +1162,7 @@ func testDiscardMultipleQueuedTransactions(t *testing.T) bool { //nolint: gocycl To: common.ToAddress(TestConfig.Account2.Address), Value: (*hexutil.Big)(big.NewInt(1000000000000)), }) - if err != queue.ErrQueuedTxDiscarded { + if err != transactions.ErrQueuedTxDiscarded { t.Errorf("expected error not thrown: %v", err) return }