Result of tx processing returned as QueuedTxResult

Currently it is quite easy to introduce concurrency issues while working
with transaction object. For example, race issue will exist every time
while transaction is processed in a separate goroutine and caller will
try to check for an error before event to Done channel is sent.

This change removes all the data that is updated on transaction and leaves
it with ID, Args and Context (which is not used at the moment).

Signed-off-by: Dmitry Shulyak <yashulyak@gmail.com>
This commit is contained in:
Dmitry Shulyak 2018-01-05 22:58:17 +02:00 committed by Dmitry Shulyak
parent 8b56060e21
commit 653da5bcd0
14 changed files with 176 additions and 162 deletions

View File

@ -495,7 +495,7 @@ func (s *TransactionsTestSuite) TestDiscardQueuedTransaction() {
log.Info("transaction return event received", "id", event["id"].(string))
receivedErrMessage := event["error_message"].(string)
expectedErrMessage := queue.ErrQueuedTxDiscarded.Error()
expectedErrMessage := transactions.ErrQueuedTxDiscarded.Error()
s.Equal(receivedErrMessage, expectedErrMessage)
receivedErrCode := event["error_code"].(string)
@ -511,7 +511,7 @@ func (s *TransactionsTestSuite) TestDiscardQueuedTransaction() {
To: common.ToAddress(TestConfig.Account2.Address),
Value: (*hexutil.Big)(big.NewInt(1000000000000)),
})
s.EqualError(err, queue.ErrQueuedTxDiscarded.Error(), "transaction is expected to be discarded")
s.EqualError(err, transactions.ErrQueuedTxDiscarded.Error(), "transaction is expected to be discarded")
select {
case <-completeQueuedTransaction:
@ -659,7 +659,7 @@ func (s *TransactionsTestSuite) TestDiscardMultipleQueuedTransactions() {
log.Info("transaction return event received", "id", event["id"].(string))
receivedErrMessage := event["error_message"].(string)
expectedErrMessage := queue.ErrQueuedTxDiscarded.Error()
expectedErrMessage := transactions.ErrQueuedTxDiscarded.Error()
s.Equal(receivedErrMessage, expectedErrMessage)
receivedErrCode := event["error_code"].(string)
@ -681,7 +681,7 @@ func (s *TransactionsTestSuite) TestDiscardMultipleQueuedTransactions() {
To: common.ToAddress(TestConfig.Account2.Address),
Value: (*hexutil.Big)(big.NewInt(1000000000000)),
})
require.EqualError(err, queue.ErrQueuedTxDiscarded.Error())
require.EqualError(err, transactions.ErrQueuedTxDiscarded.Error())
require.Equal(gethcommon.Hash{}, txHashCheck, "transaction returned hash, while it shouldn't")
}

View File

@ -172,7 +172,7 @@ func (api *StatusAPI) CompleteTransaction(id common.QueuedTxID, password string)
}
// CompleteTransactions instructs backend to complete sending of multiple transactions
func (api *StatusAPI) CompleteTransactions(ids []common.QueuedTxID, password string) map[common.QueuedTxID]common.RawCompleteTransactionResult {
func (api *StatusAPI) CompleteTransactions(ids []common.QueuedTxID, password string) map[common.QueuedTxID]common.TransactionResult {
return api.b.txQueueManager.CompleteTransactions(ids, password)
}

View File

@ -200,22 +200,19 @@ func (m *StatusBackend) CallRPC(inputJSON string) string {
}
// SendTransaction creates a new transaction and waits until it's complete.
func (m *StatusBackend) SendTransaction(ctx context.Context, args common.SendTxArgs) (gethcommon.Hash, error) {
func (m *StatusBackend) SendTransaction(ctx context.Context, args common.SendTxArgs) (hash gethcommon.Hash, err error) {
if ctx == nil {
ctx = context.Background()
}
tx := common.CreateTransaction(ctx, args)
if err := m.txQueueManager.QueueTransaction(tx); err != nil {
return gethcommon.Hash{}, err
if err = m.txQueueManager.QueueTransaction(tx); err != nil {
return hash, err
}
if err := m.txQueueManager.WaitForTransaction(tx); err != nil {
return gethcommon.Hash{}, err
rst := m.txQueueManager.WaitForTransaction(tx)
if rst.Error != nil {
return hash, rst.Error
}
return tx.Hash, nil
return rst.Hash, nil
}
// CompleteTransaction instructs backend to complete sending of a given transaction
@ -224,7 +221,7 @@ func (m *StatusBackend) CompleteTransaction(id common.QueuedTxID, password strin
}
// CompleteTransactions instructs backend to complete sending of multiple transactions
func (m *StatusBackend) CompleteTransactions(ids []common.QueuedTxID, password string) map[common.QueuedTxID]common.RawCompleteTransactionResult {
func (m *StatusBackend) CompleteTransactions(ids []common.QueuedTxID, password string) map[common.QueuedTxID]common.TransactionResult {
return m.txQueueManager.CompleteTransactions(ids, password)
}

View File

@ -141,8 +141,8 @@ type AccountManager interface {
AddressToDecryptedAccount(address, password string) (accounts.Account, *keystore.Key, error)
}
// RawCompleteTransactionResult is a JSON returned from transaction complete function (used internally)
type RawCompleteTransactionResult struct {
// TransactionResult is a JSON returned from transaction complete function (used internally)
type TransactionResult struct {
Hash common.Hash
Error error
}
@ -158,11 +158,9 @@ type QueuedTxID string
// QueuedTx holds enough information to complete the queued transaction.
type QueuedTx struct {
ID QueuedTxID
Hash common.Hash
Context context.Context
Args SendTxArgs
Done chan struct{}
Err error
Result chan TransactionResult
}
// SendTxArgs represents the arguments to submit a new transaction into the transaction pool.

View File

@ -157,9 +157,8 @@ func Fatalf(reason interface{}, args ...interface{}) {
func CreateTransaction(ctx context.Context, args SendTxArgs) *QueuedTx {
return &QueuedTx{
ID: QueuedTxID(uuid.New()),
Hash: common.Hash{},
Context: ctx,
Args: args,
Done: make(chan struct{}),
Result: make(chan TransactionResult, 1),
}
}

View File

@ -0,0 +1,10 @@
package transactions
import "errors"
var (
//ErrQueuedTxTimedOut - error transaction sending timed out
ErrQueuedTxTimedOut = errors.New("transaction sending timed out")
//ErrQueuedTxDiscarded - error transaction discarded
ErrQueuedTxDiscarded = errors.New("transaction has been discarded")
)

View File

@ -14,31 +14,31 @@ import (
reflect "reflect"
)
// MockFakePublicTransactionPoolAPI is a mock of FakePublicTransactionPoolAPI interface
type MockFakePublicTransactionPoolAPI struct {
// MockPublicTransactionPoolAPI is a mock of PublicTransactionPoolAPI interface
type MockPublicTransactionPoolAPI struct {
ctrl *gomock.Controller
recorder *MockFakePublicTransactionPoolAPIMockRecorder
recorder *MockPublicTransactionPoolAPIMockRecorder
}
// MockFakePublicTransactionPoolAPIMockRecorder is the mock recorder for MockFakePublicTransactionPoolAPI
type MockFakePublicTransactionPoolAPIMockRecorder struct {
mock *MockFakePublicTransactionPoolAPI
// MockPublicTransactionPoolAPIMockRecorder is the mock recorder for MockPublicTransactionPoolAPI
type MockPublicTransactionPoolAPIMockRecorder struct {
mock *MockPublicTransactionPoolAPI
}
// NewMockFakePublicTransactionPoolAPI creates a new mock instance
func NewMockFakePublicTransactionPoolAPI(ctrl *gomock.Controller) *MockFakePublicTransactionPoolAPI {
mock := &MockFakePublicTransactionPoolAPI{ctrl: ctrl}
mock.recorder = &MockFakePublicTransactionPoolAPIMockRecorder{mock}
// NewMockPublicTransactionPoolAPI creates a new mock instance
func NewMockPublicTransactionPoolAPI(ctrl *gomock.Controller) *MockPublicTransactionPoolAPI {
mock := &MockPublicTransactionPoolAPI{ctrl: ctrl}
mock.recorder = &MockPublicTransactionPoolAPIMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use
func (m *MockFakePublicTransactionPoolAPI) EXPECT() *MockFakePublicTransactionPoolAPIMockRecorder {
func (m *MockPublicTransactionPoolAPI) EXPECT() *MockPublicTransactionPoolAPIMockRecorder {
return m.recorder
}
// GasPrice mocks base method
func (m *MockFakePublicTransactionPoolAPI) GasPrice(ctx context.Context) (*big.Int, error) {
func (m *MockPublicTransactionPoolAPI) GasPrice(ctx context.Context) (*big.Int, error) {
ret := m.ctrl.Call(m, "GasPrice", ctx)
ret0, _ := ret[0].(*big.Int)
ret1, _ := ret[1].(error)
@ -46,12 +46,12 @@ func (m *MockFakePublicTransactionPoolAPI) GasPrice(ctx context.Context) (*big.I
}
// GasPrice indicates an expected call of GasPrice
func (mr *MockFakePublicTransactionPoolAPIMockRecorder) GasPrice(ctx interface{}) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GasPrice", reflect.TypeOf((*MockFakePublicTransactionPoolAPI)(nil).GasPrice), ctx)
func (mr *MockPublicTransactionPoolAPIMockRecorder) GasPrice(ctx interface{}) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GasPrice", reflect.TypeOf((*MockPublicTransactionPoolAPI)(nil).GasPrice), ctx)
}
// EstimateGas mocks base method
func (m *MockFakePublicTransactionPoolAPI) EstimateGas(ctx context.Context, args CallArgs) (*hexutil.Big, error) {
func (m *MockPublicTransactionPoolAPI) EstimateGas(ctx context.Context, args CallArgs) (*hexutil.Big, error) {
ret := m.ctrl.Call(m, "EstimateGas", ctx, args)
ret0, _ := ret[0].(*hexutil.Big)
ret1, _ := ret[1].(error)
@ -59,12 +59,12 @@ func (m *MockFakePublicTransactionPoolAPI) EstimateGas(ctx context.Context, args
}
// EstimateGas indicates an expected call of EstimateGas
func (mr *MockFakePublicTransactionPoolAPIMockRecorder) EstimateGas(ctx, args interface{}) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EstimateGas", reflect.TypeOf((*MockFakePublicTransactionPoolAPI)(nil).EstimateGas), ctx, args)
func (mr *MockPublicTransactionPoolAPIMockRecorder) EstimateGas(ctx, args interface{}) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EstimateGas", reflect.TypeOf((*MockPublicTransactionPoolAPI)(nil).EstimateGas), ctx, args)
}
// GetTransactionCount mocks base method
func (m *MockFakePublicTransactionPoolAPI) GetTransactionCount(ctx context.Context, address common.Address, blockNr rpc.BlockNumber) (*hexutil.Uint64, error) {
func (m *MockPublicTransactionPoolAPI) GetTransactionCount(ctx context.Context, address common.Address, blockNr rpc.BlockNumber) (*hexutil.Uint64, error) {
ret := m.ctrl.Call(m, "GetTransactionCount", ctx, address, blockNr)
ret0, _ := ret[0].(*hexutil.Uint64)
ret1, _ := ret[1].(error)
@ -72,12 +72,12 @@ func (m *MockFakePublicTransactionPoolAPI) GetTransactionCount(ctx context.Conte
}
// GetTransactionCount indicates an expected call of GetTransactionCount
func (mr *MockFakePublicTransactionPoolAPIMockRecorder) GetTransactionCount(ctx, address, blockNr interface{}) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetTransactionCount", reflect.TypeOf((*MockFakePublicTransactionPoolAPI)(nil).GetTransactionCount), ctx, address, blockNr)
func (mr *MockPublicTransactionPoolAPIMockRecorder) GetTransactionCount(ctx, address, blockNr interface{}) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetTransactionCount", reflect.TypeOf((*MockPublicTransactionPoolAPI)(nil).GetTransactionCount), ctx, address, blockNr)
}
// SendRawTransaction mocks base method
func (m *MockFakePublicTransactionPoolAPI) SendRawTransaction(ctx context.Context, encodedTx hexutil.Bytes) (common.Hash, error) {
func (m *MockPublicTransactionPoolAPI) SendRawTransaction(ctx context.Context, encodedTx hexutil.Bytes) (common.Hash, error) {
ret := m.ctrl.Call(m, "SendRawTransaction", ctx, encodedTx)
ret0, _ := ret[0].(common.Hash)
ret1, _ := ret[1].(error)
@ -85,6 +85,6 @@ func (m *MockFakePublicTransactionPoolAPI) SendRawTransaction(ctx context.Contex
}
// SendRawTransaction indicates an expected call of SendRawTransaction
func (mr *MockFakePublicTransactionPoolAPIMockRecorder) SendRawTransaction(ctx, encodedTx interface{}) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendRawTransaction", reflect.TypeOf((*MockFakePublicTransactionPoolAPI)(nil).SendRawTransaction), ctx, encodedTx)
func (mr *MockPublicTransactionPoolAPIMockRecorder) SendRawTransaction(ctx, encodedTx interface{}) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendRawTransaction", reflect.TypeOf((*MockPublicTransactionPoolAPI)(nil).SendRawTransaction), ctx, encodedTx)
}

View File

@ -11,9 +11,9 @@ import (
)
// NewTestServer returns a mocked test server
func NewTestServer(ctrl *gomock.Controller) (*rpc.Server, *MockFakePublicTransactionPoolAPI) {
func NewTestServer(ctrl *gomock.Controller) (*rpc.Server, *MockPublicTransactionPoolAPI) {
srv := rpc.NewServer()
svc := NewMockFakePublicTransactionPoolAPI(ctrl)
svc := NewMockPublicTransactionPoolAPI(ctrl)
if err := srv.RegisterName("eth", svc); err != nil {
panic(err)
}

View File

@ -1,12 +1,9 @@
package transactions
import (
"strconv"
"github.com/ethereum/go-ethereum/accounts/keystore"
"github.com/status-im/status-go/geth/common"
"github.com/status-im/status-go/geth/signal"
"github.com/status-im/status-go/geth/transactions/queue"
)
const (
@ -30,10 +27,10 @@ const (
)
var txReturnCodes = map[error]int{
nil: SendTransactionNoErrorCode,
keystore.ErrDecrypt: SendTransactionPasswordErrorCode,
queue.ErrQueuedTxTimedOut: SendTransactionTimeoutErrorCode,
queue.ErrQueuedTxDiscarded: SendTransactionDiscardedErrorCode,
nil: SendTransactionNoErrorCode,
keystore.ErrDecrypt: SendTransactionPasswordErrorCode,
ErrQueuedTxTimedOut: SendTransactionTimeoutErrorCode,
ErrQueuedTxDiscarded: SendTransactionDiscardedErrorCode,
}
// SendTransactionEvent is a signal sent on a send transaction request
@ -61,17 +58,17 @@ type ReturnSendTransactionEvent struct {
Args common.SendTxArgs `json:"args"`
MessageID string `json:"message_id"`
ErrorMessage string `json:"error_message"`
ErrorCode string `json:"error_code"`
ErrorCode int `json:"error_code,string"`
}
// NotifyOnReturn returns handler that processes responses from internal tx manager
func NotifyOnReturn(queuedTx *common.QueuedTx) {
// discard notifications with empty tx
if queuedTx == nil {
func NotifyOnReturn(queuedTx *common.QueuedTx, err error) {
// we don't want to notify a user if tx was sent successfully
if err == nil {
return
}
// we don't want to notify a user if tx sent successfully
if queuedTx.Err == nil {
// discard notifications with empty tx
if queuedTx == nil {
return
}
signal.Send(signal.Envelope{
@ -80,8 +77,8 @@ func NotifyOnReturn(queuedTx *common.QueuedTx) {
ID: string(queuedTx.ID),
Args: queuedTx.Args,
MessageID: common.MessageIDFromContext(queuedTx.Context),
ErrorMessage: queuedTx.Err.Error(),
ErrorCode: strconv.Itoa(sendTransactionErrorCode(queuedTx.Err)),
ErrorMessage: err.Error(),
ErrorCode: sendTransactionErrorCode(err),
},
})
}

View File

@ -19,16 +19,12 @@ const (
)
var (
// ErrQueuedTxExist - transaction was already enqueued
ErrQueuedTxExist = errors.New("transaction already exist in queue")
//ErrQueuedTxIDNotFound - error transaction hash not found
ErrQueuedTxIDNotFound = errors.New("transaction hash not found")
//ErrQueuedTxTimedOut - error transaction sending timed out
ErrQueuedTxTimedOut = errors.New("transaction sending timed out")
//ErrQueuedTxDiscarded - error transaction discarded
ErrQueuedTxDiscarded = errors.New("transaction has been discarded")
//ErrQueuedTxInProgress - error transaction in progress
//ErrQueuedTxInProgress - error transaction is in progress
ErrQueuedTxInProgress = errors.New("transaction is in progress")
//ErrQueuedTxAlreadyProcessed - error transaction has already processed
ErrQueuedTxAlreadyProcessed = errors.New("transaction has been already processed")
//ErrInvalidCompleteTxSender - error transaction with invalid sender
ErrInvalidCompleteTxSender = errors.New("transaction can only be completed by the same account which created it")
)
@ -133,15 +129,18 @@ func (q *TxQueue) Reset() {
// Enqueue enqueues incoming transaction
func (q *TxQueue) Enqueue(tx *common.QueuedTx) error {
log.Info(fmt.Sprintf("enqueue transaction: %s", tx.ID))
if (tx.Hash != gethcommon.Hash{} || tx.Err != nil) {
return ErrQueuedTxAlreadyProcessed
q.mu.RLock()
if _, ok := q.transactions[tx.ID]; ok {
q.mu.RUnlock()
return ErrQueuedTxExist
}
q.mu.RUnlock()
log.Info("before enqueueTicker")
// we can't hold a lock in this part
log.Debug("notifying eviction loop")
q.enqueueTicker <- struct{}{} // notify eviction loop that we are trying to insert new item
log.Info("before evictableIDs")
q.evictableIDs <- tx.ID // this will block when we hit DefaultTxQueueCap
log.Info("after evictableIDs")
q.evictableIDs <- tx.ID // this will block when we hit DefaultTxQueueCap
log.Debug("notified eviction loop")
q.mu.Lock()
q.transactions[tx.ID] = tx
@ -204,17 +203,15 @@ func (q *TxQueue) Done(id common.QueuedTxID, hash gethcommon.Hash, err error) er
func (q *TxQueue) done(tx *common.QueuedTx, hash gethcommon.Hash, err error) {
delete(q.inprogress, tx.ID)
tx.Err = err
// hash is updated only if err is nil, but transaction is not removed from a queue
if err == nil {
q.transactions[tx.ID].Result <- common.TransactionResult{Hash: hash, Error: err}
q.remove(tx.ID)
tx.Hash = hash
close(tx.Done)
return
}
if _, transient := transientErrs[err.Error()]; !transient {
q.transactions[tx.ID].Result <- common.TransactionResult{Error: err}
q.remove(tx.ID)
close(tx.Done)
}
}

View File

@ -52,15 +52,13 @@ func (s *QueueTestSuite) TestGetTransaction() {
}
}
func (s *QueueTestSuite) TestEnqueueProcessedTransaction() {
// enqueue will fail if transaction with hash will be enqueued
func (s *QueueTestSuite) TestAlreadyEnqueued() {
tx := common.CreateTransaction(context.Background(), common.SendTxArgs{})
tx.Hash = gethcommon.Hash{1}
s.Equal(ErrQueuedTxAlreadyProcessed, s.queue.Enqueue(tx))
s.NoError(s.queue.Enqueue(tx))
s.Equal(ErrQueuedTxExist, s.queue.Enqueue(tx))
// try to enqueue another tx to double check locking
tx = common.CreateTransaction(context.Background(), common.SendTxArgs{})
tx.Err = errors.New("error")
s.Equal(ErrQueuedTxAlreadyProcessed, s.queue.Enqueue(tx))
s.NoError(s.queue.Enqueue(tx))
}
func (s *QueueTestSuite) testDone(hash gethcommon.Hash, err error) *common.QueuedTx {
@ -73,12 +71,12 @@ func (s *QueueTestSuite) testDone(hash gethcommon.Hash, err error) *common.Queue
func (s *QueueTestSuite) TestDoneSuccess() {
hash := gethcommon.Hash{1}
tx := s.testDone(hash, nil)
s.NoError(tx.Err)
s.Equal(hash, tx.Hash)
s.False(s.queue.Has(tx.ID))
// event is sent only if transaction was removed from a queue
select {
case <-tx.Done:
case rst := <-tx.Result:
s.NoError(rst.Error)
s.Equal(hash, rst.Hash)
s.False(s.queue.Has(tx.ID))
default:
s.Fail("No event was sent to Done channel")
}
@ -88,22 +86,22 @@ func (s *QueueTestSuite) TestDoneTransientError() {
hash := gethcommon.Hash{1}
err := keystore.ErrDecrypt
tx := s.testDone(hash, err)
s.Equal(keystore.ErrDecrypt, tx.Err)
s.Equal(gethcommon.Hash{}, tx.Hash)
s.True(s.queue.Has(tx.ID))
_, inp := s.queue.inprogress[tx.ID]
s.False(inp)
}
func (s *QueueTestSuite) TestDoneError() {
hash := gethcommon.Hash{1}
err := errors.New("test")
tx := s.testDone(hash, err)
s.Equal(err, tx.Err)
s.NotEqual(hash, tx.Hash)
s.Equal(gethcommon.Hash{}, tx.Hash)
s.False(s.queue.Has(tx.ID))
// event is sent only if transaction was removed from a queue
select {
case <-tx.Done:
case rst := <-tx.Result:
s.Equal(err, rst.Error)
s.NotEqual(hash, rst.Hash)
s.Equal(gethcommon.Hash{}, rst.Hash)
s.False(s.queue.Has(tx.ID))
default:
s.Fail("No event was sent to Done channel")
}

View File

@ -17,7 +17,7 @@ const (
// SendTxDefaultErrorCode is sent by default, when error is not nil, but type is unknown/unexpected.
SendTxDefaultErrorCode = SendTransactionDefaultErrorCode
// DefaultTxSendCompletionTimeout defines how many seconds to wait before returning result in sentTransaction().
DefaultTxSendCompletionTimeout = 300
DefaultTxSendCompletionTimeout = 300 * time.Second
defaultGas = 90000
defaultTimeout = time.Minute
@ -25,22 +25,24 @@ const (
// Manager provides means to manage internal Status Backend (injected into LES)
type Manager struct {
nodeManager common.NodeManager
accountManager common.AccountManager
txQueue *queue.TxQueue
ethTxClient EthTransactor
addrLock *AddrLocker
notify bool
nodeManager common.NodeManager
accountManager common.AccountManager
txQueue *queue.TxQueue
ethTxClient EthTransactor
addrLock *AddrLocker
notify bool
completionTimeout time.Duration
}
// NewManager returns a new Manager.
func NewManager(nodeManager common.NodeManager, accountManager common.AccountManager) *Manager {
return &Manager{
nodeManager: nodeManager,
accountManager: accountManager,
txQueue: queue.New(),
addrLock: &AddrLocker{},
notify: true,
nodeManager: nodeManager,
accountManager: accountManager,
txQueue: queue.New(),
addrLock: &AddrLocker{},
notify: true,
completionTimeout: DefaultTxSendCompletionTimeout,
}
}
@ -75,34 +77,41 @@ func (m *Manager) QueueTransaction(tx *common.QueuedTx) error {
to = tx.Args.To.Hex()
}
log.Info("queue a new transaction", "id", tx.ID, "from", tx.Args.From.Hex(), "to", to)
err := m.txQueue.Enqueue(tx)
if err := m.txQueue.Enqueue(tx); err != nil {
return err
}
if m.notify {
NotifyOnEnqueue(tx)
}
return err
return nil
}
func (m *Manager) txDone(tx *common.QueuedTx, hash gethcommon.Hash, err error) {
m.txQueue.Done(tx.ID, hash, err) //nolint: errcheck
if err := m.txQueue.Done(tx.ID, hash, err); err == queue.ErrQueuedTxIDNotFound {
log.Warn("transaction is already removed from a queue", tx.ID)
return
}
if m.notify {
NotifyOnReturn(tx)
NotifyOnReturn(tx, err)
}
}
// WaitForTransaction adds a transaction to the queue and blocks
// until it's completed, discarded or times out.
func (m *Manager) WaitForTransaction(tx *common.QueuedTx) error {
func (m *Manager) WaitForTransaction(tx *common.QueuedTx) common.TransactionResult {
log.Info("wait for transaction", "id", tx.ID)
// now wait up until transaction is:
// - completed (via CompleteQueuedTransaction),
// - discarded (via DiscardQueuedTransaction)
// - or times out
select {
case <-tx.Done:
case <-time.After(DefaultTxSendCompletionTimeout * time.Second):
m.txDone(tx, gethcommon.Hash{}, queue.ErrQueuedTxTimedOut)
for {
select {
case rst := <-tx.Result:
return rst
case <-time.After(m.completionTimeout):
m.txDone(tx, gethcommon.Hash{}, ErrQueuedTxTimedOut)
}
}
return tx.Err
}
// CompleteTransaction instructs backend to complete sending of a given transaction.
@ -224,11 +233,11 @@ func (m *Manager) completeTransaction(queuedTx *common.QueuedTx, selectedAccount
}
// CompleteTransactions instructs backend to complete sending of multiple transactions
func (m *Manager) CompleteTransactions(ids []common.QueuedTxID, password string) map[common.QueuedTxID]common.RawCompleteTransactionResult {
results := make(map[common.QueuedTxID]common.RawCompleteTransactionResult)
func (m *Manager) CompleteTransactions(ids []common.QueuedTxID, password string) map[common.QueuedTxID]common.TransactionResult {
results := make(map[common.QueuedTxID]common.TransactionResult)
for _, txID := range ids {
txHash, txErr := m.CompleteTransaction(txID, password)
results[txID] = common.RawCompleteTransactionResult{
results[txID] = common.TransactionResult{
Hash: txHash,
Error: txErr,
}
@ -242,9 +251,9 @@ func (m *Manager) DiscardTransaction(id common.QueuedTxID) error {
if err != nil {
return err
}
err = m.txQueue.Done(id, gethcommon.Hash{}, queue.ErrQueuedTxDiscarded)
err = m.txQueue.Done(id, gethcommon.Hash{}, ErrQueuedTxDiscarded)
if m.notify {
NotifyOnReturn(tx)
NotifyOnReturn(tx, ErrQueuedTxDiscarded)
}
return err
}
@ -269,19 +278,16 @@ func (m *Manager) DiscardTransactions(ids []common.QueuedTxID) map[common.Queued
// It accepts one param which is a slice with a map of transaction params.
func (m *Manager) SendTransactionRPCHandler(ctx context.Context, args ...interface{}) (interface{}, error) {
log.Info("SendTransactionRPCHandler called")
// TODO(adam): it's a hack to parse arguments as common.RPCCall can do that.
// We should refactor parsing these params to a separate struct.
rpcCall := common.RPCCall{Params: args}
tx := common.CreateTransaction(ctx, rpcCall.ToSendTxArgs())
if err := m.QueueTransaction(tx); err != nil {
return nil, err
}
if err := m.WaitForTransaction(tx); err != nil {
return nil, err
rst := m.WaitForTransaction(tx)
if rst.Error != nil {
return nil, rst.Error
}
return tx.Hash.Hex(), nil
return rst.Hash.Hex(), nil
}

View File

@ -36,7 +36,7 @@ type TxQueueTestSuite struct {
server *gethrpc.Server
client *gethrpc.Client
txServiceMockCtrl *gomock.Controller
txServiceMock *fake.MockFakePublicTransactionPoolAPI
txServiceMock *fake.MockPublicTransactionPoolAPI
}
func (s *TxQueueTestSuite) SetupTest() {
@ -98,24 +98,26 @@ func (s *TxQueueTestSuite) TestCompleteTransaction() {
From: common.FromAddress(TestConfig.Account1.Address),
To: common.ToAddress(TestConfig.Account2.Address),
})
err := txQueueManager.QueueTransaction(tx)
s.NoError(err)
s.NoError(txQueueManager.QueueTransaction(tx))
w := make(chan struct{})
var (
hash gethcommon.Hash
err error
)
go func() {
hash, err := txQueueManager.CompleteTransaction(tx.ID, password)
hash, err = txQueueManager.CompleteTransaction(tx.ID, password)
s.NoError(err)
s.Equal(tx.Hash, hash)
close(w)
}()
err = txQueueManager.WaitForTransaction(tx)
s.NoError(err)
rst := txQueueManager.WaitForTransaction(tx)
// Check that error is assigned to the transaction.
s.NoError(tx.Err)
s.NoError(rst.Error)
// Transaction should be already removed from the queue.
s.False(txQueueManager.TransactionQueue().Has(tx.ID))
s.NoError(WaitClosed(w, time.Second))
s.Equal(hash, rst.Hash)
}
func (s *TxQueueTestSuite) TestCompleteTransactionMultipleTimes() {
@ -141,8 +143,7 @@ func (s *TxQueueTestSuite) TestCompleteTransactionMultipleTimes() {
To: common.ToAddress(TestConfig.Account2.Address),
})
err := txQueueManager.QueueTransaction(tx)
s.NoError(err)
s.NoError(txQueueManager.QueueTransaction(tx))
var (
wg sync.WaitGroup
@ -168,10 +169,9 @@ func (s *TxQueueTestSuite) TestCompleteTransactionMultipleTimes() {
}()
}
err = txQueueManager.WaitForTransaction(tx)
s.NoError(err)
rst := txQueueManager.WaitForTransaction(tx)
// Check that error is assigned to the transaction.
s.NoError(tx.Err)
s.NoError(rst.Error)
// Transaction should be already removed from the queue.
s.False(txQueueManager.TransactionQueue().Has(tx.ID))
@ -197,10 +197,9 @@ func (s *TxQueueTestSuite) TestAccountMismatch() {
To: common.ToAddress(TestConfig.Account2.Address),
})
err := txQueueManager.QueueTransaction(tx)
s.NoError(err)
s.NoError(txQueueManager.QueueTransaction(tx))
_, err = txQueueManager.CompleteTransaction(tx.ID, TestConfig.Account1.Password)
_, err := txQueueManager.CompleteTransaction(tx.ID, TestConfig.Account1.Password)
s.Equal(err, queue.ErrInvalidCompleteTxSender)
// Transaction should stay in the queue as mismatched accounts
@ -227,10 +226,9 @@ func (s *TxQueueTestSuite) TestInvalidPassword() {
To: common.ToAddress(TestConfig.Account2.Address),
})
err := txQueueManager.QueueTransaction(tx)
s.NoError(err)
s.NoError(txQueueManager.QueueTransaction(tx))
_, err = txQueueManager.CompleteTransaction(tx.ID, password)
_, err := txQueueManager.CompleteTransaction(tx.ID, password)
s.Equal(err.Error(), keystore.ErrDecrypt.Error())
// Transaction should stay in the queue as mismatched accounts
@ -250,20 +248,34 @@ func (s *TxQueueTestSuite) TestDiscardTransaction() {
To: common.ToAddress(TestConfig.Account2.Address),
})
err := txQueueManager.QueueTransaction(tx)
s.NoError(err)
s.NoError(txQueueManager.QueueTransaction(tx))
w := make(chan struct{})
go func() {
s.NoError(txQueueManager.DiscardTransaction(tx.ID))
close(w)
}()
err = txQueueManager.WaitForTransaction(tx)
s.Equal(queue.ErrQueuedTxDiscarded, err)
// Check that error is assigned to the transaction.
s.Equal(queue.ErrQueuedTxDiscarded, tx.Err)
rst := txQueueManager.WaitForTransaction(tx)
s.Equal(ErrQueuedTxDiscarded, rst.Error)
// Transaction should be already removed from the queue.
s.False(txQueueManager.TransactionQueue().Has(tx.ID))
s.NoError(WaitClosed(w, time.Second))
}
func (s *TxQueueTestSuite) TestCompletionTimedOut() {
txQueueManager := NewManager(s.nodeManagerMock, s.accountManagerMock)
txQueueManager.DisableNotificactions()
txQueueManager.completionTimeout = time.Nanosecond
txQueueManager.Start()
defer txQueueManager.Stop()
tx := common.CreateTransaction(context.Background(), common.SendTxArgs{
From: common.FromAddress(TestConfig.Account1.Address),
To: common.ToAddress(TestConfig.Account2.Address),
})
s.NoError(txQueueManager.QueueTransaction(tx))
rst := txQueueManager.WaitForTransaction(tx)
s.Equal(ErrQueuedTxTimedOut, rst.Error)
}

View File

@ -1049,7 +1049,7 @@ func testDiscardTransaction(t *testing.T) bool { //nolint: gocyclo
t.Logf("transaction return event received: {id: %s}\n", event["id"].(string))
receivedErrMessage := event["error_message"].(string)
expectedErrMessage := queue.ErrQueuedTxDiscarded.Error()
expectedErrMessage := transactions.ErrQueuedTxDiscarded.Error()
if receivedErrMessage != expectedErrMessage {
t.Errorf("unexpected error message received: got %v", receivedErrMessage)
return
@ -1071,7 +1071,7 @@ func testDiscardTransaction(t *testing.T) bool { //nolint: gocyclo
To: common.ToAddress(TestConfig.Account2.Address),
Value: (*hexutil.Big)(big.NewInt(1000000000000)),
})
if err != queue.ErrQueuedTxDiscarded {
if err != transactions.ErrQueuedTxDiscarded {
t.Errorf("expected error not thrown: %v", err)
return false
}
@ -1136,7 +1136,7 @@ func testDiscardMultipleQueuedTransactions(t *testing.T) bool { //nolint: gocycl
t.Logf("transaction return event received: {id: %s}\n", event["id"].(string))
receivedErrMessage := event["error_message"].(string)
expectedErrMessage := queue.ErrQueuedTxDiscarded.Error()
expectedErrMessage := transactions.ErrQueuedTxDiscarded.Error()
if receivedErrMessage != expectedErrMessage {
t.Errorf("unexpected error message received: got %v", receivedErrMessage)
return
@ -1162,7 +1162,7 @@ func testDiscardMultipleQueuedTransactions(t *testing.T) bool { //nolint: gocycl
To: common.ToAddress(TestConfig.Account2.Address),
Value: (*hexutil.Big)(big.NewInt(1000000000000)),
})
if err != queue.ErrQueuedTxDiscarded {
if err != transactions.ErrQueuedTxDiscarded {
t.Errorf("expected error not thrown: %v", err)
return
}