Refactoring of TxQueue and Manager (#530)

This commit is contained in:
Dmitry Shulyak 2018-01-26 07:59:21 +02:00 committed by Adam Babik
parent 6ed5997ff4
commit 680d0513b7
18 changed files with 489 additions and 569 deletions

View File

@ -111,7 +111,7 @@ mock: ##@other Regenerate mocks
mockgen -source=geth/mailservice/mailservice.go -destination=geth/mailservice/mailservice_mock.go -package=mailservice mockgen -source=geth/mailservice/mailservice.go -destination=geth/mailservice/mailservice_mock.go -package=mailservice
mockgen -source=geth/common/notification.go -destination=geth/common/notification_mock.go -package=common -imports fcm=github.com/NaySoftware/go-fcm mockgen -source=geth/common/notification.go -destination=geth/common/notification_mock.go -package=common -imports fcm=github.com/NaySoftware/go-fcm
mockgen -source=geth/notification/fcm/client.go -destination=geth/notification/fcm/client_mock.go -package=fcm -imports fcm=github.com/NaySoftware/go-fcm mockgen -source=geth/notification/fcm/client.go -destination=geth/notification/fcm/client_mock.go -package=fcm -imports fcm=github.com/NaySoftware/go-fcm
mockgen -source=geth/txqueue/fake/txservice.go -destination=geth/txqueue/fake/mock.go -package=fake mockgen -source=geth/transactions/fake/txservice.go -destination=geth/transactions/fake/mock.go -package=fake
test: test-unit-coverage ##@tests Run basic, short tests during development test: test-unit-coverage ##@tests Run basic, short tests during development

View File

@ -14,7 +14,7 @@ import (
"github.com/status-im/status-go/geth/common" "github.com/status-im/status-go/geth/common"
"github.com/status-im/status-go/geth/params" "github.com/status-im/status-go/geth/params"
"github.com/status-im/status-go/geth/signal" "github.com/status-im/status-go/geth/signal"
"github.com/status-im/status-go/geth/txqueue" "github.com/status-im/status-go/geth/transactions"
. "github.com/status-im/status-go/testing" . "github.com/status-im/status-go/testing"
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
) )
@ -126,7 +126,7 @@ func (s *JailRPCTestSuite) TestContractDeployment() {
unmarshalErr := json.Unmarshal([]byte(jsonEvent), &envelope) unmarshalErr := json.Unmarshal([]byte(jsonEvent), &envelope)
s.NoError(unmarshalErr, "cannot unmarshal JSON: %s", jsonEvent) s.NoError(unmarshalErr, "cannot unmarshal JSON: %s", jsonEvent)
if envelope.Type == txqueue.EventTransactionQueued { if envelope.Type == transactions.EventTransactionQueued {
event := envelope.Event.(map[string]interface{}) event := envelope.Event.(map[string]interface{})
s.T().Logf("transaction queued and will be completed shortly, id: %v", event["id"]) s.T().Logf("transaction queued and will be completed shortly, id: %v", event["id"])
@ -284,7 +284,7 @@ func (s *JailRPCTestSuite) TestJailVMPersistence() {
s.T().Errorf("cannot unmarshal event's JSON: %s", jsonEvent) s.T().Errorf("cannot unmarshal event's JSON: %s", jsonEvent)
return return
} }
if envelope.Type == txqueue.EventTransactionQueued { if envelope.Type == transactions.EventTransactionQueued {
event := envelope.Event.(map[string]interface{}) event := envelope.Event.(map[string]interface{})
s.T().Logf("Transaction queued (will be completed shortly): {id: %s}\n", event["id"].(string)) s.T().Logf("Transaction queued (will be completed shortly): {id: %s}\n", event["id"].(string))

View File

@ -6,6 +6,7 @@ import (
"fmt" "fmt"
"math/big" "math/big"
"reflect" "reflect"
"sync"
"testing" "testing"
"time" "time"
@ -18,7 +19,8 @@ import (
"github.com/status-im/status-go/geth/common" "github.com/status-im/status-go/geth/common"
"github.com/status-im/status-go/geth/params" "github.com/status-im/status-go/geth/params"
"github.com/status-im/status-go/geth/signal" "github.com/status-im/status-go/geth/signal"
"github.com/status-im/status-go/geth/txqueue" "github.com/status-im/status-go/geth/transactions"
"github.com/status-im/status-go/geth/transactions/queue"
. "github.com/status-im/status-go/testing" . "github.com/status-im/status-go/testing"
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
) )
@ -48,7 +50,7 @@ func (s *TransactionsTestSuite) TestCallRPCSendTransaction() {
err := json.Unmarshal([]byte(rawSignal), &sg) err := json.Unmarshal([]byte(rawSignal), &sg)
s.NoError(err) s.NoError(err)
if sg.Type == txqueue.EventTransactionQueued { if sg.Type == transactions.EventTransactionQueued {
event := sg.Event.(map[string]interface{}) event := sg.Event.(map[string]interface{})
txID := event["id"].(string) txID := event["id"].(string)
txHash, err = s.Backend.CompleteTransaction(common.QueuedTxID(txID), TestConfig.Account1.Password) txHash, err = s.Backend.CompleteTransaction(common.QueuedTxID(txID), TestConfig.Account1.Password)
@ -100,7 +102,7 @@ func (s *TransactionsTestSuite) TestCallRPCSendTransactionUpstream() {
err := json.Unmarshal([]byte(rawSignal), &signalEnvelope) err := json.Unmarshal([]byte(rawSignal), &signalEnvelope)
s.NoError(err) s.NoError(err)
if signalEnvelope.Type == txqueue.EventTransactionQueued { if signalEnvelope.Type == transactions.EventTransactionQueued {
event := signalEnvelope.Event.(map[string]interface{}) event := signalEnvelope.Event.(map[string]interface{})
txID := event["id"].(string) txID := event["id"].(string)
@ -156,7 +158,7 @@ func (s *TransactionsTestSuite) TestSendContractTx() {
err = json.Unmarshal([]byte(jsonEvent), &envelope) err = json.Unmarshal([]byte(jsonEvent), &envelope)
s.NoError(err, fmt.Sprintf("cannot unmarshal JSON: %s", jsonEvent)) s.NoError(err, fmt.Sprintf("cannot unmarshal JSON: %s", jsonEvent))
if envelope.Type == txqueue.EventTransactionQueued { if envelope.Type == transactions.EventTransactionQueued {
event := envelope.Event.(map[string]interface{}) event := envelope.Event.(map[string]interface{})
log.Info("transaction queued (will be completed shortly)", "id", event["id"].(string)) log.Info("transaction queued (will be completed shortly)", "id", event["id"].(string))
@ -182,7 +184,7 @@ func (s *TransactionsTestSuite) TestSendContractTx() {
) )
s.EqualError( s.EqualError(
err, err,
txqueue.ErrInvalidCompleteTxSender.Error(), queue.ErrInvalidCompleteTxSender.Error(),
fmt.Sprintf("expected error on queued transaction[%v] not thrown", event["id"]), fmt.Sprintf("expected error on queued transaction[%v] not thrown", event["id"]),
) )
@ -247,7 +249,7 @@ func (s *TransactionsTestSuite) TestSendEther() {
err = json.Unmarshal([]byte(jsonEvent), &envelope) err = json.Unmarshal([]byte(jsonEvent), &envelope)
s.NoError(err, fmt.Sprintf("cannot unmarshal JSON: %s", jsonEvent)) s.NoError(err, fmt.Sprintf("cannot unmarshal JSON: %s", jsonEvent))
if envelope.Type == txqueue.EventTransactionQueued { if envelope.Type == transactions.EventTransactionQueued {
event := envelope.Event.(map[string]interface{}) event := envelope.Event.(map[string]interface{})
log.Info("transaction queued (will be completed shortly)", "id", event["id"].(string)) log.Info("transaction queued (will be completed shortly)", "id", event["id"].(string))
@ -271,7 +273,7 @@ func (s *TransactionsTestSuite) TestSendEther() {
common.QueuedTxID(event["id"].(string)), TestConfig.Account1.Password) common.QueuedTxID(event["id"].(string)), TestConfig.Account1.Password)
s.EqualError( s.EqualError(
err, err,
txqueue.ErrInvalidCompleteTxSender.Error(), queue.ErrInvalidCompleteTxSender.Error(),
fmt.Sprintf("expected error on queued transaction[%v] not thrown", event["id"]), fmt.Sprintf("expected error on queued transaction[%v] not thrown", event["id"]),
) )
@ -330,7 +332,7 @@ func (s *TransactionsTestSuite) TestSendEtherTxUpstream() {
err = json.Unmarshal([]byte(jsonEvent), &envelope) err = json.Unmarshal([]byte(jsonEvent), &envelope)
s.NoError(err, "cannot unmarshal JSON: %s", jsonEvent) s.NoError(err, "cannot unmarshal JSON: %s", jsonEvent)
if envelope.Type == txqueue.EventTransactionQueued { if envelope.Type == transactions.EventTransactionQueued {
event := envelope.Event.(map[string]interface{}) event := envelope.Event.(map[string]interface{})
log.Info("transaction queued (will be completed shortly)", "id", event["id"].(string)) log.Info("transaction queued (will be completed shortly)", "id", event["id"].(string))
@ -387,7 +389,7 @@ func (s *TransactionsTestSuite) TestDoubleCompleteQueuedTransactions() {
err := json.Unmarshal([]byte(jsonEvent), &envelope) err := json.Unmarshal([]byte(jsonEvent), &envelope)
s.NoError(err, fmt.Sprintf("cannot unmarshal JSON: %s", jsonEvent)) s.NoError(err, fmt.Sprintf("cannot unmarshal JSON: %s", jsonEvent))
if envelope.Type == txqueue.EventTransactionQueued { if envelope.Type == transactions.EventTransactionQueued {
event := envelope.Event.(map[string]interface{}) event := envelope.Event.(map[string]interface{})
txID := common.QueuedTxID(event["id"].(string)) txID := common.QueuedTxID(event["id"].(string))
log.Info("transaction queued (will be failed and completed on the second call)", "id", txID) log.Info("transaction queued (will be failed and completed on the second call)", "id", txID)
@ -407,7 +409,7 @@ func (s *TransactionsTestSuite) TestDoubleCompleteQueuedTransactions() {
close(completeQueuedTransaction) close(completeQueuedTransaction)
} }
if envelope.Type == txqueue.EventTransactionFailed { if envelope.Type == transactions.EventTransactionFailed {
event := envelope.Event.(map[string]interface{}) event := envelope.Event.(map[string]interface{})
log.Info("transaction return event received", "id", event["id"].(string)) log.Info("transaction return event received", "id", event["id"].(string))
@ -466,7 +468,7 @@ func (s *TransactionsTestSuite) TestDiscardQueuedTransaction() {
err := json.Unmarshal([]byte(jsonEvent), &envelope) err := json.Unmarshal([]byte(jsonEvent), &envelope)
s.NoError(err, fmt.Sprintf("cannot unmarshal JSON: %s", jsonEvent)) s.NoError(err, fmt.Sprintf("cannot unmarshal JSON: %s", jsonEvent))
if envelope.Type == txqueue.EventTransactionQueued { if envelope.Type == transactions.EventTransactionQueued {
event := envelope.Event.(map[string]interface{}) event := envelope.Event.(map[string]interface{})
txID := common.QueuedTxID(event["id"].(string)) txID := common.QueuedTxID(event["id"].(string))
log.Info("transaction queued (will be discarded soon)", "id", txID) log.Info("transaction queued (will be discarded soon)", "id", txID)
@ -488,12 +490,12 @@ func (s *TransactionsTestSuite) TestDiscardQueuedTransaction() {
close(completeQueuedTransaction) close(completeQueuedTransaction)
} }
if envelope.Type == txqueue.EventTransactionFailed { if envelope.Type == transactions.EventTransactionFailed {
event := envelope.Event.(map[string]interface{}) event := envelope.Event.(map[string]interface{})
log.Info("transaction return event received", "id", event["id"].(string)) log.Info("transaction return event received", "id", event["id"].(string))
receivedErrMessage := event["error_message"].(string) receivedErrMessage := event["error_message"].(string)
expectedErrMessage := txqueue.ErrQueuedTxDiscarded.Error() expectedErrMessage := queue.ErrQueuedTxDiscarded.Error()
s.Equal(receivedErrMessage, expectedErrMessage) s.Equal(receivedErrMessage, expectedErrMessage)
receivedErrCode := event["error_code"].(string) receivedErrCode := event["error_code"].(string)
@ -509,7 +511,7 @@ func (s *TransactionsTestSuite) TestDiscardQueuedTransaction() {
To: common.ToAddress(TestConfig.Account2.Address), To: common.ToAddress(TestConfig.Account2.Address),
Value: (*hexutil.Big)(big.NewInt(1000000000000)), Value: (*hexutil.Big)(big.NewInt(1000000000000)),
}) })
s.EqualError(err, txqueue.ErrQueuedTxDiscarded.Error(), "transaction is expected to be discarded") s.EqualError(err, queue.ErrQueuedTxDiscarded.Error(), "transaction is expected to be discarded")
select { select {
case <-completeQueuedTransaction: case <-completeQueuedTransaction:
@ -543,7 +545,7 @@ func (s *TransactionsTestSuite) TestCompleteMultipleQueuedTransactions() {
err := json.Unmarshal([]byte(jsonEvent), &envelope) err := json.Unmarshal([]byte(jsonEvent), &envelope)
s.NoError(err, fmt.Sprintf("cannot unmarshal JSON: %s", jsonEvent)) s.NoError(err, fmt.Sprintf("cannot unmarshal JSON: %s", jsonEvent))
if envelope.Type == txqueue.EventTransactionQueued { if envelope.Type == transactions.EventTransactionQueued {
event := envelope.Event.(map[string]interface{}) event := envelope.Event.(map[string]interface{})
txID := common.QueuedTxID(event["id"].(string)) txID := common.QueuedTxID(event["id"].(string))
log.Info("transaction queued (will be completed in a single call, once aggregated)", "id", txID) log.Info("transaction queued (will be completed in a single call, once aggregated)", "id", txID)
@ -640,7 +642,7 @@ func (s *TransactionsTestSuite) TestDiscardMultipleQueuedTransactions() {
var envelope signal.Envelope var envelope signal.Envelope
err := json.Unmarshal([]byte(jsonEvent), &envelope) err := json.Unmarshal([]byte(jsonEvent), &envelope)
s.NoError(err) s.NoError(err)
if envelope.Type == txqueue.EventTransactionQueued { if envelope.Type == transactions.EventTransactionQueued {
event := envelope.Event.(map[string]interface{}) event := envelope.Event.(map[string]interface{})
txID := common.QueuedTxID(event["id"].(string)) txID := common.QueuedTxID(event["id"].(string))
log.Info("transaction queued (will be discarded soon)", "id", txID) log.Info("transaction queued (will be discarded soon)", "id", txID)
@ -650,12 +652,12 @@ func (s *TransactionsTestSuite) TestDiscardMultipleQueuedTransactions() {
txIDs <- txID txIDs <- txID
} }
if envelope.Type == txqueue.EventTransactionFailed { if envelope.Type == transactions.EventTransactionFailed {
event := envelope.Event.(map[string]interface{}) event := envelope.Event.(map[string]interface{})
log.Info("transaction return event received", "id", event["id"].(string)) log.Info("transaction return event received", "id", event["id"].(string))
receivedErrMessage := event["error_message"].(string) receivedErrMessage := event["error_message"].(string)
expectedErrMessage := txqueue.ErrQueuedTxDiscarded.Error() expectedErrMessage := queue.ErrQueuedTxDiscarded.Error()
s.Equal(receivedErrMessage, expectedErrMessage) s.Equal(receivedErrMessage, expectedErrMessage)
receivedErrCode := event["error_code"].(string) receivedErrCode := event["error_code"].(string)
@ -675,7 +677,7 @@ func (s *TransactionsTestSuite) TestDiscardMultipleQueuedTransactions() {
To: common.ToAddress(TestConfig.Account2.Address), To: common.ToAddress(TestConfig.Account2.Address),
Value: (*hexutil.Big)(big.NewInt(1000000000000)), Value: (*hexutil.Big)(big.NewInt(1000000000000)),
}) })
s.EqualError(err, txqueue.ErrQueuedTxDiscarded.Error()) s.EqualError(err, queue.ErrQueuedTxDiscarded.Error())
s.True(reflect.DeepEqual(txHashCheck, gethcommon.Hash{}), "transaction returned hash, while it shouldn't") s.True(reflect.DeepEqual(txHashCheck, gethcommon.Hash{}), "transaction returned hash, while it shouldn't")
} }
@ -747,7 +749,7 @@ func (s *TransactionsTestSuite) TestNonExistentQueuedTransactions() {
// try completing non-existing transaction // try completing non-existing transaction
_, err := s.Backend.CompleteTransaction("some-bad-transaction-id", TestConfig.Account1.Password) _, err := s.Backend.CompleteTransaction("some-bad-transaction-id", TestConfig.Account1.Password)
s.Error(err, "error expected and not received") s.Error(err, "error expected and not received")
s.EqualError(err, txqueue.ErrQueuedTxIDNotFound.Error()) s.EqualError(err, queue.ErrQueuedTxIDNotFound.Error())
} }
func (s *TransactionsTestSuite) TestEvictionOfQueuedTransactions() { func (s *TransactionsTestSuite) TestEvictionOfQueuedTransactions() {
@ -756,6 +758,24 @@ func (s *TransactionsTestSuite) TestEvictionOfQueuedTransactions() {
backend := s.LightEthereumService().StatusBackend backend := s.LightEthereumService().StatusBackend
s.NotNil(backend) s.NotNil(backend)
var m sync.Mutex
txCount := 0
txIDs := [queue.DefaultTxQueueCap + 5 + 10]common.QueuedTxID{}
signal.SetDefaultNodeNotificationHandler(func(rawSignal string) {
var sg signal.Envelope
err := json.Unmarshal([]byte(rawSignal), &sg)
s.NoError(err)
if sg.Type == transactions.EventTransactionQueued {
event := sg.Event.(map[string]interface{})
txID := event["id"].(string)
m.Lock()
txIDs[txCount] = common.QueuedTxID(txID)
txCount++
m.Unlock()
}
})
// reset queue // reset queue
s.Backend.TxQueueManager().TransactionQueue().Reset() s.Backend.TxQueueManager().TransactionQueue().Reset()
@ -764,36 +784,23 @@ func (s *TransactionsTestSuite) TestEvictionOfQueuedTransactions() {
s.NoError(s.Backend.AccountManager().SelectAccount(TestConfig.Account1.Address, TestConfig.Account1.Password)) s.NoError(s.Backend.AccountManager().SelectAccount(TestConfig.Account1.Address, TestConfig.Account1.Password))
txQueue := s.Backend.TxQueueManager().TransactionQueue() txQueue := s.Backend.TxQueueManager().TransactionQueue()
var i = 0
txIDs := [txqueue.DefaultTxQueueCap + 5 + 10]common.QueuedTxID{}
s.Backend.TxQueueManager().SetTransactionQueueHandler(func(queuedTx *common.QueuedTx) {
log.Info("tx enqueued", "i", i+1, "queue size", txQueue.Count(), "id", queuedTx.ID)
txIDs[i] = queuedTx.ID
i++
})
s.Zero(txQueue.Count(), "transaction count should be zero") s.Zero(txQueue.Count(), "transaction count should be zero")
for j := 0; j < 10; j++ { for j := 0; j < 10; j++ {
go s.Backend.SendTransaction(context.TODO(), common.SendTxArgs{}) // nolint: errcheck go s.Backend.SendTransaction(context.TODO(), common.SendTxArgs{}) // nolint: errcheck
} }
time.Sleep(2 * time.Second) // FIXME(tiabc): more reliable synchronization to ensure all transactions are enqueued time.Sleep(2 * time.Second)
log.Info(fmt.Sprintf("Number of transactions queued: %d. Queue size (shouldn't be more than %d): %d",
i, txqueue.DefaultTxQueueCap, txQueue.Count()))
s.Equal(10, txQueue.Count(), "transaction count should be 10") s.Equal(10, txQueue.Count(), "transaction count should be 10")
for i := 0; i < txqueue.DefaultTxQueueCap+5; i++ { // stress test by hitting with lots of goroutines for i := 0; i < queue.DefaultTxQueueCap+5; i++ { // stress test by hitting with lots of goroutines
go s.Backend.SendTransaction(context.TODO(), common.SendTxArgs{}) // nolint: errcheck go s.Backend.SendTransaction(context.TODO(), common.SendTxArgs{}) // nolint: errcheck
} }
time.Sleep(5 * time.Second) time.Sleep(5 * time.Second)
s.True(txQueue.Count() <= txqueue.DefaultTxQueueCap, "transaction count should be %d (or %d): got %d", txqueue.DefaultTxQueueCap, txqueue.DefaultTxQueueCap-1, txQueue.Count()) s.True(txQueue.Count() <= queue.DefaultTxQueueCap, "transaction count should be %d (or %d): got %d", queue.DefaultTxQueueCap, queue.DefaultTxQueueCap-1, txQueue.Count())
for _, txID := range txIDs { for _, txID := range txIDs {
txQueue.Remove(txID) txQueue.Remove(txID)
} }
s.Zero(txQueue.Count(), "transaction count should be zero: %d", txQueue.Count()) s.Zero(txQueue.Count(), "transaction count should be zero: %d", txQueue.Count())
} }

View File

@ -13,7 +13,7 @@ import (
"github.com/status-im/status-go/geth/notification/fcm" "github.com/status-im/status-go/geth/notification/fcm"
"github.com/status-im/status-go/geth/params" "github.com/status-im/status-go/geth/params"
"github.com/status-im/status-go/geth/signal" "github.com/status-im/status-go/geth/signal"
"github.com/status-im/status-go/geth/txqueue" "github.com/status-im/status-go/geth/transactions"
) )
const ( const (
@ -38,7 +38,7 @@ func NewStatusBackend() *StatusBackend {
nodeManager := node.NewNodeManager() nodeManager := node.NewNodeManager()
accountManager := account.NewManager(nodeManager) accountManager := account.NewManager(nodeManager)
txQueueManager := txqueue.NewManager(nodeManager, accountManager) txQueueManager := transactions.NewManager(nodeManager, accountManager)
jailManager := jail.New(nodeManager) jailManager := jail.New(nodeManager)
notificationManager := fcm.NewNotification(fcmServerKey) notificationManager := fcm.NewNotification(fcmServerKey)
@ -205,7 +205,7 @@ func (m *StatusBackend) SendTransaction(ctx context.Context, args common.SendTxA
ctx = context.Background() ctx = context.Background()
} }
tx := m.txQueueManager.CreateTransaction(ctx, args) tx := common.CreateTransaction(ctx, args)
if err := m.txQueueManager.QueueTransaction(tx); err != nil { if err := m.txQueueManager.QueueTransaction(tx); err != nil {
return gethcommon.Hash{}, err return gethcommon.Hash{}, err
@ -247,11 +247,5 @@ func (m *StatusBackend) registerHandlers() error {
rpcClient.RegisterHandler("eth_accounts", m.accountManager.AccountsRPCHandler()) rpcClient.RegisterHandler("eth_accounts", m.accountManager.AccountsRPCHandler())
rpcClient.RegisterHandler("eth_sendTransaction", m.txQueueManager.SendTransactionRPCHandler) rpcClient.RegisterHandler("eth_sendTransaction", m.txQueueManager.SendTransactionRPCHandler)
m.txQueueManager.SetTransactionQueueHandler(m.txQueueManager.TransactionQueueHandler())
log.Info("Registered handler", "fn", "TransactionQueueHandler")
m.txQueueManager.SetTransactionReturnHandler(m.txQueueManager.TransactionReturnHandler())
log.Info("Registered handler", "fn", "TransactionReturnHandler")
return nil return nil
} }

View File

@ -157,14 +157,12 @@ type QueuedTxID string
// QueuedTx holds enough information to complete the queued transaction. // QueuedTx holds enough information to complete the queued transaction.
type QueuedTx struct { type QueuedTx struct {
ID QueuedTxID ID QueuedTxID
Hash common.Hash Hash common.Hash
Context context.Context Context context.Context
Args SendTxArgs Args SendTxArgs
InProgress bool // true if transaction is being sent Done chan struct{}
Done chan struct{} Err error
Discard chan struct{}
Err error
} }
// SendTxArgs represents the arguments to submit a new transaction into the transaction pool. // SendTxArgs represents the arguments to submit a new transaction into the transaction pool.
@ -178,12 +176,6 @@ type SendTxArgs struct {
Nonce *hexutil.Uint64 `json:"nonce"` Nonce *hexutil.Uint64 `json:"nonce"`
} }
// EnqueuedTxHandler is a function that receives queued/pending transactions, when they get queued
type EnqueuedTxHandler func(*QueuedTx)
// EnqueuedTxReturnHandler is a function that receives response when tx is complete (both on success and error)
type EnqueuedTxReturnHandler func(*QueuedTx, error)
// TxQueue is a queue of transactions. // TxQueue is a queue of transactions.
type TxQueue interface { type TxQueue interface {
// Remove removes a transaction from the queue. // Remove removes a transaction from the queue.
@ -210,32 +202,14 @@ type TxQueueManager interface {
// TransactionQueue returns a transaction queue. // TransactionQueue returns a transaction queue.
TransactionQueue() TxQueue TransactionQueue() TxQueue
// CreateTransactoin creates a new transaction.
CreateTransaction(ctx context.Context, args SendTxArgs) *QueuedTx
// QueueTransaction adds a new transaction to the queue. // QueueTransaction adds a new transaction to the queue.
QueueTransaction(tx *QueuedTx) error QueueTransaction(tx *QueuedTx) error
// WaitForTransactions blocks until transaction is completed, discarded or timed out. // WaitForTransactions blocks until transaction is completed, discarded or timed out.
WaitForTransaction(tx *QueuedTx) error WaitForTransaction(tx *QueuedTx) error
// NotifyOnQueuedTxReturn notifies a handler when a transaction returns.
NotifyOnQueuedTxReturn(queuedTx *QueuedTx, err error)
// TransactionQueueHandler returns handler that processes incoming tx queue requests
TransactionQueueHandler() func(queuedTx *QueuedTx)
// TODO(adam): might be not needed
SetTransactionQueueHandler(fn EnqueuedTxHandler)
// TODO(adam): might be not needed
SetTransactionReturnHandler(fn EnqueuedTxReturnHandler)
SendTransactionRPCHandler(ctx context.Context, args ...interface{}) (interface{}, error) SendTransactionRPCHandler(ctx context.Context, args ...interface{}) (interface{}, error)
// TransactionReturnHandler returns handler that processes responses from internal tx manager
TransactionReturnHandler() func(queuedTx *QueuedTx, err error)
// CompleteTransaction instructs backend to complete sending of a given transaction // CompleteTransaction instructs backend to complete sending of a given transaction
CompleteTransaction(id QueuedTxID, password string) (common.Hash, error) CompleteTransaction(id QueuedTxID, password string) (common.Hash, error)

View File

@ -521,18 +521,6 @@ func (mr *MockTxQueueManagerMockRecorder) TransactionQueue() *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TransactionQueue", reflect.TypeOf((*MockTxQueueManager)(nil).TransactionQueue)) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TransactionQueue", reflect.TypeOf((*MockTxQueueManager)(nil).TransactionQueue))
} }
// CreateTransaction mocks base method
func (m *MockTxQueueManager) CreateTransaction(ctx context.Context, args SendTxArgs) *QueuedTx {
ret := m.ctrl.Call(m, "CreateTransaction", ctx, args)
ret0, _ := ret[0].(*QueuedTx)
return ret0
}
// CreateTransaction indicates an expected call of CreateTransaction
func (mr *MockTxQueueManagerMockRecorder) CreateTransaction(ctx, args interface{}) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateTransaction", reflect.TypeOf((*MockTxQueueManager)(nil).CreateTransaction), ctx, args)
}
// QueueTransaction mocks base method // QueueTransaction mocks base method
func (m *MockTxQueueManager) QueueTransaction(tx *QueuedTx) error { func (m *MockTxQueueManager) QueueTransaction(tx *QueuedTx) error {
ret := m.ctrl.Call(m, "QueueTransaction", tx) ret := m.ctrl.Call(m, "QueueTransaction", tx)
@ -557,48 +545,6 @@ func (mr *MockTxQueueManagerMockRecorder) WaitForTransaction(tx interface{}) *go
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WaitForTransaction", reflect.TypeOf((*MockTxQueueManager)(nil).WaitForTransaction), tx) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WaitForTransaction", reflect.TypeOf((*MockTxQueueManager)(nil).WaitForTransaction), tx)
} }
// NotifyOnQueuedTxReturn mocks base method
func (m *MockTxQueueManager) NotifyOnQueuedTxReturn(queuedTx *QueuedTx, err error) {
m.ctrl.Call(m, "NotifyOnQueuedTxReturn", queuedTx, err)
}
// NotifyOnQueuedTxReturn indicates an expected call of NotifyOnQueuedTxReturn
func (mr *MockTxQueueManagerMockRecorder) NotifyOnQueuedTxReturn(queuedTx, err interface{}) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NotifyOnQueuedTxReturn", reflect.TypeOf((*MockTxQueueManager)(nil).NotifyOnQueuedTxReturn), queuedTx, err)
}
// TransactionQueueHandler mocks base method
func (m *MockTxQueueManager) TransactionQueueHandler() func(*QueuedTx) {
ret := m.ctrl.Call(m, "TransactionQueueHandler")
ret0, _ := ret[0].(func(*QueuedTx))
return ret0
}
// TransactionQueueHandler indicates an expected call of TransactionQueueHandler
func (mr *MockTxQueueManagerMockRecorder) TransactionQueueHandler() *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TransactionQueueHandler", reflect.TypeOf((*MockTxQueueManager)(nil).TransactionQueueHandler))
}
// SetTransactionQueueHandler mocks base method
func (m *MockTxQueueManager) SetTransactionQueueHandler(fn EnqueuedTxHandler) {
m.ctrl.Call(m, "SetTransactionQueueHandler", fn)
}
// SetTransactionQueueHandler indicates an expected call of SetTransactionQueueHandler
func (mr *MockTxQueueManagerMockRecorder) SetTransactionQueueHandler(fn interface{}) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetTransactionQueueHandler", reflect.TypeOf((*MockTxQueueManager)(nil).SetTransactionQueueHandler), fn)
}
// SetTransactionReturnHandler mocks base method
func (m *MockTxQueueManager) SetTransactionReturnHandler(fn EnqueuedTxReturnHandler) {
m.ctrl.Call(m, "SetTransactionReturnHandler", fn)
}
// SetTransactionReturnHandler indicates an expected call of SetTransactionReturnHandler
func (mr *MockTxQueueManagerMockRecorder) SetTransactionReturnHandler(fn interface{}) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetTransactionReturnHandler", reflect.TypeOf((*MockTxQueueManager)(nil).SetTransactionReturnHandler), fn)
}
// SendTransactionRPCHandler mocks base method // SendTransactionRPCHandler mocks base method
func (m *MockTxQueueManager) SendTransactionRPCHandler(ctx context.Context, args ...interface{}) (interface{}, error) { func (m *MockTxQueueManager) SendTransactionRPCHandler(ctx context.Context, args ...interface{}) (interface{}, error) {
varargs := []interface{}{ctx} varargs := []interface{}{ctx}
@ -617,18 +563,6 @@ func (mr *MockTxQueueManagerMockRecorder) SendTransactionRPCHandler(ctx interfac
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendTransactionRPCHandler", reflect.TypeOf((*MockTxQueueManager)(nil).SendTransactionRPCHandler), varargs...) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendTransactionRPCHandler", reflect.TypeOf((*MockTxQueueManager)(nil).SendTransactionRPCHandler), varargs...)
} }
// TransactionReturnHandler mocks base method
func (m *MockTxQueueManager) TransactionReturnHandler() func(*QueuedTx, error) {
ret := m.ctrl.Call(m, "TransactionReturnHandler")
ret0, _ := ret[0].(func(*QueuedTx, error))
return ret0
}
// TransactionReturnHandler indicates an expected call of TransactionReturnHandler
func (mr *MockTxQueueManagerMockRecorder) TransactionReturnHandler() *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TransactionReturnHandler", reflect.TypeOf((*MockTxQueueManager)(nil).TransactionReturnHandler))
}
// CompleteTransaction mocks base method // CompleteTransaction mocks base method
func (m *MockTxQueueManager) CompleteTransaction(id QueuedTxID, password string) (common.Hash, error) { func (m *MockTxQueueManager) CompleteTransaction(id QueuedTxID, password string) (common.Hash, error) {
ret := m.ctrl.Call(m, "CompleteTransaction", id, password) ret := m.ctrl.Call(m, "CompleteTransaction", id, password)
@ -678,6 +612,16 @@ func (mr *MockTxQueueManagerMockRecorder) DiscardTransactions(ids interface{}) *
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DiscardTransactions", reflect.TypeOf((*MockTxQueueManager)(nil).DiscardTransactions), ids) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DiscardTransactions", reflect.TypeOf((*MockTxQueueManager)(nil).DiscardTransactions), ids)
} }
// DisableNotificactions mocks base method
func (m *MockTxQueueManager) DisableNotificactions() {
m.ctrl.Call(m, "DisableNotificactions")
}
// DisableNotificactions indicates an expected call of DisableNotificactions
func (mr *MockTxQueueManagerMockRecorder) DisableNotificactions() *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DisableNotificactions", reflect.TypeOf((*MockTxQueueManager)(nil).DisableNotificactions))
}
// MockJailCell is a mock of JailCell interface // MockJailCell is a mock of JailCell interface
type MockJailCell struct { type MockJailCell struct {
ctrl *gomock.Controller ctrl *gomock.Controller

View File

@ -16,6 +16,7 @@ import (
"github.com/ethereum/go-ethereum/accounts" "github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/pborman/uuid"
"github.com/status-im/status-go/geth/log" "github.com/status-im/status-go/geth/log"
"github.com/status-im/status-go/static" "github.com/status-im/status-go/static"
) )
@ -151,3 +152,14 @@ func Fatalf(reason interface{}, args ...interface{}) {
os.Exit(1) os.Exit(1)
} }
// CreateTransaction returns a transaction object.
func CreateTransaction(ctx context.Context, args SendTxArgs) *QueuedTx {
return &QueuedTx{
ID: QueuedTxID(uuid.New()),
Hash: common.Hash{},
Context: ctx,
Args: args,
Done: make(chan struct{}),
}
}

View File

@ -1,6 +1,6 @@
// copy of go-ethereum/internal/ethapi/addrlock.go // copy of go-ethereum/internal/ethapi/addrlock.go
package txqueue package transactions
import ( import (
"sync" "sync"

View File

@ -1,4 +1,4 @@
package txqueue package transactions
import ( import (
"context" "context"

View File

@ -1,5 +1,5 @@
// Code generated by MockGen. DO NOT EDIT. // Code generated by MockGen. DO NOT EDIT.
// Source: geth/txqueue/fake/txservice.go // Source: geth/transactions/fake/txservice.go
// Package fake is a generated GoMock package. // Package fake is a generated GoMock package.
package fake package fake

View File

@ -0,0 +1,94 @@
package transactions
import (
"strconv"
"github.com/ethereum/go-ethereum/accounts/keystore"
"github.com/status-im/status-go/geth/common"
"github.com/status-im/status-go/geth/signal"
"github.com/status-im/status-go/geth/transactions/queue"
)
const (
// EventTransactionQueued is triggered when send transaction request is queued
EventTransactionQueued = "transaction.queued"
// EventTransactionFailed is triggered when send transaction request fails
EventTransactionFailed = "transaction.failed"
)
const (
// SendTransactionNoErrorCode is sent when no error occurred.
SendTransactionNoErrorCode = iota
// SendTransactionDefaultErrorCode is every case when there is no special tx return code.
SendTransactionDefaultErrorCode
// SendTransactionPasswordErrorCode is sent when account failed verification.
SendTransactionPasswordErrorCode
// SendTransactionTimeoutErrorCode is sent when tx is timed out.
SendTransactionTimeoutErrorCode
// SendTransactionDiscardedErrorCode is sent when tx was discarded.
SendTransactionDiscardedErrorCode
)
var txReturnCodes = map[error]int{
nil: SendTransactionNoErrorCode,
keystore.ErrDecrypt: SendTransactionPasswordErrorCode,
queue.ErrQueuedTxTimedOut: SendTransactionTimeoutErrorCode,
queue.ErrQueuedTxDiscarded: SendTransactionDiscardedErrorCode,
}
// SendTransactionEvent is a signal sent on a send transaction request
type SendTransactionEvent struct {
ID string `json:"id"`
Args common.SendTxArgs `json:"args"`
MessageID string `json:"message_id"`
}
// NotifyOnEnqueue returns handler that processes incoming tx queue requests
func NotifyOnEnqueue(queuedTx *common.QueuedTx) {
signal.Send(signal.Envelope{
Type: EventTransactionQueued,
Event: SendTransactionEvent{
ID: string(queuedTx.ID),
Args: queuedTx.Args,
MessageID: common.MessageIDFromContext(queuedTx.Context),
},
})
}
// ReturnSendTransactionEvent is a JSON returned whenever transaction send is returned
type ReturnSendTransactionEvent struct {
ID string `json:"id"`
Args common.SendTxArgs `json:"args"`
MessageID string `json:"message_id"`
ErrorMessage string `json:"error_message"`
ErrorCode string `json:"error_code"`
}
// NotifyOnReturn returns handler that processes responses from internal tx manager
func NotifyOnReturn(queuedTx *common.QueuedTx) {
// discard notifications with empty tx
if queuedTx == nil {
return
}
// we don't want to notify a user if tx sent successfully
if queuedTx.Err == nil {
return
}
signal.Send(signal.Envelope{
Type: EventTransactionFailed,
Event: ReturnSendTransactionEvent{
ID: string(queuedTx.ID),
Args: queuedTx.Args,
MessageID: common.MessageIDFromContext(queuedTx.Context),
ErrorMessage: queuedTx.Err.Error(),
ErrorCode: strconv.Itoa(sendTransactionErrorCode(queuedTx.Err)),
},
})
}
func sendTransactionErrorCode(err error) int {
if code, ok := txReturnCodes[err]; ok {
return code
}
return SendTxDefaultErrorCode
}

View File

@ -1,4 +1,4 @@
package txqueue package queue
import ( import (
"errors" "errors"
@ -8,6 +8,7 @@ import (
"github.com/ethereum/go-ethereum/accounts/keystore" "github.com/ethereum/go-ethereum/accounts/keystore"
gethcommon "github.com/ethereum/go-ethereum/common" gethcommon "github.com/ethereum/go-ethereum/common"
"github.com/status-im/status-go/geth/account"
"github.com/status-im/status-go/geth/common" "github.com/status-im/status-go/geth/common"
"github.com/status-im/status-go/geth/log" "github.com/status-im/status-go/geth/log"
) )
@ -15,10 +16,6 @@ import (
const ( const (
// DefaultTxQueueCap defines how many items can be queued. // DefaultTxQueueCap defines how many items can be queued.
DefaultTxQueueCap = int(35) DefaultTxQueueCap = int(35)
// DefaultTxSendQueueCap defines how many items can be passed to sendTransaction() w/o blocking.
DefaultTxSendQueueCap = int(70)
// DefaultTxSendCompletionTimeout defines how many seconds to wait before returning result in sentTransaction().
DefaultTxSendCompletionTimeout = 300
) )
var ( var (
@ -36,33 +33,39 @@ var (
ErrInvalidCompleteTxSender = errors.New("transaction can only be completed by the same account which created it") ErrInvalidCompleteTxSender = errors.New("transaction can only be completed by the same account which created it")
) )
// remove from queue on any error (except for transient ones) and propagate
// defined as map[string]bool because errors from ethclient returned wrapped as jsonError
var transientErrs = map[string]bool{
keystore.ErrDecrypt.Error(): true, // wrong password
ErrInvalidCompleteTxSender.Error(): true, // completing tx create from another account
account.ErrNoAccountSelected.Error(): true, // account not selected
}
type empty struct{}
// TxQueue is capped container that holds pending transactions // TxQueue is capped container that holds pending transactions
type TxQueue struct { type TxQueue struct {
transactions map[common.QueuedTxID]*common.QueuedTx mu sync.RWMutex // to guard transactions map
mu sync.RWMutex // to guard transactions map transactions map[common.QueuedTxID]*common.QueuedTx
inprogress map[common.QueuedTxID]empty
// TODO(dshulyak) research why eviction is done in separate goroutine
evictableIDs chan common.QueuedTxID evictableIDs chan common.QueuedTxID
enqueueTicker chan struct{} enqueueTicker chan struct{}
incomingPool chan *common.QueuedTx
// when this channel is closed, all queue channels processing must cease (incoming queue, processing queued items etc) // when this channel is closed, all queue channels processing must cease (incoming queue, processing queued items etc)
stopped chan struct{} stopped chan struct{}
stoppedGroup sync.WaitGroup // to make sure that all routines are stopped stoppedGroup sync.WaitGroup // to make sure that all routines are stopped
// when items are enqueued notify subscriber
txEnqueueHandler common.EnqueuedTxHandler
// when tx is returned (either successfully or with error) notify subscriber
txReturnHandler common.EnqueuedTxReturnHandler
} }
// NewTransactionQueue make new transaction queue // New creates a transaction queue.
func NewTransactionQueue() *TxQueue { func New() *TxQueue {
log.Info("initializing transaction queue") log.Info("initializing transaction queue")
return &TxQueue{ return &TxQueue{
transactions: make(map[common.QueuedTxID]*common.QueuedTx), transactions: make(map[common.QueuedTxID]*common.QueuedTx),
inprogress: make(map[common.QueuedTxID]empty),
evictableIDs: make(chan common.QueuedTxID, DefaultTxQueueCap), // will be used to evict in FIFO evictableIDs: make(chan common.QueuedTxID, DefaultTxQueueCap), // will be used to evict in FIFO
enqueueTicker: make(chan struct{}), enqueueTicker: make(chan struct{}),
incomingPool: make(chan *common.QueuedTx, DefaultTxSendQueueCap),
} }
} }
@ -75,10 +78,8 @@ func (q *TxQueue) Start() {
} }
q.stopped = make(chan struct{}) q.stopped = make(chan struct{})
q.stoppedGroup.Add(2) q.stoppedGroup.Add(1)
go q.evictionLoop() go q.evictionLoop()
go q.enqueueLoop()
} }
// Stop stops transaction enqueue and eviction loops // Stop stops transaction enqueue and eviction loops
@ -100,7 +101,7 @@ func (q *TxQueue) Stop() {
func (q *TxQueue) evictionLoop() { func (q *TxQueue) evictionLoop() {
defer HaltOnPanic() defer HaltOnPanic()
evict := func() { evict := func() {
if len(q.transactions) >= DefaultTxQueueCap { // eviction is required to accommodate another/last item if q.Count() >= DefaultTxQueueCap { // eviction is required to accommodate another/last item
q.Remove(<-q.evictableIDs) q.Remove(<-q.evictableIDs)
} }
} }
@ -119,26 +120,6 @@ func (q *TxQueue) evictionLoop() {
} }
} }
// enqueueLoop process incoming enqueue requests
func (q *TxQueue) enqueueLoop() {
defer HaltOnPanic()
// enqueue incoming transactions
for {
select {
case queuedTx := <-q.incomingPool:
log.Info("transaction enqueued requested", "tx", queuedTx.ID)
err := q.Enqueue(queuedTx)
log.Warn("transaction enqueued error", "tx", err)
log.Info("transaction enqueued", "tx", queuedTx.ID)
case <-q.stopped:
log.Info("transaction queue's enqueue loop stopped")
q.stoppedGroup.Done()
return
}
}
}
// Reset is to be used in tests only, as it simply creates new transaction map, w/o any cleanup of the previous one // Reset is to be used in tests only, as it simply creates new transaction map, w/o any cleanup of the previous one
func (q *TxQueue) Reset() { func (q *TxQueue) Reset() {
q.mu.Lock() q.mu.Lock()
@ -146,22 +127,14 @@ func (q *TxQueue) Reset() {
q.transactions = make(map[common.QueuedTxID]*common.QueuedTx) q.transactions = make(map[common.QueuedTxID]*common.QueuedTx)
q.evictableIDs = make(chan common.QueuedTxID, DefaultTxQueueCap) q.evictableIDs = make(chan common.QueuedTxID, DefaultTxQueueCap)
} q.inprogress = make(map[common.QueuedTxID]empty)
// EnqueueAsync enqueues incoming transaction in async manner, returns as soon as possible
func (q *TxQueue) EnqueueAsync(tx *common.QueuedTx) error {
q.incomingPool <- tx
return nil
} }
// Enqueue enqueues incoming transaction // Enqueue enqueues incoming transaction
func (q *TxQueue) Enqueue(tx *common.QueuedTx) error { func (q *TxQueue) Enqueue(tx *common.QueuedTx) error {
log.Info(fmt.Sprintf("enqueue transaction: %s", tx.ID)) log.Info(fmt.Sprintf("enqueue transaction: %s", tx.ID))
if (tx.Hash != gethcommon.Hash{} || tx.Err != nil) {
if q.txEnqueueHandler == nil { //discard, until handler is provided return ErrQueuedTxAlreadyProcessed
log.Info("there is no txEnqueueHandler")
return nil
} }
log.Info("before enqueueTicker") log.Info("before enqueueTicker")
@ -176,8 +149,6 @@ func (q *TxQueue) Enqueue(tx *common.QueuedTx) error {
// notify handler // notify handler
log.Info("calling txEnqueueHandler") log.Info("calling txEnqueueHandler")
q.txEnqueueHandler(tx)
return nil return nil
} }
@ -189,50 +160,68 @@ func (q *TxQueue) Get(id common.QueuedTxID) (*common.QueuedTx, error) {
if tx, ok := q.transactions[id]; ok { if tx, ok := q.transactions[id]; ok {
return tx, nil return tx, nil
} }
return nil, ErrQueuedTxIDNotFound return nil, ErrQueuedTxIDNotFound
} }
// LockInprogress returns error if transaction is already inprogress.
func (q *TxQueue) LockInprogress(id common.QueuedTxID) error {
q.mu.Lock()
defer q.mu.Unlock()
if _, ok := q.transactions[id]; ok {
if _, inprogress := q.inprogress[id]; inprogress {
return ErrQueuedTxInProgress
}
q.inprogress[id] = empty{}
return nil
}
return ErrQueuedTxIDNotFound
}
// Remove removes transaction by transaction identifier // Remove removes transaction by transaction identifier
func (q *TxQueue) Remove(id common.QueuedTxID) { func (q *TxQueue) Remove(id common.QueuedTxID) {
q.mu.Lock() q.mu.Lock()
defer q.mu.Unlock() defer q.mu.Unlock()
q.remove(id)
delete(q.transactions, id)
} }
// StartProcessing marks a transaction as in progress. It's thread-safe and func (q *TxQueue) remove(id common.QueuedTxID) {
// prevents from processing the same transaction multiple times. delete(q.transactions, id)
func (q *TxQueue) StartProcessing(tx *common.QueuedTx) error { delete(q.inprogress, id)
}
// Done removes transaction from queue if no error or error is not transient
// and notify subscribers
func (q *TxQueue) Done(id common.QueuedTxID, hash gethcommon.Hash, err error) error {
q.mu.Lock() q.mu.Lock()
defer q.mu.Unlock() defer q.mu.Unlock()
tx, ok := q.transactions[id]
if tx.Hash != (gethcommon.Hash{}) || tx.Err != nil { if !ok {
return ErrQueuedTxAlreadyProcessed return ErrQueuedTxIDNotFound
} }
q.done(tx, hash, err)
if tx.InProgress {
return ErrQueuedTxInProgress
}
tx.InProgress = true
return nil return nil
} }
// StopProcessing removes the "InProgress" flag from the transaction. func (q *TxQueue) done(tx *common.QueuedTx, hash gethcommon.Hash, err error) {
func (q *TxQueue) StopProcessing(tx *common.QueuedTx) { delete(q.inprogress, tx.ID)
q.mu.Lock() tx.Err = err
defer q.mu.Unlock() // hash is updated only if err is nil, but transaction is not removed from a queue
if err == nil {
tx.InProgress = false q.remove(tx.ID)
tx.Hash = hash
close(tx.Done)
return
}
if _, transient := transientErrs[err.Error()]; !transient {
q.remove(tx.ID)
close(tx.Done)
}
} }
// Count returns number of currently queued transactions // Count returns number of currently queued transactions
func (q *TxQueue) Count() int { func (q *TxQueue) Count() int {
q.mu.RLock() q.mu.RLock()
defer q.mu.RUnlock() defer q.mu.RUnlock()
return len(q.transactions) return len(q.transactions)
} }
@ -240,54 +229,6 @@ func (q *TxQueue) Count() int {
func (q *TxQueue) Has(id common.QueuedTxID) bool { func (q *TxQueue) Has(id common.QueuedTxID) bool {
q.mu.RLock() q.mu.RLock()
defer q.mu.RUnlock() defer q.mu.RUnlock()
_, ok := q.transactions[id] _, ok := q.transactions[id]
return ok return ok
} }
// SetEnqueueHandler sets callback handler, that is triggered on enqueue operation
func (q *TxQueue) SetEnqueueHandler(fn common.EnqueuedTxHandler) {
q.txEnqueueHandler = fn
}
// SetTxReturnHandler sets callback handler, that is triggered when transaction is finished executing
func (q *TxQueue) SetTxReturnHandler(fn common.EnqueuedTxReturnHandler) {
q.txReturnHandler = fn
}
// NotifyOnQueuedTxReturn is invoked when transaction is ready to return
// Transaction can be in error state, or executed successfully at this point.
func (q *TxQueue) NotifyOnQueuedTxReturn(queuedTx *common.QueuedTx, err error) {
if q == nil {
return
}
// discard, if transaction is not found
if queuedTx == nil {
return
}
// on success, remove item from the queue and stop propagating
if err == nil {
q.Remove(queuedTx.ID)
return
}
// error occurred, send upward notification
if q.txReturnHandler == nil { // discard, until handler is provided
return
}
// remove from queue on any error (except for transient ones) and propagate
transientErrs := map[error]bool{
keystore.ErrDecrypt: true, // wrong password
ErrInvalidCompleteTxSender: true, // completing tx create from another account
}
if !transientErrs[err] { // remove only on unrecoverable errors
q.Remove(queuedTx.ID)
}
// notify handler
q.txReturnHandler(queuedTx, err)
}

View File

@ -0,0 +1,134 @@
package queue
import (
"context"
"errors"
"testing"
"github.com/ethereum/go-ethereum/accounts/keystore"
gethcommon "github.com/ethereum/go-ethereum/common"
"github.com/status-im/status-go/geth/common"
"github.com/stretchr/testify/suite"
)
func TestQueueTestSuite(t *testing.T) {
suite.Run(t, new(QueueTestSuite))
}
type QueueTestSuite struct {
suite.Suite
queue *TxQueue
}
func (s *QueueTestSuite) SetupTest() {
s.queue = New()
s.queue.Start()
}
func (s *QueueTestSuite) TearDownTest() {
s.queue.Stop()
}
func (s *QueueTestSuite) TestLockInprogressTransaction() {
tx := common.CreateTransaction(context.Background(), common.SendTxArgs{})
s.NoError(s.queue.Enqueue(tx))
enquedTx, err := s.queue.Get(tx.ID)
s.NoError(err)
s.NoError(s.queue.LockInprogress(tx.ID))
s.Equal(tx, enquedTx)
// verify that tx was marked as being inprogress
s.Equal(ErrQueuedTxInProgress, s.queue.LockInprogress(tx.ID))
}
func (s *QueueTestSuite) TestGetTransaction() {
tx := common.CreateTransaction(context.Background(), common.SendTxArgs{})
s.NoError(s.queue.Enqueue(tx))
for i := 2; i > 0; i-- {
enquedTx, err := s.queue.Get(tx.ID)
s.NoError(err)
s.Equal(tx, enquedTx)
}
}
func (s *QueueTestSuite) TestEnqueueProcessedTransaction() {
// enqueue will fail if transaction with hash will be enqueued
tx := common.CreateTransaction(context.Background(), common.SendTxArgs{})
tx.Hash = gethcommon.Hash{1}
s.Equal(ErrQueuedTxAlreadyProcessed, s.queue.Enqueue(tx))
tx = common.CreateTransaction(context.Background(), common.SendTxArgs{})
tx.Err = errors.New("error")
s.Equal(ErrQueuedTxAlreadyProcessed, s.queue.Enqueue(tx))
}
func (s *QueueTestSuite) testDone(hash gethcommon.Hash, err error) *common.QueuedTx {
tx := common.CreateTransaction(context.Background(), common.SendTxArgs{})
s.NoError(s.queue.Enqueue(tx))
s.NoError(s.queue.Done(tx.ID, hash, err))
return tx
}
func (s *QueueTestSuite) TestDoneSuccess() {
hash := gethcommon.Hash{1}
tx := s.testDone(hash, nil)
s.NoError(tx.Err)
s.Equal(hash, tx.Hash)
s.False(s.queue.Has(tx.ID))
// event is sent only if transaction was removed from a queue
select {
case <-tx.Done:
default:
s.Fail("No event was sent to Done channel")
}
}
func (s *QueueTestSuite) TestDoneTransientError() {
hash := gethcommon.Hash{1}
err := keystore.ErrDecrypt
tx := s.testDone(hash, err)
s.Equal(keystore.ErrDecrypt, tx.Err)
s.Equal(gethcommon.Hash{}, tx.Hash)
s.True(s.queue.Has(tx.ID))
}
func (s *QueueTestSuite) TestDoneError() {
hash := gethcommon.Hash{1}
err := errors.New("test")
tx := s.testDone(hash, err)
s.Equal(err, tx.Err)
s.NotEqual(hash, tx.Hash)
s.Equal(gethcommon.Hash{}, tx.Hash)
s.False(s.queue.Has(tx.ID))
// event is sent only if transaction was removed from a queue
select {
case <-tx.Done:
default:
s.Fail("No event was sent to Done channel")
}
}
func (s QueueTestSuite) TestMultipleDone() {
hash := gethcommon.Hash{1}
err := keystore.ErrDecrypt
tx := s.testDone(hash, err)
s.NoError(s.queue.Done(tx.ID, hash, nil))
s.Equal(ErrQueuedTxIDNotFound, s.queue.Done(tx.ID, hash, errors.New("timeout")))
}
func (s *QueueTestSuite) TestEviction() {
var first *common.QueuedTx
for i := 0; i < DefaultTxQueueCap; i++ {
tx := common.CreateTransaction(context.Background(), common.SendTxArgs{})
if first == nil {
first = tx
}
s.NoError(s.queue.Enqueue(tx))
}
s.Equal(DefaultTxQueueCap, s.queue.Count())
tx := common.CreateTransaction(context.Background(), common.SendTxArgs{})
s.NoError(s.queue.Enqueue(tx))
s.Equal(DefaultTxQueueCap, s.queue.Count())
s.False(s.queue.Has(first.ID))
}

View File

@ -1,4 +1,4 @@
package txqueue package queue
import ( import (
"errors" "errors"

View File

@ -1,4 +1,4 @@
package txqueue package transactions
import ( import (
"context" "context"
@ -6,53 +6,31 @@ import (
"time" "time"
ethereum "github.com/ethereum/go-ethereum" ethereum "github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/accounts/keystore"
gethcommon "github.com/ethereum/go-ethereum/common" gethcommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/pborman/uuid"
"github.com/status-im/status-go/geth/common" "github.com/status-im/status-go/geth/common"
"github.com/status-im/status-go/geth/log" "github.com/status-im/status-go/geth/log"
"github.com/status-im/status-go/geth/signal" "github.com/status-im/status-go/geth/transactions/queue"
) )
const ( const (
// EventTransactionQueued is triggered when send transaction request is queued
EventTransactionQueued = "transaction.queued"
// EventTransactionFailed is triggered when send transaction request fails
EventTransactionFailed = "transaction.failed"
// SendTxDefaultErrorCode is sent by default, when error is not nil, but type is unknown/unexpected. // SendTxDefaultErrorCode is sent by default, when error is not nil, but type is unknown/unexpected.
SendTxDefaultErrorCode = SendTransactionDefaultErrorCode SendTxDefaultErrorCode = SendTransactionDefaultErrorCode
// DefaultTxSendCompletionTimeout defines how many seconds to wait before returning result in sentTransaction().
DefaultTxSendCompletionTimeout = 300
defaultGas = 90000 defaultGas = 90000
defaultTimeout = time.Minute defaultTimeout = time.Minute
) )
// Send transaction response codes
const (
SendTransactionNoErrorCode = "0"
SendTransactionDefaultErrorCode = "1"
SendTransactionPasswordErrorCode = "2"
SendTransactionTimeoutErrorCode = "3"
SendTransactionDiscardedErrorCode = "4"
)
var txReturnCodes = map[error]string{ // deliberately strings, in case more meaningful codes are to be returned
nil: SendTransactionNoErrorCode,
keystore.ErrDecrypt: SendTransactionPasswordErrorCode,
ErrQueuedTxTimedOut: SendTransactionTimeoutErrorCode,
ErrQueuedTxDiscarded: SendTransactionDiscardedErrorCode,
}
// Manager provides means to manage internal Status Backend (injected into LES) // Manager provides means to manage internal Status Backend (injected into LES)
type Manager struct { type Manager struct {
nodeManager common.NodeManager nodeManager common.NodeManager
accountManager common.AccountManager accountManager common.AccountManager
txQueue *TxQueue txQueue *queue.TxQueue
ethTxClient EthTransactor ethTxClient EthTransactor
addrLock *AddrLocker addrLock *AddrLocker
notify bool
} }
// NewManager returns a new Manager. // NewManager returns a new Manager.
@ -60,11 +38,18 @@ func NewManager(nodeManager common.NodeManager, accountManager common.AccountMan
return &Manager{ return &Manager{
nodeManager: nodeManager, nodeManager: nodeManager,
accountManager: accountManager, accountManager: accountManager,
txQueue: NewTransactionQueue(), txQueue: queue.New(),
addrLock: &AddrLocker{}, addrLock: &AddrLocker{},
notify: true,
} }
} }
// DisableNotificactions turns off notifications on enqueue and return of tx.
// It is not thread safe and must be called only before manager is started.
func (m *Manager) DisableNotificactions() {
m.notify = false
}
// Start starts accepting new transactions into the queue. // Start starts accepting new transactions into the queue.
func (m *Manager) Start() { func (m *Manager) Start() {
log.Info("start Manager") log.Info("start Manager")
@ -83,18 +68,6 @@ func (m *Manager) TransactionQueue() common.TxQueue {
return m.txQueue return m.txQueue
} }
// CreateTransaction returns a transaction object.
func (m *Manager) CreateTransaction(ctx context.Context, args common.SendTxArgs) *common.QueuedTx {
return &common.QueuedTx{
ID: common.QueuedTxID(uuid.New()),
Hash: gethcommon.Hash{},
Context: ctx,
Args: args,
Done: make(chan struct{}, 1),
Discard: make(chan struct{}, 1),
}
}
// QueueTransaction puts a transaction into the queue. // QueueTransaction puts a transaction into the queue.
func (m *Manager) QueueTransaction(tx *common.QueuedTx) error { func (m *Manager) QueueTransaction(tx *common.QueuedTx) error {
to := "<nil>" to := "<nil>"
@ -102,108 +75,93 @@ func (m *Manager) QueueTransaction(tx *common.QueuedTx) error {
to = tx.Args.To.Hex() to = tx.Args.To.Hex()
} }
log.Info("queue a new transaction", "id", tx.ID, "from", tx.Args.From.Hex(), "to", to) log.Info("queue a new transaction", "id", tx.ID, "from", tx.Args.From.Hex(), "to", to)
err := m.txQueue.Enqueue(tx)
if m.notify {
NotifyOnEnqueue(tx)
}
return err
}
return m.txQueue.Enqueue(tx) func (m *Manager) txDone(tx *common.QueuedTx, hash gethcommon.Hash, err error) {
m.txQueue.Done(tx.ID, hash, err) //nolint: errcheck
if m.notify {
NotifyOnReturn(tx)
}
} }
// WaitForTransaction adds a transaction to the queue and blocks // WaitForTransaction adds a transaction to the queue and blocks
// until it's completed, discarded or times out. // until it's completed, discarded or times out.
func (m *Manager) WaitForTransaction(tx *common.QueuedTx) error { func (m *Manager) WaitForTransaction(tx *common.QueuedTx) error {
log.Info("wait for transaction", "id", tx.ID) log.Info("wait for transaction", "id", tx.ID)
// now wait up until transaction is: // now wait up until transaction is:
// - completed (via CompleteQueuedTransaction), // - completed (via CompleteQueuedTransaction),
// - discarded (via DiscardQueuedTransaction) // - discarded (via DiscardQueuedTransaction)
// - or times out // - or times out
select { select {
case <-tx.Done: case <-tx.Done:
m.NotifyOnQueuedTxReturn(tx, tx.Err)
return tx.Err
case <-tx.Discard:
m.NotifyOnQueuedTxReturn(tx, ErrQueuedTxDiscarded)
return ErrQueuedTxDiscarded
case <-time.After(DefaultTxSendCompletionTimeout * time.Second): case <-time.After(DefaultTxSendCompletionTimeout * time.Second):
m.NotifyOnQueuedTxReturn(tx, ErrQueuedTxTimedOut) m.txDone(tx, gethcommon.Hash{}, queue.ErrQueuedTxTimedOut)
return ErrQueuedTxTimedOut
} }
} return tx.Err
// NotifyOnQueuedTxReturn calls a handler when a transaction resolves.
func (m *Manager) NotifyOnQueuedTxReturn(queuedTx *common.QueuedTx, err error) {
m.txQueue.NotifyOnQueuedTxReturn(queuedTx, err)
} }
// CompleteTransaction instructs backend to complete sending of a given transaction. // CompleteTransaction instructs backend to complete sending of a given transaction.
// TODO(adam): investigate a possible bug that calling this method multiple times with the same Transaction ID func (m *Manager) CompleteTransaction(id common.QueuedTxID, password string) (hash gethcommon.Hash, err error) {
// results in sending multiple transactions.
func (m *Manager) CompleteTransaction(id common.QueuedTxID, password string) (gethcommon.Hash, error) {
log.Info("complete transaction", "id", id) log.Info("complete transaction", "id", id)
tx, err := m.txQueue.Get(id)
queuedTx, err := m.txQueue.Get(id)
if err != nil { if err != nil {
log.Warn("could not get a queued transaction", "err", err) log.Warn("error getting a queued transaction", "err", err)
return gethcommon.Hash{}, err
}
err = m.txQueue.StartProcessing(queuedTx)
if err != nil {
return gethcommon.Hash{}, err
}
defer m.txQueue.StopProcessing(queuedTx)
selectedAccount, err := m.accountManager.SelectedAccount()
if err != nil {
log.Warn("failed to get a selected account", "err", err)
return gethcommon.Hash{}, err
}
// make sure that only account which created the tx can complete it
if queuedTx.Args.From.Hex() != selectedAccount.Address.Hex() {
log.Warn("queued transaction does not belong to the selected account", "err", ErrInvalidCompleteTxSender)
m.NotifyOnQueuedTxReturn(queuedTx, ErrInvalidCompleteTxSender)
return gethcommon.Hash{}, ErrInvalidCompleteTxSender
}
// Send the transaction finally.
hash, err := m.completeTransaction(queuedTx, selectedAccount, password)
// when incorrect sender tries to complete the account,
// notify and keep tx in queue (so that correct sender can complete)
if err == keystore.ErrDecrypt {
log.Warn("failed to complete transaction", "err", err)
m.NotifyOnQueuedTxReturn(queuedTx, err)
return hash, err return hash, err
} }
if err := m.txQueue.LockInprogress(id); err != nil {
log.Info("finally completed transaction", "id", queuedTx.ID, "hash", hash, "err", err) log.Warn("can't process transaction", "err", err)
return hash, err
queuedTx.Hash = hash }
queuedTx.Err = err account, err := m.validateAccount(tx)
queuedTx.Done <- struct{}{} if err != nil {
m.txDone(tx, hash, err)
return hash, err
}
// Send the transaction finally.
hash, err = m.completeTransaction(tx, account, password)
log.Info("finally completed transaction", "id", tx.ID, "hash", hash, "err", err)
m.txDone(tx, hash, err)
return hash, err return hash, err
} }
func (m *Manager) completeTransaction(queuedTx *common.QueuedTx, selectedAccount *common.SelectedExtKey, password string) (gethcommon.Hash, error) { func (m *Manager) validateAccount(tx *common.QueuedTx) (*common.SelectedExtKey, error) {
selectedAccount, err := m.accountManager.SelectedAccount()
if err != nil {
log.Warn("failed to get a selected account", "err", err)
return nil, err
}
// make sure that only account which created the tx can complete it
if tx.Args.From.Hex() != selectedAccount.Address.Hex() {
log.Warn("queued transaction does not belong to the selected account", "err", queue.ErrInvalidCompleteTxSender)
return nil, queue.ErrInvalidCompleteTxSender
}
return selectedAccount, nil
}
func (m *Manager) completeTransaction(queuedTx *common.QueuedTx, selectedAccount *common.SelectedExtKey, password string) (hash gethcommon.Hash, err error) {
log.Info("complete transaction", "id", queuedTx.ID) log.Info("complete transaction", "id", queuedTx.ID)
var emptyHash gethcommon.Hash log.Info("verifying account password for transaction", "id", queuedTx.ID)
config, err := m.nodeManager.NodeConfig() config, err := m.nodeManager.NodeConfig()
if err != nil { if err != nil {
return emptyHash, err return hash, err
} }
_, err = m.accountManager.VerifyAccountPassword(config.KeyStoreDir, selectedAccount.Address.String(), password) _, err = m.accountManager.VerifyAccountPassword(config.KeyStoreDir, selectedAccount.Address.String(), password)
if err != nil { if err != nil {
log.Warn("failed to verify account", "account", selectedAccount.Address.String(), "error", err.Error()) log.Warn("failed to verify account", "account", selectedAccount.Address.String(), "error", err.Error())
return emptyHash, err return hash, err
} }
// update transaction with nonce, gas price and gas estimates
ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout)
defer cancel()
m.addrLock.LockAddr(queuedTx.Args.From) m.addrLock.LockAddr(queuedTx.Args.From)
defer m.addrLock.UnlockAddr(queuedTx.Args.From) defer m.addrLock.UnlockAddr(queuedTx.Args.From)
ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout)
defer cancel()
nonce, err := m.ethTxClient.PendingNonceAt(ctx, queuedTx.Args.From) nonce, err := m.ethTxClient.PendingNonceAt(ctx, queuedTx.Args.From)
if err != nil { if err != nil {
return emptyHash, err return hash, err
} }
args := queuedTx.Args args := queuedTx.Args
gasPrice := (*big.Int)(args.GasPrice) gasPrice := (*big.Int)(args.GasPrice)
@ -212,7 +170,7 @@ func (m *Manager) completeTransaction(queuedTx *common.QueuedTx, selectedAccount
defer cancel() defer cancel()
gasPrice, err = m.ethTxClient.SuggestGasPrice(ctx) gasPrice, err = m.ethTxClient.SuggestGasPrice(ctx)
if err != nil { if err != nil {
return emptyHash, err return hash, err
} }
} }
@ -223,6 +181,7 @@ func (m *Manager) completeTransaction(queuedTx *common.QueuedTx, selectedAccount
if args.To != nil { if args.To != nil {
toAddr = *args.To toAddr = *args.To
} }
gas := (*big.Int)(args.Gas) gas := (*big.Int)(args.Gas)
if args.Gas == nil { if args.Gas == nil {
ctx, cancel = context.WithTimeout(context.Background(), defaultTimeout) ctx, cancel = context.WithTimeout(context.Background(), defaultTimeout)
@ -235,7 +194,7 @@ func (m *Manager) completeTransaction(queuedTx *common.QueuedTx, selectedAccount
Data: data, Data: data,
}) })
if err != nil { if err != nil {
return emptyHash, err return hash, err
} }
if gas.Cmp(big.NewInt(defaultGas)) == -1 { if gas.Cmp(big.NewInt(defaultGas)) == -1 {
log.Info("default gas will be used. estimated gas", gas, "is lower than", defaultGas) log.Info("default gas will be used. estimated gas", gas, "is lower than", defaultGas)
@ -254,12 +213,12 @@ func (m *Manager) completeTransaction(queuedTx *common.QueuedTx, selectedAccount
tx := types.NewTransaction(nonce, toAddr, value, gas, gasPrice, data) tx := types.NewTransaction(nonce, toAddr, value, gas, gasPrice, data)
signedTx, err := types.SignTx(tx, types.NewEIP155Signer(chainID), selectedAccount.AccountKey.PrivateKey) signedTx, err := types.SignTx(tx, types.NewEIP155Signer(chainID), selectedAccount.AccountKey.PrivateKey)
if err != nil { if err != nil {
return emptyHash, err return hash, err
} }
ctx, cancel = context.WithTimeout(context.Background(), defaultTimeout) ctx, cancel = context.WithTimeout(context.Background(), defaultTimeout)
defer cancel() defer cancel()
if err := m.ethTxClient.SendTransaction(ctx, signedTx); err != nil { if err := m.ethTxClient.SendTransaction(ctx, signedTx); err != nil {
return emptyHash, err return hash, err
} }
return signedTx.Hash(), nil return signedTx.Hash(), nil
} }
@ -267,7 +226,6 @@ func (m *Manager) completeTransaction(queuedTx *common.QueuedTx, selectedAccount
// CompleteTransactions instructs backend to complete sending of multiple transactions // CompleteTransactions instructs backend to complete sending of multiple transactions
func (m *Manager) CompleteTransactions(ids []common.QueuedTxID, password string) map[common.QueuedTxID]common.RawCompleteTransactionResult { func (m *Manager) CompleteTransactions(ids []common.QueuedTxID, password string) map[common.QueuedTxID]common.RawCompleteTransactionResult {
results := make(map[common.QueuedTxID]common.RawCompleteTransactionResult) results := make(map[common.QueuedTxID]common.RawCompleteTransactionResult)
for _, txID := range ids { for _, txID := range ids {
txHash, txErr := m.CompleteTransaction(txID, password) txHash, txErr := m.CompleteTransaction(txID, password)
results[txID] = common.RawCompleteTransactionResult{ results[txID] = common.RawCompleteTransactionResult{
@ -275,25 +233,20 @@ func (m *Manager) CompleteTransactions(ids []common.QueuedTxID, password string)
Error: txErr, Error: txErr,
} }
} }
return results return results
} }
// DiscardTransaction discards a given transaction from transaction queue // DiscardTransaction discards a given transaction from transaction queue
func (m *Manager) DiscardTransaction(id common.QueuedTxID) error { func (m *Manager) DiscardTransaction(id common.QueuedTxID) error {
queuedTx, err := m.txQueue.Get(id) tx, err := m.txQueue.Get(id)
if err != nil { if err != nil {
return err return err
} }
err = m.txQueue.Done(id, gethcommon.Hash{}, queue.ErrQueuedTxDiscarded)
// remove from queue, before notifying SendTransaction if m.notify {
m.txQueue.Remove(queuedTx.ID) NotifyOnReturn(tx)
}
// allow SendTransaction to return return err
queuedTx.Err = ErrQueuedTxDiscarded
queuedTx.Discard <- struct{}{} // sendTransaction() waits on this, notify so that it can return
return nil
} }
// DiscardTransactions discards given multiple transactions from transaction queue // DiscardTransactions discards given multiple transactions from transaction queue
@ -312,84 +265,6 @@ func (m *Manager) DiscardTransactions(ids []common.QueuedTxID) map[common.Queued
return results return results
} }
// SendTransactionEvent is a signal sent on a send transaction request
type SendTransactionEvent struct {
ID string `json:"id"`
Args common.SendTxArgs `json:"args"`
MessageID string `json:"message_id"`
}
// TransactionQueueHandler returns handler that processes incoming tx queue requests
func (m *Manager) TransactionQueueHandler() func(queuedTx *common.QueuedTx) {
return func(queuedTx *common.QueuedTx) {
log.Info("calling TransactionQueueHandler")
signal.Send(signal.Envelope{
Type: EventTransactionQueued,
Event: SendTransactionEvent{
ID: string(queuedTx.ID),
Args: queuedTx.Args,
MessageID: common.MessageIDFromContext(queuedTx.Context),
},
})
}
}
// SetTransactionQueueHandler sets a handler that will be called
// when a new transaction is enqueued.
func (m *Manager) SetTransactionQueueHandler(fn common.EnqueuedTxHandler) {
m.txQueue.SetEnqueueHandler(fn)
}
// ReturnSendTransactionEvent is a JSON returned whenever transaction send is returned
type ReturnSendTransactionEvent struct {
ID string `json:"id"`
Args common.SendTxArgs `json:"args"`
MessageID string `json:"message_id"`
ErrorMessage string `json:"error_message"`
ErrorCode string `json:"error_code"`
}
// TransactionReturnHandler returns handler that processes responses from internal tx manager
func (m *Manager) TransactionReturnHandler() func(queuedTx *common.QueuedTx, err error) {
return func(queuedTx *common.QueuedTx, err error) {
if err == nil {
return
}
// discard notifications with empty tx
if queuedTx == nil {
return
}
// error occurred, signal up to application
signal.Send(signal.Envelope{
Type: EventTransactionFailed,
Event: ReturnSendTransactionEvent{
ID: string(queuedTx.ID),
Args: queuedTx.Args,
MessageID: common.MessageIDFromContext(queuedTx.Context),
ErrorMessage: err.Error(),
ErrorCode: m.sendTransactionErrorCode(err),
},
})
}
}
func (m *Manager) sendTransactionErrorCode(err error) string {
if code, ok := txReturnCodes[err]; ok {
return code
}
return SendTxDefaultErrorCode
}
// SetTransactionReturnHandler sets a handler that will be called
// when a transaction is about to return or when a recoverable error occurred.
// Recoverable error is, for instance, wrong password.
func (m *Manager) SetTransactionReturnHandler(fn common.EnqueuedTxReturnHandler) {
m.txQueue.SetTxReturnHandler(fn)
}
// SendTransactionRPCHandler is a handler for eth_sendTransaction method. // SendTransactionRPCHandler is a handler for eth_sendTransaction method.
// It accepts one param which is a slice with a map of transaction params. // It accepts one param which is a slice with a map of transaction params.
func (m *Manager) SendTransactionRPCHandler(ctx context.Context, args ...interface{}) (interface{}, error) { func (m *Manager) SendTransactionRPCHandler(ctx context.Context, args ...interface{}) (interface{}, error) {
@ -398,8 +273,7 @@ func (m *Manager) SendTransactionRPCHandler(ctx context.Context, args ...interfa
// TODO(adam): it's a hack to parse arguments as common.RPCCall can do that. // TODO(adam): it's a hack to parse arguments as common.RPCCall can do that.
// We should refactor parsing these params to a separate struct. // We should refactor parsing these params to a separate struct.
rpcCall := common.RPCCall{Params: args} rpcCall := common.RPCCall{Params: args}
tx := common.CreateTransaction(ctx, rpcCall.ToSendTxArgs())
tx := m.CreateTransaction(ctx, rpcCall.ToSendTxArgs())
if err := m.QueueTransaction(tx); err != nil { if err := m.QueueTransaction(tx); err != nil {
return nil, err return nil, err

View File

@ -1,4 +1,4 @@
package txqueue package transactions
import ( import (
"context" "context"
@ -18,7 +18,8 @@ import (
"github.com/status-im/status-go/geth/common" "github.com/status-im/status-go/geth/common"
"github.com/status-im/status-go/geth/params" "github.com/status-im/status-go/geth/params"
"github.com/status-im/status-go/geth/rpc" "github.com/status-im/status-go/geth/rpc"
"github.com/status-im/status-go/geth/txqueue/fake" "github.com/status-im/status-go/geth/transactions/fake"
"github.com/status-im/status-go/geth/transactions/queue"
. "github.com/status-im/status-go/testing" . "github.com/status-im/status-go/testing"
) )
@ -93,21 +94,10 @@ func (s *TxQueueTestSuite) TestCompleteTransaction() {
txQueueManager.Start() txQueueManager.Start()
defer txQueueManager.Stop() defer txQueueManager.Stop()
tx := txQueueManager.CreateTransaction(context.Background(), common.SendTxArgs{ tx := common.CreateTransaction(context.Background(), common.SendTxArgs{
From: common.FromAddress(TestConfig.Account1.Address), From: common.FromAddress(TestConfig.Account1.Address),
To: common.ToAddress(TestConfig.Account2.Address), To: common.ToAddress(TestConfig.Account2.Address),
}) })
// TransactionQueueHandler is required to enqueue a transaction.
txQueueManager.SetTransactionQueueHandler(func(queuedTx *common.QueuedTx) {
s.Equal(tx.ID, queuedTx.ID)
})
txQueueManager.SetTransactionReturnHandler(func(queuedTx *common.QueuedTx, err error) {
s.Equal(tx.ID, queuedTx.ID)
s.NoError(err)
})
err := txQueueManager.QueueTransaction(tx) err := txQueueManager.QueueTransaction(tx)
s.NoError(err) s.NoError(err)
@ -142,25 +132,15 @@ func (s *TxQueueTestSuite) TestCompleteTransactionMultipleTimes() {
s.setupTransactionPoolAPI(account, nonce, gas, nil) s.setupTransactionPoolAPI(account, nonce, gas, nil)
txQueueManager := NewManager(s.nodeManagerMock, s.accountManagerMock) txQueueManager := NewManager(s.nodeManagerMock, s.accountManagerMock)
txQueueManager.DisableNotificactions()
txQueueManager.Start() txQueueManager.Start()
defer txQueueManager.Stop() defer txQueueManager.Stop()
tx := txQueueManager.CreateTransaction(context.Background(), common.SendTxArgs{ tx := common.CreateTransaction(context.Background(), common.SendTxArgs{
From: common.FromAddress(TestConfig.Account1.Address), From: common.FromAddress(TestConfig.Account1.Address),
To: common.ToAddress(TestConfig.Account2.Address), To: common.ToAddress(TestConfig.Account2.Address),
}) })
// TransactionQueueHandler is required to enqueue a transaction.
txQueueManager.SetTransactionQueueHandler(func(queuedTx *common.QueuedTx) {
s.Equal(tx.ID, queuedTx.ID)
})
txQueueManager.SetTransactionReturnHandler(func(queuedTx *common.QueuedTx, err error) {
s.Equal(tx.ID, queuedTx.ID)
s.NoError(err)
})
err := txQueueManager.QueueTransaction(tx) err := txQueueManager.QueueTransaction(tx)
s.NoError(err) s.NoError(err)
@ -179,7 +159,7 @@ func (s *TxQueueTestSuite) TestCompleteTransactionMultipleTimes() {
mu.Lock() mu.Lock()
if err == nil { if err == nil {
completedTx++ completedTx++
} else if err == ErrQueuedTxInProgress { } else if err == queue.ErrQueuedTxInProgress {
inprogressTx++ inprogressTx++
} else { } else {
s.Fail("tx failed with unexpected error: ", err.Error()) s.Fail("tx failed with unexpected error: ", err.Error())
@ -207,33 +187,21 @@ func (s *TxQueueTestSuite) TestAccountMismatch() {
}, nil) }, nil)
txQueueManager := NewManager(s.nodeManagerMock, s.accountManagerMock) txQueueManager := NewManager(s.nodeManagerMock, s.accountManagerMock)
txQueueManager.DisableNotificactions()
txQueueManager.Start() txQueueManager.Start()
defer txQueueManager.Stop() defer txQueueManager.Stop()
tx := txQueueManager.CreateTransaction(context.Background(), common.SendTxArgs{ tx := common.CreateTransaction(context.Background(), common.SendTxArgs{
From: common.FromAddress(TestConfig.Account1.Address), From: common.FromAddress(TestConfig.Account1.Address),
To: common.ToAddress(TestConfig.Account2.Address), To: common.ToAddress(TestConfig.Account2.Address),
}) })
// TransactionQueueHandler is required to enqueue a transaction.
txQueueManager.SetTransactionQueueHandler(func(queuedTx *common.QueuedTx) {
s.Equal(tx.ID, queuedTx.ID)
})
// Missmatched address is a recoverable error, that's why
// the return handler is called.
txQueueManager.SetTransactionReturnHandler(func(queuedTx *common.QueuedTx, err error) {
s.Equal(tx.ID, queuedTx.ID)
s.Equal(ErrInvalidCompleteTxSender, err)
s.Nil(tx.Err)
})
err := txQueueManager.QueueTransaction(tx) err := txQueueManager.QueueTransaction(tx)
s.NoError(err) s.NoError(err)
_, err = txQueueManager.CompleteTransaction(tx.ID, TestConfig.Account1.Password) _, err = txQueueManager.CompleteTransaction(tx.ID, TestConfig.Account1.Password)
s.Equal(err, ErrInvalidCompleteTxSender) s.Equal(err, queue.ErrInvalidCompleteTxSender)
// Transaction should stay in the queue as mismatched accounts // Transaction should stay in the queue as mismatched accounts
// is a recoverable error. // is a recoverable error.
@ -250,28 +218,15 @@ func (s *TxQueueTestSuite) TestInvalidPassword() {
s.setupStatusBackend(account, password, keystore.ErrDecrypt) s.setupStatusBackend(account, password, keystore.ErrDecrypt)
txQueueManager := NewManager(s.nodeManagerMock, s.accountManagerMock) txQueueManager := NewManager(s.nodeManagerMock, s.accountManagerMock)
txQueueManager.DisableNotificactions()
txQueueManager.Start() txQueueManager.Start()
defer txQueueManager.Stop() defer txQueueManager.Stop()
tx := txQueueManager.CreateTransaction(context.Background(), common.SendTxArgs{ tx := common.CreateTransaction(context.Background(), common.SendTxArgs{
From: common.FromAddress(TestConfig.Account1.Address), From: common.FromAddress(TestConfig.Account1.Address),
To: common.ToAddress(TestConfig.Account2.Address), To: common.ToAddress(TestConfig.Account2.Address),
}) })
// TransactionQueueHandler is required to enqueue a transaction.
txQueueManager.SetTransactionQueueHandler(func(queuedTx *common.QueuedTx) {
s.Equal(tx.ID, queuedTx.ID)
})
// Missmatched address is a revocable error, that's why
// the return handler is called.
txQueueManager.SetTransactionReturnHandler(func(queuedTx *common.QueuedTx, err error) {
s.Equal(tx.ID, queuedTx.ID)
s.Equal(keystore.ErrDecrypt, err)
s.Nil(tx.Err)
})
err := txQueueManager.QueueTransaction(tx) err := txQueueManager.QueueTransaction(tx)
s.NoError(err) s.NoError(err)
@ -285,39 +240,29 @@ func (s *TxQueueTestSuite) TestInvalidPassword() {
func (s *TxQueueTestSuite) TestDiscardTransaction() { func (s *TxQueueTestSuite) TestDiscardTransaction() {
txQueueManager := NewManager(s.nodeManagerMock, s.accountManagerMock) txQueueManager := NewManager(s.nodeManagerMock, s.accountManagerMock)
txQueueManager.DisableNotificactions()
txQueueManager.Start() txQueueManager.Start()
defer txQueueManager.Stop() defer txQueueManager.Stop()
tx := txQueueManager.CreateTransaction(context.Background(), common.SendTxArgs{ tx := common.CreateTransaction(context.Background(), common.SendTxArgs{
From: common.FromAddress(TestConfig.Account1.Address), From: common.FromAddress(TestConfig.Account1.Address),
To: common.ToAddress(TestConfig.Account2.Address), To: common.ToAddress(TestConfig.Account2.Address),
}) })
// TransactionQueueHandler is required to enqueue a transaction.
txQueueManager.SetTransactionQueueHandler(func(queuedTx *common.QueuedTx) {
s.Equal(tx.ID, queuedTx.ID)
})
txQueueManager.SetTransactionReturnHandler(func(queuedTx *common.QueuedTx, err error) {
s.Equal(tx.ID, queuedTx.ID)
s.Equal(ErrQueuedTxDiscarded, err)
})
err := txQueueManager.QueueTransaction(tx) err := txQueueManager.QueueTransaction(tx)
s.NoError(err) s.NoError(err)
w := make(chan struct{}) w := make(chan struct{})
go func() { go func() {
err := txQueueManager.DiscardTransaction(tx.ID) s.NoError(txQueueManager.DiscardTransaction(tx.ID))
s.NoError(err)
close(w) close(w)
}() }()
err = txQueueManager.WaitForTransaction(tx) err = txQueueManager.WaitForTransaction(tx)
s.Equal(ErrQueuedTxDiscarded, err) s.Equal(queue.ErrQueuedTxDiscarded, err)
// Check that error is assigned to the transaction. // Check that error is assigned to the transaction.
s.Equal(ErrQueuedTxDiscarded, tx.Err) s.Equal(queue.ErrQueuedTxDiscarded, tx.Err)
// Transaction should be already removed from the queue. // Transaction should be already removed from the queue.
s.False(txQueueManager.TransactionQueue().Has(tx.ID)) s.False(txQueueManager.TransactionQueue().Has(tx.ID))
s.NoError(WaitClosed(w, time.Second)) s.NoError(WaitClosed(w, time.Second))

View File

@ -34,7 +34,8 @@ import (
"github.com/status-im/status-go/geth/common" "github.com/status-im/status-go/geth/common"
"github.com/status-im/status-go/geth/params" "github.com/status-im/status-go/geth/params"
"github.com/status-im/status-go/geth/signal" "github.com/status-im/status-go/geth/signal"
"github.com/status-im/status-go/geth/txqueue" "github.com/status-im/status-go/geth/transactions"
"github.com/status-im/status-go/geth/transactions/queue"
"github.com/status-im/status-go/static" "github.com/status-im/status-go/static"
. "github.com/status-im/status-go/testing" //nolint: golint . "github.com/status-im/status-go/testing" //nolint: golint
) )
@ -793,7 +794,7 @@ func testCompleteTransaction(t *testing.T) bool {
t.Errorf("cannot unmarshal event's JSON: %s. Error %q", jsonEvent, err) t.Errorf("cannot unmarshal event's JSON: %s. Error %q", jsonEvent, err)
return return
} }
if envelope.Type == txqueue.EventTransactionQueued { if envelope.Type == transactions.EventTransactionQueued {
event := envelope.Event.(map[string]interface{}) event := envelope.Event.(map[string]interface{})
t.Logf("transaction queued (will be completed shortly): {id: %s}\n", event["id"].(string)) t.Logf("transaction queued (will be completed shortly): {id: %s}\n", event["id"].(string))
@ -871,7 +872,7 @@ func testCompleteMultipleQueuedTransactions(t *testing.T) bool { //nolint: gocyc
t.Errorf("cannot unmarshal event's JSON: %s", jsonEvent) t.Errorf("cannot unmarshal event's JSON: %s", jsonEvent)
return return
} }
if envelope.Type == txqueue.EventTransactionQueued { if envelope.Type == transactions.EventTransactionQueued {
event := envelope.Event.(map[string]interface{}) event := envelope.Event.(map[string]interface{})
txID = event["id"].(string) txID = event["id"].(string)
t.Logf("transaction queued (will be completed in a single call, once aggregated): {id: %s}\n", txID) t.Logf("transaction queued (will be completed in a single call, once aggregated): {id: %s}\n", txID)
@ -918,7 +919,7 @@ func testCompleteMultipleQueuedTransactions(t *testing.T) bool { //nolint: gocyc
} }
results := resultsStruct.Results results := resultsStruct.Results
if len(results) != (testTxCount+1) || results["invalid-tx-id"].Error != txqueue.ErrQueuedTxIDNotFound.Error() { if len(results) != (testTxCount+1) || results["invalid-tx-id"].Error != queue.ErrQueuedTxIDNotFound.Error() {
t.Errorf("cannot complete txs: %v", results) t.Errorf("cannot complete txs: %v", results)
return return
} }
@ -1004,7 +1005,7 @@ func testDiscardTransaction(t *testing.T) bool { //nolint: gocyclo
t.Errorf("cannot unmarshal event's JSON: %s", jsonEvent) t.Errorf("cannot unmarshal event's JSON: %s", jsonEvent)
return return
} }
if envelope.Type == txqueue.EventTransactionQueued { if envelope.Type == transactions.EventTransactionQueued {
event := envelope.Event.(map[string]interface{}) event := envelope.Event.(map[string]interface{})
txID = event["id"].(string) txID = event["id"].(string)
t.Logf("transaction queued (will be discarded soon): {id: %s}\n", txID) t.Logf("transaction queued (will be discarded soon): {id: %s}\n", txID)
@ -1029,7 +1030,7 @@ func testDiscardTransaction(t *testing.T) bool { //nolint: gocyclo
// try completing discarded transaction // try completing discarded transaction
_, err := statusAPI.CompleteTransaction(common.QueuedTxID(txID), TestConfig.Account1.Password) _, err := statusAPI.CompleteTransaction(common.QueuedTxID(txID), TestConfig.Account1.Password)
if err != txqueue.ErrQueuedTxIDNotFound { if err != queue.ErrQueuedTxIDNotFound {
t.Error("expects tx not found, but call to CompleteTransaction succeeded") t.Error("expects tx not found, but call to CompleteTransaction succeeded")
return return
} }
@ -1043,19 +1044,19 @@ func testDiscardTransaction(t *testing.T) bool { //nolint: gocyclo
completeQueuedTransaction <- struct{}{} // so that timeout is aborted completeQueuedTransaction <- struct{}{} // so that timeout is aborted
} }
if envelope.Type == txqueue.EventTransactionFailed { if envelope.Type == transactions.EventTransactionFailed {
event := envelope.Event.(map[string]interface{}) event := envelope.Event.(map[string]interface{})
t.Logf("transaction return event received: {id: %s}\n", event["id"].(string)) t.Logf("transaction return event received: {id: %s}\n", event["id"].(string))
receivedErrMessage := event["error_message"].(string) receivedErrMessage := event["error_message"].(string)
expectedErrMessage := txqueue.ErrQueuedTxDiscarded.Error() expectedErrMessage := queue.ErrQueuedTxDiscarded.Error()
if receivedErrMessage != expectedErrMessage { if receivedErrMessage != expectedErrMessage {
t.Errorf("unexpected error message received: got %v", receivedErrMessage) t.Errorf("unexpected error message received: got %v", receivedErrMessage)
return return
} }
receivedErrCode := event["error_code"].(string) receivedErrCode := event["error_code"].(string)
if receivedErrCode != txqueue.SendTransactionDiscardedErrorCode { if receivedErrCode != strconv.Itoa(transactions.SendTransactionDiscardedErrorCode) {
t.Errorf("unexpected error code received: got %v", receivedErrCode) t.Errorf("unexpected error code received: got %v", receivedErrCode)
return return
} }
@ -1070,7 +1071,7 @@ func testDiscardTransaction(t *testing.T) bool { //nolint: gocyclo
To: common.ToAddress(TestConfig.Account2.Address), To: common.ToAddress(TestConfig.Account2.Address),
Value: (*hexutil.Big)(big.NewInt(1000000000000)), Value: (*hexutil.Big)(big.NewInt(1000000000000)),
}) })
if err != txqueue.ErrQueuedTxDiscarded { if err != queue.ErrQueuedTxDiscarded {
t.Errorf("expected error not thrown: %v", err) t.Errorf("expected error not thrown: %v", err)
return false return false
} }
@ -1117,7 +1118,7 @@ func testDiscardMultipleQueuedTransactions(t *testing.T) bool { //nolint: gocycl
t.Errorf("cannot unmarshal event's JSON: %s", jsonEvent) t.Errorf("cannot unmarshal event's JSON: %s", jsonEvent)
return return
} }
if envelope.Type == txqueue.EventTransactionQueued { if envelope.Type == transactions.EventTransactionQueued {
event := envelope.Event.(map[string]interface{}) event := envelope.Event.(map[string]interface{})
txID = event["id"].(string) txID = event["id"].(string)
t.Logf("transaction queued (will be discarded soon): {id: %s}\n", txID) t.Logf("transaction queued (will be discarded soon): {id: %s}\n", txID)
@ -1130,19 +1131,19 @@ func testDiscardMultipleQueuedTransactions(t *testing.T) bool { //nolint: gocycl
txIDs <- txID txIDs <- txID
} }
if envelope.Type == txqueue.EventTransactionFailed { if envelope.Type == transactions.EventTransactionFailed {
event := envelope.Event.(map[string]interface{}) event := envelope.Event.(map[string]interface{})
t.Logf("transaction return event received: {id: %s}\n", event["id"].(string)) t.Logf("transaction return event received: {id: %s}\n", event["id"].(string))
receivedErrMessage := event["error_message"].(string) receivedErrMessage := event["error_message"].(string)
expectedErrMessage := txqueue.ErrQueuedTxDiscarded.Error() expectedErrMessage := queue.ErrQueuedTxDiscarded.Error()
if receivedErrMessage != expectedErrMessage { if receivedErrMessage != expectedErrMessage {
t.Errorf("unexpected error message received: got %v", receivedErrMessage) t.Errorf("unexpected error message received: got %v", receivedErrMessage)
return return
} }
receivedErrCode := event["error_code"].(string) receivedErrCode := event["error_code"].(string)
if receivedErrCode != txqueue.SendTransactionDiscardedErrorCode { if receivedErrCode != strconv.Itoa(transactions.SendTransactionDiscardedErrorCode) {
t.Errorf("unexpected error code received: got %v", receivedErrCode) t.Errorf("unexpected error code received: got %v", receivedErrCode)
return return
} }
@ -1161,7 +1162,7 @@ func testDiscardMultipleQueuedTransactions(t *testing.T) bool { //nolint: gocycl
To: common.ToAddress(TestConfig.Account2.Address), To: common.ToAddress(TestConfig.Account2.Address),
Value: (*hexutil.Big)(big.NewInt(1000000000000)), Value: (*hexutil.Big)(big.NewInt(1000000000000)),
}) })
if err != txqueue.ErrQueuedTxDiscarded { if err != queue.ErrQueuedTxDiscarded {
t.Errorf("expected error not thrown: %v", err) t.Errorf("expected error not thrown: %v", err)
return return
} }
@ -1192,7 +1193,7 @@ func testDiscardMultipleQueuedTransactions(t *testing.T) bool { //nolint: gocycl
} }
discardResults := discardResultsStruct.Results discardResults := discardResultsStruct.Results
if len(discardResults) != 1 || discardResults["invalid-tx-id"].Error != txqueue.ErrQueuedTxIDNotFound.Error() { if len(discardResults) != 1 || discardResults["invalid-tx-id"].Error != queue.ErrQueuedTxIDNotFound.Error() {
t.Errorf("cannot discard txs: %v", discardResults) t.Errorf("cannot discard txs: %v", discardResults)
return return
} }
@ -1214,7 +1215,7 @@ func testDiscardMultipleQueuedTransactions(t *testing.T) bool { //nolint: gocycl
t.Errorf("tx id not set in result: expected id is %s", txID) t.Errorf("tx id not set in result: expected id is %s", txID)
return return
} }
if txResult.Error != txqueue.ErrQueuedTxIDNotFound.Error() { if txResult.Error != queue.ErrQueuedTxIDNotFound.Error() {
t.Errorf("invalid error for %s", txResult.Hash) t.Errorf("invalid error for %s", txResult.Hash)
return return
} }
@ -1431,7 +1432,7 @@ func startTestNode(t *testing.T) <-chan struct{} {
return return
} }
if envelope.Type == txqueue.EventTransactionQueued { if envelope.Type == transactions.EventTransactionQueued {
} }
if envelope.Type == signal.EventNodeStarted { if envelope.Type == signal.EventNodeStarted {
t.Log("Node started, but we wait till it be ready") t.Log("Node started, but we wait till it be ready")