diff --git a/services/wallet/activity/filter.go b/services/wallet/activity/filter.go index 3ef992036..0b7229c64 100644 --- a/services/wallet/activity/filter.go +++ b/services/wallet/activity/filter.go @@ -99,6 +99,18 @@ type Filter struct { FilterOutCollectibles bool `json:"filterOutCollectibles"` } +func (f *Filter) IsEmpty() bool { + return f.Period.StartTimestamp == NoLimitTimestampForPeriod && + f.Period.EndTimestamp == NoLimitTimestampForPeriod && + len(f.Types) == 0 && + len(f.Statuses) == 0 && + len(f.CounterpartyAddresses) == 0 && + len(f.Assets) == 0 && + len(f.Collectibles) == 0 && + !f.FilterOutAssets && + !f.FilterOutCollectibles +} + func GetRecipients(ctx context.Context, db *sql.DB, chainIDs []common.ChainID, addresses []eth.Address, offset int, limit int) (recipients []eth.Address, hasMore bool, err error) { filterAllAddresses := len(addresses) == 0 involvedAddresses := noEntriesInTmpTableSQLValues diff --git a/services/wallet/activity/session.go b/services/wallet/activity/session.go index db265f222..1eebc2fbd 100644 --- a/services/wallet/activity/session.go +++ b/services/wallet/activity/session.go @@ -42,6 +42,13 @@ func (e EntryIdentity) key() string { type SessionID int32 +// Session stores state related to a filter session +// The user happy flow is: +// 1. StartFilterSession to get a new SessionID and client be notified by the current state +// 2. GetMoreForFilterSession anytime to get more entries after the first page +// 3. UpdateFilterForSession to update the filter and get the new state or clean the filter and get the newer entries +// 4. ResetFilterSession in case client receives SessionUpdate with HasNewOnTop = true to get the latest state +// 5. StopFilterSession to stop the session when no used (user changed from activity screens or changed addresses and chains) type Session struct { id SessionID @@ -54,6 +61,8 @@ type Session struct { // model is a mirror of the data model presentation has (sent by EventActivityFilteringDone) model []EntryIdentity + // noFilterModel is a mirror of the data model presentation has when filter is empty + noFilterModel map[string]EntryIdentity // new holds the new entries until user requests update by calling ResetFilterSession new []EntryIdentity } @@ -105,11 +114,44 @@ func (s *Service) internalFilter(f fullFilterParams, offset int, count int, proc }) } +// mirrorIdentities for update use +func mirrorIdentities(entries []Entry) []EntryIdentity { + model := make([]EntryIdentity, 0, len(entries)) + for _, a := range entries { + model = append(model, EntryIdentity{ + payloadType: a.payloadType, + transaction: a.transaction, + id: a.id, + }) + } + return model +} + +func (s *Service) internalFilterForSession(session *Session, firstPageCount int) { + s.internalFilter( + fullFilterParams{ + sessionID: session.id, + addresses: session.addresses, + allAddresses: session.allAddresses, + chainIDs: session.chainIDs, + filter: session.filter, + }, + 0, + firstPageCount, + func(entries []Entry) (offset int) { + s.sessionsRWMutex.Lock() + defer s.sessionsRWMutex.Unlock() + + session.model = mirrorIdentities(entries) + + return 0 + }, + ) +} + func (s *Service) StartFilterSession(addresses []eth.Address, allAddresses bool, chainIDs []common.ChainID, filter Filter, firstPageCount int) SessionID { sessionID := s.nextSessionID() - s.sessionsRWMutex.Lock() - subscribeToEvents := len(s.sessions) == 0 session := &Session{ id: sessionID, @@ -120,6 +162,10 @@ func (s *Service) StartFilterSession(addresses []eth.Address, allAddresses bool, model: make([]EntryIdentity, 0, firstPageCount), } + + s.sessionsRWMutex.Lock() + subscribeToEvents := len(s.sessions) == 0 + s.sessions[sessionID] = session if subscribeToEvents { @@ -127,36 +173,81 @@ func (s *Service) StartFilterSession(addresses []eth.Address, allAddresses bool, } s.sessionsRWMutex.Unlock() - s.internalFilter( - fullFilterParams{ - sessionID: sessionID, - addresses: addresses, - allAddresses: allAddresses, - chainIDs: chainIDs, - filter: filter, - }, - 0, - firstPageCount, - func(entries []Entry) (offset int) { - // Mirror identities for update use - s.sessionsRWMutex.Lock() - defer s.sessionsRWMutex.Unlock() - - session.model = make([]EntryIdentity, 0, len(entries)) - for _, a := range entries { - session.model = append(session.model, EntryIdentity{ - payloadType: a.payloadType, - transaction: a.transaction, - id: a.id, - }) - } - return 0 - }, - ) + s.internalFilterForSession(session, firstPageCount) return sessionID } +// UpdateFilterForSession is to be called for updating the filter of a specific session +// After calling this method to set a filter all the incoming changes will be reported with +// Entry.isNew = true when filter is reset to empty +func (s *Service) UpdateFilterForSession(id SessionID, filter Filter, firstPageCount int) error { + s.sessionsRWMutex.RLock() + session, found := s.sessions[id] + if !found { + s.sessionsRWMutex.RUnlock() + return errors.New("session not found") + } + + prevEmpty := session.filter.IsEmpty() + newEmpty := filter.IsEmpty() + s.sessionsRWMutex.RUnlock() + + s.sessionsRWMutex.Lock() + + session.new = nil + + session.filter = filter + + if prevEmpty && !newEmpty { + // Session is moving from empty to non-empty filter + // Take a snapshot of the current model + session.noFilterModel = entryIdsToMap(session.model) + + session.model = make([]EntryIdentity, 0, firstPageCount) + + // In this case there is nothing to flag so we request the first page + s.internalFilterForSession(session, firstPageCount) + } else if !prevEmpty && newEmpty { + // Session is moving from non-empty to empty filter + // In this case we need to flag all the new entries that are not in the noFilterModel + s.internalFilter( + fullFilterParams{ + sessionID: session.id, + addresses: session.addresses, + allAddresses: session.allAddresses, + chainIDs: session.chainIDs, + filter: session.filter, + }, + 0, + firstPageCount, + func(entries []Entry) (offset int) { + s.sessionsRWMutex.Lock() + defer s.sessionsRWMutex.Unlock() + + // Mark new entries + for i, a := range entries { + _, found := session.noFilterModel[a.getIdentity().key()] + entries[i].isNew = !found + } + + // Mirror identities for update use + session.model = mirrorIdentities(entries) + session.noFilterModel = nil + return 0 + }, + ) + } else { + // Else act as a normal filter update + s.internalFilterForSession(session, firstPageCount) + } + s.sessionsRWMutex.Unlock() + + return nil +} + +// ResetFilterSession is to be called when SessionUpdate.HasNewOnTop == true to +// update client with the latest state including new on top entries func (s *Service) ResetFilterSession(id SessionID, firstPageCount int) error { session, found := s.sessions[id] if !found { @@ -186,14 +277,8 @@ func (s *Service) ResetFilterSession(id SessionID, firstPageCount int) error { session.new = nil // Mirror client identities for checking updates - session.model = make([]EntryIdentity, 0, len(entries)) - for _, a := range entries { - session.model = append(session.model, EntryIdentity{ - payloadType: a.payloadType, - transaction: a.transaction, - id: a.id, - }) - } + session.model = mirrorIdentities(entries) + return 0 }, ) diff --git a/services/wallet/api.go b/services/wallet/api.go index 1a073c232..934082d88 100644 --- a/services/wallet/api.go +++ b/services/wallet/api.go @@ -586,6 +586,7 @@ func (api *API) FetchAllCurrencyFormats() (currency.FormatPerSymbol, error) { return api.s.currency.FetchAllCurrencyFormats() } +// @deprecated replaced by session APIs; see #12120 func (api *API) FilterActivityAsync(requestID int32, addresses []common.Address, allAddresses bool, chainIDs []wcommon.ChainID, filter activity.Filter, offset int, limit int) error { log.Debug("wallet.api.FilterActivityAsync", "requestID", requestID, "addr.count", len(addresses), "allAddresses", allAddresses, "chainIDs.count", len(chainIDs), "offset", offset, "limit", limit) @@ -593,6 +594,7 @@ func (api *API) FilterActivityAsync(requestID int32, addresses []common.Address, return nil } +// @deprecated replaced by session APIs; see #12120 func (api *API) CancelActivityFilterTask(requestID int32) error { log.Debug("wallet.api.CancelActivityFilterTask", "requestID", requestID) @@ -606,6 +608,12 @@ func (api *API) StartActivityFilterSession(addresses []common.Address, allAddres return api.s.activity.StartFilterSession(addresses, allAddresses, chainIDs, filter, firstPageCount), nil } +func (api *API) UpdateActivityFilterForSession(sessionID activity.SessionID, filter activity.Filter, firstPageCount int) error { + log.Debug("wallet.api.UpdateActivityFilterForSession", "sessionID", sessionID, "firstPageCount", firstPageCount) + + return api.s.activity.UpdateFilterForSession(sessionID, filter, firstPageCount) +} + func (api *API) ResetActivityFilterSession(id activity.SessionID, firstPageCount int) error { log.Debug("wallet.api.ResetActivityFilterSession", "id", id, "firstPageCount", firstPageCount)