From ca973b4aa6aaf39c9684fa5f3113fffc4bd9a3a0 Mon Sep 17 00:00:00 2001 From: Stefan Date: Thu, 25 Jan 2024 23:31:18 -0500 Subject: [PATCH] feat(wallet) use sql activity filter for incremental updates Switch from the prototype of duplicating the SQL filter as a runtime and keeping them in sync on each event that might invalidate the current filtered entries to a simpler approach of requesting the filter again and doing the diff to detect the new changes. Also add a new reset API to model the new entries design requirements. The new approach shows less corner-case to handle and follows one source of truth concept making debugging and future maintenance easier. Other changes - Fix pending mocking to work with multiple calls - Refactor tests to account for the new changes Updates status-desktop #12120 --- services/wallet/activity/activity.go | 18 ++ services/wallet/activity/service.go | 2 +- services/wallet/activity/service_test.go | 241 +++++++++++---- services/wallet/activity/session.go | 365 +++++++++++++++-------- services/wallet/activity/session_test.go | 107 +++++++ services/wallet/api.go | 6 + services/wallet/transfer/testutils.go | 9 + transactions/pendingtxtracker_test.go | 10 +- transactions/testhelpers.go | 88 +++--- 9 files changed, 616 insertions(+), 230 deletions(-) create mode 100644 services/wallet/activity/session_test.go diff --git a/services/wallet/activity/activity.go b/services/wallet/activity/activity.go index 7d1bd6fbc..af2cae615 100644 --- a/services/wallet/activity/activity.go +++ b/services/wallet/activity/activity.go @@ -68,6 +68,8 @@ type Entry struct { chainIDIn *common.ChainID transferType *TransferType contractAddress *eth.Address + + isNew bool } // Only used for JSON marshalling @@ -91,6 +93,8 @@ type EntryData struct { TransferType *TransferType `json:"transferType,omitempty"` ContractAddress *eth.Address `json:"contractAddress,omitempty"` + IsNew *bool `json:"isNew,omitempty"` + NftName *string `json:"nftName,omitempty"` NftURL *string `json:"nftUrl,omitempty"` } @@ -121,6 +125,9 @@ func (e *Entry) MarshalJSON() ([]byte, error) { } data.PayloadType = e.payloadType + if e.isNew { + data.IsNew = &e.isNew + } return json.Marshal(data) } @@ -155,6 +162,9 @@ func (e *Entry) UnmarshalJSON(data []byte) error { e.chainIDOut = aux.ChainIDOut e.chainIDIn = aux.ChainIDIn e.transferType = aux.TransferType + + e.isNew = aux.IsNew != nil && *aux.IsNew + return nil } @@ -231,6 +241,14 @@ func (e *Entry) anyIdentity() *thirdparty.CollectibleUniqueID { return nil } +func (e *Entry) getIdentity() EntryIdentity { + return EntryIdentity{ + payloadType: e.payloadType, + id: e.id, + transaction: e.transaction, + } +} + func multiTransactionTypeToActivityType(mtType transfer.MultiTransactionType) Type { if mtType == transfer.MultiTransactionSend { return SendAT diff --git a/services/wallet/activity/service.go b/services/wallet/activity/service.go index 3639148cd..238094b5e 100644 --- a/services/wallet/activity/service.go +++ b/services/wallet/activity/service.go @@ -376,7 +376,7 @@ func sendResponseEvent(eventFeed *event.Feed, requestID *int32, eventType wallet err = resErr } - requestIDStr := "nil" + requestIDStr := nilStr if requestID != nil { requestIDStr = strconv.Itoa(int(*requestID)) } diff --git a/services/wallet/activity/service_test.go b/services/wallet/activity/service_test.go index 74829b88f..27a532274 100644 --- a/services/wallet/activity/service_test.go +++ b/services/wallet/activity/service_test.go @@ -58,26 +58,37 @@ func (m *mockTokenManager) LookupToken(chainID *uint64, tokenSymbol string) (tkn return args.Get(0).(*token.Token), args.Bool(1) } -func setupTestService(tb testing.TB) (service *Service, eventFeed *event.Feed, tokenMock *mockTokenManager, collectiblesMock *mockCollectiblesManager, close func(), pendingTracker *transactions.PendingTxTracker, chainClient *transactions.MockChainClient) { +type testState struct { + service *Service + eventFeed *event.Feed + tokenMock *mockTokenManager + collectiblesMock *mockCollectiblesManager + close func() + pendingTracker *transactions.PendingTxTracker + chainClient *transactions.MockChainClient +} + +func setupTestService(tb testing.TB) (state testState) { db, err := helpers.SetupTestMemorySQLDB(walletdatabase.DbInitializer{}) require.NoError(tb, err) - eventFeed = new(event.Feed) - tokenMock = &mockTokenManager{} - collectiblesMock = &mockCollectiblesManager{} + state.eventFeed = new(event.Feed) + state.tokenMock = &mockTokenManager{} + state.collectiblesMock = &mockCollectiblesManager{} - chainClient = transactions.NewMockChainClient() + state.chainClient = transactions.NewMockChainClient() // Ensure we process pending transactions as needed, only once pendingCheckInterval := time.Second - pendingTracker = transactions.NewPendingTxTracker(db, chainClient, nil, eventFeed, pendingCheckInterval) + state.pendingTracker = transactions.NewPendingTxTracker(db, state.chainClient, nil, state.eventFeed, pendingCheckInterval) - service = NewService(db, tokenMock, collectiblesMock, eventFeed, pendingTracker) - - return service, eventFeed, tokenMock, collectiblesMock, func() { - require.NoError(tb, pendingTracker.Stop()) + state.service = NewService(db, state.tokenMock, state.collectiblesMock, state.eventFeed, state.pendingTracker) + state.close = func() { + require.NoError(tb, state.pendingTracker.Stop()) require.NoError(tb, db.Close()) - }, pendingTracker, chainClient + } + + return state } type arg struct { @@ -110,8 +121,8 @@ func insertStubTransfersWithCollectibles(t *testing.T, db *sql.DB, args []arg) ( } func TestService_UpdateCollectibleInfo(t *testing.T) { - s, e, tM, c, close, _, _ := setupTestService(t) - defer close() + state := setupTestService(t) + defer state.close() args := []arg{ {5, "0xA2838FDA19EB6EED3F8B9EFF411D4CD7D2DE0313", "0x0D", nil, nil}, @@ -119,20 +130,20 @@ func TestService_UpdateCollectibleInfo(t *testing.T) { {5, "0x3d6afaa395c31fcd391fe3d562e75fe9e8ec7e6a", "", nil, nil}, {5, "0xA2838FDA19EB6EED3F8B9EFF411D4CD7D2DE0313", "0x0F", nil, nil}, } - fromAddresses, toAddresses := insertStubTransfersWithCollectibles(t, s.db, args) + fromAddresses, toAddresses := insertStubTransfersWithCollectibles(t, state.service.db, args) ch := make(chan walletevent.Event) - sub := e.Subscribe(ch) + sub := state.eventFeed.Subscribe(ch) // Expect one call for the fungible token - tM.On("LookupTokenIdentity", uint64(5), eth.HexToAddress("0x3d6afaa395c31fcd391fe3d562e75fe9e8ec7e6a"), false).Return( + state.tokenMock.On("LookupTokenIdentity", uint64(5), eth.HexToAddress("0x3d6afaa395c31fcd391fe3d562e75fe9e8ec7e6a"), false).Return( &token.Token{ ChainID: 5, Address: eth.HexToAddress("0x3d6afaa395c31fcd391fe3d562e75fe9e8ec7e6a"), Symbol: "STT", }, false, ).Once() - c.On("FetchAssetsByCollectibleUniqueID", []thirdparty.CollectibleUniqueID{ + state.collectiblesMock.On("FetchAssetsByCollectibleUniqueID", []thirdparty.CollectibleUniqueID{ { ContractID: thirdparty.ContractID{ ChainID: args[3].chainID, @@ -158,7 +169,7 @@ func TestService_UpdateCollectibleInfo(t *testing.T) { }, }, nil).Once() - s.FilterActivityAsync(0, append(fromAddresses, toAddresses...), true, allNetworksFilter(), Filter{}, 0, 3) + state.service.FilterActivityAsync(0, append(fromAddresses, toAddresses...), true, allNetworksFilter(), Filter{}, 0, 3) filterResponseCount := 0 var updates []EntryData @@ -194,8 +205,8 @@ func TestService_UpdateCollectibleInfo(t *testing.T) { } func TestService_UpdateCollectibleInfo_Error(t *testing.T) { - s, e, _, c, close, _, _ := setupTestService(t) - defer close() + state := setupTestService(t) + defer state.close() args := []arg{ {5, "0xA2838FDA19EB6EED3F8B9EFF411D4CD7D2DE0313", "0x762AD3E4934E687F8701F24C7274E5209213FD6208FF952ACEB325D028866949", nil, nil}, @@ -203,13 +214,13 @@ func TestService_UpdateCollectibleInfo_Error(t *testing.T) { } ch := make(chan walletevent.Event, 4) - sub := e.Subscribe(ch) + sub := state.eventFeed.Subscribe(ch) - fromAddresses, toAddresses := insertStubTransfersWithCollectibles(t, s.db, args) + fromAddresses, toAddresses := insertStubTransfersWithCollectibles(t, state.service.db, args) - c.On("FetchAssetsByCollectibleUniqueID", mock.Anything).Return(nil, thirdparty.ErrChainIDNotSupported).Once() + state.collectiblesMock.On("FetchAssetsByCollectibleUniqueID", mock.Anything).Return(nil, thirdparty.ErrChainIDNotSupported).Once() - s.FilterActivityAsync(0, append(fromAddresses, toAddresses...), true, allNetworksFilter(), Filter{}, 0, 5) + state.service.FilterActivityAsync(0, append(fromAddresses, toAddresses...), true, allNetworksFilter(), Filter{}, 0, 5) filterResponseCount := 0 updatesCount := 0 @@ -239,35 +250,65 @@ func TestService_UpdateCollectibleInfo_Error(t *testing.T) { sub.Unsubscribe() } -func TestService_IncrementalFilterUpdate(t *testing.T) { - s, e, tM, _, close, pTx, chainClient := setupTestService(t) - defer close() +func setupTransactions(t *testing.T, state testState, txCount int, testTxs []transactions.TestTxSummary) (allAddresses []eth.Address, pendings []transactions.PendingTransaction, ch chan walletevent.Event, cleanup func()) { + ch = make(chan walletevent.Event, 4) + sub := state.eventFeed.Subscribe(ch) - ch := make(chan walletevent.Event, 4) - sub := e.Subscribe(ch) - defer sub.Unsubscribe() + pendings = transactions.MockTestTransactions(t, state.chainClient, testTxs) - txs, fromTrs, toTrs := transfer.GenerateTestTransfers(t, s.db, 0, 3) - transfer.InsertTestTransfer(t, s.db, txs[0].To, &txs[0]) - transfer.InsertTestTransfer(t, s.db, txs[2].To, &txs[2]) + txs, fromTrs, toTrs := transfer.GenerateTestTransfers(t, state.service.db, len(pendings), txCount) + for i := range txs { + transfer.InsertTestTransfer(t, state.service.db, txs[i].To, &txs[i]) + } - allAddresses := append(fromTrs, toTrs...) - - tM.On("LookupTokenIdentity", mock.Anything, eth.HexToAddress("0x0"), true).Return( + allAddresses = append(append(fromTrs, toTrs...), pendings[0].From, pendings[0].To) + state.tokenMock.On("LookupTokenIdentity", mock.Anything, mock.Anything, mock.Anything).Return( &token.Token{ ChainID: 5, - Address: eth.HexToAddress("0x0"), + Address: eth.Address{}, Symbol: "ETH", - }, false, - ).Times(2) + }, true, + ).Times(0) - sessionID := s.StartFilterSession(allAddresses, true, allNetworksFilter(), Filter{}, 5) - require.Greater(t, sessionID, SessionID(0)) - defer s.StopFilterSession(sessionID) + state.tokenMock.On("LookupToken", mock.Anything, mock.Anything).Return( + &token.Token{ + ChainID: 5, + Address: eth.Address{}, + Symbol: "ETH", + }, true, + ).Times(0) - var filterResponseCount int + return allAddresses, pendings, ch, func() { + sub.Unsubscribe() + } +} - for i := 0; i < 1; i++ { +func validateSessionUpdateEvent(t *testing.T, ch chan walletevent.Event, filterResponseCount *int) (pendingTransactionUpdate, sessionUpdatesCount int) { + for sessionUpdatesCount < 1 { + select { + case res := <-ch: + switch res.Type { + case transactions.EventPendingTransactionUpdate: + pendingTransactionUpdate++ + case EventActivitySessionUpdated: + var payload SessionUpdate + err := json.Unmarshal([]byte(res.Message), &payload) + require.NoError(t, err) + require.NotNil(t, payload.HasNewEntries) + require.True(t, *payload.HasNewEntries) + sessionUpdatesCount++ + case EventActivityFilteringDone: + (*filterResponseCount)++ + } + case <-time.NewTimer(1 * time.Second).C: + require.Fail(t, "timeout while waiting for EventActivitySessionUpdated") + } + } + return +} + +func validateSessionUpdateEventWithPending(t *testing.T, ch chan walletevent.Event) (filterResponseCount int) { + for filterResponseCount < 1 { select { case res := <-ch: switch res.Type { @@ -283,30 +324,49 @@ func TestService_IncrementalFilterUpdate(t *testing.T) { require.Fail(t, "timeout while waiting for EventActivityFilteringDone") } } + return +} - pendings := transactions.MockTestTransactions(t, chainClient, []transactions.TestTxSummary{{}}) +func TestService_IncrementalUpdateOnTop(t *testing.T) { + state := setupTestService(t) + defer state.close() - err := pTx.StoreAndTrackPendingTx(&pendings[0]) + allAddresses, pendings, ch, cleanup := setupTransactions(t, state, 2, []transactions.TestTxSummary{{DontConfirm: true, Timestamp: 3}}) + defer cleanup() + + sessionID := state.service.StartFilterSession(allAddresses, true, allNetworksFilter(), Filter{}, 5) + require.Greater(t, sessionID, SessionID(0)) + defer state.service.StopFilterSession(sessionID) + + filterResponseCount := validateSessionUpdateEventWithPending(t, ch) + + exp := pendings[0] + err := state.pendingTracker.StoreAndTrackPendingTx(&exp) require.NoError(t, err) - pendingTransactionUpdate, sessionUpdatesCount := 0, 0 - // Validate the session update event - for sessionUpdatesCount < 1 { + pendingTransactionUpdate, sessionUpdatesCount := validateSessionUpdateEvent(t, ch, &filterResponseCount) + + err = state.service.ResetFilterSession(sessionID, 5) + require.NoError(t, err) + + // Validate the reset data + eventActivityDoneCount := 0 + for eventActivityDoneCount < 1 { select { case res := <-ch: switch res.Type { - case transactions.EventPendingTransactionUpdate: - pendingTransactionUpdate++ - case EventActivitySessionUpdated: - var payload SessionUpdate + case EventActivityFilteringDone: + var payload FilterResponse err := json.Unmarshal([]byte(res.Message), &payload) require.NoError(t, err) - require.Equal(t, 1, len(payload.NewEntries)) - tx := payload.NewEntries[0] - exp := pendings[0] - // TODO #12120: this should be a multi-transaction - // require.Equal(t, exp.MultiTransactionID, tx.id) + require.Equal(t, ErrorCodeSuccess, payload.ErrorCode) + require.Equal(t, 3, len(payload.Activities)) + require.True(t, payload.Activities[0].isNew) + require.False(t, payload.Activities[1].isNew) + require.False(t, payload.Activities[2].isNew) + + tx := payload.Activities[0] require.Equal(t, PendingTransactionPT, tx.payloadType) // We don't keep type in the DB require.Equal(t, (*int)(nil), tx.transferType) @@ -316,28 +376,79 @@ func TestService_IncrementalFilterUpdate(t *testing.T) { require.Equal(t, exp.ChainID, *tx.chainIDOut) require.Equal(t, (*common.ChainID)(nil), tx.chainIDIn) require.Equal(t, exp.Hash, tx.transaction.Hash) - require.Equal(t, exp.From, tx.transaction.Address) + // Pending doesn't have address as part of identity + require.Equal(t, eth.Address{}, tx.transaction.Address) require.Equal(t, exp.From, *tx.sender) require.Equal(t, exp.To, *tx.recipient) require.Equal(t, 0, exp.Value.Int.Cmp((*big.Int)(tx.amountOut))) require.Equal(t, exp.Timestamp, uint64(tx.timestamp)) require.Equal(t, exp.Symbol, *tx.symbolOut) require.Equal(t, (*string)(nil), tx.symbolIn) - require.Equal(t, (*Token)(nil), tx.tokenOut) + require.Equal(t, &Token{ + TokenType: Native, + ChainID: 5, + }, tx.tokenOut) require.Equal(t, (*Token)(nil), tx.tokenIn) require.Equal(t, (*eth.Address)(nil), tx.contractAddress) - - sessionUpdatesCount++ - case EventActivityFilteringDone: - filterResponseCount++ + eventActivityDoneCount++ } case <-time.NewTimer(1 * time.Second).C: require.Fail(t, "timeout while waiting for EventActivitySessionUpdated") } } - // Don't wait for deletion require.Equal(t, 1, pendingTransactionUpdate) require.Equal(t, 1, filterResponseCount) require.Equal(t, 1, sessionUpdatesCount) + require.Equal(t, 1, eventActivityDoneCount) +} + +func TestService_IncrementalUpdateFetchWindowRegression(t *testing.T) { + state := setupTestService(t) + defer state.close() + + allAddresses, pendings, ch, cleanup := setupTransactions(t, state, 3, []transactions.TestTxSummary{{DontConfirm: true, Timestamp: 4}}) + defer cleanup() + + sessionID := state.service.StartFilterSession(allAddresses, true, allNetworksFilter(), Filter{}, 2) + require.Greater(t, sessionID, SessionID(0)) + defer state.service.StopFilterSession(sessionID) + + filterResponseCount := validateSessionUpdateEventWithPending(t, ch) + + exp := pendings[0] + err := state.pendingTracker.StoreAndTrackPendingTx(&exp) + require.NoError(t, err) + + pendingTransactionUpdate, sessionUpdatesCount := validateSessionUpdateEvent(t, ch, &filterResponseCount) + + err = state.service.ResetFilterSession(sessionID, 2) + require.NoError(t, err) + + // Validate the reset data + eventActivityDoneCount := 0 + for eventActivityDoneCount < 1 { + select { + case res := <-ch: + switch res.Type { + case EventActivityFilteringDone: + var payload FilterResponse + err := json.Unmarshal([]byte(res.Message), &payload) + require.NoError(t, err) + require.Equal(t, ErrorCodeSuccess, payload.ErrorCode) + require.Equal(t, 2, len(payload.Activities)) + + require.True(t, payload.Activities[0].isNew) + require.False(t, payload.Activities[1].isNew) + eventActivityDoneCount++ + } + case <-time.NewTimer(1 * time.Second).C: + require.Fail(t, "timeout while waiting for EventActivitySessionUpdated") + } + } + + require.Equal(t, 1, pendingTransactionUpdate) + require.Equal(t, 1, filterResponseCount) + require.Equal(t, 1, sessionUpdatesCount) + require.Equal(t, 1, eventActivityDoneCount) } diff --git a/services/wallet/activity/session.go b/services/wallet/activity/session.go index 6e67becba..a47fda36b 100644 --- a/services/wallet/activity/session.go +++ b/services/wallet/activity/session.go @@ -2,13 +2,12 @@ package activity import ( "context" - "encoding/json" "errors" + "strconv" "golang.org/x/exp/slices" eth "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" "github.com/status-im/status-go/services/wallet/async" @@ -18,12 +17,28 @@ import ( "github.com/status-im/status-go/transactions" ) +const nilStr = "nil" + type EntryIdentity struct { payloadType PayloadType transaction *transfer.TransactionIdentity id transfer.MultiTransactionIDType } +// func (e EntryIdentity) same(a EntryIdentity) bool { +// return a.payloadType == e.payloadType && (a.transaction == e.transaction && (a.transaction == nil || (a.transaction.ChainID == e.transaction.ChainID && +// a.transaction.Hash == e.transaction.Hash && +// a.transaction.Address == e.transaction.Address))) && a.id == e.id +// } + +func (e EntryIdentity) key() string { + txID := nilStr + if e.transaction != nil { + txID = strconv.FormatUint(uint64(e.transaction.ChainID), 10) + e.transaction.Hash.Hex() + e.transaction.Address.Hex() + } + return strconv.Itoa(e.payloadType) + txID + strconv.FormatInt(int64(e.id), 16) +} + type SessionID int32 type Session struct { @@ -36,16 +51,17 @@ type Session struct { chainIDs []common.ChainID filter Filter - // model is a mirror of the data model presentation has (EventActivityFilteringDone) + // model is a mirror of the data model presentation has (sent by EventActivityFilteringDone) model []EntryIdentity + // new holds the new entries until user requests update + new []EntryIdentity } // SessionUpdate payload for EventActivitySessionUpdated type SessionUpdate struct { - // TODO #12120: add index for each entry, now all new are first entries - NewEntries []Entry `json:"newEntries,omitempty"` - Removed []EntryIdentity `json:"removed,omitempty"` - Updated []Entry `json:"updated,omitempty"` + HasNewEntries *bool `json:"hasNewEntries,omitempty"` + Removed []EntryIdentity `json:"removed,omitempty"` + Updated []Entry `json:"updated,omitempty"` } type fullFilterParams struct { @@ -99,7 +115,7 @@ func (s *Service) StartFilterSession(addresses []eth.Address, allAddresses bool, s.sessionsRWMutex.Lock() subscribeToEvents := len(s.sessions) == 0 - s.sessions[sessionID] = &Session{ + session := &Session{ id: sessionID, addresses: addresses, @@ -109,6 +125,8 @@ func (s *Service) StartFilterSession(addresses []eth.Address, allAddresses bool, model: make([]EntryIdentity, 0, firstPageCount), } + s.sessions[sessionID] = session + if subscribeToEvents { s.subscribeToEvents() } @@ -124,22 +142,57 @@ func (s *Service) StartFilterSession(addresses []eth.Address, allAddresses bool, // Mirror identities for update use s.sessionsRWMutex.Lock() defer s.sessionsRWMutex.Unlock() - session, ok := s.sessions[sessionID] - if ok { - session.model = make([]EntryIdentity, 0, len(entries)) - for _, a := range entries { - session.model = append(session.model, EntryIdentity{ - payloadType: a.payloadType, - transaction: a.transaction, - id: a.id, - }) - } + + session.model = make([]EntryIdentity, 0, len(entries)) + for _, a := range entries { + session.model = append(session.model, EntryIdentity{ + payloadType: a.payloadType, + transaction: a.transaction, + id: a.id, + }) } }) return sessionID } +func (s *Service) ResetFilterSession(id SessionID, firstPageCount int) error { + session, found := s.sessions[id] + if !found { + return errors.New("session not found") + } + + s.internalFilter(fullFilterParams{ + sessionID: id, + addresses: session.addresses, + allAddresses: session.allAddresses, + chainIDs: session.chainIDs, + filter: session.filter, + }, 0, firstPageCount, func(entries []Entry) { + s.sessionsRWMutex.Lock() + defer s.sessionsRWMutex.Unlock() + + // Mark new entries + newMap := entryIdsToMap(session.new) + for i, a := range entries { + _, isNew := newMap[a.getIdentity().key()] + entries[i].isNew = isNew + } + session.new = nil + + // Mirror client identities for checking updates + session.model = make([]EntryIdentity, 0, len(entries)) + for _, a := range entries { + session.model = append(session.model, EntryIdentity{ + payloadType: a.payloadType, + transaction: a.transaction, + id: a.id, + }) + } + }) + return nil +} + // TODO #12120: extend the session based API //func (s *Service) GetMoreForFilterSession(count int) {} @@ -150,127 +203,137 @@ func (s *Service) subscribeToEvents() { go s.processEvents() } +// func (s *Service) processEvents() { +// for event := range s.ch { +// if event.Type == transactions.EventPendingTransactionUpdate { +// var p transactions.PendingTxUpdatePayload +// err := json.Unmarshal([]byte(event.Message), &p) +// if err != nil { +// log.Error("Error unmarshalling PendingTxUpdatePayload", "error", err) +// continue +// } + +// for id := range s.sessions { +// s.sessionsRWMutex.RLock() +// pTx, pass := s.checkFilterForPending(s.sessions[id], p.TxIdentity) +// if pass { +// s.sessionsRWMutex.RUnlock() +// s.sessionsRWMutex.Lock() +// addOnTop(s.sessions[id], p.TxIdentity) +// s.sessionsRWMutex.Unlock() +// // TODO #12120: can't send events from an event handler +// go notify(s.eventFeed, id, *pTx) +// } else { +// s.sessionsRWMutex.RUnlock() +// } +// } +// } +// } +// } + +// TODO #12120: check that it exits on channel close func (s *Service) processEvents() { for event := range s.ch { + // TODO #12120: process rest of the events + // TODO #12120: debounce for 1s if event.Type == transactions.EventPendingTransactionUpdate { - var p transactions.PendingTxUpdatePayload - err := json.Unmarshal([]byte(event.Message), &p) - if err != nil { - log.Error("Error unmarshalling PendingTxUpdatePayload", "error", err) - continue - } - - for id := range s.sessions { - s.sessionsRWMutex.RLock() - pTx, pass := s.checkFilterForPending(s.sessions[id], p.TxIdentity) - if pass { - s.sessionsRWMutex.RUnlock() - s.sessionsRWMutex.Lock() - addOnTop(s.sessions[id], p.TxIdentity) - s.sessionsRWMutex.Unlock() - // TODO #12120: can't send events from an event handler - go notify(s.eventFeed, id, *pTx) - } else { - s.sessionsRWMutex.RUnlock() + for sessionID := range s.sessions { + session := s.sessions[sessionID] + activities, err := getActivityEntries(context.Background(), s.getDeps(), session.addresses, session.allAddresses, session.chainIDs, session.filter, 0, len(session.model)) + if err != nil { + log.Error("Error getting activity entries", "error", err) + continue } + + s.sessionsRWMutex.RLock() + allData := append(session.model, session.new...) + new, _ /*removed*/ := findUpdates(allData, activities) + s.sessionsRWMutex.RUnlock() + + s.sessionsRWMutex.Lock() + lastProcessed := -1 + for i, idRes := range new { + if i-lastProcessed > 1 { + // The events are not continuous, therefore these are not on top but mixed between existing entries + break + } + lastProcessed = idRes.newPos + // TODO #12120: make it more generic to follow the detection function + // TODO #12120: hold the first few and only send mixed and removed + if session.new == nil { + session.new = make([]EntryIdentity, 0, len(new)) + } + session.new = append(session.new, idRes.id) + } + + // TODO #12120: mixed + + s.sessionsRWMutex.Unlock() + + go notify(s.eventFeed, sessionID, len(session.new) > 0) } } } } -// checkFilterForPending should be called with sessionsRWMutex locked for reading -func (s *Service) checkFilterForPending(session *Session, id transactions.TxIdentity) (tr *transactions.PendingTransaction, pass bool) { - allChains := len(session.chainIDs) == 0 - if !allChains { - _, found := slices.BinarySearch(session.chainIDs, id.ChainID) - if !found { - return nil, false - } - } +// // checkFilterForPending should be called with sessionsRWMutex locked for reading +// func (s *Service) checkFilterForPending(session *Session, id transactions.TxIdentity) (tr *transactions.PendingTransaction, pass bool) { +// allChains := len(session.chainIDs) == 0 +// if !allChains { +// _, found := slices.BinarySearch(session.chainIDs, id.ChainID) +// if !found { +// return nil, false +// } +// } - tr, err := s.pendingTracker.GetPendingEntry(id.ChainID, id.Hash) - if err != nil { - log.Error("Error getting pending entry", "error", err) - return nil, false - } +// tr, err := s.pendingTracker.GetPendingEntry(id.ChainID, id.Hash) +// if err != nil { +// log.Error("Error getting pending entry", "error", err) +// return nil, false +// } - if !session.allAddresses { - _, found := slices.BinarySearchFunc(session.addresses, tr.From, func(a eth.Address, b eth.Address) int { - // TODO #12120: optimize this - if a.Hex() < b.Hex() { - return -1 - } - if a.Hex() > b.Hex() { - return 1 - } - return 0 - }) - if !found { - return nil, false - } - } +// if !session.allAddresses { +// _, found := slices.BinarySearchFunc(session.addresses, tr.From, func(a eth.Address, b eth.Address) int { +// // TODO #12120: optimize this +// if a.Hex() < b.Hex() { +// return -1 +// } +// if a.Hex() > b.Hex() { +// return 1 +// } +// return 0 +// }) +// if !found { +// return nil, false +// } +// } - fl := session.filter - if fl.Period.StartTimestamp != NoLimitTimestampForPeriod || fl.Period.EndTimestamp != NoLimitTimestampForPeriod { - ts := int64(tr.Timestamp) - if ts < fl.Period.StartTimestamp || ts > fl.Period.EndTimestamp { - return nil, false - } - } +// fl := session.filter +// if fl.Period.StartTimestamp != NoLimitTimestampForPeriod || fl.Period.EndTimestamp != NoLimitTimestampForPeriod { +// ts := int64(tr.Timestamp) +// if ts < fl.Period.StartTimestamp || ts > fl.Period.EndTimestamp { +// return nil, false +// } +// } - // TODO #12120 check filter - // Types []Type `json:"types"` - // Statuses []Status `json:"statuses"` - // CounterpartyAddresses []eth.Address `json:"counterpartyAddresses"` +// // TODO #12120 check filter +// // Types []Type `json:"types"` +// // Statuses []Status `json:"statuses"` +// // CounterpartyAddresses []eth.Address `json:"counterpartyAddresses"` - // // Tokens - // Assets []Token `json:"assets"` - // Collectibles []Token `json:"collectibles"` - // FilterOutAssets bool `json:"filterOutAssets"` - // FilterOutCollectibles bool `json:"filterOutCollectibles"` +// // // Tokens +// // Assets []Token `json:"assets"` +// // Collectibles []Token `json:"collectibles"` +// // FilterOutAssets bool `json:"filterOutAssets"` +// // FilterOutCollectibles bool `json:"filterOutCollectibles"` - return tr, true -} +// return tr, true +// } -// addOnTop should be called with sessionsRWMutex locked for writing -func addOnTop(session *Session, id transactions.TxIdentity) { - session.model = append([]EntryIdentity{{ - payloadType: PendingTransactionPT, - transaction: &transfer.TransactionIdentity{ - ChainID: id.ChainID, - Hash: id.Hash, - }, - }}, session.model...) -} - -func notify(eventFeed *event.Feed, id SessionID, tx transactions.PendingTransaction) { - payload := SessionUpdate{ - NewEntries: []Entry{ - { - payloadType: PendingTransactionPT, - transaction: &transfer.TransactionIdentity{ - ChainID: tx.ChainID, - Hash: tx.Hash, - Address: tx.From, - }, - id: transfer.NoMultiTransactionID, - timestamp: int64(tx.Timestamp), - activityType: SendAT, - activityStatus: PendingAS, - amountOut: (*hexutil.Big)(tx.Value.Int), - amountIn: nil, - tokenOut: nil, - tokenIn: nil, - symbolOut: &tx.Symbol, - symbolIn: nil, - sender: &tx.From, - recipient: &tx.To, - chainIDOut: &tx.ChainID, - chainIDIn: nil, - transferType: nil, - contractAddress: nil, - }, - }, +func notify(eventFeed *event.Feed, id SessionID, hasNewEntries bool) { + payload := SessionUpdate{} + if hasNewEntries { + payload.HasNewEntries = &hasNewEntries } sendResponseEvent(eventFeed, (*int32)(&id), EventActivitySessionUpdated, payload, nil) @@ -312,3 +375,59 @@ func (s *Service) getActivityDetailsAsync(requestID int32, entries []Entry) { } }() } + +type mixedIdentityResult struct { + newPos int + id EntryIdentity +} + +func entryIdsToMap(ids []EntryIdentity) map[string]EntryIdentity { + idsMap := make(map[string]EntryIdentity, len(ids)) + for _, id := range ids { + idsMap[id.key()] = id + } + return idsMap +} + +func entriesToMap(entries []Entry) map[string]Entry { + entryMap := make(map[string]Entry, len(entries)) + for _, entry := range entries { + updatedIdentity := entry.getIdentity() + entryMap[updatedIdentity.key()] = entry + } + return entryMap +} + +// FindUpdates returns changes in updated entries compared to the identities +// +// expects identities and entries to be sorted by timestamp +// +// the returned newer are entries that are newer than the first identity +// the returned mixed are entries that are older than the first identity (sorted by timestamp) +// the returned removed are identities that are not present in the updated entries (sorted by timestamp) +// +// implementation assumes the order of each identity doesn't change from old state (identities) and new state (updated); we have either add or removed. +func findUpdates(identities []EntryIdentity, updated []Entry) (new []mixedIdentityResult, removed []EntryIdentity) { + + idsMap := entryIdsToMap(identities) + updatedMap := entriesToMap(updated) + + for newIndex, entry := range updated { + id := entry.getIdentity() + if _, found := idsMap[id.key()]; !found { + new = append(new, mixedIdentityResult{ + newPos: newIndex, + id: id, + }) + } + } + + // Account for new entries + for i := 0; i < len(identities); i++ { + id := identities[i] + if _, found := updatedMap[id.key()]; !found { + removed = append(removed, id) + } + } + return +} diff --git a/services/wallet/activity/session_test.go b/services/wallet/activity/session_test.go new file mode 100644 index 000000000..4ddc64f70 --- /dev/null +++ b/services/wallet/activity/session_test.go @@ -0,0 +1,107 @@ +package activity + +import ( + "reflect" + "testing" + + "github.com/ethereum/go-ethereum/common" + "github.com/status-im/status-go/services/wallet/transfer" +) + +// TODO #12120: cover missing cases +func TestFindUpdates(t *testing.T) { + txIds := []transfer.TransactionIdentity{ + transfer.TransactionIdentity{ + ChainID: 1, + Hash: common.HexToHash("0x1234"), + Address: common.HexToAddress("0x1234"), + }, + } + + type findUpdatesResult struct { + new []mixedIdentityResult + removed []EntryIdentity + } + + tests := []struct { + name string + identities []EntryIdentity + updated []Entry + want findUpdatesResult + }{ + { + name: "Empty to single MT update", + identities: []EntryIdentity{}, + updated: []Entry{ + {payloadType: MultiTransactionPT, id: 1}, + }, + want: findUpdatesResult{ + new: []mixedIdentityResult{{0, EntryIdentity{payloadType: MultiTransactionPT, id: 1}}}, + }, + }, + { + name: "No updates", + identities: []EntryIdentity{ + EntryIdentity{ + payloadType: SimpleTransactionPT, transaction: &txIds[0], + }, + }, + updated: []Entry{ + {payloadType: SimpleTransactionPT, transaction: &txIds[0]}, + }, + want: findUpdatesResult{}, + }, + { + name: "Empty to mixed updates", + identities: []EntryIdentity{}, + updated: []Entry{ + {payloadType: MultiTransactionPT, id: 1}, + {payloadType: PendingTransactionPT, transaction: &txIds[0]}, + }, + want: findUpdatesResult{ + new: []mixedIdentityResult{{0, EntryIdentity{payloadType: MultiTransactionPT, id: 1}}, + {1, EntryIdentity{payloadType: PendingTransactionPT, transaction: &txIds[0]}}, + }, + }, + }, + { + name: "Add one on top of one", + identities: []EntryIdentity{ + EntryIdentity{ + payloadType: MultiTransactionPT, id: 1, + }, + }, + updated: []Entry{ + {payloadType: PendingTransactionPT, transaction: &txIds[0]}, + {payloadType: MultiTransactionPT, id: 1}, + }, + want: findUpdatesResult{ + new: []mixedIdentityResult{{0, EntryIdentity{payloadType: PendingTransactionPT, transaction: &txIds[0]}}}, + }, + }, + { + name: "Add one on top keep window", + identities: []EntryIdentity{ + EntryIdentity{payloadType: MultiTransactionPT, id: 1}, + EntryIdentity{payloadType: PendingTransactionPT, transaction: &txIds[0]}, + }, + updated: []Entry{ + {payloadType: MultiTransactionPT, id: 2}, + {payloadType: MultiTransactionPT, id: 1}, + }, + want: findUpdatesResult{ + new: []mixedIdentityResult{{0, EntryIdentity{payloadType: MultiTransactionPT, id: 2}}}, + removed: []EntryIdentity{EntryIdentity{payloadType: PendingTransactionPT, transaction: &txIds[0]}}, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + gotNew, gotRemoved := findUpdates(tt.identities, tt.updated) + if !reflect.DeepEqual(gotNew, tt.want.new) || !reflect.DeepEqual(gotRemoved, tt.want.removed) { + t.Errorf("findUpdates() = %v, %v, want %v, %v", gotNew, gotRemoved, tt.want.new, tt.want.removed) + } + }) + } +} diff --git a/services/wallet/api.go b/services/wallet/api.go index 9df26a271..9ed04515a 100644 --- a/services/wallet/api.go +++ b/services/wallet/api.go @@ -602,6 +602,12 @@ func (api *API) StartActivityFilterSession(addresses []common.Address, allAddres return api.s.activity.StartFilterSession(addresses, allAddresses, chainIDs, filter, firstPageCount), nil } +func (api *API) ResetFilterSession(id activity.SessionID, firstPageCount int) error { + log.Debug("wallet.api.ResetFilterSession", "id", id, "firstPageCount", firstPageCount) + + return api.s.activity.ResetFilterSession(id, firstPageCount) +} + func (api *API) StopActivityFilterSession(id activity.SessionID) { log.Debug("wallet.api.StopActivityFilterSession", "id", id) diff --git a/services/wallet/transfer/testutils.go b/services/wallet/transfer/testutils.go index c592b524a..a6f5819e0 100644 --- a/services/wallet/transfer/testutils.go +++ b/services/wallet/transfer/testutils.go @@ -241,6 +241,15 @@ var TestTokens = []*token.Token{ &EthMainnet, &EthGoerli, &EthOptimism, &UsdcMainnet, &UsdcGoerli, &UsdcOptimism, &SntMainnet, &DaiMainnet, &DaiGoerli, } +func LookupTokenIdentity(chainID uint64, address eth_common.Address, native bool) *token.Token { + for _, token := range TestTokens { + if token.ChainID == chainID && token.Address == address && token.IsNative() == native { + return token + } + } + return nil +} + var NativeTokenIndices = []int{0, 1, 2} func InsertTestTransfer(tb testing.TB, db *sql.DB, address eth_common.Address, tr *TestTransfer) { diff --git a/transactions/pendingtxtracker_test.go b/transactions/pendingtxtracker_test.go index aecaf8baa..84386eb1f 100644 --- a/transactions/pendingtxtracker_test.go +++ b/transactions/pendingtxtracker_test.go @@ -138,7 +138,7 @@ func TestPendingTxTracker_InterruptWatching(t *testing.T) { m, stop, chainClient, eventFeed := setupTestTransactionDB(t, nil) defer stop() - txs := GenerateTestPendingTransactions(2) + txs := GenerateTestPendingTransactions(0, 2) // Mock the first call to getTransactionByHash chainClient.SetAvailableClients([]common.ChainID{txs[0].ChainID}) @@ -259,7 +259,7 @@ func TestPendingTxTracker_MultipleClients(t *testing.T) { m, stop, chainClient, eventFeed := setupTestTransactionDB(t, nil) defer stop() - txs := GenerateTestPendingTransactions(2) + txs := GenerateTestPendingTransactions(0, 2) txs[1].ChainID++ // Mock the both clients to be available @@ -344,7 +344,7 @@ func TestPendingTxTracker_Watch(t *testing.T) { m, stop, chainClient, eventFeed := setupTestTransactionDB(t, nil) defer stop() - txs := GenerateTestPendingTransactions(2) + txs := GenerateTestPendingTransactions(0, 2) // Make the second already confirmed *txs[0].Status = Success @@ -428,7 +428,7 @@ func TestPendingTxTracker_Watch_StatusChangeIncrementally(t *testing.T) { m, stop, chainClient, eventFeed := setupTestTransactionDB(t, common.NewAndSet(1*time.Nanosecond)) defer stop() - txs := GenerateTestPendingTransactions(2) + txs := GenerateTestPendingTransactions(0, 2) var firsDoneWG sync.WaitGroup firsDoneWG.Add(1) @@ -544,7 +544,7 @@ func TestPendingTransactions(t *testing.T) { manager, stop, _, _ := setupTestTransactionDB(t, nil) defer stop() - tx := GenerateTestPendingTransactions(1)[0] + tx := GenerateTestPendingTransactions(0, 1)[0] rst, err := manager.GetAllPending() require.NoError(t, err) diff --git a/transactions/testhelpers.go b/transactions/testhelpers.go index bcc668644..f4481f95f 100644 --- a/transactions/testhelpers.go +++ b/transactions/testhelpers.go @@ -54,27 +54,27 @@ func (m *MockChainClient) AbstractEthClient(chainID common.ChainID) (chain.Batch return m.Clients[chainID], nil } -func GenerateTestPendingTransactions(count int) []PendingTransaction { +func GenerateTestPendingTransactions(start int, count int) []PendingTransaction { if count > 127 { panic("can't generate more than 127 distinct transactions") } txs := make([]PendingTransaction, count) - for i := 0; i < count; i++ { - // Avoid generating zero values hash and addresses - seed := i + 1 + for i := start; i < count; i++ { txs[i] = PendingTransaction{ - Hash: eth.Hash{byte(seed)}, - From: eth.Address{byte(seed)}, - To: eth.Address{byte(seed * 2)}, + Hash: eth.HexToHash(fmt.Sprintf("0x1%d", i)), + From: eth.HexToAddress(fmt.Sprintf("0x2%d", i)), + To: eth.HexToAddress(fmt.Sprintf("0x3%d", i)), Type: RegisterENS, AdditionalData: "someuser.stateofus.eth", - Value: bigint.BigInt{Int: big.NewInt(int64(seed))}, + Value: bigint.BigInt{Int: big.NewInt(int64(i))}, GasLimit: bigint.BigInt{Int: big.NewInt(21000)}, - GasPrice: bigint.BigInt{Int: big.NewInt(int64(seed))}, + GasPrice: bigint.BigInt{Int: big.NewInt(int64(i))}, ChainID: 777, Status: new(TxStatus), AutoDelete: new(bool), + Symbol: "ETH", + Timestamp: uint64(i), } *txs[i].Status = Pending // set to pending by default *txs[i].AutoDelete = true // set to true by default @@ -84,40 +84,56 @@ func GenerateTestPendingTransactions(count int) []PendingTransaction { type TestTxSummary struct { failStatus bool - dontConfirm bool + DontConfirm bool + // Timestamp will be used to mock the Timestamp if greater than 0 + Timestamp int } func MockTestTransactions(t *testing.T, chainClient *MockChainClient, testTxs []TestTxSummary) []PendingTransaction { - txs := GenerateTestPendingTransactions(len(testTxs)) + txs := GenerateTestPendingTransactions(0, len(testTxs)) - // Mock the first call to getTransactionByHash - chainClient.SetAvailableClients([]common.ChainID{txs[0].ChainID}) - cl := chainClient.Clients[txs[0].ChainID] - cl.On("BatchCallContext", mock.Anything, mock.MatchedBy(func(b []rpc.BatchElem) bool { - ok := len(b) == len(testTxs) - for i := range b { - ok = ok && b[i].Method == GetTransactionReceiptRPCName && b[i].Args[0] == txs[0].Hash + for txIdx := range txs { + tx := &txs[txIdx] + if testTxs[txIdx].Timestamp > 0 { + tx.Timestamp = uint64(testTxs[txIdx].Timestamp) } - return ok - })).Return(nil).Once().Run(func(args mock.Arguments) { - elems := args.Get(1).([]rpc.BatchElem) - for i := range elems { - receiptWrapper, ok := elems[i].Result.(*nullableReceipt) - require.True(t, ok) - require.NotNil(t, receiptWrapper) - // Simulate parsing of eth_getTransactionReceipt response - if !testTxs[i].dontConfirm { - status := types.ReceiptStatusSuccessful - if testTxs[i].failStatus { - status = types.ReceiptStatusFailed - } - receiptWrapper.Receipt = &types.Receipt{ - BlockNumber: new(big.Int).SetUint64(1), - Status: status, + // Mock the first call to getTransactionByHash + chainClient.SetAvailableClients([]common.ChainID{tx.ChainID}) + cl := chainClient.Clients[tx.ChainID] + call := cl.On("BatchCallContext", mock.Anything, mock.MatchedBy(func(b []rpc.BatchElem) bool { + ok := len(b) == len(testTxs) + for i := range b { + ok = ok && b[i].Method == GetTransactionReceiptRPCName && b[i].Args[0] == tx.Hash + } + return ok + })).Return(nil) + if testTxs[txIdx].DontConfirm { + call = call.Times(0) + } else { + call = call.Once() + } + + call.Run(func(args mock.Arguments) { + elems := args.Get(1).([]rpc.BatchElem) + for i := range elems { + receiptWrapper, ok := elems[i].Result.(*nullableReceipt) + require.True(t, ok) + require.NotNil(t, receiptWrapper) + // Simulate parsing of eth_getTransactionReceipt response + if !testTxs[i].DontConfirm { + status := types.ReceiptStatusSuccessful + if testTxs[i].failStatus { + status = types.ReceiptStatusFailed + } + + receiptWrapper.Receipt = &types.Receipt{ + BlockNumber: new(big.Int).SetUint64(1), + Status: status, + } } } - } - }) + }) + } return txs }