Move transaction queue code into separate package (#365)
This commit is contained in:
parent
d625ddacfd
commit
9cc5fd2112
|
@ -21,10 +21,10 @@ import (
|
|||
|
||||
"github.com/status-im/status-go/geth/account"
|
||||
"github.com/status-im/status-go/geth/common"
|
||||
"github.com/status-im/status-go/geth/node"
|
||||
"github.com/status-im/status-go/geth/params"
|
||||
"github.com/status-im/status-go/geth/signal"
|
||||
. "github.com/status-im/status-go/geth/testing"
|
||||
"github.com/status-im/status-go/geth/txqueue"
|
||||
"github.com/status-im/status-go/static"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
@ -746,7 +746,7 @@ func testCompleteTransaction(t *testing.T) bool {
|
|||
t.Errorf("cannot unmarshal event's JSON: %s", jsonEvent)
|
||||
return
|
||||
}
|
||||
if envelope.Type == node.EventTransactionQueued {
|
||||
if envelope.Type == txqueue.EventTransactionQueued {
|
||||
event := envelope.Event.(map[string]interface{})
|
||||
t.Logf("transaction queued (will be completed shortly): {id: %s}\n", event["id"].(string))
|
||||
|
||||
|
@ -824,7 +824,7 @@ func testCompleteMultipleQueuedTransactions(t *testing.T) bool {
|
|||
t.Errorf("cannot unmarshal event's JSON: %s", jsonEvent)
|
||||
return
|
||||
}
|
||||
if envelope.Type == node.EventTransactionQueued {
|
||||
if envelope.Type == txqueue.EventTransactionQueued {
|
||||
event := envelope.Event.(map[string]interface{})
|
||||
txID = event["id"].(string)
|
||||
t.Logf("transaction queued (will be completed in a single call, once aggregated): {id: %s}\n", txID)
|
||||
|
@ -871,7 +871,7 @@ func testCompleteMultipleQueuedTransactions(t *testing.T) bool {
|
|||
}
|
||||
results := resultsStruct.Results
|
||||
|
||||
if len(results) != (testTxCount+1) || results["invalid-tx-id"].Error != node.ErrQueuedTxIDNotFound.Error() {
|
||||
if len(results) != (testTxCount+1) || results["invalid-tx-id"].Error != txqueue.ErrQueuedTxIDNotFound.Error() {
|
||||
t.Errorf("cannot complete txs: %v", results)
|
||||
return
|
||||
}
|
||||
|
@ -957,7 +957,7 @@ func testDiscardTransaction(t *testing.T) bool {
|
|||
t.Errorf("cannot unmarshal event's JSON: %s", jsonEvent)
|
||||
return
|
||||
}
|
||||
if envelope.Type == node.EventTransactionQueued {
|
||||
if envelope.Type == txqueue.EventTransactionQueued {
|
||||
event := envelope.Event.(map[string]interface{})
|
||||
txID = event["id"].(string)
|
||||
t.Logf("transaction queued (will be discarded soon): {id: %s}\n", txID)
|
||||
|
@ -982,7 +982,7 @@ func testDiscardTransaction(t *testing.T) bool {
|
|||
|
||||
// try completing discarded transaction
|
||||
_, err := statusAPI.CompleteTransaction(common.QueuedTxID(txID), TestConfig.Account1.Password)
|
||||
if err != node.ErrQueuedTxIDNotFound {
|
||||
if err != txqueue.ErrQueuedTxIDNotFound {
|
||||
t.Error("expects tx not found, but call to CompleteTransaction succeeded")
|
||||
return
|
||||
}
|
||||
|
@ -996,19 +996,19 @@ func testDiscardTransaction(t *testing.T) bool {
|
|||
completeQueuedTransaction <- struct{}{} // so that timeout is aborted
|
||||
}
|
||||
|
||||
if envelope.Type == node.EventTransactionFailed {
|
||||
if envelope.Type == txqueue.EventTransactionFailed {
|
||||
event := envelope.Event.(map[string]interface{})
|
||||
t.Logf("transaction return event received: {id: %s}\n", event["id"].(string))
|
||||
|
||||
receivedErrMessage := event["error_message"].(string)
|
||||
expectedErrMessage := node.ErrQueuedTxDiscarded.Error()
|
||||
expectedErrMessage := txqueue.ErrQueuedTxDiscarded.Error()
|
||||
if receivedErrMessage != expectedErrMessage {
|
||||
t.Errorf("unexpected error message received: got %v", receivedErrMessage)
|
||||
return
|
||||
}
|
||||
|
||||
receivedErrCode := event["error_code"].(string)
|
||||
if receivedErrCode != node.SendTransactionDiscardedErrorCode {
|
||||
if receivedErrCode != txqueue.SendTransactionDiscardedErrorCode {
|
||||
t.Errorf("unexpected error code received: got %v", receivedErrCode)
|
||||
return
|
||||
}
|
||||
|
@ -1023,7 +1023,7 @@ func testDiscardTransaction(t *testing.T) bool {
|
|||
To: common.ToAddress(TestConfig.Account2.Address),
|
||||
Value: (*hexutil.Big)(big.NewInt(1000000000000)),
|
||||
})
|
||||
if err != node.ErrQueuedTxDiscarded {
|
||||
if err != txqueue.ErrQueuedTxDiscarded {
|
||||
t.Errorf("expected error not thrown: %v", err)
|
||||
return false
|
||||
}
|
||||
|
@ -1070,7 +1070,7 @@ func testDiscardMultipleQueuedTransactions(t *testing.T) bool {
|
|||
t.Errorf("cannot unmarshal event's JSON: %s", jsonEvent)
|
||||
return
|
||||
}
|
||||
if envelope.Type == node.EventTransactionQueued {
|
||||
if envelope.Type == txqueue.EventTransactionQueued {
|
||||
event := envelope.Event.(map[string]interface{})
|
||||
txID = event["id"].(string)
|
||||
t.Logf("transaction queued (will be discarded soon): {id: %s}\n", txID)
|
||||
|
@ -1083,19 +1083,19 @@ func testDiscardMultipleQueuedTransactions(t *testing.T) bool {
|
|||
txIDs <- txID
|
||||
}
|
||||
|
||||
if envelope.Type == node.EventTransactionFailed {
|
||||
if envelope.Type == txqueue.EventTransactionFailed {
|
||||
event := envelope.Event.(map[string]interface{})
|
||||
t.Logf("transaction return event received: {id: %s}\n", event["id"].(string))
|
||||
|
||||
receivedErrMessage := event["error_message"].(string)
|
||||
expectedErrMessage := node.ErrQueuedTxDiscarded.Error()
|
||||
expectedErrMessage := txqueue.ErrQueuedTxDiscarded.Error()
|
||||
if receivedErrMessage != expectedErrMessage {
|
||||
t.Errorf("unexpected error message received: got %v", receivedErrMessage)
|
||||
return
|
||||
}
|
||||
|
||||
receivedErrCode := event["error_code"].(string)
|
||||
if receivedErrCode != node.SendTransactionDiscardedErrorCode {
|
||||
if receivedErrCode != txqueue.SendTransactionDiscardedErrorCode {
|
||||
t.Errorf("unexpected error code received: got %v", receivedErrCode)
|
||||
return
|
||||
}
|
||||
|
@ -1114,7 +1114,7 @@ func testDiscardMultipleQueuedTransactions(t *testing.T) bool {
|
|||
To: common.ToAddress(TestConfig.Account2.Address),
|
||||
Value: (*hexutil.Big)(big.NewInt(1000000000000)),
|
||||
})
|
||||
if err != node.ErrQueuedTxDiscarded {
|
||||
if err != txqueue.ErrQueuedTxDiscarded {
|
||||
t.Errorf("expected error not thrown: %v", err)
|
||||
return
|
||||
}
|
||||
|
@ -1145,7 +1145,7 @@ func testDiscardMultipleQueuedTransactions(t *testing.T) bool {
|
|||
}
|
||||
discardResults := discardResultsStruct.Results
|
||||
|
||||
if len(discardResults) != 1 || discardResults["invalid-tx-id"].Error != node.ErrQueuedTxIDNotFound.Error() {
|
||||
if len(discardResults) != 1 || discardResults["invalid-tx-id"].Error != txqueue.ErrQueuedTxIDNotFound.Error() {
|
||||
t.Errorf("cannot discard txs: %v", discardResults)
|
||||
return
|
||||
}
|
||||
|
@ -1167,7 +1167,7 @@ func testDiscardMultipleQueuedTransactions(t *testing.T) bool {
|
|||
t.Errorf("tx id not set in result: expected id is %s", txID)
|
||||
return
|
||||
}
|
||||
if txResult.Error != node.ErrQueuedTxIDNotFound.Error() {
|
||||
if txResult.Error != txqueue.ErrQueuedTxIDNotFound.Error() {
|
||||
t.Errorf("invalid error for %s", txResult.Hash)
|
||||
return
|
||||
}
|
||||
|
@ -1349,7 +1349,7 @@ func startTestNode(t *testing.T) <-chan struct{} {
|
|||
return
|
||||
}
|
||||
|
||||
if envelope.Type == node.EventTransactionQueued {
|
||||
if envelope.Type == txqueue.EventTransactionQueued {
|
||||
}
|
||||
if envelope.Type == signal.EventNodeStarted {
|
||||
t.Log("Node started, but we wait till it be ready")
|
||||
|
|
|
@ -12,6 +12,7 @@ import (
|
|||
"github.com/status-im/status-go/geth/node"
|
||||
"github.com/status-im/status-go/geth/params"
|
||||
"github.com/status-im/status-go/geth/signal"
|
||||
"github.com/status-im/status-go/geth/txqueue"
|
||||
)
|
||||
|
||||
// StatusBackend implements Status.im service
|
||||
|
@ -31,7 +32,7 @@ func NewStatusBackend() *StatusBackend {
|
|||
|
||||
nodeManager := node.NewNodeManager()
|
||||
accountManager := account.NewManager(nodeManager)
|
||||
txQueueManager := node.NewTxQueueManager(nodeManager, accountManager)
|
||||
txQueueManager := txqueue.NewManager(nodeManager, accountManager)
|
||||
|
||||
return &StatusBackend{
|
||||
nodeManager: nodeManager,
|
||||
|
|
|
@ -14,10 +14,10 @@ import (
|
|||
whisper "github.com/ethereum/go-ethereum/whisper/whisperv5"
|
||||
"github.com/status-im/status-go/geth/common"
|
||||
"github.com/status-im/status-go/geth/log"
|
||||
"github.com/status-im/status-go/geth/node"
|
||||
"github.com/status-im/status-go/geth/params"
|
||||
"github.com/status-im/status-go/geth/signal"
|
||||
. "github.com/status-im/status-go/geth/testing"
|
||||
"github.com/status-im/status-go/geth/txqueue"
|
||||
"github.com/status-im/status-go/static"
|
||||
)
|
||||
|
||||
|
@ -61,7 +61,7 @@ func (s *BackendTestSuite) TestJailSendQueuedTransaction() {
|
|||
err := json.Unmarshal([]byte(jsonEvent), &envelope)
|
||||
s.NoError(err, fmt.Sprintf("cannot unmarshal JSON: %s", jsonEvent))
|
||||
|
||||
if envelope.Type == node.EventTransactionQueued {
|
||||
if envelope.Type == txqueue.EventTransactionQueued {
|
||||
event := envelope.Event.(map[string]interface{})
|
||||
messageId, ok := event["message_id"].(string)
|
||||
s.True(ok, "Message id is required, but not found")
|
||||
|
@ -221,7 +221,7 @@ func (s *BackendTestSuite) TestContractDeployment() {
|
|||
err = json.Unmarshal([]byte(jsonEvent), &envelope)
|
||||
require.NoError(err, fmt.Sprintf("cannot unmarshal JSON: %s", jsonEvent))
|
||||
|
||||
if envelope.Type == node.EventTransactionQueued {
|
||||
if envelope.Type == txqueue.EventTransactionQueued {
|
||||
// Use s.* for assertions - require leaves the channel unclosed.
|
||||
|
||||
event := envelope.Event.(map[string]interface{})
|
||||
|
@ -724,7 +724,7 @@ func (s *BackendTestSuite) TestJailVMPersistence() {
|
|||
s.T().Errorf("cannot unmarshal event's JSON: %s", jsonEvent)
|
||||
return
|
||||
}
|
||||
if envelope.Type == node.EventTransactionQueued {
|
||||
if envelope.Type == txqueue.EventTransactionQueued {
|
||||
event := envelope.Event.(map[string]interface{})
|
||||
s.T().Logf("Transaction queued (will be completed shortly): {id: %s}\n", event["id"].(string))
|
||||
|
||||
|
|
|
@ -19,6 +19,7 @@ import (
|
|||
"github.com/status-im/status-go/geth/params"
|
||||
"github.com/status-im/status-go/geth/signal"
|
||||
. "github.com/status-im/status-go/geth/testing"
|
||||
"github.com/status-im/status-go/geth/txqueue"
|
||||
"github.com/stretchr/testify/suite"
|
||||
)
|
||||
|
||||
|
@ -285,7 +286,7 @@ func (s *BackendTestSuite) TestCallRPCSendTransaction() {
|
|||
err := json.Unmarshal([]byte(rawSignal), &signal)
|
||||
s.NoError(err)
|
||||
|
||||
if signal.Type == node.EventTransactionQueued {
|
||||
if signal.Type == txqueue.EventTransactionQueued {
|
||||
event := signal.Event.(map[string]interface{})
|
||||
txID := event["id"].(string)
|
||||
txHash, err = s.backend.CompleteTransaction(common.QueuedTxID(txID), TestConfig.Account1.Password)
|
||||
|
@ -343,7 +344,7 @@ func (s *BackendTestSuite) TestCallRPCSendTransactionUpstream() {
|
|||
err := json.Unmarshal([]byte(rawSignal), &signal)
|
||||
s.NoError(err)
|
||||
|
||||
if signal.Type == node.EventTransactionQueued {
|
||||
if signal.Type == txqueue.EventTransactionQueued {
|
||||
event := signal.Event.(map[string]interface{})
|
||||
txID := event["id"].(string)
|
||||
|
||||
|
|
|
@ -13,10 +13,10 @@ import (
|
|||
"github.com/status-im/status-go/geth/account"
|
||||
"github.com/status-im/status-go/geth/common"
|
||||
"github.com/status-im/status-go/geth/log"
|
||||
"github.com/status-im/status-go/geth/node"
|
||||
"github.com/status-im/status-go/geth/params"
|
||||
"github.com/status-im/status-go/geth/signal"
|
||||
. "github.com/status-im/status-go/geth/testing"
|
||||
"github.com/status-im/status-go/geth/txqueue"
|
||||
)
|
||||
|
||||
// FIXME(tiabc): Sometimes it fails due to "no suitable peers found".
|
||||
|
@ -41,7 +41,7 @@ func (s *BackendTestSuite) TestSendContractTx() {
|
|||
err := json.Unmarshal([]byte(jsonEvent), &envelope)
|
||||
s.NoError(err, fmt.Sprintf("cannot unmarshal JSON: %s", jsonEvent))
|
||||
|
||||
if envelope.Type == node.EventTransactionQueued {
|
||||
if envelope.Type == txqueue.EventTransactionQueued {
|
||||
event := envelope.Event.(map[string]interface{})
|
||||
log.Info("transaction queued (will be completed shortly)", "id", event["id"].(string))
|
||||
|
||||
|
@ -67,7 +67,7 @@ func (s *BackendTestSuite) TestSendContractTx() {
|
|||
)
|
||||
s.EqualError(
|
||||
err,
|
||||
node.ErrInvalidCompleteTxSender.Error(),
|
||||
txqueue.ErrInvalidCompleteTxSender.Error(),
|
||||
fmt.Sprintf("expected error on queued transaction[%v] not thrown", event["id"]),
|
||||
)
|
||||
|
||||
|
@ -135,7 +135,7 @@ func (s *BackendTestSuite) TestSendEtherTx() {
|
|||
err := json.Unmarshal([]byte(jsonEvent), &envelope)
|
||||
s.NoError(err, fmt.Sprintf("cannot unmarshal JSON: %s", jsonEvent))
|
||||
|
||||
if envelope.Type == node.EventTransactionQueued {
|
||||
if envelope.Type == txqueue.EventTransactionQueued {
|
||||
event := envelope.Event.(map[string]interface{})
|
||||
log.Info("transaction queued (will be completed shortly)", "id", event["id"].(string))
|
||||
|
||||
|
@ -159,7 +159,7 @@ func (s *BackendTestSuite) TestSendEtherTx() {
|
|||
common.QueuedTxID(event["id"].(string)), TestConfig.Account1.Password)
|
||||
s.EqualError(
|
||||
err,
|
||||
node.ErrInvalidCompleteTxSender.Error(),
|
||||
txqueue.ErrInvalidCompleteTxSender.Error(),
|
||||
fmt.Sprintf("expected error on queued transaction[%v] not thrown", event["id"]),
|
||||
)
|
||||
|
||||
|
@ -215,7 +215,7 @@ func (s *BackendTestSuite) TestSendEtherTxUpstream() {
|
|||
err := json.Unmarshal([]byte(jsonEvent), &envelope)
|
||||
s.NoError(err, "cannot unmarshal JSON: %s", jsonEvent)
|
||||
|
||||
if envelope.Type == node.EventTransactionQueued {
|
||||
if envelope.Type == txqueue.EventTransactionQueued {
|
||||
event := envelope.Event.(map[string]interface{})
|
||||
log.Info("transaction queued (will be completed shortly)", "id", event["id"].(string))
|
||||
|
||||
|
@ -275,7 +275,7 @@ func (s *BackendTestSuite) TestDoubleCompleteQueuedTransactions() {
|
|||
err := json.Unmarshal([]byte(jsonEvent), &envelope)
|
||||
s.NoError(err, fmt.Sprintf("cannot unmarshal JSON: %s", jsonEvent))
|
||||
|
||||
if envelope.Type == node.EventTransactionQueued {
|
||||
if envelope.Type == txqueue.EventTransactionQueued {
|
||||
event := envelope.Event.(map[string]interface{})
|
||||
txID := common.QueuedTxID(event["id"].(string))
|
||||
log.Info("transaction queued (will be failed and completed on the second call)", "id", txID)
|
||||
|
@ -295,7 +295,7 @@ func (s *BackendTestSuite) TestDoubleCompleteQueuedTransactions() {
|
|||
close(completeQueuedTransaction)
|
||||
}
|
||||
|
||||
if envelope.Type == node.EventTransactionFailed {
|
||||
if envelope.Type == txqueue.EventTransactionFailed {
|
||||
event := envelope.Event.(map[string]interface{})
|
||||
log.Info("transaction return event received", "id", event["id"].(string))
|
||||
|
||||
|
@ -357,7 +357,7 @@ func (s *BackendTestSuite) TestDiscardQueuedTransaction() {
|
|||
err := json.Unmarshal([]byte(jsonEvent), &envelope)
|
||||
s.NoError(err, fmt.Sprintf("cannot unmarshal JSON: %s", jsonEvent))
|
||||
|
||||
if envelope.Type == node.EventTransactionQueued {
|
||||
if envelope.Type == txqueue.EventTransactionQueued {
|
||||
event := envelope.Event.(map[string]interface{})
|
||||
txID := common.QueuedTxID(event["id"].(string))
|
||||
log.Info("transaction queued (will be discarded soon)", "id", txID)
|
||||
|
@ -379,12 +379,12 @@ func (s *BackendTestSuite) TestDiscardQueuedTransaction() {
|
|||
close(completeQueuedTransaction)
|
||||
}
|
||||
|
||||
if envelope.Type == node.EventTransactionFailed {
|
||||
if envelope.Type == txqueue.EventTransactionFailed {
|
||||
event := envelope.Event.(map[string]interface{})
|
||||
log.Info("transaction return event received", "id", event["id"].(string))
|
||||
|
||||
receivedErrMessage := event["error_message"].(string)
|
||||
expectedErrMessage := node.ErrQueuedTxDiscarded.Error()
|
||||
expectedErrMessage := txqueue.ErrQueuedTxDiscarded.Error()
|
||||
s.Equal(receivedErrMessage, expectedErrMessage)
|
||||
|
||||
receivedErrCode := event["error_code"].(string)
|
||||
|
@ -400,7 +400,7 @@ func (s *BackendTestSuite) TestDiscardQueuedTransaction() {
|
|||
To: common.ToAddress(TestConfig.Account2.Address),
|
||||
Value: (*hexutil.Big)(big.NewInt(1000000000000)),
|
||||
})
|
||||
s.EqualError(err, node.ErrQueuedTxDiscarded.Error(), "transaction is expected to be discarded")
|
||||
s.EqualError(err, txqueue.ErrQueuedTxDiscarded.Error(), "transaction is expected to be discarded")
|
||||
|
||||
select {
|
||||
case <-completeQueuedTransaction:
|
||||
|
@ -439,7 +439,7 @@ func (s *BackendTestSuite) TestCompleteMultipleQueuedTransactions() {
|
|||
err := json.Unmarshal([]byte(jsonEvent), &envelope)
|
||||
s.NoError(err, fmt.Sprintf("cannot unmarshal JSON: %s", jsonEvent))
|
||||
|
||||
if envelope.Type == node.EventTransactionQueued {
|
||||
if envelope.Type == txqueue.EventTransactionQueued {
|
||||
event := envelope.Event.(map[string]interface{})
|
||||
txID := common.QueuedTxID(event["id"].(string))
|
||||
log.Info("transaction queued (will be completed in a single call, once aggregated)", "id", txID)
|
||||
|
@ -539,7 +539,7 @@ func (s *BackendTestSuite) TestDiscardMultipleQueuedTransactions() {
|
|||
var envelope signal.Envelope
|
||||
err := json.Unmarshal([]byte(jsonEvent), &envelope)
|
||||
s.NoError(err)
|
||||
if envelope.Type == node.EventTransactionQueued {
|
||||
if envelope.Type == txqueue.EventTransactionQueued {
|
||||
event := envelope.Event.(map[string]interface{})
|
||||
txID := common.QueuedTxID(event["id"].(string))
|
||||
log.Info("transaction queued (will be discarded soon)", "id", txID)
|
||||
|
@ -549,12 +549,12 @@ func (s *BackendTestSuite) TestDiscardMultipleQueuedTransactions() {
|
|||
txIDs <- txID
|
||||
}
|
||||
|
||||
if envelope.Type == node.EventTransactionFailed {
|
||||
if envelope.Type == txqueue.EventTransactionFailed {
|
||||
event := envelope.Event.(map[string]interface{})
|
||||
log.Info("transaction return event received", "id", event["id"].(string))
|
||||
|
||||
receivedErrMessage := event["error_message"].(string)
|
||||
expectedErrMessage := node.ErrQueuedTxDiscarded.Error()
|
||||
expectedErrMessage := txqueue.ErrQueuedTxDiscarded.Error()
|
||||
s.Equal(receivedErrMessage, expectedErrMessage)
|
||||
|
||||
receivedErrCode := event["error_code"].(string)
|
||||
|
@ -574,7 +574,7 @@ func (s *BackendTestSuite) TestDiscardMultipleQueuedTransactions() {
|
|||
To: common.ToAddress(TestConfig.Account2.Address),
|
||||
Value: (*hexutil.Big)(big.NewInt(1000000000000)),
|
||||
})
|
||||
s.EqualError(err, node.ErrQueuedTxDiscarded.Error())
|
||||
s.EqualError(err, txqueue.ErrQueuedTxDiscarded.Error())
|
||||
|
||||
s.True(reflect.DeepEqual(txHashCheck, gethcommon.Hash{}), "transaction returned hash, while it shouldn't")
|
||||
}
|
||||
|
@ -649,7 +649,7 @@ func (s *BackendTestSuite) TestNonExistentQueuedTransactions() {
|
|||
// try completing non-existing transaction
|
||||
_, err := s.backend.CompleteTransaction("some-bad-transaction-id", TestConfig.Account1.Password)
|
||||
s.Error(err, "error expected and not received")
|
||||
s.EqualError(err, node.ErrQueuedTxIDNotFound.Error())
|
||||
s.EqualError(err, txqueue.ErrQueuedTxIDNotFound.Error())
|
||||
}
|
||||
|
||||
func (s *BackendTestSuite) TestEvictionOfQueuedTransactions() {
|
||||
|
@ -670,7 +670,7 @@ func (s *BackendTestSuite) TestEvictionOfQueuedTransactions() {
|
|||
|
||||
txQueue := s.backend.TxQueueManager().TransactionQueue()
|
||||
var i = 0
|
||||
txIDs := [node.DefaultTxQueueCap + 5 + 10]common.QueuedTxID{}
|
||||
txIDs := [txqueue.DefaultTxQueueCap + 5 + 10]common.QueuedTxID{}
|
||||
s.backend.TxQueueManager().SetTransactionQueueHandler(func(queuedTx *common.QueuedTx) {
|
||||
log.Info("tx enqueued", "i", i+1, "queue size", txQueue.Count(), "id", queuedTx.ID)
|
||||
txIDs[i] = queuedTx.ID
|
||||
|
@ -685,16 +685,16 @@ func (s *BackendTestSuite) TestEvictionOfQueuedTransactions() {
|
|||
time.Sleep(2 * time.Second) // FIXME(tiabc): more reliable synchronization to ensure all transactions are enqueued
|
||||
|
||||
log.Info(fmt.Sprintf("Number of transactions queued: %d. Queue size (shouldn't be more than %d): %d",
|
||||
i, node.DefaultTxQueueCap, txQueue.Count()))
|
||||
i, txqueue.DefaultTxQueueCap, txQueue.Count()))
|
||||
|
||||
s.Equal(10, txQueue.Count(), "transaction count should be 10")
|
||||
|
||||
for i := 0; i < node.DefaultTxQueueCap+5; i++ { // stress test by hitting with lots of goroutines
|
||||
for i := 0; i < txqueue.DefaultTxQueueCap+5; i++ { // stress test by hitting with lots of goroutines
|
||||
go s.backend.SendTransaction(nil, common.SendTxArgs{}) // nolint: errcheck
|
||||
}
|
||||
time.Sleep(3 * time.Second)
|
||||
|
||||
require.True(txQueue.Count() <= node.DefaultTxQueueCap, "transaction count should be %d (or %d): got %d", node.DefaultTxQueueCap, node.DefaultTxQueueCap-1, txQueue.Count())
|
||||
require.True(txQueue.Count() <= txqueue.DefaultTxQueueCap, "transaction count should be %d (or %d): got %d", txqueue.DefaultTxQueueCap, txqueue.DefaultTxQueueCap-1, txQueue.Count())
|
||||
|
||||
for _, txID := range txIDs {
|
||||
txQueue.Remove(txID)
|
||||
|
|
|
@ -13,6 +13,7 @@ import (
|
|||
"github.com/status-im/status-go/geth/params"
|
||||
"github.com/status-im/status-go/geth/signal"
|
||||
. "github.com/status-im/status-go/geth/testing"
|
||||
"github.com/status-im/status-go/geth/txqueue"
|
||||
"github.com/status-im/status-go/static"
|
||||
"github.com/stretchr/testify/suite"
|
||||
)
|
||||
|
@ -44,7 +45,7 @@ func (s *JailTestSuite) SetupTest() {
|
|||
accountManager := account.NewManager(nodeManager)
|
||||
require.NotNil(accountManager)
|
||||
|
||||
txQueueManager := node.NewTxQueueManager(nodeManager, accountManager)
|
||||
txQueueManager := txqueue.NewManager(nodeManager, accountManager)
|
||||
|
||||
jail := jail.New(nodeManager, accountManager, txQueueManager)
|
||||
require.NotNil(jail)
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
package node
|
||||
package txqueue
|
||||
|
||||
import (
|
||||
"errors"
|
|
@ -1,4 +1,4 @@
|
|||
package node
|
||||
package txqueue
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
@ -44,16 +44,16 @@ var txReturnCodes = map[error]string{ // deliberately strings, in case more mean
|
|||
ErrQueuedTxDiscarded: SendTransactionDiscardedErrorCode,
|
||||
}
|
||||
|
||||
// TxQueueManager provides means to manage internal Status Backend (injected into LES)
|
||||
type TxQueueManager struct {
|
||||
// Manager provides means to manage internal Status Backend (injected into LES)
|
||||
type Manager struct {
|
||||
nodeManager common.NodeManager
|
||||
accountManager common.AccountManager
|
||||
txQueue *TxQueue
|
||||
}
|
||||
|
||||
// NewTxQueueManager returns a new TxQueueManager.
|
||||
func NewTxQueueManager(nodeManager common.NodeManager, accountManager common.AccountManager) *TxQueueManager {
|
||||
return &TxQueueManager{
|
||||
// NewManager returns a new Manager.
|
||||
func NewManager(nodeManager common.NodeManager, accountManager common.AccountManager) *Manager {
|
||||
return &Manager{
|
||||
nodeManager: nodeManager,
|
||||
accountManager: accountManager,
|
||||
txQueue: NewTransactionQueue(),
|
||||
|
@ -61,24 +61,24 @@ func NewTxQueueManager(nodeManager common.NodeManager, accountManager common.Acc
|
|||
}
|
||||
|
||||
// Start starts accepting new transactions into the queue.
|
||||
func (m *TxQueueManager) Start() {
|
||||
log.Info("start TxQueueManager")
|
||||
func (m *Manager) Start() {
|
||||
log.Info("start Manager")
|
||||
m.txQueue.Start()
|
||||
}
|
||||
|
||||
// Stop stops accepting new transactions into the queue.
|
||||
func (m *TxQueueManager) Stop() {
|
||||
log.Info("stop TxQueueManager")
|
||||
func (m *Manager) Stop() {
|
||||
log.Info("stop Manager")
|
||||
m.txQueue.Stop()
|
||||
}
|
||||
|
||||
// TransactionQueue returns a reference to the queue.
|
||||
func (m *TxQueueManager) TransactionQueue() common.TxQueue {
|
||||
func (m *Manager) TransactionQueue() common.TxQueue {
|
||||
return m.txQueue
|
||||
}
|
||||
|
||||
// CreateTransaction returns a transaction object.
|
||||
func (m *TxQueueManager) CreateTransaction(ctx context.Context, args common.SendTxArgs) *common.QueuedTx {
|
||||
func (m *Manager) CreateTransaction(ctx context.Context, args common.SendTxArgs) *common.QueuedTx {
|
||||
return &common.QueuedTx{
|
||||
ID: common.QueuedTxID(uuid.New()),
|
||||
Hash: gethcommon.Hash{},
|
||||
|
@ -90,7 +90,7 @@ func (m *TxQueueManager) CreateTransaction(ctx context.Context, args common.Send
|
|||
}
|
||||
|
||||
// QueueTransaction puts a transaction into the queue.
|
||||
func (m *TxQueueManager) QueueTransaction(tx *common.QueuedTx) error {
|
||||
func (m *Manager) QueueTransaction(tx *common.QueuedTx) error {
|
||||
to := "<nil>"
|
||||
if tx.Args.To != nil {
|
||||
to = tx.Args.To.Hex()
|
||||
|
@ -102,7 +102,7 @@ func (m *TxQueueManager) QueueTransaction(tx *common.QueuedTx) error {
|
|||
|
||||
// WaitForTransaction adds a transaction to the queue and blocks
|
||||
// until it's completed, discarded or times out.
|
||||
func (m *TxQueueManager) WaitForTransaction(tx *common.QueuedTx) error {
|
||||
func (m *Manager) WaitForTransaction(tx *common.QueuedTx) error {
|
||||
log.Info("wait for transaction", "id", tx.ID)
|
||||
|
||||
// now wait up until transaction is:
|
||||
|
@ -123,14 +123,14 @@ func (m *TxQueueManager) WaitForTransaction(tx *common.QueuedTx) error {
|
|||
}
|
||||
|
||||
// NotifyOnQueuedTxReturn calls a handler when a transaction resolves.
|
||||
func (m *TxQueueManager) NotifyOnQueuedTxReturn(queuedTx *common.QueuedTx, err error) {
|
||||
func (m *Manager) NotifyOnQueuedTxReturn(queuedTx *common.QueuedTx, err error) {
|
||||
m.txQueue.NotifyOnQueuedTxReturn(queuedTx, err)
|
||||
}
|
||||
|
||||
// CompleteTransaction instructs backend to complete sending of a given transaction.
|
||||
// TODO(adam): investigate a possible bug that calling this method multiple times with the same Transaction ID
|
||||
// results in sending multiple transactions.
|
||||
func (m *TxQueueManager) CompleteTransaction(id common.QueuedTxID, password string) (gethcommon.Hash, error) {
|
||||
func (m *Manager) CompleteTransaction(id common.QueuedTxID, password string) (gethcommon.Hash, error) {
|
||||
log.Info("complete transaction", "id", id)
|
||||
|
||||
queuedTx, err := m.txQueue.Get(id)
|
||||
|
@ -190,7 +190,7 @@ func (m *TxQueueManager) CompleteTransaction(id common.QueuedTxID, password stri
|
|||
return hash, txErr
|
||||
}
|
||||
|
||||
func (m *TxQueueManager) completeLocalTransaction(queuedTx *common.QueuedTx, password string) (gethcommon.Hash, error) {
|
||||
func (m *Manager) completeLocalTransaction(queuedTx *common.QueuedTx, password string) (gethcommon.Hash, error) {
|
||||
log.Info("complete transaction using local node", "id", queuedTx.ID)
|
||||
|
||||
les, err := m.nodeManager.LightEthereumService()
|
||||
|
@ -201,7 +201,7 @@ func (m *TxQueueManager) completeLocalTransaction(queuedTx *common.QueuedTx, pas
|
|||
return les.StatusBackend.SendTransaction(context.Background(), status.SendTxArgs(queuedTx.Args), password)
|
||||
}
|
||||
|
||||
func (m *TxQueueManager) completeRemoteTransaction(queuedTx *common.QueuedTx, password string) (gethcommon.Hash, error) {
|
||||
func (m *Manager) completeRemoteTransaction(queuedTx *common.QueuedTx, password string) (gethcommon.Hash, error) {
|
||||
log.Info("complete transaction using upstream node", "id", queuedTx.ID)
|
||||
|
||||
var emptyHash gethcommon.Hash
|
||||
|
@ -290,7 +290,7 @@ func (m *TxQueueManager) completeRemoteTransaction(queuedTx *common.QueuedTx, pa
|
|||
return signedTx.Hash(), nil
|
||||
}
|
||||
|
||||
func (m *TxQueueManager) estimateGas(args common.SendTxArgs) (*hexutil.Big, error) {
|
||||
func (m *Manager) estimateGas(args common.SendTxArgs) (*hexutil.Big, error) {
|
||||
if args.Gas != nil {
|
||||
return args.Gas, nil
|
||||
}
|
||||
|
@ -338,7 +338,7 @@ func (m *TxQueueManager) estimateGas(args common.SendTxArgs) (*hexutil.Big, erro
|
|||
return &estimatedGas, nil
|
||||
}
|
||||
|
||||
func (m *TxQueueManager) gasPrice() (*hexutil.Big, error) {
|
||||
func (m *Manager) gasPrice() (*hexutil.Big, error) {
|
||||
client := m.nodeManager.RPCClient()
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
|
||||
defer cancel()
|
||||
|
@ -353,7 +353,7 @@ func (m *TxQueueManager) gasPrice() (*hexutil.Big, error) {
|
|||
}
|
||||
|
||||
// CompleteTransactions instructs backend to complete sending of multiple transactions
|
||||
func (m *TxQueueManager) CompleteTransactions(ids []common.QueuedTxID, password string) map[common.QueuedTxID]common.RawCompleteTransactionResult {
|
||||
func (m *Manager) CompleteTransactions(ids []common.QueuedTxID, password string) map[common.QueuedTxID]common.RawCompleteTransactionResult {
|
||||
results := make(map[common.QueuedTxID]common.RawCompleteTransactionResult)
|
||||
|
||||
for _, txID := range ids {
|
||||
|
@ -368,7 +368,7 @@ func (m *TxQueueManager) CompleteTransactions(ids []common.QueuedTxID, password
|
|||
}
|
||||
|
||||
// DiscardTransaction discards a given transaction from transaction queue
|
||||
func (m *TxQueueManager) DiscardTransaction(id common.QueuedTxID) error {
|
||||
func (m *Manager) DiscardTransaction(id common.QueuedTxID) error {
|
||||
queuedTx, err := m.txQueue.Get(id)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -385,7 +385,7 @@ func (m *TxQueueManager) DiscardTransaction(id common.QueuedTxID) error {
|
|||
}
|
||||
|
||||
// DiscardTransactions discards given multiple transactions from transaction queue
|
||||
func (m *TxQueueManager) DiscardTransactions(ids []common.QueuedTxID) map[common.QueuedTxID]common.RawDiscardTransactionResult {
|
||||
func (m *Manager) DiscardTransactions(ids []common.QueuedTxID) map[common.QueuedTxID]common.RawDiscardTransactionResult {
|
||||
results := make(map[common.QueuedTxID]common.RawDiscardTransactionResult)
|
||||
|
||||
for _, txID := range ids {
|
||||
|
@ -408,7 +408,7 @@ type SendTransactionEvent struct {
|
|||
}
|
||||
|
||||
// TransactionQueueHandler returns handler that processes incoming tx queue requests
|
||||
func (m *TxQueueManager) TransactionQueueHandler() func(queuedTx *common.QueuedTx) {
|
||||
func (m *Manager) TransactionQueueHandler() func(queuedTx *common.QueuedTx) {
|
||||
return func(queuedTx *common.QueuedTx) {
|
||||
log.Info("calling TransactionQueueHandler")
|
||||
signal.Send(signal.Envelope{
|
||||
|
@ -424,7 +424,7 @@ func (m *TxQueueManager) TransactionQueueHandler() func(queuedTx *common.QueuedT
|
|||
|
||||
// SetTransactionQueueHandler sets a handler that will be called
|
||||
// when a new transaction is enqueued.
|
||||
func (m *TxQueueManager) SetTransactionQueueHandler(fn common.EnqueuedTxHandler) {
|
||||
func (m *Manager) SetTransactionQueueHandler(fn common.EnqueuedTxHandler) {
|
||||
m.txQueue.SetEnqueueHandler(fn)
|
||||
}
|
||||
|
||||
|
@ -438,7 +438,7 @@ type ReturnSendTransactionEvent struct {
|
|||
}
|
||||
|
||||
// TransactionReturnHandler returns handler that processes responses from internal tx manager
|
||||
func (m *TxQueueManager) TransactionReturnHandler() func(queuedTx *common.QueuedTx, err error) {
|
||||
func (m *Manager) TransactionReturnHandler() func(queuedTx *common.QueuedTx, err error) {
|
||||
return func(queuedTx *common.QueuedTx, err error) {
|
||||
if err == nil {
|
||||
return
|
||||
|
@ -463,7 +463,7 @@ func (m *TxQueueManager) TransactionReturnHandler() func(queuedTx *common.Queued
|
|||
}
|
||||
}
|
||||
|
||||
func (m *TxQueueManager) sendTransactionErrorCode(err error) string {
|
||||
func (m *Manager) sendTransactionErrorCode(err error) string {
|
||||
if code, ok := txReturnCodes[err]; ok {
|
||||
return code
|
||||
}
|
||||
|
@ -474,13 +474,13 @@ func (m *TxQueueManager) sendTransactionErrorCode(err error) string {
|
|||
// SetTransactionReturnHandler sets a handler that will be called
|
||||
// when a transaction is about to return or when a recoverable error occured.
|
||||
// Recoverable error is, for instance, wrong password.
|
||||
func (m *TxQueueManager) SetTransactionReturnHandler(fn common.EnqueuedTxReturnHandler) {
|
||||
func (m *Manager) SetTransactionReturnHandler(fn common.EnqueuedTxReturnHandler) {
|
||||
m.txQueue.SetTxReturnHandler(fn)
|
||||
}
|
||||
|
||||
// SendTransactionRPCHandler is a handler for eth_sendTransaction method.
|
||||
// It accepts one param which is a slice with a map of transaction params.
|
||||
func (m *TxQueueManager) SendTransactionRPCHandler(ctx context.Context, args ...interface{}) (interface{}, error) {
|
||||
func (m *Manager) SendTransactionRPCHandler(ctx context.Context, args ...interface{}) (interface{}, error) {
|
||||
log.Info("SendTransactionRPCHandler called")
|
||||
|
||||
// TODO(adam): it's a hack to parse arguments as common.RPCCall can do that.
|
|
@ -1,4 +1,4 @@
|
|||
package node
|
||||
package txqueue
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
@ -57,7 +57,7 @@ func (s *TxQueueTestSuite) TestCompleteTransaction() {
|
|||
// and treat as success.
|
||||
s.nodeManagerMock.EXPECT().LightEthereumService().Return(nil, errTxAssumedSent)
|
||||
|
||||
txQueueManager := NewTxQueueManager(s.nodeManagerMock, s.accountManagerMock)
|
||||
txQueueManager := NewManager(s.nodeManagerMock, s.accountManagerMock)
|
||||
|
||||
txQueueManager.Start()
|
||||
defer txQueueManager.Stop()
|
||||
|
@ -107,7 +107,7 @@ func (s *TxQueueTestSuite) TestCompleteTransactionMultipleTimes() {
|
|||
// and treat as success.
|
||||
s.nodeManagerMock.EXPECT().LightEthereumService().Return(nil, errTxAssumedSent)
|
||||
|
||||
txQueueManager := NewTxQueueManager(s.nodeManagerMock, s.accountManagerMock)
|
||||
txQueueManager := NewManager(s.nodeManagerMock, s.accountManagerMock)
|
||||
|
||||
txQueueManager.Start()
|
||||
defer txQueueManager.Stop()
|
||||
|
@ -161,7 +161,7 @@ func (s *TxQueueTestSuite) TestAccountMismatch() {
|
|||
Address: common.FromAddress(TestConfig.Account2.Address),
|
||||
}, nil)
|
||||
|
||||
txQueueManager := NewTxQueueManager(s.nodeManagerMock, s.accountManagerMock)
|
||||
txQueueManager := NewManager(s.nodeManagerMock, s.accountManagerMock)
|
||||
|
||||
txQueueManager.Start()
|
||||
defer txQueueManager.Stop()
|
||||
|
@ -207,7 +207,7 @@ func (s *TxQueueTestSuite) TestInvalidPassword() {
|
|||
// Set ErrDecrypt error response as expected with a wrong password.
|
||||
s.nodeManagerMock.EXPECT().LightEthereumService().Return(nil, keystore.ErrDecrypt)
|
||||
|
||||
txQueueManager := NewTxQueueManager(s.nodeManagerMock, s.accountManagerMock)
|
||||
txQueueManager := NewManager(s.nodeManagerMock, s.accountManagerMock)
|
||||
|
||||
txQueueManager.Start()
|
||||
defer txQueueManager.Stop()
|
||||
|
@ -242,7 +242,7 @@ func (s *TxQueueTestSuite) TestInvalidPassword() {
|
|||
}
|
||||
|
||||
func (s *TxQueueTestSuite) TestDiscardTransaction() {
|
||||
txQueueManager := NewTxQueueManager(s.nodeManagerMock, s.accountManagerMock)
|
||||
txQueueManager := NewManager(s.nodeManagerMock, s.accountManagerMock)
|
||||
|
||||
txQueueManager.Start()
|
||||
defer txQueueManager.Stop()
|
|
@ -0,0 +1,28 @@
|
|||
package txqueue
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/status-im/status-go/geth/common"
|
||||
"github.com/status-im/status-go/geth/signal"
|
||||
)
|
||||
|
||||
var ErrTxQueueRunFailure = errors.New("error running transaction queue")
|
||||
|
||||
// HaltOnPanic recovers from panic, logs issue, sends upward notification, and exits
|
||||
func HaltOnPanic() {
|
||||
if r := recover(); r != nil {
|
||||
err := fmt.Errorf("%v: %v", ErrTxQueueRunFailure, r)
|
||||
|
||||
// send signal up to native app
|
||||
signal.Send(signal.Envelope{
|
||||
Type: signal.EventNodeCrashed,
|
||||
Event: signal.NodeCrashEvent{
|
||||
Error: err.Error(),
|
||||
},
|
||||
})
|
||||
|
||||
common.Fatalf(err) // os.exit(1) is called internally
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue