From 680d0513b7ddf8d6e0d521a2e956ca3ed44859cf Mon Sep 17 00:00:00 2001 From: Dmitry Shulyak Date: Fri, 26 Jan 2018 07:59:21 +0200 Subject: [PATCH] Refactoring of TxQueue and Manager (#530) --- Makefile | 2 +- e2e/jail/jail_rpc_test.go | 6 +- e2e/transactions/transactions_test.go | 79 ++--- geth/api/backend.go | 12 +- geth/common/types.go | 38 +-- geth/common/types_mock.go | 76 +---- geth/common/utils.go | 12 + geth/{txqueue => transactions}/addrlock.go | 2 +- geth/{txqueue => transactions}/ethtxclient.go | 2 +- geth/{txqueue => transactions}/fake/mock.go | 2 +- .../fake/txservice.go | 0 geth/transactions/notifications.go | 94 ++++++ .../queue/queue.go} | 191 +++++------- geth/transactions/queue/queue_test.go | 134 +++++++++ geth/{txqueue => transactions/queue}/utils.go | 2 +- .../txqueue_manager.go | 280 +++++------------- .../txqueue_manager_test.go | 89 ++---- lib/utils.go | 37 +-- 18 files changed, 489 insertions(+), 569 deletions(-) rename geth/{txqueue => transactions}/addrlock.go (98%) rename geth/{txqueue => transactions}/ethtxclient.go (99%) rename geth/{txqueue => transactions}/fake/mock.go (98%) rename geth/{txqueue => transactions}/fake/txservice.go (100%) create mode 100644 geth/transactions/notifications.go rename geth/{txqueue/txqueue.go => transactions/queue/queue.go} (56%) create mode 100644 geth/transactions/queue/queue_test.go rename geth/{txqueue => transactions/queue}/utils.go (97%) rename geth/{txqueue => transactions}/txqueue_manager.go (51%) rename geth/{txqueue => transactions}/txqueue_manager_test.go (75%) diff --git a/Makefile b/Makefile index 43d17db4d..b848d4901 100644 --- a/Makefile +++ b/Makefile @@ -111,7 +111,7 @@ mock: ##@other Regenerate mocks mockgen -source=geth/mailservice/mailservice.go -destination=geth/mailservice/mailservice_mock.go -package=mailservice mockgen -source=geth/common/notification.go -destination=geth/common/notification_mock.go -package=common -imports fcm=github.com/NaySoftware/go-fcm mockgen -source=geth/notification/fcm/client.go -destination=geth/notification/fcm/client_mock.go -package=fcm -imports fcm=github.com/NaySoftware/go-fcm - mockgen -source=geth/txqueue/fake/txservice.go -destination=geth/txqueue/fake/mock.go -package=fake + mockgen -source=geth/transactions/fake/txservice.go -destination=geth/transactions/fake/mock.go -package=fake test: test-unit-coverage ##@tests Run basic, short tests during development diff --git a/e2e/jail/jail_rpc_test.go b/e2e/jail/jail_rpc_test.go index 2b9f99d87..96c7c4ed0 100644 --- a/e2e/jail/jail_rpc_test.go +++ b/e2e/jail/jail_rpc_test.go @@ -14,7 +14,7 @@ import ( "github.com/status-im/status-go/geth/common" "github.com/status-im/status-go/geth/params" "github.com/status-im/status-go/geth/signal" - "github.com/status-im/status-go/geth/txqueue" + "github.com/status-im/status-go/geth/transactions" . "github.com/status-im/status-go/testing" "github.com/stretchr/testify/suite" ) @@ -126,7 +126,7 @@ func (s *JailRPCTestSuite) TestContractDeployment() { unmarshalErr := json.Unmarshal([]byte(jsonEvent), &envelope) s.NoError(unmarshalErr, "cannot unmarshal JSON: %s", jsonEvent) - if envelope.Type == txqueue.EventTransactionQueued { + if envelope.Type == transactions.EventTransactionQueued { event := envelope.Event.(map[string]interface{}) s.T().Logf("transaction queued and will be completed shortly, id: %v", event["id"]) @@ -284,7 +284,7 @@ func (s *JailRPCTestSuite) TestJailVMPersistence() { s.T().Errorf("cannot unmarshal event's JSON: %s", jsonEvent) return } - if envelope.Type == txqueue.EventTransactionQueued { + if envelope.Type == transactions.EventTransactionQueued { event := envelope.Event.(map[string]interface{}) s.T().Logf("Transaction queued (will be completed shortly): {id: %s}\n", event["id"].(string)) diff --git a/e2e/transactions/transactions_test.go b/e2e/transactions/transactions_test.go index b3533d37d..ea56a025c 100644 --- a/e2e/transactions/transactions_test.go +++ b/e2e/transactions/transactions_test.go @@ -6,6 +6,7 @@ import ( "fmt" "math/big" "reflect" + "sync" "testing" "time" @@ -18,7 +19,8 @@ import ( "github.com/status-im/status-go/geth/common" "github.com/status-im/status-go/geth/params" "github.com/status-im/status-go/geth/signal" - "github.com/status-im/status-go/geth/txqueue" + "github.com/status-im/status-go/geth/transactions" + "github.com/status-im/status-go/geth/transactions/queue" . "github.com/status-im/status-go/testing" "github.com/stretchr/testify/suite" ) @@ -48,7 +50,7 @@ func (s *TransactionsTestSuite) TestCallRPCSendTransaction() { err := json.Unmarshal([]byte(rawSignal), &sg) s.NoError(err) - if sg.Type == txqueue.EventTransactionQueued { + if sg.Type == transactions.EventTransactionQueued { event := sg.Event.(map[string]interface{}) txID := event["id"].(string) txHash, err = s.Backend.CompleteTransaction(common.QueuedTxID(txID), TestConfig.Account1.Password) @@ -100,7 +102,7 @@ func (s *TransactionsTestSuite) TestCallRPCSendTransactionUpstream() { err := json.Unmarshal([]byte(rawSignal), &signalEnvelope) s.NoError(err) - if signalEnvelope.Type == txqueue.EventTransactionQueued { + if signalEnvelope.Type == transactions.EventTransactionQueued { event := signalEnvelope.Event.(map[string]interface{}) txID := event["id"].(string) @@ -156,7 +158,7 @@ func (s *TransactionsTestSuite) TestSendContractTx() { err = json.Unmarshal([]byte(jsonEvent), &envelope) s.NoError(err, fmt.Sprintf("cannot unmarshal JSON: %s", jsonEvent)) - if envelope.Type == txqueue.EventTransactionQueued { + if envelope.Type == transactions.EventTransactionQueued { event := envelope.Event.(map[string]interface{}) log.Info("transaction queued (will be completed shortly)", "id", event["id"].(string)) @@ -182,7 +184,7 @@ func (s *TransactionsTestSuite) TestSendContractTx() { ) s.EqualError( err, - txqueue.ErrInvalidCompleteTxSender.Error(), + queue.ErrInvalidCompleteTxSender.Error(), fmt.Sprintf("expected error on queued transaction[%v] not thrown", event["id"]), ) @@ -247,7 +249,7 @@ func (s *TransactionsTestSuite) TestSendEther() { err = json.Unmarshal([]byte(jsonEvent), &envelope) s.NoError(err, fmt.Sprintf("cannot unmarshal JSON: %s", jsonEvent)) - if envelope.Type == txqueue.EventTransactionQueued { + if envelope.Type == transactions.EventTransactionQueued { event := envelope.Event.(map[string]interface{}) log.Info("transaction queued (will be completed shortly)", "id", event["id"].(string)) @@ -271,7 +273,7 @@ func (s *TransactionsTestSuite) TestSendEther() { common.QueuedTxID(event["id"].(string)), TestConfig.Account1.Password) s.EqualError( err, - txqueue.ErrInvalidCompleteTxSender.Error(), + queue.ErrInvalidCompleteTxSender.Error(), fmt.Sprintf("expected error on queued transaction[%v] not thrown", event["id"]), ) @@ -330,7 +332,7 @@ func (s *TransactionsTestSuite) TestSendEtherTxUpstream() { err = json.Unmarshal([]byte(jsonEvent), &envelope) s.NoError(err, "cannot unmarshal JSON: %s", jsonEvent) - if envelope.Type == txqueue.EventTransactionQueued { + if envelope.Type == transactions.EventTransactionQueued { event := envelope.Event.(map[string]interface{}) log.Info("transaction queued (will be completed shortly)", "id", event["id"].(string)) @@ -387,7 +389,7 @@ func (s *TransactionsTestSuite) TestDoubleCompleteQueuedTransactions() { err := json.Unmarshal([]byte(jsonEvent), &envelope) s.NoError(err, fmt.Sprintf("cannot unmarshal JSON: %s", jsonEvent)) - if envelope.Type == txqueue.EventTransactionQueued { + if envelope.Type == transactions.EventTransactionQueued { event := envelope.Event.(map[string]interface{}) txID := common.QueuedTxID(event["id"].(string)) log.Info("transaction queued (will be failed and completed on the second call)", "id", txID) @@ -407,7 +409,7 @@ func (s *TransactionsTestSuite) TestDoubleCompleteQueuedTransactions() { close(completeQueuedTransaction) } - if envelope.Type == txqueue.EventTransactionFailed { + if envelope.Type == transactions.EventTransactionFailed { event := envelope.Event.(map[string]interface{}) log.Info("transaction return event received", "id", event["id"].(string)) @@ -466,7 +468,7 @@ func (s *TransactionsTestSuite) TestDiscardQueuedTransaction() { err := json.Unmarshal([]byte(jsonEvent), &envelope) s.NoError(err, fmt.Sprintf("cannot unmarshal JSON: %s", jsonEvent)) - if envelope.Type == txqueue.EventTransactionQueued { + if envelope.Type == transactions.EventTransactionQueued { event := envelope.Event.(map[string]interface{}) txID := common.QueuedTxID(event["id"].(string)) log.Info("transaction queued (will be discarded soon)", "id", txID) @@ -488,12 +490,12 @@ func (s *TransactionsTestSuite) TestDiscardQueuedTransaction() { close(completeQueuedTransaction) } - if envelope.Type == txqueue.EventTransactionFailed { + if envelope.Type == transactions.EventTransactionFailed { event := envelope.Event.(map[string]interface{}) log.Info("transaction return event received", "id", event["id"].(string)) receivedErrMessage := event["error_message"].(string) - expectedErrMessage := txqueue.ErrQueuedTxDiscarded.Error() + expectedErrMessage := queue.ErrQueuedTxDiscarded.Error() s.Equal(receivedErrMessage, expectedErrMessage) receivedErrCode := event["error_code"].(string) @@ -509,7 +511,7 @@ func (s *TransactionsTestSuite) TestDiscardQueuedTransaction() { To: common.ToAddress(TestConfig.Account2.Address), Value: (*hexutil.Big)(big.NewInt(1000000000000)), }) - s.EqualError(err, txqueue.ErrQueuedTxDiscarded.Error(), "transaction is expected to be discarded") + s.EqualError(err, queue.ErrQueuedTxDiscarded.Error(), "transaction is expected to be discarded") select { case <-completeQueuedTransaction: @@ -543,7 +545,7 @@ func (s *TransactionsTestSuite) TestCompleteMultipleQueuedTransactions() { err := json.Unmarshal([]byte(jsonEvent), &envelope) s.NoError(err, fmt.Sprintf("cannot unmarshal JSON: %s", jsonEvent)) - if envelope.Type == txqueue.EventTransactionQueued { + if envelope.Type == transactions.EventTransactionQueued { event := envelope.Event.(map[string]interface{}) txID := common.QueuedTxID(event["id"].(string)) log.Info("transaction queued (will be completed in a single call, once aggregated)", "id", txID) @@ -640,7 +642,7 @@ func (s *TransactionsTestSuite) TestDiscardMultipleQueuedTransactions() { var envelope signal.Envelope err := json.Unmarshal([]byte(jsonEvent), &envelope) s.NoError(err) - if envelope.Type == txqueue.EventTransactionQueued { + if envelope.Type == transactions.EventTransactionQueued { event := envelope.Event.(map[string]interface{}) txID := common.QueuedTxID(event["id"].(string)) log.Info("transaction queued (will be discarded soon)", "id", txID) @@ -650,12 +652,12 @@ func (s *TransactionsTestSuite) TestDiscardMultipleQueuedTransactions() { txIDs <- txID } - if envelope.Type == txqueue.EventTransactionFailed { + if envelope.Type == transactions.EventTransactionFailed { event := envelope.Event.(map[string]interface{}) log.Info("transaction return event received", "id", event["id"].(string)) receivedErrMessage := event["error_message"].(string) - expectedErrMessage := txqueue.ErrQueuedTxDiscarded.Error() + expectedErrMessage := queue.ErrQueuedTxDiscarded.Error() s.Equal(receivedErrMessage, expectedErrMessage) receivedErrCode := event["error_code"].(string) @@ -675,7 +677,7 @@ func (s *TransactionsTestSuite) TestDiscardMultipleQueuedTransactions() { To: common.ToAddress(TestConfig.Account2.Address), Value: (*hexutil.Big)(big.NewInt(1000000000000)), }) - s.EqualError(err, txqueue.ErrQueuedTxDiscarded.Error()) + s.EqualError(err, queue.ErrQueuedTxDiscarded.Error()) s.True(reflect.DeepEqual(txHashCheck, gethcommon.Hash{}), "transaction returned hash, while it shouldn't") } @@ -747,7 +749,7 @@ func (s *TransactionsTestSuite) TestNonExistentQueuedTransactions() { // try completing non-existing transaction _, err := s.Backend.CompleteTransaction("some-bad-transaction-id", TestConfig.Account1.Password) s.Error(err, "error expected and not received") - s.EqualError(err, txqueue.ErrQueuedTxIDNotFound.Error()) + s.EqualError(err, queue.ErrQueuedTxIDNotFound.Error()) } func (s *TransactionsTestSuite) TestEvictionOfQueuedTransactions() { @@ -756,6 +758,24 @@ func (s *TransactionsTestSuite) TestEvictionOfQueuedTransactions() { backend := s.LightEthereumService().StatusBackend s.NotNil(backend) + var m sync.Mutex + txCount := 0 + txIDs := [queue.DefaultTxQueueCap + 5 + 10]common.QueuedTxID{} + + signal.SetDefaultNodeNotificationHandler(func(rawSignal string) { + var sg signal.Envelope + err := json.Unmarshal([]byte(rawSignal), &sg) + s.NoError(err) + + if sg.Type == transactions.EventTransactionQueued { + event := sg.Event.(map[string]interface{}) + txID := event["id"].(string) + m.Lock() + txIDs[txCount] = common.QueuedTxID(txID) + txCount++ + m.Unlock() + } + }) // reset queue s.Backend.TxQueueManager().TransactionQueue().Reset() @@ -764,36 +784,23 @@ func (s *TransactionsTestSuite) TestEvictionOfQueuedTransactions() { s.NoError(s.Backend.AccountManager().SelectAccount(TestConfig.Account1.Address, TestConfig.Account1.Password)) txQueue := s.Backend.TxQueueManager().TransactionQueue() - var i = 0 - txIDs := [txqueue.DefaultTxQueueCap + 5 + 10]common.QueuedTxID{} - s.Backend.TxQueueManager().SetTransactionQueueHandler(func(queuedTx *common.QueuedTx) { - log.Info("tx enqueued", "i", i+1, "queue size", txQueue.Count(), "id", queuedTx.ID) - txIDs[i] = queuedTx.ID - i++ - }) - s.Zero(txQueue.Count(), "transaction count should be zero") for j := 0; j < 10; j++ { go s.Backend.SendTransaction(context.TODO(), common.SendTxArgs{}) // nolint: errcheck } - time.Sleep(2 * time.Second) // FIXME(tiabc): more reliable synchronization to ensure all transactions are enqueued - - log.Info(fmt.Sprintf("Number of transactions queued: %d. Queue size (shouldn't be more than %d): %d", - i, txqueue.DefaultTxQueueCap, txQueue.Count())) - + time.Sleep(2 * time.Second) s.Equal(10, txQueue.Count(), "transaction count should be 10") - for i := 0; i < txqueue.DefaultTxQueueCap+5; i++ { // stress test by hitting with lots of goroutines + for i := 0; i < queue.DefaultTxQueueCap+5; i++ { // stress test by hitting with lots of goroutines go s.Backend.SendTransaction(context.TODO(), common.SendTxArgs{}) // nolint: errcheck } time.Sleep(5 * time.Second) - s.True(txQueue.Count() <= txqueue.DefaultTxQueueCap, "transaction count should be %d (or %d): got %d", txqueue.DefaultTxQueueCap, txqueue.DefaultTxQueueCap-1, txQueue.Count()) + s.True(txQueue.Count() <= queue.DefaultTxQueueCap, "transaction count should be %d (or %d): got %d", queue.DefaultTxQueueCap, queue.DefaultTxQueueCap-1, txQueue.Count()) for _, txID := range txIDs { txQueue.Remove(txID) } - s.Zero(txQueue.Count(), "transaction count should be zero: %d", txQueue.Count()) } diff --git a/geth/api/backend.go b/geth/api/backend.go index fdc61930c..6b27d72a3 100644 --- a/geth/api/backend.go +++ b/geth/api/backend.go @@ -13,7 +13,7 @@ import ( "github.com/status-im/status-go/geth/notification/fcm" "github.com/status-im/status-go/geth/params" "github.com/status-im/status-go/geth/signal" - "github.com/status-im/status-go/geth/txqueue" + "github.com/status-im/status-go/geth/transactions" ) const ( @@ -38,7 +38,7 @@ func NewStatusBackend() *StatusBackend { nodeManager := node.NewNodeManager() accountManager := account.NewManager(nodeManager) - txQueueManager := txqueue.NewManager(nodeManager, accountManager) + txQueueManager := transactions.NewManager(nodeManager, accountManager) jailManager := jail.New(nodeManager) notificationManager := fcm.NewNotification(fcmServerKey) @@ -205,7 +205,7 @@ func (m *StatusBackend) SendTransaction(ctx context.Context, args common.SendTxA ctx = context.Background() } - tx := m.txQueueManager.CreateTransaction(ctx, args) + tx := common.CreateTransaction(ctx, args) if err := m.txQueueManager.QueueTransaction(tx); err != nil { return gethcommon.Hash{}, err @@ -247,11 +247,5 @@ func (m *StatusBackend) registerHandlers() error { rpcClient.RegisterHandler("eth_accounts", m.accountManager.AccountsRPCHandler()) rpcClient.RegisterHandler("eth_sendTransaction", m.txQueueManager.SendTransactionRPCHandler) - m.txQueueManager.SetTransactionQueueHandler(m.txQueueManager.TransactionQueueHandler()) - log.Info("Registered handler", "fn", "TransactionQueueHandler") - - m.txQueueManager.SetTransactionReturnHandler(m.txQueueManager.TransactionReturnHandler()) - log.Info("Registered handler", "fn", "TransactionReturnHandler") - return nil } diff --git a/geth/common/types.go b/geth/common/types.go index 03d32affb..812fc66ba 100644 --- a/geth/common/types.go +++ b/geth/common/types.go @@ -157,14 +157,12 @@ type QueuedTxID string // QueuedTx holds enough information to complete the queued transaction. type QueuedTx struct { - ID QueuedTxID - Hash common.Hash - Context context.Context - Args SendTxArgs - InProgress bool // true if transaction is being sent - Done chan struct{} - Discard chan struct{} - Err error + ID QueuedTxID + Hash common.Hash + Context context.Context + Args SendTxArgs + Done chan struct{} + Err error } // SendTxArgs represents the arguments to submit a new transaction into the transaction pool. @@ -178,12 +176,6 @@ type SendTxArgs struct { Nonce *hexutil.Uint64 `json:"nonce"` } -// EnqueuedTxHandler is a function that receives queued/pending transactions, when they get queued -type EnqueuedTxHandler func(*QueuedTx) - -// EnqueuedTxReturnHandler is a function that receives response when tx is complete (both on success and error) -type EnqueuedTxReturnHandler func(*QueuedTx, error) - // TxQueue is a queue of transactions. type TxQueue interface { // Remove removes a transaction from the queue. @@ -210,32 +202,14 @@ type TxQueueManager interface { // TransactionQueue returns a transaction queue. TransactionQueue() TxQueue - // CreateTransactoin creates a new transaction. - CreateTransaction(ctx context.Context, args SendTxArgs) *QueuedTx - // QueueTransaction adds a new transaction to the queue. QueueTransaction(tx *QueuedTx) error // WaitForTransactions blocks until transaction is completed, discarded or timed out. WaitForTransaction(tx *QueuedTx) error - // NotifyOnQueuedTxReturn notifies a handler when a transaction returns. - NotifyOnQueuedTxReturn(queuedTx *QueuedTx, err error) - - // TransactionQueueHandler returns handler that processes incoming tx queue requests - TransactionQueueHandler() func(queuedTx *QueuedTx) - - // TODO(adam): might be not needed - SetTransactionQueueHandler(fn EnqueuedTxHandler) - - // TODO(adam): might be not needed - SetTransactionReturnHandler(fn EnqueuedTxReturnHandler) - SendTransactionRPCHandler(ctx context.Context, args ...interface{}) (interface{}, error) - // TransactionReturnHandler returns handler that processes responses from internal tx manager - TransactionReturnHandler() func(queuedTx *QueuedTx, err error) - // CompleteTransaction instructs backend to complete sending of a given transaction CompleteTransaction(id QueuedTxID, password string) (common.Hash, error) diff --git a/geth/common/types_mock.go b/geth/common/types_mock.go index 2fbd5c068..756bef6e2 100644 --- a/geth/common/types_mock.go +++ b/geth/common/types_mock.go @@ -521,18 +521,6 @@ func (mr *MockTxQueueManagerMockRecorder) TransactionQueue() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TransactionQueue", reflect.TypeOf((*MockTxQueueManager)(nil).TransactionQueue)) } -// CreateTransaction mocks base method -func (m *MockTxQueueManager) CreateTransaction(ctx context.Context, args SendTxArgs) *QueuedTx { - ret := m.ctrl.Call(m, "CreateTransaction", ctx, args) - ret0, _ := ret[0].(*QueuedTx) - return ret0 -} - -// CreateTransaction indicates an expected call of CreateTransaction -func (mr *MockTxQueueManagerMockRecorder) CreateTransaction(ctx, args interface{}) *gomock.Call { - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateTransaction", reflect.TypeOf((*MockTxQueueManager)(nil).CreateTransaction), ctx, args) -} - // QueueTransaction mocks base method func (m *MockTxQueueManager) QueueTransaction(tx *QueuedTx) error { ret := m.ctrl.Call(m, "QueueTransaction", tx) @@ -557,48 +545,6 @@ func (mr *MockTxQueueManagerMockRecorder) WaitForTransaction(tx interface{}) *go return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WaitForTransaction", reflect.TypeOf((*MockTxQueueManager)(nil).WaitForTransaction), tx) } -// NotifyOnQueuedTxReturn mocks base method -func (m *MockTxQueueManager) NotifyOnQueuedTxReturn(queuedTx *QueuedTx, err error) { - m.ctrl.Call(m, "NotifyOnQueuedTxReturn", queuedTx, err) -} - -// NotifyOnQueuedTxReturn indicates an expected call of NotifyOnQueuedTxReturn -func (mr *MockTxQueueManagerMockRecorder) NotifyOnQueuedTxReturn(queuedTx, err interface{}) *gomock.Call { - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NotifyOnQueuedTxReturn", reflect.TypeOf((*MockTxQueueManager)(nil).NotifyOnQueuedTxReturn), queuedTx, err) -} - -// TransactionQueueHandler mocks base method -func (m *MockTxQueueManager) TransactionQueueHandler() func(*QueuedTx) { - ret := m.ctrl.Call(m, "TransactionQueueHandler") - ret0, _ := ret[0].(func(*QueuedTx)) - return ret0 -} - -// TransactionQueueHandler indicates an expected call of TransactionQueueHandler -func (mr *MockTxQueueManagerMockRecorder) TransactionQueueHandler() *gomock.Call { - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TransactionQueueHandler", reflect.TypeOf((*MockTxQueueManager)(nil).TransactionQueueHandler)) -} - -// SetTransactionQueueHandler mocks base method -func (m *MockTxQueueManager) SetTransactionQueueHandler(fn EnqueuedTxHandler) { - m.ctrl.Call(m, "SetTransactionQueueHandler", fn) -} - -// SetTransactionQueueHandler indicates an expected call of SetTransactionQueueHandler -func (mr *MockTxQueueManagerMockRecorder) SetTransactionQueueHandler(fn interface{}) *gomock.Call { - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetTransactionQueueHandler", reflect.TypeOf((*MockTxQueueManager)(nil).SetTransactionQueueHandler), fn) -} - -// SetTransactionReturnHandler mocks base method -func (m *MockTxQueueManager) SetTransactionReturnHandler(fn EnqueuedTxReturnHandler) { - m.ctrl.Call(m, "SetTransactionReturnHandler", fn) -} - -// SetTransactionReturnHandler indicates an expected call of SetTransactionReturnHandler -func (mr *MockTxQueueManagerMockRecorder) SetTransactionReturnHandler(fn interface{}) *gomock.Call { - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetTransactionReturnHandler", reflect.TypeOf((*MockTxQueueManager)(nil).SetTransactionReturnHandler), fn) -} - // SendTransactionRPCHandler mocks base method func (m *MockTxQueueManager) SendTransactionRPCHandler(ctx context.Context, args ...interface{}) (interface{}, error) { varargs := []interface{}{ctx} @@ -617,18 +563,6 @@ func (mr *MockTxQueueManagerMockRecorder) SendTransactionRPCHandler(ctx interfac return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendTransactionRPCHandler", reflect.TypeOf((*MockTxQueueManager)(nil).SendTransactionRPCHandler), varargs...) } -// TransactionReturnHandler mocks base method -func (m *MockTxQueueManager) TransactionReturnHandler() func(*QueuedTx, error) { - ret := m.ctrl.Call(m, "TransactionReturnHandler") - ret0, _ := ret[0].(func(*QueuedTx, error)) - return ret0 -} - -// TransactionReturnHandler indicates an expected call of TransactionReturnHandler -func (mr *MockTxQueueManagerMockRecorder) TransactionReturnHandler() *gomock.Call { - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TransactionReturnHandler", reflect.TypeOf((*MockTxQueueManager)(nil).TransactionReturnHandler)) -} - // CompleteTransaction mocks base method func (m *MockTxQueueManager) CompleteTransaction(id QueuedTxID, password string) (common.Hash, error) { ret := m.ctrl.Call(m, "CompleteTransaction", id, password) @@ -678,6 +612,16 @@ func (mr *MockTxQueueManagerMockRecorder) DiscardTransactions(ids interface{}) * return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DiscardTransactions", reflect.TypeOf((*MockTxQueueManager)(nil).DiscardTransactions), ids) } +// DisableNotificactions mocks base method +func (m *MockTxQueueManager) DisableNotificactions() { + m.ctrl.Call(m, "DisableNotificactions") +} + +// DisableNotificactions indicates an expected call of DisableNotificactions +func (mr *MockTxQueueManagerMockRecorder) DisableNotificactions() *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DisableNotificactions", reflect.TypeOf((*MockTxQueueManager)(nil).DisableNotificactions)) +} + // MockJailCell is a mock of JailCell interface type MockJailCell struct { ctrl *gomock.Controller diff --git a/geth/common/utils.go b/geth/common/utils.go index 8bedfd2af..c4ad1d152 100644 --- a/geth/common/utils.go +++ b/geth/common/utils.go @@ -16,6 +16,7 @@ import ( "github.com/ethereum/go-ethereum/accounts" "github.com/ethereum/go-ethereum/common" + "github.com/pborman/uuid" "github.com/status-im/status-go/geth/log" "github.com/status-im/status-go/static" ) @@ -151,3 +152,14 @@ func Fatalf(reason interface{}, args ...interface{}) { os.Exit(1) } + +// CreateTransaction returns a transaction object. +func CreateTransaction(ctx context.Context, args SendTxArgs) *QueuedTx { + return &QueuedTx{ + ID: QueuedTxID(uuid.New()), + Hash: common.Hash{}, + Context: ctx, + Args: args, + Done: make(chan struct{}), + } +} diff --git a/geth/txqueue/addrlock.go b/geth/transactions/addrlock.go similarity index 98% rename from geth/txqueue/addrlock.go rename to geth/transactions/addrlock.go index dd08c7f43..69a7a9229 100644 --- a/geth/txqueue/addrlock.go +++ b/geth/transactions/addrlock.go @@ -1,6 +1,6 @@ // copy of go-ethereum/internal/ethapi/addrlock.go -package txqueue +package transactions import ( "sync" diff --git a/geth/txqueue/ethtxclient.go b/geth/transactions/ethtxclient.go similarity index 99% rename from geth/txqueue/ethtxclient.go rename to geth/transactions/ethtxclient.go index baf240e96..0f49e7290 100644 --- a/geth/txqueue/ethtxclient.go +++ b/geth/transactions/ethtxclient.go @@ -1,4 +1,4 @@ -package txqueue +package transactions import ( "context" diff --git a/geth/txqueue/fake/mock.go b/geth/transactions/fake/mock.go similarity index 98% rename from geth/txqueue/fake/mock.go rename to geth/transactions/fake/mock.go index 0e002858e..6b4218978 100644 --- a/geth/txqueue/fake/mock.go +++ b/geth/transactions/fake/mock.go @@ -1,5 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: geth/txqueue/fake/txservice.go +// Source: geth/transactions/fake/txservice.go // Package fake is a generated GoMock package. package fake diff --git a/geth/txqueue/fake/txservice.go b/geth/transactions/fake/txservice.go similarity index 100% rename from geth/txqueue/fake/txservice.go rename to geth/transactions/fake/txservice.go diff --git a/geth/transactions/notifications.go b/geth/transactions/notifications.go new file mode 100644 index 000000000..d307951ff --- /dev/null +++ b/geth/transactions/notifications.go @@ -0,0 +1,94 @@ +package transactions + +import ( + "strconv" + + "github.com/ethereum/go-ethereum/accounts/keystore" + "github.com/status-im/status-go/geth/common" + "github.com/status-im/status-go/geth/signal" + "github.com/status-im/status-go/geth/transactions/queue" +) + +const ( + // EventTransactionQueued is triggered when send transaction request is queued + EventTransactionQueued = "transaction.queued" + // EventTransactionFailed is triggered when send transaction request fails + EventTransactionFailed = "transaction.failed" +) + +const ( + // SendTransactionNoErrorCode is sent when no error occurred. + SendTransactionNoErrorCode = iota + // SendTransactionDefaultErrorCode is every case when there is no special tx return code. + SendTransactionDefaultErrorCode + // SendTransactionPasswordErrorCode is sent when account failed verification. + SendTransactionPasswordErrorCode + // SendTransactionTimeoutErrorCode is sent when tx is timed out. + SendTransactionTimeoutErrorCode + // SendTransactionDiscardedErrorCode is sent when tx was discarded. + SendTransactionDiscardedErrorCode +) + +var txReturnCodes = map[error]int{ + nil: SendTransactionNoErrorCode, + keystore.ErrDecrypt: SendTransactionPasswordErrorCode, + queue.ErrQueuedTxTimedOut: SendTransactionTimeoutErrorCode, + queue.ErrQueuedTxDiscarded: SendTransactionDiscardedErrorCode, +} + +// SendTransactionEvent is a signal sent on a send transaction request +type SendTransactionEvent struct { + ID string `json:"id"` + Args common.SendTxArgs `json:"args"` + MessageID string `json:"message_id"` +} + +// NotifyOnEnqueue returns handler that processes incoming tx queue requests +func NotifyOnEnqueue(queuedTx *common.QueuedTx) { + signal.Send(signal.Envelope{ + Type: EventTransactionQueued, + Event: SendTransactionEvent{ + ID: string(queuedTx.ID), + Args: queuedTx.Args, + MessageID: common.MessageIDFromContext(queuedTx.Context), + }, + }) +} + +// ReturnSendTransactionEvent is a JSON returned whenever transaction send is returned +type ReturnSendTransactionEvent struct { + ID string `json:"id"` + Args common.SendTxArgs `json:"args"` + MessageID string `json:"message_id"` + ErrorMessage string `json:"error_message"` + ErrorCode string `json:"error_code"` +} + +// NotifyOnReturn returns handler that processes responses from internal tx manager +func NotifyOnReturn(queuedTx *common.QueuedTx) { + // discard notifications with empty tx + if queuedTx == nil { + return + } + // we don't want to notify a user if tx sent successfully + if queuedTx.Err == nil { + return + } + signal.Send(signal.Envelope{ + Type: EventTransactionFailed, + Event: ReturnSendTransactionEvent{ + ID: string(queuedTx.ID), + Args: queuedTx.Args, + MessageID: common.MessageIDFromContext(queuedTx.Context), + ErrorMessage: queuedTx.Err.Error(), + ErrorCode: strconv.Itoa(sendTransactionErrorCode(queuedTx.Err)), + }, + }) +} + +func sendTransactionErrorCode(err error) int { + if code, ok := txReturnCodes[err]; ok { + return code + } + return SendTxDefaultErrorCode +} diff --git a/geth/txqueue/txqueue.go b/geth/transactions/queue/queue.go similarity index 56% rename from geth/txqueue/txqueue.go rename to geth/transactions/queue/queue.go index 05fd4826a..eea695442 100644 --- a/geth/txqueue/txqueue.go +++ b/geth/transactions/queue/queue.go @@ -1,4 +1,4 @@ -package txqueue +package queue import ( "errors" @@ -8,6 +8,7 @@ import ( "github.com/ethereum/go-ethereum/accounts/keystore" gethcommon "github.com/ethereum/go-ethereum/common" + "github.com/status-im/status-go/geth/account" "github.com/status-im/status-go/geth/common" "github.com/status-im/status-go/geth/log" ) @@ -15,10 +16,6 @@ import ( const ( // DefaultTxQueueCap defines how many items can be queued. DefaultTxQueueCap = int(35) - // DefaultTxSendQueueCap defines how many items can be passed to sendTransaction() w/o blocking. - DefaultTxSendQueueCap = int(70) - // DefaultTxSendCompletionTimeout defines how many seconds to wait before returning result in sentTransaction(). - DefaultTxSendCompletionTimeout = 300 ) var ( @@ -36,33 +33,39 @@ var ( ErrInvalidCompleteTxSender = errors.New("transaction can only be completed by the same account which created it") ) +// remove from queue on any error (except for transient ones) and propagate +// defined as map[string]bool because errors from ethclient returned wrapped as jsonError +var transientErrs = map[string]bool{ + keystore.ErrDecrypt.Error(): true, // wrong password + ErrInvalidCompleteTxSender.Error(): true, // completing tx create from another account + account.ErrNoAccountSelected.Error(): true, // account not selected +} + +type empty struct{} + // TxQueue is capped container that holds pending transactions type TxQueue struct { - transactions map[common.QueuedTxID]*common.QueuedTx - mu sync.RWMutex // to guard transactions map + mu sync.RWMutex // to guard transactions map + transactions map[common.QueuedTxID]*common.QueuedTx + inprogress map[common.QueuedTxID]empty + + // TODO(dshulyak) research why eviction is done in separate goroutine evictableIDs chan common.QueuedTxID enqueueTicker chan struct{} - incomingPool chan *common.QueuedTx // when this channel is closed, all queue channels processing must cease (incoming queue, processing queued items etc) stopped chan struct{} stoppedGroup sync.WaitGroup // to make sure that all routines are stopped - - // when items are enqueued notify subscriber - txEnqueueHandler common.EnqueuedTxHandler - - // when tx is returned (either successfully or with error) notify subscriber - txReturnHandler common.EnqueuedTxReturnHandler } -// NewTransactionQueue make new transaction queue -func NewTransactionQueue() *TxQueue { +// New creates a transaction queue. +func New() *TxQueue { log.Info("initializing transaction queue") return &TxQueue{ transactions: make(map[common.QueuedTxID]*common.QueuedTx), + inprogress: make(map[common.QueuedTxID]empty), evictableIDs: make(chan common.QueuedTxID, DefaultTxQueueCap), // will be used to evict in FIFO enqueueTicker: make(chan struct{}), - incomingPool: make(chan *common.QueuedTx, DefaultTxSendQueueCap), } } @@ -75,10 +78,8 @@ func (q *TxQueue) Start() { } q.stopped = make(chan struct{}) - q.stoppedGroup.Add(2) - + q.stoppedGroup.Add(1) go q.evictionLoop() - go q.enqueueLoop() } // Stop stops transaction enqueue and eviction loops @@ -100,7 +101,7 @@ func (q *TxQueue) Stop() { func (q *TxQueue) evictionLoop() { defer HaltOnPanic() evict := func() { - if len(q.transactions) >= DefaultTxQueueCap { // eviction is required to accommodate another/last item + if q.Count() >= DefaultTxQueueCap { // eviction is required to accommodate another/last item q.Remove(<-q.evictableIDs) } } @@ -119,26 +120,6 @@ func (q *TxQueue) evictionLoop() { } } -// enqueueLoop process incoming enqueue requests -func (q *TxQueue) enqueueLoop() { - defer HaltOnPanic() - - // enqueue incoming transactions - for { - select { - case queuedTx := <-q.incomingPool: - log.Info("transaction enqueued requested", "tx", queuedTx.ID) - err := q.Enqueue(queuedTx) - log.Warn("transaction enqueued error", "tx", err) - log.Info("transaction enqueued", "tx", queuedTx.ID) - case <-q.stopped: - log.Info("transaction queue's enqueue loop stopped") - q.stoppedGroup.Done() - return - } - } -} - // Reset is to be used in tests only, as it simply creates new transaction map, w/o any cleanup of the previous one func (q *TxQueue) Reset() { q.mu.Lock() @@ -146,22 +127,14 @@ func (q *TxQueue) Reset() { q.transactions = make(map[common.QueuedTxID]*common.QueuedTx) q.evictableIDs = make(chan common.QueuedTxID, DefaultTxQueueCap) -} - -// EnqueueAsync enqueues incoming transaction in async manner, returns as soon as possible -func (q *TxQueue) EnqueueAsync(tx *common.QueuedTx) error { - q.incomingPool <- tx - - return nil + q.inprogress = make(map[common.QueuedTxID]empty) } // Enqueue enqueues incoming transaction func (q *TxQueue) Enqueue(tx *common.QueuedTx) error { log.Info(fmt.Sprintf("enqueue transaction: %s", tx.ID)) - - if q.txEnqueueHandler == nil { //discard, until handler is provided - log.Info("there is no txEnqueueHandler") - return nil + if (tx.Hash != gethcommon.Hash{} || tx.Err != nil) { + return ErrQueuedTxAlreadyProcessed } log.Info("before enqueueTicker") @@ -176,8 +149,6 @@ func (q *TxQueue) Enqueue(tx *common.QueuedTx) error { // notify handler log.Info("calling txEnqueueHandler") - q.txEnqueueHandler(tx) - return nil } @@ -189,50 +160,68 @@ func (q *TxQueue) Get(id common.QueuedTxID) (*common.QueuedTx, error) { if tx, ok := q.transactions[id]; ok { return tx, nil } - return nil, ErrQueuedTxIDNotFound } +// LockInprogress returns error if transaction is already inprogress. +func (q *TxQueue) LockInprogress(id common.QueuedTxID) error { + q.mu.Lock() + defer q.mu.Unlock() + if _, ok := q.transactions[id]; ok { + if _, inprogress := q.inprogress[id]; inprogress { + return ErrQueuedTxInProgress + } + q.inprogress[id] = empty{} + return nil + } + return ErrQueuedTxIDNotFound +} + // Remove removes transaction by transaction identifier func (q *TxQueue) Remove(id common.QueuedTxID) { q.mu.Lock() defer q.mu.Unlock() - - delete(q.transactions, id) + q.remove(id) } -// StartProcessing marks a transaction as in progress. It's thread-safe and -// prevents from processing the same transaction multiple times. -func (q *TxQueue) StartProcessing(tx *common.QueuedTx) error { +func (q *TxQueue) remove(id common.QueuedTxID) { + delete(q.transactions, id) + delete(q.inprogress, id) +} + +// Done removes transaction from queue if no error or error is not transient +// and notify subscribers +func (q *TxQueue) Done(id common.QueuedTxID, hash gethcommon.Hash, err error) error { q.mu.Lock() defer q.mu.Unlock() - - if tx.Hash != (gethcommon.Hash{}) || tx.Err != nil { - return ErrQueuedTxAlreadyProcessed + tx, ok := q.transactions[id] + if !ok { + return ErrQueuedTxIDNotFound } - - if tx.InProgress { - return ErrQueuedTxInProgress - } - - tx.InProgress = true - + q.done(tx, hash, err) return nil } -// StopProcessing removes the "InProgress" flag from the transaction. -func (q *TxQueue) StopProcessing(tx *common.QueuedTx) { - q.mu.Lock() - defer q.mu.Unlock() - - tx.InProgress = false +func (q *TxQueue) done(tx *common.QueuedTx, hash gethcommon.Hash, err error) { + delete(q.inprogress, tx.ID) + tx.Err = err + // hash is updated only if err is nil, but transaction is not removed from a queue + if err == nil { + q.remove(tx.ID) + tx.Hash = hash + close(tx.Done) + return + } + if _, transient := transientErrs[err.Error()]; !transient { + q.remove(tx.ID) + close(tx.Done) + } } // Count returns number of currently queued transactions func (q *TxQueue) Count() int { q.mu.RLock() defer q.mu.RUnlock() - return len(q.transactions) } @@ -240,54 +229,6 @@ func (q *TxQueue) Count() int { func (q *TxQueue) Has(id common.QueuedTxID) bool { q.mu.RLock() defer q.mu.RUnlock() - _, ok := q.transactions[id] - return ok } - -// SetEnqueueHandler sets callback handler, that is triggered on enqueue operation -func (q *TxQueue) SetEnqueueHandler(fn common.EnqueuedTxHandler) { - q.txEnqueueHandler = fn -} - -// SetTxReturnHandler sets callback handler, that is triggered when transaction is finished executing -func (q *TxQueue) SetTxReturnHandler(fn common.EnqueuedTxReturnHandler) { - q.txReturnHandler = fn -} - -// NotifyOnQueuedTxReturn is invoked when transaction is ready to return -// Transaction can be in error state, or executed successfully at this point. -func (q *TxQueue) NotifyOnQueuedTxReturn(queuedTx *common.QueuedTx, err error) { - if q == nil { - return - } - - // discard, if transaction is not found - if queuedTx == nil { - return - } - - // on success, remove item from the queue and stop propagating - if err == nil { - q.Remove(queuedTx.ID) - return - } - - // error occurred, send upward notification - if q.txReturnHandler == nil { // discard, until handler is provided - return - } - - // remove from queue on any error (except for transient ones) and propagate - transientErrs := map[error]bool{ - keystore.ErrDecrypt: true, // wrong password - ErrInvalidCompleteTxSender: true, // completing tx create from another account - } - if !transientErrs[err] { // remove only on unrecoverable errors - q.Remove(queuedTx.ID) - } - - // notify handler - q.txReturnHandler(queuedTx, err) -} diff --git a/geth/transactions/queue/queue_test.go b/geth/transactions/queue/queue_test.go new file mode 100644 index 000000000..e156e6b91 --- /dev/null +++ b/geth/transactions/queue/queue_test.go @@ -0,0 +1,134 @@ +package queue + +import ( + "context" + "errors" + "testing" + + "github.com/ethereum/go-ethereum/accounts/keystore" + gethcommon "github.com/ethereum/go-ethereum/common" + + "github.com/status-im/status-go/geth/common" + "github.com/stretchr/testify/suite" +) + +func TestQueueTestSuite(t *testing.T) { + suite.Run(t, new(QueueTestSuite)) +} + +type QueueTestSuite struct { + suite.Suite + queue *TxQueue +} + +func (s *QueueTestSuite) SetupTest() { + s.queue = New() + s.queue.Start() +} + +func (s *QueueTestSuite) TearDownTest() { + s.queue.Stop() +} + +func (s *QueueTestSuite) TestLockInprogressTransaction() { + tx := common.CreateTransaction(context.Background(), common.SendTxArgs{}) + s.NoError(s.queue.Enqueue(tx)) + enquedTx, err := s.queue.Get(tx.ID) + s.NoError(err) + s.NoError(s.queue.LockInprogress(tx.ID)) + s.Equal(tx, enquedTx) + + // verify that tx was marked as being inprogress + s.Equal(ErrQueuedTxInProgress, s.queue.LockInprogress(tx.ID)) +} + +func (s *QueueTestSuite) TestGetTransaction() { + tx := common.CreateTransaction(context.Background(), common.SendTxArgs{}) + s.NoError(s.queue.Enqueue(tx)) + for i := 2; i > 0; i-- { + enquedTx, err := s.queue.Get(tx.ID) + s.NoError(err) + s.Equal(tx, enquedTx) + } +} + +func (s *QueueTestSuite) TestEnqueueProcessedTransaction() { + // enqueue will fail if transaction with hash will be enqueued + tx := common.CreateTransaction(context.Background(), common.SendTxArgs{}) + tx.Hash = gethcommon.Hash{1} + s.Equal(ErrQueuedTxAlreadyProcessed, s.queue.Enqueue(tx)) + + tx = common.CreateTransaction(context.Background(), common.SendTxArgs{}) + tx.Err = errors.New("error") + s.Equal(ErrQueuedTxAlreadyProcessed, s.queue.Enqueue(tx)) +} + +func (s *QueueTestSuite) testDone(hash gethcommon.Hash, err error) *common.QueuedTx { + tx := common.CreateTransaction(context.Background(), common.SendTxArgs{}) + s.NoError(s.queue.Enqueue(tx)) + s.NoError(s.queue.Done(tx.ID, hash, err)) + return tx +} + +func (s *QueueTestSuite) TestDoneSuccess() { + hash := gethcommon.Hash{1} + tx := s.testDone(hash, nil) + s.NoError(tx.Err) + s.Equal(hash, tx.Hash) + s.False(s.queue.Has(tx.ID)) + // event is sent only if transaction was removed from a queue + select { + case <-tx.Done: + default: + s.Fail("No event was sent to Done channel") + } +} + +func (s *QueueTestSuite) TestDoneTransientError() { + hash := gethcommon.Hash{1} + err := keystore.ErrDecrypt + tx := s.testDone(hash, err) + s.Equal(keystore.ErrDecrypt, tx.Err) + s.Equal(gethcommon.Hash{}, tx.Hash) + s.True(s.queue.Has(tx.ID)) +} + +func (s *QueueTestSuite) TestDoneError() { + hash := gethcommon.Hash{1} + err := errors.New("test") + tx := s.testDone(hash, err) + s.Equal(err, tx.Err) + s.NotEqual(hash, tx.Hash) + s.Equal(gethcommon.Hash{}, tx.Hash) + s.False(s.queue.Has(tx.ID)) + // event is sent only if transaction was removed from a queue + select { + case <-tx.Done: + default: + s.Fail("No event was sent to Done channel") + } +} + +func (s QueueTestSuite) TestMultipleDone() { + hash := gethcommon.Hash{1} + err := keystore.ErrDecrypt + tx := s.testDone(hash, err) + s.NoError(s.queue.Done(tx.ID, hash, nil)) + s.Equal(ErrQueuedTxIDNotFound, s.queue.Done(tx.ID, hash, errors.New("timeout"))) +} + +func (s *QueueTestSuite) TestEviction() { + var first *common.QueuedTx + for i := 0; i < DefaultTxQueueCap; i++ { + tx := common.CreateTransaction(context.Background(), common.SendTxArgs{}) + if first == nil { + first = tx + } + s.NoError(s.queue.Enqueue(tx)) + } + s.Equal(DefaultTxQueueCap, s.queue.Count()) + tx := common.CreateTransaction(context.Background(), common.SendTxArgs{}) + s.NoError(s.queue.Enqueue(tx)) + s.Equal(DefaultTxQueueCap, s.queue.Count()) + s.False(s.queue.Has(first.ID)) +} diff --git a/geth/txqueue/utils.go b/geth/transactions/queue/utils.go similarity index 97% rename from geth/txqueue/utils.go rename to geth/transactions/queue/utils.go index a1b858c4f..46659e330 100644 --- a/geth/txqueue/utils.go +++ b/geth/transactions/queue/utils.go @@ -1,4 +1,4 @@ -package txqueue +package queue import ( "errors" diff --git a/geth/txqueue/txqueue_manager.go b/geth/transactions/txqueue_manager.go similarity index 51% rename from geth/txqueue/txqueue_manager.go rename to geth/transactions/txqueue_manager.go index 4e359fbec..bb1ade9c4 100644 --- a/geth/txqueue/txqueue_manager.go +++ b/geth/transactions/txqueue_manager.go @@ -1,4 +1,4 @@ -package txqueue +package transactions import ( "context" @@ -6,53 +6,31 @@ import ( "time" ethereum "github.com/ethereum/go-ethereum" - "github.com/ethereum/go-ethereum/accounts/keystore" gethcommon "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" - "github.com/pborman/uuid" "github.com/status-im/status-go/geth/common" "github.com/status-im/status-go/geth/log" - "github.com/status-im/status-go/geth/signal" + "github.com/status-im/status-go/geth/transactions/queue" ) const ( - // EventTransactionQueued is triggered when send transaction request is queued - EventTransactionQueued = "transaction.queued" - - // EventTransactionFailed is triggered when send transaction request fails - EventTransactionFailed = "transaction.failed" - // SendTxDefaultErrorCode is sent by default, when error is not nil, but type is unknown/unexpected. SendTxDefaultErrorCode = SendTransactionDefaultErrorCode + // DefaultTxSendCompletionTimeout defines how many seconds to wait before returning result in sentTransaction(). + DefaultTxSendCompletionTimeout = 300 - defaultGas = 90000 - + defaultGas = 90000 defaultTimeout = time.Minute ) -// Send transaction response codes -const ( - SendTransactionNoErrorCode = "0" - SendTransactionDefaultErrorCode = "1" - SendTransactionPasswordErrorCode = "2" - SendTransactionTimeoutErrorCode = "3" - SendTransactionDiscardedErrorCode = "4" -) - -var txReturnCodes = map[error]string{ // deliberately strings, in case more meaningful codes are to be returned - nil: SendTransactionNoErrorCode, - keystore.ErrDecrypt: SendTransactionPasswordErrorCode, - ErrQueuedTxTimedOut: SendTransactionTimeoutErrorCode, - ErrQueuedTxDiscarded: SendTransactionDiscardedErrorCode, -} - // Manager provides means to manage internal Status Backend (injected into LES) type Manager struct { nodeManager common.NodeManager accountManager common.AccountManager - txQueue *TxQueue + txQueue *queue.TxQueue ethTxClient EthTransactor addrLock *AddrLocker + notify bool } // NewManager returns a new Manager. @@ -60,11 +38,18 @@ func NewManager(nodeManager common.NodeManager, accountManager common.AccountMan return &Manager{ nodeManager: nodeManager, accountManager: accountManager, - txQueue: NewTransactionQueue(), + txQueue: queue.New(), addrLock: &AddrLocker{}, + notify: true, } } +// DisableNotificactions turns off notifications on enqueue and return of tx. +// It is not thread safe and must be called only before manager is started. +func (m *Manager) DisableNotificactions() { + m.notify = false +} + // Start starts accepting new transactions into the queue. func (m *Manager) Start() { log.Info("start Manager") @@ -83,18 +68,6 @@ func (m *Manager) TransactionQueue() common.TxQueue { return m.txQueue } -// CreateTransaction returns a transaction object. -func (m *Manager) CreateTransaction(ctx context.Context, args common.SendTxArgs) *common.QueuedTx { - return &common.QueuedTx{ - ID: common.QueuedTxID(uuid.New()), - Hash: gethcommon.Hash{}, - Context: ctx, - Args: args, - Done: make(chan struct{}, 1), - Discard: make(chan struct{}, 1), - } -} - // QueueTransaction puts a transaction into the queue. func (m *Manager) QueueTransaction(tx *common.QueuedTx) error { to := "" @@ -102,108 +75,93 @@ func (m *Manager) QueueTransaction(tx *common.QueuedTx) error { to = tx.Args.To.Hex() } log.Info("queue a new transaction", "id", tx.ID, "from", tx.Args.From.Hex(), "to", to) + err := m.txQueue.Enqueue(tx) + if m.notify { + NotifyOnEnqueue(tx) + } + return err +} - return m.txQueue.Enqueue(tx) +func (m *Manager) txDone(tx *common.QueuedTx, hash gethcommon.Hash, err error) { + m.txQueue.Done(tx.ID, hash, err) //nolint: errcheck + if m.notify { + NotifyOnReturn(tx) + } } // WaitForTransaction adds a transaction to the queue and blocks // until it's completed, discarded or times out. func (m *Manager) WaitForTransaction(tx *common.QueuedTx) error { log.Info("wait for transaction", "id", tx.ID) - // now wait up until transaction is: // - completed (via CompleteQueuedTransaction), // - discarded (via DiscardQueuedTransaction) // - or times out select { case <-tx.Done: - m.NotifyOnQueuedTxReturn(tx, tx.Err) - return tx.Err - case <-tx.Discard: - m.NotifyOnQueuedTxReturn(tx, ErrQueuedTxDiscarded) - return ErrQueuedTxDiscarded case <-time.After(DefaultTxSendCompletionTimeout * time.Second): - m.NotifyOnQueuedTxReturn(tx, ErrQueuedTxTimedOut) - return ErrQueuedTxTimedOut + m.txDone(tx, gethcommon.Hash{}, queue.ErrQueuedTxTimedOut) } -} - -// NotifyOnQueuedTxReturn calls a handler when a transaction resolves. -func (m *Manager) NotifyOnQueuedTxReturn(queuedTx *common.QueuedTx, err error) { - m.txQueue.NotifyOnQueuedTxReturn(queuedTx, err) + return tx.Err } // CompleteTransaction instructs backend to complete sending of a given transaction. -// TODO(adam): investigate a possible bug that calling this method multiple times with the same Transaction ID -// results in sending multiple transactions. -func (m *Manager) CompleteTransaction(id common.QueuedTxID, password string) (gethcommon.Hash, error) { +func (m *Manager) CompleteTransaction(id common.QueuedTxID, password string) (hash gethcommon.Hash, err error) { log.Info("complete transaction", "id", id) - - queuedTx, err := m.txQueue.Get(id) + tx, err := m.txQueue.Get(id) if err != nil { - log.Warn("could not get a queued transaction", "err", err) - return gethcommon.Hash{}, err - } - - err = m.txQueue.StartProcessing(queuedTx) - if err != nil { - return gethcommon.Hash{}, err - } - defer m.txQueue.StopProcessing(queuedTx) - - selectedAccount, err := m.accountManager.SelectedAccount() - if err != nil { - log.Warn("failed to get a selected account", "err", err) - return gethcommon.Hash{}, err - } - - // make sure that only account which created the tx can complete it - if queuedTx.Args.From.Hex() != selectedAccount.Address.Hex() { - log.Warn("queued transaction does not belong to the selected account", "err", ErrInvalidCompleteTxSender) - m.NotifyOnQueuedTxReturn(queuedTx, ErrInvalidCompleteTxSender) - return gethcommon.Hash{}, ErrInvalidCompleteTxSender - } - // Send the transaction finally. - hash, err := m.completeTransaction(queuedTx, selectedAccount, password) - - // when incorrect sender tries to complete the account, - // notify and keep tx in queue (so that correct sender can complete) - if err == keystore.ErrDecrypt { - log.Warn("failed to complete transaction", "err", err) - m.NotifyOnQueuedTxReturn(queuedTx, err) + log.Warn("error getting a queued transaction", "err", err) return hash, err } - - log.Info("finally completed transaction", "id", queuedTx.ID, "hash", hash, "err", err) - - queuedTx.Hash = hash - queuedTx.Err = err - queuedTx.Done <- struct{}{} - + if err := m.txQueue.LockInprogress(id); err != nil { + log.Warn("can't process transaction", "err", err) + return hash, err + } + account, err := m.validateAccount(tx) + if err != nil { + m.txDone(tx, hash, err) + return hash, err + } + // Send the transaction finally. + hash, err = m.completeTransaction(tx, account, password) + log.Info("finally completed transaction", "id", tx.ID, "hash", hash, "err", err) + m.txDone(tx, hash, err) return hash, err } -func (m *Manager) completeTransaction(queuedTx *common.QueuedTx, selectedAccount *common.SelectedExtKey, password string) (gethcommon.Hash, error) { +func (m *Manager) validateAccount(tx *common.QueuedTx) (*common.SelectedExtKey, error) { + selectedAccount, err := m.accountManager.SelectedAccount() + if err != nil { + log.Warn("failed to get a selected account", "err", err) + return nil, err + } + // make sure that only account which created the tx can complete it + if tx.Args.From.Hex() != selectedAccount.Address.Hex() { + log.Warn("queued transaction does not belong to the selected account", "err", queue.ErrInvalidCompleteTxSender) + return nil, queue.ErrInvalidCompleteTxSender + } + return selectedAccount, nil +} + +func (m *Manager) completeTransaction(queuedTx *common.QueuedTx, selectedAccount *common.SelectedExtKey, password string) (hash gethcommon.Hash, err error) { log.Info("complete transaction", "id", queuedTx.ID) - var emptyHash gethcommon.Hash + log.Info("verifying account password for transaction", "id", queuedTx.ID) config, err := m.nodeManager.NodeConfig() if err != nil { - return emptyHash, err + return hash, err } _, err = m.accountManager.VerifyAccountPassword(config.KeyStoreDir, selectedAccount.Address.String(), password) if err != nil { log.Warn("failed to verify account", "account", selectedAccount.Address.String(), "error", err.Error()) - return emptyHash, err + return hash, err } - - // update transaction with nonce, gas price and gas estimates - ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout) - defer cancel() m.addrLock.LockAddr(queuedTx.Args.From) defer m.addrLock.UnlockAddr(queuedTx.Args.From) + ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout) + defer cancel() nonce, err := m.ethTxClient.PendingNonceAt(ctx, queuedTx.Args.From) if err != nil { - return emptyHash, err + return hash, err } args := queuedTx.Args gasPrice := (*big.Int)(args.GasPrice) @@ -212,7 +170,7 @@ func (m *Manager) completeTransaction(queuedTx *common.QueuedTx, selectedAccount defer cancel() gasPrice, err = m.ethTxClient.SuggestGasPrice(ctx) if err != nil { - return emptyHash, err + return hash, err } } @@ -223,6 +181,7 @@ func (m *Manager) completeTransaction(queuedTx *common.QueuedTx, selectedAccount if args.To != nil { toAddr = *args.To } + gas := (*big.Int)(args.Gas) if args.Gas == nil { ctx, cancel = context.WithTimeout(context.Background(), defaultTimeout) @@ -235,7 +194,7 @@ func (m *Manager) completeTransaction(queuedTx *common.QueuedTx, selectedAccount Data: data, }) if err != nil { - return emptyHash, err + return hash, err } if gas.Cmp(big.NewInt(defaultGas)) == -1 { log.Info("default gas will be used. estimated gas", gas, "is lower than", defaultGas) @@ -254,12 +213,12 @@ func (m *Manager) completeTransaction(queuedTx *common.QueuedTx, selectedAccount tx := types.NewTransaction(nonce, toAddr, value, gas, gasPrice, data) signedTx, err := types.SignTx(tx, types.NewEIP155Signer(chainID), selectedAccount.AccountKey.PrivateKey) if err != nil { - return emptyHash, err + return hash, err } ctx, cancel = context.WithTimeout(context.Background(), defaultTimeout) defer cancel() if err := m.ethTxClient.SendTransaction(ctx, signedTx); err != nil { - return emptyHash, err + return hash, err } return signedTx.Hash(), nil } @@ -267,7 +226,6 @@ func (m *Manager) completeTransaction(queuedTx *common.QueuedTx, selectedAccount // CompleteTransactions instructs backend to complete sending of multiple transactions func (m *Manager) CompleteTransactions(ids []common.QueuedTxID, password string) map[common.QueuedTxID]common.RawCompleteTransactionResult { results := make(map[common.QueuedTxID]common.RawCompleteTransactionResult) - for _, txID := range ids { txHash, txErr := m.CompleteTransaction(txID, password) results[txID] = common.RawCompleteTransactionResult{ @@ -275,25 +233,20 @@ func (m *Manager) CompleteTransactions(ids []common.QueuedTxID, password string) Error: txErr, } } - return results } // DiscardTransaction discards a given transaction from transaction queue func (m *Manager) DiscardTransaction(id common.QueuedTxID) error { - queuedTx, err := m.txQueue.Get(id) + tx, err := m.txQueue.Get(id) if err != nil { return err } - - // remove from queue, before notifying SendTransaction - m.txQueue.Remove(queuedTx.ID) - - // allow SendTransaction to return - queuedTx.Err = ErrQueuedTxDiscarded - queuedTx.Discard <- struct{}{} // sendTransaction() waits on this, notify so that it can return - - return nil + err = m.txQueue.Done(id, gethcommon.Hash{}, queue.ErrQueuedTxDiscarded) + if m.notify { + NotifyOnReturn(tx) + } + return err } // DiscardTransactions discards given multiple transactions from transaction queue @@ -312,84 +265,6 @@ func (m *Manager) DiscardTransactions(ids []common.QueuedTxID) map[common.Queued return results } -// SendTransactionEvent is a signal sent on a send transaction request -type SendTransactionEvent struct { - ID string `json:"id"` - Args common.SendTxArgs `json:"args"` - MessageID string `json:"message_id"` -} - -// TransactionQueueHandler returns handler that processes incoming tx queue requests -func (m *Manager) TransactionQueueHandler() func(queuedTx *common.QueuedTx) { - return func(queuedTx *common.QueuedTx) { - log.Info("calling TransactionQueueHandler") - signal.Send(signal.Envelope{ - Type: EventTransactionQueued, - Event: SendTransactionEvent{ - ID: string(queuedTx.ID), - Args: queuedTx.Args, - MessageID: common.MessageIDFromContext(queuedTx.Context), - }, - }) - } -} - -// SetTransactionQueueHandler sets a handler that will be called -// when a new transaction is enqueued. -func (m *Manager) SetTransactionQueueHandler(fn common.EnqueuedTxHandler) { - m.txQueue.SetEnqueueHandler(fn) -} - -// ReturnSendTransactionEvent is a JSON returned whenever transaction send is returned -type ReturnSendTransactionEvent struct { - ID string `json:"id"` - Args common.SendTxArgs `json:"args"` - MessageID string `json:"message_id"` - ErrorMessage string `json:"error_message"` - ErrorCode string `json:"error_code"` -} - -// TransactionReturnHandler returns handler that processes responses from internal tx manager -func (m *Manager) TransactionReturnHandler() func(queuedTx *common.QueuedTx, err error) { - return func(queuedTx *common.QueuedTx, err error) { - if err == nil { - return - } - - // discard notifications with empty tx - if queuedTx == nil { - return - } - - // error occurred, signal up to application - signal.Send(signal.Envelope{ - Type: EventTransactionFailed, - Event: ReturnSendTransactionEvent{ - ID: string(queuedTx.ID), - Args: queuedTx.Args, - MessageID: common.MessageIDFromContext(queuedTx.Context), - ErrorMessage: err.Error(), - ErrorCode: m.sendTransactionErrorCode(err), - }, - }) - } -} - -func (m *Manager) sendTransactionErrorCode(err error) string { - if code, ok := txReturnCodes[err]; ok { - return code - } - - return SendTxDefaultErrorCode -} - -// SetTransactionReturnHandler sets a handler that will be called -// when a transaction is about to return or when a recoverable error occurred. -// Recoverable error is, for instance, wrong password. -func (m *Manager) SetTransactionReturnHandler(fn common.EnqueuedTxReturnHandler) { - m.txQueue.SetTxReturnHandler(fn) -} - // SendTransactionRPCHandler is a handler for eth_sendTransaction method. // It accepts one param which is a slice with a map of transaction params. func (m *Manager) SendTransactionRPCHandler(ctx context.Context, args ...interface{}) (interface{}, error) { @@ -398,8 +273,7 @@ func (m *Manager) SendTransactionRPCHandler(ctx context.Context, args ...interfa // TODO(adam): it's a hack to parse arguments as common.RPCCall can do that. // We should refactor parsing these params to a separate struct. rpcCall := common.RPCCall{Params: args} - - tx := m.CreateTransaction(ctx, rpcCall.ToSendTxArgs()) + tx := common.CreateTransaction(ctx, rpcCall.ToSendTxArgs()) if err := m.QueueTransaction(tx); err != nil { return nil, err diff --git a/geth/txqueue/txqueue_manager_test.go b/geth/transactions/txqueue_manager_test.go similarity index 75% rename from geth/txqueue/txqueue_manager_test.go rename to geth/transactions/txqueue_manager_test.go index c4294a76f..3fa4af314 100644 --- a/geth/txqueue/txqueue_manager_test.go +++ b/geth/transactions/txqueue_manager_test.go @@ -1,4 +1,4 @@ -package txqueue +package transactions import ( "context" @@ -18,7 +18,8 @@ import ( "github.com/status-im/status-go/geth/common" "github.com/status-im/status-go/geth/params" "github.com/status-im/status-go/geth/rpc" - "github.com/status-im/status-go/geth/txqueue/fake" + "github.com/status-im/status-go/geth/transactions/fake" + "github.com/status-im/status-go/geth/transactions/queue" . "github.com/status-im/status-go/testing" ) @@ -93,21 +94,10 @@ func (s *TxQueueTestSuite) TestCompleteTransaction() { txQueueManager.Start() defer txQueueManager.Stop() - tx := txQueueManager.CreateTransaction(context.Background(), common.SendTxArgs{ + tx := common.CreateTransaction(context.Background(), common.SendTxArgs{ From: common.FromAddress(TestConfig.Account1.Address), To: common.ToAddress(TestConfig.Account2.Address), }) - - // TransactionQueueHandler is required to enqueue a transaction. - txQueueManager.SetTransactionQueueHandler(func(queuedTx *common.QueuedTx) { - s.Equal(tx.ID, queuedTx.ID) - }) - - txQueueManager.SetTransactionReturnHandler(func(queuedTx *common.QueuedTx, err error) { - s.Equal(tx.ID, queuedTx.ID) - s.NoError(err) - }) - err := txQueueManager.QueueTransaction(tx) s.NoError(err) @@ -142,25 +132,15 @@ func (s *TxQueueTestSuite) TestCompleteTransactionMultipleTimes() { s.setupTransactionPoolAPI(account, nonce, gas, nil) txQueueManager := NewManager(s.nodeManagerMock, s.accountManagerMock) - + txQueueManager.DisableNotificactions() txQueueManager.Start() defer txQueueManager.Stop() - tx := txQueueManager.CreateTransaction(context.Background(), common.SendTxArgs{ + tx := common.CreateTransaction(context.Background(), common.SendTxArgs{ From: common.FromAddress(TestConfig.Account1.Address), To: common.ToAddress(TestConfig.Account2.Address), }) - // TransactionQueueHandler is required to enqueue a transaction. - txQueueManager.SetTransactionQueueHandler(func(queuedTx *common.QueuedTx) { - s.Equal(tx.ID, queuedTx.ID) - }) - - txQueueManager.SetTransactionReturnHandler(func(queuedTx *common.QueuedTx, err error) { - s.Equal(tx.ID, queuedTx.ID) - s.NoError(err) - }) - err := txQueueManager.QueueTransaction(tx) s.NoError(err) @@ -179,7 +159,7 @@ func (s *TxQueueTestSuite) TestCompleteTransactionMultipleTimes() { mu.Lock() if err == nil { completedTx++ - } else if err == ErrQueuedTxInProgress { + } else if err == queue.ErrQueuedTxInProgress { inprogressTx++ } else { s.Fail("tx failed with unexpected error: ", err.Error()) @@ -207,33 +187,21 @@ func (s *TxQueueTestSuite) TestAccountMismatch() { }, nil) txQueueManager := NewManager(s.nodeManagerMock, s.accountManagerMock) + txQueueManager.DisableNotificactions() txQueueManager.Start() defer txQueueManager.Stop() - tx := txQueueManager.CreateTransaction(context.Background(), common.SendTxArgs{ + tx := common.CreateTransaction(context.Background(), common.SendTxArgs{ From: common.FromAddress(TestConfig.Account1.Address), To: common.ToAddress(TestConfig.Account2.Address), }) - // TransactionQueueHandler is required to enqueue a transaction. - txQueueManager.SetTransactionQueueHandler(func(queuedTx *common.QueuedTx) { - s.Equal(tx.ID, queuedTx.ID) - }) - - // Missmatched address is a recoverable error, that's why - // the return handler is called. - txQueueManager.SetTransactionReturnHandler(func(queuedTx *common.QueuedTx, err error) { - s.Equal(tx.ID, queuedTx.ID) - s.Equal(ErrInvalidCompleteTxSender, err) - s.Nil(tx.Err) - }) - err := txQueueManager.QueueTransaction(tx) s.NoError(err) _, err = txQueueManager.CompleteTransaction(tx.ID, TestConfig.Account1.Password) - s.Equal(err, ErrInvalidCompleteTxSender) + s.Equal(err, queue.ErrInvalidCompleteTxSender) // Transaction should stay in the queue as mismatched accounts // is a recoverable error. @@ -250,28 +218,15 @@ func (s *TxQueueTestSuite) TestInvalidPassword() { s.setupStatusBackend(account, password, keystore.ErrDecrypt) txQueueManager := NewManager(s.nodeManagerMock, s.accountManagerMock) - + txQueueManager.DisableNotificactions() txQueueManager.Start() defer txQueueManager.Stop() - tx := txQueueManager.CreateTransaction(context.Background(), common.SendTxArgs{ + tx := common.CreateTransaction(context.Background(), common.SendTxArgs{ From: common.FromAddress(TestConfig.Account1.Address), To: common.ToAddress(TestConfig.Account2.Address), }) - // TransactionQueueHandler is required to enqueue a transaction. - txQueueManager.SetTransactionQueueHandler(func(queuedTx *common.QueuedTx) { - s.Equal(tx.ID, queuedTx.ID) - }) - - // Missmatched address is a revocable error, that's why - // the return handler is called. - txQueueManager.SetTransactionReturnHandler(func(queuedTx *common.QueuedTx, err error) { - s.Equal(tx.ID, queuedTx.ID) - s.Equal(keystore.ErrDecrypt, err) - s.Nil(tx.Err) - }) - err := txQueueManager.QueueTransaction(tx) s.NoError(err) @@ -285,39 +240,29 @@ func (s *TxQueueTestSuite) TestInvalidPassword() { func (s *TxQueueTestSuite) TestDiscardTransaction() { txQueueManager := NewManager(s.nodeManagerMock, s.accountManagerMock) + txQueueManager.DisableNotificactions() txQueueManager.Start() defer txQueueManager.Stop() - tx := txQueueManager.CreateTransaction(context.Background(), common.SendTxArgs{ + tx := common.CreateTransaction(context.Background(), common.SendTxArgs{ From: common.FromAddress(TestConfig.Account1.Address), To: common.ToAddress(TestConfig.Account2.Address), }) - // TransactionQueueHandler is required to enqueue a transaction. - txQueueManager.SetTransactionQueueHandler(func(queuedTx *common.QueuedTx) { - s.Equal(tx.ID, queuedTx.ID) - }) - - txQueueManager.SetTransactionReturnHandler(func(queuedTx *common.QueuedTx, err error) { - s.Equal(tx.ID, queuedTx.ID) - s.Equal(ErrQueuedTxDiscarded, err) - }) - err := txQueueManager.QueueTransaction(tx) s.NoError(err) w := make(chan struct{}) go func() { - err := txQueueManager.DiscardTransaction(tx.ID) - s.NoError(err) + s.NoError(txQueueManager.DiscardTransaction(tx.ID)) close(w) }() err = txQueueManager.WaitForTransaction(tx) - s.Equal(ErrQueuedTxDiscarded, err) + s.Equal(queue.ErrQueuedTxDiscarded, err) // Check that error is assigned to the transaction. - s.Equal(ErrQueuedTxDiscarded, tx.Err) + s.Equal(queue.ErrQueuedTxDiscarded, tx.Err) // Transaction should be already removed from the queue. s.False(txQueueManager.TransactionQueue().Has(tx.ID)) s.NoError(WaitClosed(w, time.Second)) diff --git a/lib/utils.go b/lib/utils.go index fde588c17..216b5b0d5 100644 --- a/lib/utils.go +++ b/lib/utils.go @@ -34,7 +34,8 @@ import ( "github.com/status-im/status-go/geth/common" "github.com/status-im/status-go/geth/params" "github.com/status-im/status-go/geth/signal" - "github.com/status-im/status-go/geth/txqueue" + "github.com/status-im/status-go/geth/transactions" + "github.com/status-im/status-go/geth/transactions/queue" "github.com/status-im/status-go/static" . "github.com/status-im/status-go/testing" //nolint: golint ) @@ -793,7 +794,7 @@ func testCompleteTransaction(t *testing.T) bool { t.Errorf("cannot unmarshal event's JSON: %s. Error %q", jsonEvent, err) return } - if envelope.Type == txqueue.EventTransactionQueued { + if envelope.Type == transactions.EventTransactionQueued { event := envelope.Event.(map[string]interface{}) t.Logf("transaction queued (will be completed shortly): {id: %s}\n", event["id"].(string)) @@ -871,7 +872,7 @@ func testCompleteMultipleQueuedTransactions(t *testing.T) bool { //nolint: gocyc t.Errorf("cannot unmarshal event's JSON: %s", jsonEvent) return } - if envelope.Type == txqueue.EventTransactionQueued { + if envelope.Type == transactions.EventTransactionQueued { event := envelope.Event.(map[string]interface{}) txID = event["id"].(string) t.Logf("transaction queued (will be completed in a single call, once aggregated): {id: %s}\n", txID) @@ -918,7 +919,7 @@ func testCompleteMultipleQueuedTransactions(t *testing.T) bool { //nolint: gocyc } results := resultsStruct.Results - if len(results) != (testTxCount+1) || results["invalid-tx-id"].Error != txqueue.ErrQueuedTxIDNotFound.Error() { + if len(results) != (testTxCount+1) || results["invalid-tx-id"].Error != queue.ErrQueuedTxIDNotFound.Error() { t.Errorf("cannot complete txs: %v", results) return } @@ -1004,7 +1005,7 @@ func testDiscardTransaction(t *testing.T) bool { //nolint: gocyclo t.Errorf("cannot unmarshal event's JSON: %s", jsonEvent) return } - if envelope.Type == txqueue.EventTransactionQueued { + if envelope.Type == transactions.EventTransactionQueued { event := envelope.Event.(map[string]interface{}) txID = event["id"].(string) t.Logf("transaction queued (will be discarded soon): {id: %s}\n", txID) @@ -1029,7 +1030,7 @@ func testDiscardTransaction(t *testing.T) bool { //nolint: gocyclo // try completing discarded transaction _, err := statusAPI.CompleteTransaction(common.QueuedTxID(txID), TestConfig.Account1.Password) - if err != txqueue.ErrQueuedTxIDNotFound { + if err != queue.ErrQueuedTxIDNotFound { t.Error("expects tx not found, but call to CompleteTransaction succeeded") return } @@ -1043,19 +1044,19 @@ func testDiscardTransaction(t *testing.T) bool { //nolint: gocyclo completeQueuedTransaction <- struct{}{} // so that timeout is aborted } - if envelope.Type == txqueue.EventTransactionFailed { + if envelope.Type == transactions.EventTransactionFailed { event := envelope.Event.(map[string]interface{}) t.Logf("transaction return event received: {id: %s}\n", event["id"].(string)) receivedErrMessage := event["error_message"].(string) - expectedErrMessage := txqueue.ErrQueuedTxDiscarded.Error() + expectedErrMessage := queue.ErrQueuedTxDiscarded.Error() if receivedErrMessage != expectedErrMessage { t.Errorf("unexpected error message received: got %v", receivedErrMessage) return } receivedErrCode := event["error_code"].(string) - if receivedErrCode != txqueue.SendTransactionDiscardedErrorCode { + if receivedErrCode != strconv.Itoa(transactions.SendTransactionDiscardedErrorCode) { t.Errorf("unexpected error code received: got %v", receivedErrCode) return } @@ -1070,7 +1071,7 @@ func testDiscardTransaction(t *testing.T) bool { //nolint: gocyclo To: common.ToAddress(TestConfig.Account2.Address), Value: (*hexutil.Big)(big.NewInt(1000000000000)), }) - if err != txqueue.ErrQueuedTxDiscarded { + if err != queue.ErrQueuedTxDiscarded { t.Errorf("expected error not thrown: %v", err) return false } @@ -1117,7 +1118,7 @@ func testDiscardMultipleQueuedTransactions(t *testing.T) bool { //nolint: gocycl t.Errorf("cannot unmarshal event's JSON: %s", jsonEvent) return } - if envelope.Type == txqueue.EventTransactionQueued { + if envelope.Type == transactions.EventTransactionQueued { event := envelope.Event.(map[string]interface{}) txID = event["id"].(string) t.Logf("transaction queued (will be discarded soon): {id: %s}\n", txID) @@ -1130,19 +1131,19 @@ func testDiscardMultipleQueuedTransactions(t *testing.T) bool { //nolint: gocycl txIDs <- txID } - if envelope.Type == txqueue.EventTransactionFailed { + if envelope.Type == transactions.EventTransactionFailed { event := envelope.Event.(map[string]interface{}) t.Logf("transaction return event received: {id: %s}\n", event["id"].(string)) receivedErrMessage := event["error_message"].(string) - expectedErrMessage := txqueue.ErrQueuedTxDiscarded.Error() + expectedErrMessage := queue.ErrQueuedTxDiscarded.Error() if receivedErrMessage != expectedErrMessage { t.Errorf("unexpected error message received: got %v", receivedErrMessage) return } receivedErrCode := event["error_code"].(string) - if receivedErrCode != txqueue.SendTransactionDiscardedErrorCode { + if receivedErrCode != strconv.Itoa(transactions.SendTransactionDiscardedErrorCode) { t.Errorf("unexpected error code received: got %v", receivedErrCode) return } @@ -1161,7 +1162,7 @@ func testDiscardMultipleQueuedTransactions(t *testing.T) bool { //nolint: gocycl To: common.ToAddress(TestConfig.Account2.Address), Value: (*hexutil.Big)(big.NewInt(1000000000000)), }) - if err != txqueue.ErrQueuedTxDiscarded { + if err != queue.ErrQueuedTxDiscarded { t.Errorf("expected error not thrown: %v", err) return } @@ -1192,7 +1193,7 @@ func testDiscardMultipleQueuedTransactions(t *testing.T) bool { //nolint: gocycl } discardResults := discardResultsStruct.Results - if len(discardResults) != 1 || discardResults["invalid-tx-id"].Error != txqueue.ErrQueuedTxIDNotFound.Error() { + if len(discardResults) != 1 || discardResults["invalid-tx-id"].Error != queue.ErrQueuedTxIDNotFound.Error() { t.Errorf("cannot discard txs: %v", discardResults) return } @@ -1214,7 +1215,7 @@ func testDiscardMultipleQueuedTransactions(t *testing.T) bool { //nolint: gocycl t.Errorf("tx id not set in result: expected id is %s", txID) return } - if txResult.Error != txqueue.ErrQueuedTxIDNotFound.Error() { + if txResult.Error != queue.ErrQueuedTxIDNotFound.Error() { t.Errorf("invalid error for %s", txResult.Hash) return } @@ -1431,7 +1432,7 @@ func startTestNode(t *testing.T) <-chan struct{} { return } - if envelope.Type == txqueue.EventTransactionQueued { + if envelope.Type == transactions.EventTransactionQueued { } if envelope.Type == signal.EventNodeStarted { t.Log("Node started, but we wait till it be ready")