Add a test case with multiple transactions and upstream node (#615)
* Add a test case with multiple transactions and upstream node * address review comments
This commit is contained in:
parent
13454e827b
commit
2f4be933bf
|
@ -525,98 +525,16 @@ func (s *TransactionsTestSuite) TestDiscardQueuedTransaction() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *TransactionsTestSuite) TestCompleteMultipleQueuedTransactions() {
|
func (s *TransactionsTestSuite) TestCompleteMultipleQueuedTransactions() {
|
||||||
s.StartTestBackend()
|
s.setupLocalNode()
|
||||||
defer s.StopTestBackend()
|
defer s.StopTestBackend()
|
||||||
|
|
||||||
EnsureNodeSync(s.Backend.NodeManager())
|
|
||||||
s.TxQueueManager().TransactionQueue().Reset()
|
s.TxQueueManager().TransactionQueue().Reset()
|
||||||
|
|
||||||
// log into account from which transactions will be sent
|
// log into account from which transactions will be sent
|
||||||
err := s.Backend.AccountManager().SelectAccount(TestConfig.Account1.Address, TestConfig.Account1.Password)
|
err := s.Backend.AccountManager().SelectAccount(TestConfig.Account1.Address, TestConfig.Account1.Password)
|
||||||
s.NoError(err)
|
s.NoError(err)
|
||||||
|
|
||||||
testTxCount := 3
|
s.sendConcurrentTransactions(3)
|
||||||
txIDs := make(chan common.QueuedTxID, testTxCount)
|
|
||||||
allTestTxCompleted := make(chan struct{})
|
|
||||||
|
|
||||||
require := s.Require()
|
|
||||||
|
|
||||||
// replace transaction notification handler
|
|
||||||
signal.SetDefaultNodeNotificationHandler(func(jsonEvent string) {
|
|
||||||
var envelope signal.Envelope
|
|
||||||
err := json.Unmarshal([]byte(jsonEvent), &envelope)
|
|
||||||
require.NoError(err, fmt.Sprintf("cannot unmarshal JSON: %s", jsonEvent))
|
|
||||||
|
|
||||||
if envelope.Type == transactions.EventTransactionQueued {
|
|
||||||
event := envelope.Event.(map[string]interface{})
|
|
||||||
txID := common.QueuedTxID(event["id"].(string))
|
|
||||||
log.Info("transaction queued (will be completed in a single call, once aggregated)", "id", txID)
|
|
||||||
|
|
||||||
txIDs <- txID
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
// this call blocks, and should return when DiscardQueuedTransaction() for a given tx id is called
|
|
||||||
sendTx := func() {
|
|
||||||
txHashCheck, err := s.Backend.SendTransaction(context.TODO(), common.SendTxArgs{
|
|
||||||
From: common.FromAddress(TestConfig.Account1.Address),
|
|
||||||
To: common.ToAddress(TestConfig.Account2.Address),
|
|
||||||
Value: (*hexutil.Big)(big.NewInt(1000000000000)),
|
|
||||||
})
|
|
||||||
require.NoError(err, "cannot send transaction")
|
|
||||||
require.NotEqual(gethcommon.Hash{}, txHashCheck, "transaction returned empty hash")
|
|
||||||
}
|
|
||||||
|
|
||||||
// wait for transactions, and complete them in a single call
|
|
||||||
completeTxs := func(txIDs []common.QueuedTxID) {
|
|
||||||
txIDs = append(txIDs, "invalid-tx-id")
|
|
||||||
results := s.Backend.CompleteTransactions(txIDs, TestConfig.Account1.Password)
|
|
||||||
s.Len(results, testTxCount+1)
|
|
||||||
s.EqualError(results["invalid-tx-id"].Error, "transaction hash not found")
|
|
||||||
|
|
||||||
for txID, txResult := range results {
|
|
||||||
s.False(
|
|
||||||
txResult.Error != nil && txID != "invalid-tx-id",
|
|
||||||
"invalid error for %s", txID,
|
|
||||||
)
|
|
||||||
s.False(
|
|
||||||
txResult.Hash == (gethcommon.Hash{}) && txID != "invalid-tx-id",
|
|
||||||
"invalid hash (expected non empty hash): %s", txID,
|
|
||||||
)
|
|
||||||
log.Info("transaction complete", "URL", "https://ropsten.etherscan.io/tx/"+txResult.Hash.Hex())
|
|
||||||
}
|
|
||||||
|
|
||||||
time.Sleep(1 * time.Second) // make sure that tx complete signal propagates
|
|
||||||
|
|
||||||
for _, txID := range txIDs {
|
|
||||||
s.False(
|
|
||||||
s.Backend.TxQueueManager().TransactionQueue().Has(txID),
|
|
||||||
"txqueue should not have test tx at this point (it should be completed)",
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
go func() {
|
|
||||||
ids := make([]common.QueuedTxID, testTxCount)
|
|
||||||
for i := 0; i < testTxCount; i++ {
|
|
||||||
ids[i] = <-txIDs
|
|
||||||
}
|
|
||||||
|
|
||||||
completeTxs(ids)
|
|
||||||
close(allTestTxCompleted)
|
|
||||||
}()
|
|
||||||
|
|
||||||
// send multiple transactions
|
|
||||||
for i := 0; i < testTxCount; i++ {
|
|
||||||
go sendTx()
|
|
||||||
}
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-allTestTxCompleted:
|
|
||||||
case <-time.After(30 * time.Second):
|
|
||||||
s.FailNow("test timed out")
|
|
||||||
}
|
|
||||||
|
|
||||||
s.Zero(s.TxQueueManager().TransactionQueue().Count(), "queue should be empty")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *TransactionsTestSuite) TestDiscardMultipleQueuedTransactions() {
|
func (s *TransactionsTestSuite) TestDiscardMultipleQueuedTransactions() {
|
||||||
|
@ -811,3 +729,119 @@ func (s *TransactionsTestSuite) TestEvictionOfQueuedTransactions() {
|
||||||
m.Unlock()
|
m.Unlock()
|
||||||
s.Zero(txQueue.Count(), "transaction count should be zero: %d", txQueue.Count())
|
s.Zero(txQueue.Count(), "transaction count should be zero: %d", txQueue.Count())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *TransactionsTestSuite) TestCompleteMultipleQueuedTransactionsUpstream() {
|
||||||
|
s.setupUpstreamNode()
|
||||||
|
defer s.StopTestBackend()
|
||||||
|
|
||||||
|
s.TxQueueManager().TransactionQueue().Reset()
|
||||||
|
|
||||||
|
// log into account from which transactions will be sent
|
||||||
|
err := s.Backend.AccountManager().SelectAccount(TestConfig.Account1.Address, TestConfig.Account1.Password)
|
||||||
|
s.NoError(err)
|
||||||
|
|
||||||
|
s.sendConcurrentTransactions(30)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *TransactionsTestSuite) setupLocalNode() {
|
||||||
|
s.StartTestBackend()
|
||||||
|
|
||||||
|
EnsureNodeSync(s.Backend.NodeManager())
|
||||||
|
|
||||||
|
backend := s.LightEthereumService().StatusBackend
|
||||||
|
s.NotNil(backend)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *TransactionsTestSuite) setupUpstreamNode() {
|
||||||
|
if GetNetworkID() == params.StatusChainNetworkID {
|
||||||
|
s.T().Skip()
|
||||||
|
}
|
||||||
|
|
||||||
|
addr, err := GetRemoteURL()
|
||||||
|
s.NoError(err)
|
||||||
|
s.StartTestBackend(e2e.WithUpstream(addr))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *TransactionsTestSuite) sendConcurrentTransactions(testTxCount int) {
|
||||||
|
txIDs := make(chan common.QueuedTxID, testTxCount)
|
||||||
|
allTestTxCompleted := make(chan struct{})
|
||||||
|
|
||||||
|
require := s.Require()
|
||||||
|
|
||||||
|
// replace transaction notification handler
|
||||||
|
signal.SetDefaultNodeNotificationHandler(func(jsonEvent string) {
|
||||||
|
var envelope signal.Envelope
|
||||||
|
err := json.Unmarshal([]byte(jsonEvent), &envelope)
|
||||||
|
require.NoError(err, fmt.Sprintf("cannot unmarshal JSON: %s", jsonEvent))
|
||||||
|
|
||||||
|
if envelope.Type == transactions.EventTransactionQueued {
|
||||||
|
event := envelope.Event.(map[string]interface{})
|
||||||
|
txID := common.QueuedTxID(event["id"].(string))
|
||||||
|
log.Info("transaction queued (will be completed in a single call, once aggregated)", "id", txID)
|
||||||
|
|
||||||
|
txIDs <- txID
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
// this call blocks, and should return when DiscardQueuedTransaction() for a given tx id is called
|
||||||
|
sendTx := func() {
|
||||||
|
txHashCheck, err := s.Backend.SendTransaction(context.TODO(), common.SendTxArgs{
|
||||||
|
From: common.FromAddress(TestConfig.Account1.Address),
|
||||||
|
To: common.ToAddress(TestConfig.Account2.Address),
|
||||||
|
Value: (*hexutil.Big)(big.NewInt(1000000000000)),
|
||||||
|
})
|
||||||
|
require.NoError(err, "cannot send transaction")
|
||||||
|
require.NotEqual(gethcommon.Hash{}, txHashCheck, "transaction returned empty hash")
|
||||||
|
}
|
||||||
|
|
||||||
|
// wait for transactions, and complete them in a single call
|
||||||
|
completeTxs := func(txIDs []common.QueuedTxID) {
|
||||||
|
txIDs = append(txIDs, "invalid-tx-id")
|
||||||
|
results := s.Backend.CompleteTransactions(txIDs, TestConfig.Account1.Password)
|
||||||
|
s.Len(results, testTxCount+1)
|
||||||
|
s.EqualError(results["invalid-tx-id"].Error, "transaction hash not found")
|
||||||
|
|
||||||
|
for txID, txResult := range results {
|
||||||
|
s.False(
|
||||||
|
txResult.Error != nil && txID != "invalid-tx-id",
|
||||||
|
"invalid error for %s", txID,
|
||||||
|
)
|
||||||
|
s.False(
|
||||||
|
txResult.Hash == (gethcommon.Hash{}) && txID != "invalid-tx-id",
|
||||||
|
"invalid hash (expected non empty hash): %s", txID,
|
||||||
|
)
|
||||||
|
log.Info("transaction complete", "URL", "https://ropsten.etherscan.io/tx/"+txResult.Hash.Hex())
|
||||||
|
}
|
||||||
|
|
||||||
|
time.Sleep(1 * time.Second) // make sure that tx complete signal propagates
|
||||||
|
|
||||||
|
for _, txID := range txIDs {
|
||||||
|
s.False(
|
||||||
|
s.Backend.TxQueueManager().TransactionQueue().Has(txID),
|
||||||
|
"txqueue should not have test tx at this point (it should be completed)",
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
go func() {
|
||||||
|
ids := make([]common.QueuedTxID, testTxCount)
|
||||||
|
for i := 0; i < testTxCount; i++ {
|
||||||
|
ids[i] = <-txIDs
|
||||||
|
}
|
||||||
|
|
||||||
|
completeTxs(ids)
|
||||||
|
close(allTestTxCompleted)
|
||||||
|
}()
|
||||||
|
|
||||||
|
// send multiple transactions
|
||||||
|
for i := 0; i < testTxCount; i++ {
|
||||||
|
go sendTx()
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-allTestTxCompleted:
|
||||||
|
case <-time.After(50 * time.Second):
|
||||||
|
s.FailNow("test timed out")
|
||||||
|
}
|
||||||
|
|
||||||
|
s.Zero(s.TxQueueManager().TransactionQueue().Count(), "queue should be empty")
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue