status-go/transactions/pendingtxtracker_test.go

582 lines
17 KiB
Go
Raw Normal View History

package transactions
import (
"context"
"database/sql"
"encoding/json"
"math/big"
"sync"
"testing"
"time"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
eth "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/rpc"
"github.com/status-im/status-go/services/wallet/common"
"github.com/status-im/status-go/services/wallet/walletevent"
"github.com/status-im/status-go/t/helpers"
"github.com/status-im/status-go/walletdatabase"
)
// setupTestTransactionDB will use the default pending check interval if checkInterval is nil
func setupTestTransactionDB(t *testing.T, checkInterval *time.Duration) (*PendingTxTracker, func(), *MockChainClient, *event.Feed) {
db, err := helpers.SetupTestMemorySQLDB(walletdatabase.DbInitializer{})
require.NoError(t, err)
chainClient := NewMockChainClient()
eventFeed := &event.Feed{}
pendingCheckInterval := PendingCheckInterval
if checkInterval != nil {
pendingCheckInterval = *checkInterval
}
return NewPendingTxTracker(db, chainClient, nil, eventFeed, pendingCheckInterval), func() {
require.NoError(t, db.Close())
}, chainClient, eventFeed
}
func waitForTaskToStop(pt *PendingTxTracker) {
for pt.taskRunner.IsRunning() {
time.Sleep(1 * time.Microsecond)
}
}
func TestPendingTxTracker_ValidateConfirmedWithSuccessStatus(t *testing.T) {
m, stop, chainClient, eventFeed := setupTestTransactionDB(t, nil)
defer stop()
txs := MockTestTransactions(t, chainClient, []TestTxSummary{{}})
eventChan := make(chan walletevent.Event, 3)
sub := eventFeed.Subscribe(eventChan)
err := m.StoreAndTrackPendingTx(&txs[0])
require.NoError(t, err)
for i := 0; i < 3; i++ {
select {
case we := <-eventChan:
if i == 0 || i == 1 {
// Check add and delete
require.Equal(t, EventPendingTransactionUpdate, we.Type)
} else {
require.Equal(t, EventPendingTransactionStatusChanged, we.Type)
var p StatusChangedPayload
err = json.Unmarshal([]byte(we.Message), &p)
require.NoError(t, err)
require.Equal(t, txs[0].Hash, p.Hash)
require.Equal(t, Success, p.Status)
}
case <-time.After(1 * time.Second):
t.Fatal("timeout waiting for event")
}
}
// Wait for the answer to be processed
err = m.Stop()
require.NoError(t, err)
waitForTaskToStop(m)
res, err := m.GetAllPending()
require.NoError(t, err)
require.Equal(t, 0, len(res))
sub.Unsubscribe()
}
func TestPendingTxTracker_ValidateConfirmedWithFailedStatus(t *testing.T) {
m, stop, chainClient, eventFeed := setupTestTransactionDB(t, nil)
defer stop()
txs := MockTestTransactions(t, chainClient, []TestTxSummary{{failStatus: true}})
eventChan := make(chan walletevent.Event, 3)
sub := eventFeed.Subscribe(eventChan)
err := m.StoreAndTrackPendingTx(&txs[0])
require.NoError(t, err)
for i := 0; i < 3; i++ {
select {
case we := <-eventChan:
if i == 0 || i == 1 {
// Check add and delete
require.Equal(t, EventPendingTransactionUpdate, we.Type)
} else {
require.Equal(t, EventPendingTransactionStatusChanged, we.Type)
var p StatusChangedPayload
err = json.Unmarshal([]byte(we.Message), &p)
require.NoError(t, err)
require.Equal(t, txs[0].Hash, p.Hash)
require.Equal(t, Failed, p.Status)
}
case <-time.After(1 * time.Second):
t.Fatal("timeout waiting for event")
}
}
// Wait for the answer to be processed
err = m.Stop()
require.NoError(t, err)
waitForTaskToStop(m)
res, err := m.GetAllPending()
require.NoError(t, err)
require.Equal(t, 0, len(res))
sub.Unsubscribe()
}
func TestPendingTxTracker_InterruptWatching(t *testing.T) {
m, stop, chainClient, eventFeed := setupTestTransactionDB(t, nil)
defer stop()
txs := GenerateTestPendingTransactions(2)
// Mock the first call to getTransactionByHash
chainClient.SetAvailableClients([]common.ChainID{txs[0].ChainID})
cl := chainClient.Clients[txs[0].ChainID]
cl.On("BatchCallContext", mock.Anything, mock.MatchedBy(func(b []rpc.BatchElem) bool {
return (len(b) == 2 && b[0].Method == GetTransactionReceiptRPCName && b[0].Args[0] == txs[0].Hash && b[1].Method == GetTransactionReceiptRPCName && b[1].Args[0] == txs[1].Hash)
})).Return(nil).Once().Run(func(args mock.Arguments) {
elems := args.Get(1).([]rpc.BatchElem)
// Simulate still pending due to "null" return from eth_getTransactionReceipt
elems[0].Result.(*nullableReceipt).Receipt = nil
// Simulate parsing of eth_getTransactionReceipt response
elems[1].Result.(*nullableReceipt).Receipt = &types.Receipt{
BlockNumber: new(big.Int).SetUint64(1),
Status: 1,
}
})
eventChan := make(chan walletevent.Event, 2)
sub := eventFeed.Subscribe(eventChan)
for i := range txs {
err := m.addPending(&txs[i])
require.NoError(t, err)
}
// Check add
for i := 0; i < 2; i++ {
select {
case we := <-eventChan:
require.Equal(t, EventPendingTransactionUpdate, we.Type)
case <-time.After(1 * time.Second):
t.Fatal("timeout waiting for event")
}
}
err := m.Start()
require.NoError(t, err)
for i := 0; i < 2; i++ {
select {
case we := <-eventChan:
if i == 0 {
require.Equal(t, EventPendingTransactionUpdate, we.Type)
} else {
require.Equal(t, EventPendingTransactionStatusChanged, we.Type)
var p StatusChangedPayload
err := json.Unmarshal([]byte(we.Message), &p)
require.NoError(t, err)
require.Equal(t, txs[1].Hash, p.Hash)
require.Equal(t, txs[1].ChainID, p.ChainID)
require.Equal(t, Success, p.Status)
}
case <-time.After(1 * time.Second):
t.Fatal("timeout waiting for event")
}
}
// Stop the next timed call
err = m.Stop()
require.NoError(t, err)
waitForTaskToStop(m)
res, err := m.GetAllPending()
require.NoError(t, err)
require.Equal(t, 1, len(res), "should have only one pending tx")
// Restart the tracker to process leftovers
//
cl.On("BatchCallContext", mock.Anything, mock.MatchedBy(func(b []rpc.BatchElem) bool {
return (len(b) == 1 && b[0].Method == GetTransactionReceiptRPCName && b[0].Args[0] == txs[0].Hash)
})).Return(nil).Once().Run(func(args mock.Arguments) {
elems := args.Get(1).([]rpc.BatchElem)
// Simulate parsing of eth_getTransactionReceipt response
elems[0].Result.(*nullableReceipt).Receipt = &types.Receipt{
BlockNumber: new(big.Int).SetUint64(1),
Status: 1,
}
})
err = m.Start()
require.NoError(t, err)
for i := 0; i < 2; i++ {
select {
case we := <-eventChan:
if i == 0 {
require.Equal(t, EventPendingTransactionUpdate, we.Type)
} else {
require.Equal(t, EventPendingTransactionStatusChanged, we.Type)
var p StatusChangedPayload
err := json.Unmarshal([]byte(we.Message), &p)
require.NoError(t, err)
require.Equal(t, txs[0].ChainID, p.ChainID)
require.Equal(t, txs[0].Hash, p.Hash)
require.Equal(t, Success, p.Status)
}
case <-time.After(1 * time.Second):
t.Fatal("timeout waiting for event")
}
}
err = m.Stop()
require.NoError(t, err)
waitForTaskToStop(m)
res, err = m.GetAllPending()
require.NoError(t, err)
require.Equal(t, 0, len(res))
sub.Unsubscribe()
}
func TestPendingTxTracker_MultipleClients(t *testing.T) {
m, stop, chainClient, eventFeed := setupTestTransactionDB(t, nil)
defer stop()
txs := GenerateTestPendingTransactions(2)
txs[1].ChainID++
// Mock the both clients to be available
chainClient.SetAvailableClients([]common.ChainID{txs[0].ChainID, txs[1].ChainID})
cl := chainClient.Clients[txs[0].ChainID]
cl.On("BatchCallContext", mock.Anything, mock.MatchedBy(func(b []rpc.BatchElem) bool {
return (len(b) == 1 && b[0].Method == GetTransactionReceiptRPCName && b[0].Args[0] == txs[0].Hash)
})).Return(nil).Once().Run(func(args mock.Arguments) {
elems := args.Get(1).([]rpc.BatchElem)
// Simulate parsing of eth_getTransactionReceipt response
elems[0].Result.(*nullableReceipt).Receipt = &types.Receipt{
BlockNumber: new(big.Int).SetUint64(1),
Status: 1,
}
})
cl = chainClient.Clients[txs[1].ChainID]
cl.On("BatchCallContext", mock.Anything, mock.MatchedBy(func(b []rpc.BatchElem) bool {
return (len(b) == 1 && b[0].Method == GetTransactionReceiptRPCName && b[0].Args[0] == txs[1].Hash)
})).Return(nil).Once().Run(func(args mock.Arguments) {
elems := args.Get(1).([]rpc.BatchElem)
// Simulate parsing of eth_getTransactionReceipt response
elems[0].Result.(*nullableReceipt).Receipt = &types.Receipt{
BlockNumber: new(big.Int).SetUint64(1),
Status: 1,
}
})
eventChan := make(chan walletevent.Event, 6)
sub := eventFeed.Subscribe(eventChan)
for i := range txs {
err := m.TrackPendingTransaction(txs[i].ChainID, txs[i].Hash, txs[i].From, txs[i].Type, AutoDelete)
require.NoError(t, err)
}
err := m.Start()
require.NoError(t, err)
storeEventCount := 0
statusEventCount := 0
validateStatusChange := func(we *walletevent.Event) {
if we.Type == EventPendingTransactionUpdate {
storeEventCount++
} else if we.Type == EventPendingTransactionStatusChanged {
statusEventCount++
require.Equal(t, EventPendingTransactionStatusChanged, we.Type)
var p StatusChangedPayload
err := json.Unmarshal([]byte(we.Message), &p)
require.NoError(t, err)
require.Equal(t, Success, p.Status)
}
}
for i := 0; i < 2; i++ {
for j := 0; j < 3; j++ {
select {
case we := <-eventChan:
validateStatusChange(&we)
case <-time.After(1 * time.Second):
t.Fatal("timeout waiting for event", i, j, storeEventCount, statusEventCount)
}
}
}
require.Equal(t, 4, storeEventCount)
require.Equal(t, 2, statusEventCount)
err = m.Stop()
require.NoError(t, err)
waitForTaskToStop(m)
res, err := m.GetAllPending()
require.NoError(t, err)
require.Equal(t, 0, len(res))
sub.Unsubscribe()
}
func TestPendingTxTracker_Watch(t *testing.T) {
m, stop, chainClient, eventFeed := setupTestTransactionDB(t, nil)
defer stop()
txs := GenerateTestPendingTransactions(2)
// Make the second already confirmed
*txs[0].Status = Success
// Mock the first call to getTransactionByHash
chainClient.SetAvailableClients([]common.ChainID{txs[0].ChainID})
cl := chainClient.Clients[txs[0].ChainID]
cl.On("BatchCallContext", mock.Anything, mock.MatchedBy(func(b []rpc.BatchElem) bool {
return len(b) == 1 && b[0].Method == GetTransactionReceiptRPCName && b[0].Args[0] == txs[1].Hash
})).Return(nil).Once().Run(func(args mock.Arguments) {
elems := args.Get(1).([]rpc.BatchElem)
// Simulate parsing of eth_getTransactionReceipt response
elems[0].Result.(*nullableReceipt).Receipt = &types.Receipt{
BlockNumber: new(big.Int).SetUint64(1),
Status: 1,
}
})
eventChan := make(chan walletevent.Event, 3)
sub := eventFeed.Subscribe(eventChan)
// Track the first transaction
err := m.TrackPendingTransaction(txs[1].ChainID, txs[1].Hash, txs[1].From, txs[1].Type, Keep)
require.NoError(t, err)
// Store the confirmed already
err = m.StoreAndTrackPendingTx(&txs[0])
require.NoError(t, err)
storeEventCount := 0
statusEventCount := 0
for j := 0; j < 3; j++ {
select {
case we := <-eventChan:
if EventPendingTransactionUpdate == we.Type {
storeEventCount++
} else if EventPendingTransactionStatusChanged == we.Type {
statusEventCount++
var p StatusChangedPayload
err := json.Unmarshal([]byte(we.Message), &p)
require.NoError(t, err)
require.Equal(t, txs[1].ChainID, p.ChainID)
require.Equal(t, txs[1].Hash, p.Hash)
require.Equal(t, Success, p.Status)
}
case <-time.After(1 * time.Second):
t.Fatal("timeout waiting for the status update event")
}
}
require.Equal(t, 2, storeEventCount)
require.Equal(t, 1, statusEventCount)
// Stop the next timed call
err = m.Stop()
require.NoError(t, err)
waitForTaskToStop(m)
res, err := m.GetAllPending()
require.NoError(t, err)
require.Equal(t, 0, len(res), "should have no pending tx")
status, err := m.Watch(context.Background(), txs[1].ChainID, txs[1].Hash)
require.NoError(t, err)
require.NotEqual(t, Pending, status)
err = m.Delete(context.Background(), txs[1].ChainID, txs[1].Hash)
require.NoError(t, err)
select {
case we := <-eventChan:
require.Equal(t, EventPendingTransactionUpdate, we.Type)
case <-time.After(1 * time.Second):
t.Fatal("timeout waiting for the delete event")
}
sub.Unsubscribe()
}
func TestPendingTxTracker_Watch_StatusChangeIncrementally(t *testing.T) {
m, stop, chainClient, eventFeed := setupTestTransactionDB(t, common.NewAndSet(1*time.Nanosecond))
defer stop()
txs := GenerateTestPendingTransactions(2)
var firsDoneWG sync.WaitGroup
firsDoneWG.Add(1)
// Mock the first call to getTransactionByHash
chainClient.SetAvailableClients([]common.ChainID{txs[0].ChainID})
cl := chainClient.Clients[txs[0].ChainID]
cl.On("BatchCallContext", mock.Anything, mock.MatchedBy(func(b []rpc.BatchElem) bool {
if len(cl.Calls) == 0 {
res := len(b) > 0 && b[0].Method == GetTransactionReceiptRPCName && b[0].Args[0] == txs[0].Hash
// If the first processing call picked up the second validate this case also
if len(b) == 2 {
res = res && b[1].Method == GetTransactionReceiptRPCName && b[1].Args[0] == txs[1].Hash
}
return res
}
// Second call we expect only one left
return len(b) == 1 && (b[0].Method == GetTransactionReceiptRPCName && b[0].Args[0] == txs[1].Hash)
})).Return(nil).Twice().Run(func(args mock.Arguments) {
elems := args.Get(1).([]rpc.BatchElem)
if len(cl.Calls) == 2 {
firsDoneWG.Wait()
}
// Only first item is processed, second is left pending
// Simulate parsing of eth_getTransactionReceipt response
elems[0].Result.(*nullableReceipt).Receipt = &types.Receipt{
BlockNumber: new(big.Int).SetUint64(1),
Status: 1,
}
})
eventChan := make(chan walletevent.Event, 6)
sub := eventFeed.Subscribe(eventChan)
for i := range txs {
// Track the first transaction
err := m.TrackPendingTransaction(txs[i].ChainID, txs[i].Hash, txs[i].From, txs[i].Type, Keep)
require.NoError(t, err)
}
storeEventCount := 0
statusEventCount := 0
validateStatusChange := func(we *walletevent.Event) {
var p StatusChangedPayload
err := json.Unmarshal([]byte(we.Message), &p)
require.NoError(t, err)
if statusEventCount == 0 {
require.Equal(t, txs[0].ChainID, p.ChainID)
require.Equal(t, txs[0].Hash, p.Hash)
require.Equal(t, Success, p.Status)
status, err := m.Watch(context.Background(), txs[0].ChainID, txs[0].Hash)
require.NoError(t, err)
require.Equal(t, Success, *status)
err = m.Delete(context.Background(), txs[0].ChainID, txs[0].Hash)
require.NoError(t, err)
status, err = m.Watch(context.Background(), txs[1].ChainID, txs[1].Hash)
require.NoError(t, err)
require.Equal(t, Pending, *status)
firsDoneWG.Done()
} else {
_, err := m.Watch(context.Background(), txs[0].ChainID, txs[0].Hash)
require.Equal(t, err, sql.ErrNoRows)
status, err := m.Watch(context.Background(), txs[1].ChainID, txs[1].Hash)
require.NoError(t, err)
require.Equal(t, Success, *status)
err = m.Delete(context.Background(), txs[1].ChainID, txs[1].Hash)
require.NoError(t, err)
}
statusEventCount++
}
for j := 0; j < 6; j++ {
select {
case we := <-eventChan:
if EventPendingTransactionUpdate == we.Type {
storeEventCount++
} else if EventPendingTransactionStatusChanged == we.Type {
validateStatusChange(&we)
}
case <-time.After(1 * time.Second):
t.Fatal("timeout waiting for the status update event")
}
}
_, err := m.Watch(context.Background(), txs[1].ChainID, txs[1].Hash)
require.Equal(t, err, sql.ErrNoRows)
// One for add and one for delete
require.Equal(t, 4, storeEventCount)
require.Equal(t, 2, statusEventCount)
err = m.Stop()
require.NoError(t, err)
waitForTaskToStop(m)
res, err := m.GetAllPending()
require.NoError(t, err)
require.Equal(t, 0, len(res), "should have no pending tx")
sub.Unsubscribe()
}
func TestPendingTransactions(t *testing.T) {
manager, stop, _, _ := setupTestTransactionDB(t, nil)
defer stop()
tx := GenerateTestPendingTransactions(1)[0]
rst, err := manager.GetAllPending()
require.NoError(t, err)
require.Nil(t, rst)
rst, err = manager.GetPendingByAddress([]uint64{777}, tx.From)
require.NoError(t, err)
require.Nil(t, rst)
err = manager.addPending(&tx)
require.NoError(t, err)
rst, err = manager.GetPendingByAddress([]uint64{777}, tx.From)
require.NoError(t, err)
require.Equal(t, 1, len(rst))
require.Equal(t, tx, *rst[0])
rst, err = manager.GetAllPending()
require.NoError(t, err)
require.Equal(t, 1, len(rst))
require.Equal(t, tx, *rst[0])
rst, err = manager.GetPendingByAddress([]uint64{777}, eth.Address{2})
require.NoError(t, err)
require.Nil(t, rst)
err = manager.Delete(context.Background(), common.ChainID(777), tx.Hash)
require.Error(t, err, ErrStillPending)
rst, err = manager.GetPendingByAddress([]uint64{777}, tx.From)
require.NoError(t, err)
require.Equal(t, 0, len(rst))
rst, err = manager.GetAllPending()
require.NoError(t, err)
require.Equal(t, 0, len(rst))
}