test(wallet) sql integration tests for activity incremental update

Refactor tests to follow changes in session based activity API

Updates #12120
This commit is contained in:
Stefan 2024-02-08 09:47:09 -03:00 committed by Stefan Dunca
parent d970e7c3f2
commit 1e75319664
5 changed files with 277 additions and 87 deletions

View File

@ -11,6 +11,7 @@ import (
"testing"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
statusgo "github.com/status-im/status-go/mobile"
"github.com/status-im/status-go/multiaccounts"
@ -107,18 +108,31 @@ func WaitForEvent(eventQueue chan GoEvent, eventName StatusGoEventName, timeout
}
}
// WaitForWalletEvents returns payloads corresponding to the given eventNames in the order they are received for duplicate events
func WaitForWalletEvents[T any](eventQueue chan GoEvent, eventNamesOrig []walletevent.EventType, timeout time.Duration) (payloads []*T, err error) {
var event *GoEvent
func WaitForWalletEvents(eventQueue chan GoEvent, eventNames []walletevent.EventType, timeout time.Duration, condition func(walletEvent *walletevent.Event) bool) (walletEvents []*walletevent.Event, err error) {
return WaitForWalletEventsWithOptionals(eventQueue, eventNames, timeout, condition, nil)
}
payloads = make([]*T, len(eventNamesOrig))
processed := make([]bool, len(eventNamesOrig))
processedCount := 0
// WaitForWalletEvents waits for the given events to be received on the eventQueue.
// It returns the wallet events in the order they are received.
func WaitForWalletEventsWithOptionals(eventQueue chan GoEvent, eventNames []walletevent.EventType, timeout time.Duration, condition func(walletEvent *walletevent.Event) bool, optionalEventNames []walletevent.EventType) (walletEvents []*walletevent.Event, err error) {
if len(eventNames) == 0 {
return nil, errors.New("no event names provided")
}
startTime := time.Now()
expected := make([]walletevent.EventType, len(eventNames))
copy(expected, eventNames)
walletEvents = make([]*walletevent.Event, 0, len(eventNames))
infiniteLoop:
for {
event, err = WaitForEvent(eventQueue, WalletEvent, timeout)
toWait := timeout - time.Since(startTime)
if toWait <= 0 {
return nil, fmt.Errorf("timeout waiting for events %+v", expected)
}
event, err := WaitForEvent(eventQueue, WalletEvent, toWait)
if err != nil {
return nil, err
return nil, fmt.Errorf("error waiting for events %+v: %w", expected, err)
}
walletEvent, ok := event.Payload.(walletevent.Event)
@ -126,40 +140,115 @@ func WaitForWalletEvents[T any](eventQueue chan GoEvent, eventNamesOrig []wallet
return nil, errors.New("event payload is not a wallet event")
}
var newPayload T
foundIndex := -1
for i, eventName := range eventNamesOrig {
if walletEvent.Type == eventName && !processed[i] {
foundIndex = i
processed[i] = true
processedCount += 1
break
for i, event := range expected {
if walletEvent.Type == event && (condition == nil || condition(&walletEvent)) {
walletEvents = append(walletEvents, &walletEvent)
if len(expected) == 1 {
return walletEvents, nil
}
// Remove found event from the list of expected events
expected = append(expected[:i], expected[i+1:]...)
continue infiniteLoop
}
}
if foundIndex != -1 {
if walletEvent.Message != "" {
err = json.Unmarshal([]byte(walletEvent.Message), &newPayload)
if err != nil {
return nil, err
}
payloads[foundIndex] = &newPayload
} else {
payloads[foundIndex] = nil
}
if processedCount == len(eventNamesOrig) {
return payloads, nil
for _, event := range optionalEventNames {
if walletEvent.Type == event && condition != nil {
_ = condition(&walletEvent)
}
}
}
}
func WaitForWalletEvent[T any](eventQueue chan GoEvent, eventName walletevent.EventType, timeout time.Duration) (payload *T, err error) {
res, err := WaitForWalletEvents[T](eventQueue, []walletevent.EventType{eventName}, timeout)
type payloadRes struct {
eventName walletevent.EventType
data []byte
}
// WaitForWalletEventsGetPayloads returns payloads corresponding to the given eventNames in the order they are received for duplicate events
func WaitForWalletEventsGetPayloads(eventQueue chan GoEvent, eventNames []walletevent.EventType, timeoutEach time.Duration) (payloads []payloadRes, err error) {
walletEvents, err := WaitForWalletEvents(eventQueue, eventNames, timeoutEach, nil)
if err != nil {
return nil, err
}
return res[0], nil
payloads = make([]payloadRes, len(walletEvents))
for i, event := range walletEvents {
payloads[i] = payloadRes{
eventName: event.Type,
}
if event.Message != "" {
payloads[i].data = []byte(event.Message)
}
}
return payloads, nil
}
type payloadMapRes struct {
EventName walletevent.EventType
JsonData map[string]interface{}
}
// WaitForWalletEventsGetMap returns parsed JSON payloads; @see WaitForWalletEventsGetPayloads
func WaitForWalletEventsGetMap(eventQueue chan GoEvent, eventNames []walletevent.EventType, timeout time.Duration) (payloads []payloadMapRes, err error) {
bytePayloads, err := WaitForWalletEventsGetPayloads(eventQueue, eventNames, timeout)
if err != nil {
return nil, err
}
payloads = make([]payloadMapRes, len(bytePayloads))
for i, payload := range bytePayloads {
var mapPayload map[string]interface{}
if payload.data != nil {
mapPayload = make(map[string]interface{})
err = json.Unmarshal(payload.data, &mapPayload)
if err != nil {
return nil, err
}
}
payloads[i] = payloadMapRes{
EventName: payload.eventName,
JsonData: mapPayload,
}
}
return payloads, nil
}
func WaitForWalletEventGetPayload[T any](eventQueue chan GoEvent, eventName walletevent.EventType, timeout time.Duration) (payload *T, err error) {
res, err := WaitForWalletEventsGetPayloads(eventQueue, []walletevent.EventType{eventName}, timeout)
if err != nil {
return nil, err
}
if res[0].data == nil {
return nil, nil
}
newPayload := new(T)
err = json.Unmarshal(res[0].data, newPayload)
if err != nil {
return nil, err
}
return newPayload, nil
}
// WaitForTxDownloaderToFinishForAccountsCondition returns a state-full condition function that records every account that has been seen with the events until the entire list is seen
func WaitForTxDownloaderToFinishForAccountsCondition(t *testing.T, accounts []common.Address) func(walletEvent *walletevent.Event) bool {
accs := make([]common.Address, len(accounts))
copy(accs, accounts)
return func(walletEvent *walletevent.Event) bool {
eventAccountsLoop:
for _, acc := range walletEvent.Accounts {
for i, a := range accs {
if acc == a {
if len(accs) == 1 {
return true
}
accs = append(accs[:i], accs[i+1:]...)
continue eventAccountsLoop
}
}
}
return false
}
}
func loginToAccount(hashedPassword, userFolder, nodeConfigJson string) error {
@ -251,32 +340,49 @@ func CallPrivateMethodWithTimeout(method string, params []interface{}, timeout t
}
didTimeout := false
done := make(chan bool)
var responseJson string
done := make(chan string)
go func() {
responseJson = statusgo.CallPrivateRPC(string(msgJson))
responseJson := statusgo.CallPrivateRPC(string(msgJson))
if didTimeout {
log.Warn("Call to CallPrivateRPC returned after timeout", "payload", string(msgJson))
return
}
done <- true
done <- responseJson
}()
select {
case <-done:
case res := <-done:
return res, nil
case <-time.After(timeout):
didTimeout = true
return "", fmt.Errorf("timeout waiting for response to statusgo.CallPrivateRPC; payload \"%s\"", string(msgJson))
}
return responseJson, nil
}
func CallPrivateMethod(method string, params []interface{}) (string, error) {
return CallPrivateMethodWithTimeout(method, params, 60*time.Second)
}
func CallPrivateMethodAndGetT[T any](method string, params []interface{}) (*T, error) {
resJson, err := CallPrivateMethodWithTimeout(method, params, 60*time.Second)
if err != nil {
return nil, err
}
var res T
rawJson, err := GetRPCAPIResponseRaw(resJson)
if err != nil {
return nil, err
}
if err := json.Unmarshal(rawJson, &res); err != nil {
return nil, fmt.Errorf("failed to unmarshal data: %w", err)
}
return &res, nil
}
type Config struct {
HashedPassword string `json:"hashedPassword"`
NodeConfigFile string `json:"nodeConfigFile"`
@ -359,29 +465,37 @@ type jsonError struct {
}
func GetRPCAPIResponse[T any](responseJson string, res T) error {
errApiResponse := jsonrpcErrorResponse{}
err := json.Unmarshal([]byte(responseJson), &errApiResponse)
if err == nil && errApiResponse.Error.Code != 0 {
return fmt.Errorf("API error: %#v", errApiResponse.Error)
}
apiResponse := jsonrpcSuccessfulResponse{}
err = json.Unmarshal([]byte(responseJson), &apiResponse)
if err != nil {
return fmt.Errorf("failed to unmarshal jsonrpcSuccessfulResponse: %w", err)
}
typeOfT := reflect.TypeOf(res)
kindOfT := typeOfT.Kind()
// Check for valid types: pointer, slice, map
if kindOfT != reflect.Ptr && kindOfT != reflect.Slice && kindOfT != reflect.Map {
return fmt.Errorf("type T must be a pointer, slice, or map")
}
if err := json.Unmarshal(apiResponse.Result, &res); err != nil {
rawJson, err := GetRPCAPIResponseRaw(responseJson)
if err != nil {
return err
}
if err := json.Unmarshal(rawJson, &res); err != nil {
return fmt.Errorf("failed to unmarshal data: %w", err)
}
return nil
}
func GetRPCAPIResponseRaw(responseJson string) (json.RawMessage, error) {
errApiResponse := jsonrpcErrorResponse{}
err := json.Unmarshal([]byte(responseJson), &errApiResponse)
if err == nil && errApiResponse.Error.Code != 0 {
return nil, fmt.Errorf("API error: %#v", errApiResponse.Error)
}
apiResponse := jsonrpcSuccessfulResponse{}
err = json.Unmarshal([]byte(responseJson), &apiResponse)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal jsonrpcSuccessfulResponse: %w", err)
}
return apiResponse.Result, nil
}

View File

@ -4,9 +4,11 @@
package wallet
import (
"encoding/json"
"testing"
"time"
eth "github.com/ethereum/go-ethereum/common"
"github.com/stretchr/testify/require"
"github.com/status-im/status-desktop/test/status-go/integration/helpers"
@ -14,8 +16,8 @@ import (
"github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/services/wallet/activity"
"github.com/status-im/status-go/services/wallet/common"
"github.com/status-im/status-go/services/wallet/transfer"
"github.com/status-im/status-go/services/wallet/walletevent"
"github.com/status-im/status-go/transactions"
)
// TestActivityIncrementalUpdates_NoFilterNewPendingTransactions tests that a pending transaction is created, then updated and finally deleted.
@ -23,34 +25,83 @@ func TestActivityIncrementalUpdates_NoFilterNewPendingTransactions(t *testing.T)
td, close := setupAccountsAndTransactions(t)
defer close()
_, err := helpers.CallPrivateMethod("wallet_startActivityFilterSession", []interface{}{[]types.Address{td.sender.Address}, false, []common.ChainID{5}, activity.Filter{}, 3})
rawSessionID, err := helpers.CallPrivateMethodAndGetT[int32]("wallet_startActivityFilterSession", []interface{}{[]types.Address{td.sender.Address}, false, []common.ChainID{5}, activity.Filter{}, 3})
require.NoError(t, err)
require.NotNil(t, rawSessionID)
sessionID := activity.SessionID(*rawSessionID)
// Confirm async filtering results
filterRes, err := helpers.WaitForWalletEvents[activity.FilterResponse](
td.eventQueue, []walletevent.EventType{activity.EventActivityFilteringDone},
5*time.Second,
)
res, err := helpers.WaitForWalletEventGetPayload[activity.FilterResponse](td.eventQueue, activity.EventActivityFilteringDone, 5*time.Second)
require.NoError(t, err)
res := filterRes[0]
require.Equal(t, activity.ErrorCodeSuccess, res.ErrorCode)
require.Equal(t, 3, len(res.Activities))
// Trigger updating of activity results
sendTransaction(t, td)
// Wait for EventActivitySessionUpdated signal triggered by the EventPendingTransactionUpdate
update, err := helpers.WaitForWalletEvent[activity.SessionUpdate](td.eventQueue, activity.EventActivitySessionUpdated, 2*time.Second)
// Wait for EventActivitySessionUpdated signal triggered by the first EventPendingTransactionUpdate
update, err := helpers.WaitForWalletEventGetPayload[activity.SessionUpdate](td.eventQueue, activity.EventActivitySessionUpdated, 60*time.Second)
require.NoError(t, err)
require.Equal(t, 1, len(update.NewEntries))
require.NotNil(t, update.HasNewEntries)
require.True(t, *update.HasNewEntries)
// Step x: Trigger downloading of the new transaction ...
_, err = helpers.CallPrivateMethodWithTimeout("wallet_checkRecentHistoryForChainIDs", []interface{}{[]uint64{5}, []types.Address{td.sender.Address, td.recipient.Address}}, 2*time.Second)
// TODO #12120 check EventActivitySessionUpdated due to transactions.EventPendingTransactionStatusChanged
// statusPayload, err := helpers.WaitForWalletEventGetPayload[transactions.StatusChangedPayload](td.eventQueue, , 60*time.Second)
// require.NoError(t, err)
// require.Equal(t, transactions.Success, statusPayload.Status)
// Start history download to cleanup pending transactions
_, err = helpers.CallPrivateMethod("wallet_checkRecentHistoryForChainIDs", []interface{}{[]uint64{5}, []types.Address{td.sender.Address, td.recipient.Address}})
require.NoError(t, err)
// ... and wait for the new transaction download to trigger deletion from pending_transactions
updatePayload, err := helpers.WaitForWalletEvent[transactions.PendingTxUpdatePayload](
td.eventQueue, transactions.EventPendingTransactionUpdate, 120*time.Second)
downloadDoneFn := helpers.WaitForTxDownloaderToFinishForAccountsCondition(t, []eth.Address{eth.Address(td.sender.Address), eth.Address(td.recipient.Address)})
update = nil
// Wait for EventRecentHistoryReady.
// It is expected that downloading will generate a EventPendingTransactionUpdate that in turn will generate a second EventActivitySessionUpdated signal marked by the update non nil value
_, err = helpers.WaitForWalletEventsWithOptionals(
td.eventQueue,
[]walletevent.EventType{transfer.EventRecentHistoryReady},
120*time.Second,
func(e *walletevent.Event) bool {
if e.Type == activity.EventActivitySessionUpdated {
var parsedPayload activity.SessionUpdate
err := json.Unmarshal(([]byte)(e.Message), &parsedPayload)
require.NoError(t, err)
require.Equal(t, true, updatePayload.Deleted)
update = &parsedPayload
// TODO #12120 enable after implementing remove and update
// require.NotNil(t, update.HasNewEntries)
// require.True(t, *update.HasNewEntries)
// require.NotNil(t, update.Removed)
// require.True(t, *update.Removed)
return false
} else if e.Type == transfer.EventFetchingHistoryError {
require.Fail(t, "History download failed")
return false
} else if downloadDoneFn(e) {
return true
}
return false
},
[]walletevent.EventType{activity.EventActivitySessionUpdated, transfer.EventFetchingHistoryError},
)
require.NoError(t, err)
require.NotNil(t, update, "EventActivitySessionUpdated signal WASN'T triggered by the second EventPendingTransactionUpdate during history download")
require.NotNil(t, update.HasNewEntries)
require.True(t, *update.HasNewEntries)
// Start history download to cleanup pending transactions
_, err = helpers.CallPrivateMethodAndGetT[interface{}]("wallet_resetFilterSession", []interface{}{sessionID, 3})
require.NoError(t, err)
updatedRes, err := helpers.WaitForWalletEventsGetMap(td.eventQueue, []walletevent.EventType{activity.EventActivityFilteringDone}, 1*time.Second)
require.NoError(t, err)
require.Equal(t, activity.ErrorCodeSuccess, activity.ErrorCode(updatedRes[0].JsonData["errorCode"].(float64)))
activitiesList := updatedRes[0].JsonData["activities"].([]interface{})
require.Equal(t, 3, len(activitiesList))
firstActivity := activitiesList[0].(map[string]interface{})
isNew, found := firstActivity["isNew"]
require.True(t, found)
require.True(t, isNew.(bool))
}

View File

@ -7,11 +7,13 @@ import (
"testing"
"time"
eth "github.com/ethereum/go-ethereum/common"
"github.com/stretchr/testify/require"
"github.com/status-im/status-desktop/test/status-go/integration/helpers"
"github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/services/wallet/transfer"
"github.com/status-im/status-go/services/wallet/walletevent"
"github.com/status-im/status-go/transactions"
)
@ -23,21 +25,44 @@ func TestPendingTx_NotificationStatus(t *testing.T) {
sendTransaction(t, td)
// Start history download ...
_, err := helpers.CallPrivateMethod("wallet_checkRecentHistoryForChainIDs", []interface{}{[]uint64{5}, []types.Address{td.sender.Address, td.recipient.Address}})
require.NoError(t, err)
// ... and wait for the new transaction download to trigger deletion from pending_transactions
updatePayloads, err := helpers.WaitForWalletEvents[transactions.PendingTxUpdatePayload](
// Wait for transaction to be included in block
confirmationPayloads, err := helpers.WaitForWalletEventsGetMap(
td.eventQueue, []walletevent.EventType{
transactions.EventPendingTransactionUpdate,
transactions.EventPendingTransactionUpdate,
transactions.EventPendingTransactionStatusChanged,
},
60*time.Second,
)
require.NoError(t, err)
// Validate that we received both add and delete event
require.False(t, updatePayloads[0].Deleted)
require.True(t, updatePayloads[1].Deleted)
// Validate that we received update event
for _, payload := range confirmationPayloads {
if payload.EventName == transactions.EventPendingTransactionUpdate {
require.False(t, payload.JsonData["deleted"].(bool))
} else {
require.Equal(t, transactions.Success, payload.JsonData["status"].(transactions.TxStatus))
}
}
// Start history download ...
_, err = helpers.CallPrivateMethod("wallet_checkRecentHistoryForChainIDs", []interface{}{[]uint64{5}, []types.Address{td.sender.Address, td.recipient.Address}})
require.NoError(t, err)
downloadDoneFn := helpers.WaitForTxDownloaderToFinishForAccountsCondition(t, []eth.Address{eth.Address(td.sender.Address), eth.Address(td.recipient.Address)})
// ... and wait for the new transaction download to trigger deletion from pending_transactions
_, err = helpers.WaitForWalletEventsWithOptionals(
td.eventQueue,
[]walletevent.EventType{transfer.EventRecentHistoryReady},
60*time.Second,
func(e *walletevent.Event) bool {
if e.Type == transfer.EventFetchingHistoryError {
require.Fail(t, "History download failed")
return false
}
return downloadDoneFn(e)
},
[]walletevent.EventType{transfer.EventFetchingHistoryError},
)
require.NoError(t, err)
}

2
vendor/status-go vendored

@ -1 +1 @@
Subproject commit 1c42c077603dbd37cb10ebc5cccb755925fc812b
Subproject commit e9ff0fbefe11d57f03f0ef6cb7d30cb0b5ebe44e