diff --git a/services/wallet/activity/TODO.md b/services/wallet/activity/TODO.md new file mode 100644 index 000000000..d1dc26ae8 --- /dev/null +++ b/services/wallet/activity/TODO.md @@ -0,0 +1,103 @@ +# Provide dynamic activity updates + +Task: https://github.com/status-im/status-desktop/issues/12120 + +## Intro + +In the current approach only static paginated filtering is possible because the filtering is done in SQL + +The updated requirements need to support dynamic updates of the current visualized filter + +## Plan + +- [ ] Required common (runtime/SQL) infrastructure + - [-] Refactor into a session based filter + - [-] Keep a mirror of identities for session + - [-] Capture events (new downloaded and pending first) + - [-] Have the simplest filter to handle new and updated and emit wallet event + - [ ] Handle update filter events in UX and alter the model (add/remove) +- [ ] Asses how the runtime filter grows in complexity/risk +- [ ] Quick prototype of SQL only filter if still make sense +- [ ] Refactor the async handling to fit the session based better (use channels and goroutine) + +## How to + +I see two ways: + +- Keep a **runtime** (go/nim) dynamic in memory filter that is in sync with the SQL filter and use the filter to process transactions updates and propagate to the current visualized model + - The filter will push changes to the in memory model based on the sorting and filtering criteria + - If the filter is completely in sync withe the SQL one, then the dynamic updates to the model should have the same content as fetched from scratch from the DB + - *Advantages* + - Less memory and performance requirements + - *Disadvantages* + - Two sources of truth for the filter + - With tests for each event this can be mitigated + - Complexity around the multi-transaction/sub-transaction relation + - If we miss doing equivalent changes in bot filters (SQL and runtime) the filter might not be in sync with the SQL one and have errors in update +- **Refresh SQL filter** on every transaction (or bulk) update to DB and compare with the current visualized filter to extract differences and push as change notifications + - This approach is more expensive in terms of memory and performance but will use only one source of truth implementation + - This way we know for sure that the updated model is in sync with a newly fetched one + - *Advantages* + - Less complexity and less risk to be out of sync with the SQL filter + - *Disadvantages* + - More memory and performance requirements + - The real improvement will be to do the postponed refactoring of the activity in DB + +## Requirements + +Expected filter states to be addressed + +- Filter is set +- No Filter +- Filter is cleared + - How about if only partially cleared? + +Expected dynamic events + +- **New transactions** + - Pending + - Downloaded (external) + - Multi-transactions? +- **Transaction changed state** + - Pending to confirmed (new transaction/removed transaction) + +Filter criteria + +- time interval: start-end +- activity type (send/receive/buy/swap/bridge/contract_deploy/mint) +- status (pending/failed/confirmed/finalized) +- addresses +- tokens +- multi-transaction filtering transaction + +## Implementation + +### SQL filter + +For new events + +- keep a mirror of identities on status-go side (optional session based) +- on update events fetch identities and check against the mirror if any is new +- for new entries send the notification with the transaction details +- keep pending changes (not added) + - remove entries that were processed for this session + +For update? + +- check if entry is in the mirror and propagate update event + +### Mirror filter + +For new events + +- keep a mirror of identities +- on update events pass them through the filter and if they pass send updates + - the filter checks criteria and available mirror interval to dismiss from mirror +- sub-transactions challenge + - TODO +- token challenges + - TODO + +For update? + +- check if entry is in the mirror and propagate update event \ No newline at end of file diff --git a/services/wallet/activity/activity.go b/services/wallet/activity/activity.go index f452ff3ef..7d1bd6fbc 100644 --- a/services/wallet/activity/activity.go +++ b/services/wallet/activity/activity.go @@ -70,6 +70,7 @@ type Entry struct { contractAddress *eth.Address } +// Only used for JSON marshalling type EntryData struct { PayloadType PayloadType `json:"payloadType"` Transaction *transfer.TransactionIdentity `json:"transaction,omitempty"` diff --git a/services/wallet/activity/service.go b/services/wallet/activity/service.go index 9bfab7b69..3639148cd 100644 --- a/services/wallet/activity/service.go +++ b/services/wallet/activity/service.go @@ -6,6 +6,8 @@ import ( "encoding/json" "errors" "strconv" + "sync" + "sync/atomic" "time" "github.com/ethereum/go-ethereum/common" @@ -18,15 +20,19 @@ import ( "github.com/status-im/status-go/services/wallet/thirdparty" "github.com/status-im/status-go/services/wallet/token" "github.com/status-im/status-go/services/wallet/walletevent" + "github.com/status-im/status-go/transactions" ) const ( - // FilterResponse json is sent as a message in the EventActivityFilteringDone event + // EventActivityFilteringDone contains a FilterResponse payload EventActivityFilteringDone walletevent.EventType = "wallet-activity-filtering-done" EventActivityFilteringUpdate walletevent.EventType = "wallet-activity-filtering-entries-updated" EventActivityGetRecipientsDone walletevent.EventType = "wallet-activity-get-recipients-result" EventActivityGetOldestTimestampDone walletevent.EventType = "wallet-activity-get-oldest-timestamp-result" EventActivityGetCollectibles walletevent.EventType = "wallet-activity-get-collectibles" + + // EventActivitySessionUpdated contains a SessionUpdate payload + EventActivitySessionUpdated walletevent.EventType = "wallet-activity-session-updated" ) var ( @@ -56,15 +62,33 @@ type Service struct { eventFeed *event.Feed scheduler *async.MultiClientScheduler + + sessions map[SessionID]*Session + lastSessionID atomic.Int32 + subscriptions event.Subscription + ch chan walletevent.Event + // sessionsRWMutex is used to protect all sessions related members + sessionsRWMutex sync.RWMutex + + // TODO #12120: sort out session dependencies + pendingTracker *transactions.PendingTxTracker } -func NewService(db *sql.DB, tokenManager token.ManagerInterface, collectibles collectibles.ManagerInterface, eventFeed *event.Feed) *Service { +func (s *Service) nextSessionID() SessionID { + return SessionID(s.lastSessionID.Add(1)) +} + +func NewService(db *sql.DB, tokenManager token.ManagerInterface, collectibles collectibles.ManagerInterface, eventFeed *event.Feed, pendingTracker *transactions.PendingTxTracker) *Service { return &Service{ db: db, tokenManager: tokenManager, collectibles: collectibles, eventFeed: eventFeed, scheduler: async.NewMultiClientScheduler(), + + sessions: make(map[SessionID]*Session), + + pendingTracker: pendingTracker, } } @@ -90,6 +114,7 @@ type FilterResponse struct { // and should not expect other owners to have data in one of the queried tables // // All calls will trigger an EventActivityFilteringDone event with the result of the filtering +// TODO #12120: replace with session based APIs func (s *Service) FilterActivityAsync(requestID int32, addresses []common.Address, allAddresses bool, chainIDs []w_common.ChainID, filter Filter, offset int, limit int) { s.scheduler.Enqueue(requestID, filterTask, func(ctx context.Context) (interface{}, error) { activities, err := getActivityEntries(ctx, s.getDeps(), addresses, allAddresses, chainIDs, filter, offset, limit) @@ -109,27 +134,12 @@ func (s *Service) FilterActivityAsync(requestID int32, addresses []common.Addres res.ErrorCode = ErrorCodeSuccess } - s.sendResponseEvent(&requestID, EventActivityFilteringDone, res, err) + sendResponseEvent(s.eventFeed, &requestID, EventActivityFilteringDone, res, err) s.getActivityDetailsAsync(requestID, res.Activities) }) } -func (s *Service) getActivityDetailsAsync(requestID int32, entries []Entry) { - if len(entries) == 0 { - return - } - - ctx := context.Background() - - go func() { - activityData, err := s.getActivityDetails(ctx, entries) - if len(activityData) != 0 { - s.sendResponseEvent(&requestID, EventActivityFilteringUpdate, activityData, err) - } - }() -} - type CollectibleHeader struct { ID thirdparty.CollectibleUniqueID `json:"id"` Name string `json:"name"` @@ -184,7 +194,7 @@ func (s *Service) GetActivityCollectiblesAsync(requestID int32, chainIDs []w_com res.ErrorCode = ErrorCodeSuccess } - s.sendResponseEvent(&requestID, EventActivityGetCollectibles, res, err) + sendResponseEvent(s.eventFeed, &requestID, EventActivityGetCollectibles, res, err) }) } @@ -280,7 +290,7 @@ func (s *Service) GetRecipientsAsync(requestID int32, chainIDs []w_common.ChainI res.ErrorCode = ErrorCodeFailed } - s.sendResponseEvent(&requestID, EventActivityGetRecipientsDone, result, err) + sendResponseEvent(s.eventFeed, &requestID, EventActivityGetRecipientsDone, result, err) }) } @@ -305,7 +315,7 @@ func (s *Service) GetOldestTimestampAsync(requestID int32, addresses []common.Ad res.ErrorCode = ErrorCodeSuccess } - s.sendResponseEvent(&requestID, EventActivityGetOldestTimestampDone, res, err) + sendResponseEvent(s.eventFeed, &requestID, EventActivityGetOldestTimestampDone, res, err) }) } @@ -358,7 +368,7 @@ func (s *Service) getDeps() FilterDependencies { } } -func (s *Service) sendResponseEvent(requestID *int32, eventType walletevent.EventType, payloadObj interface{}, resErr error) { +func sendResponseEvent(eventFeed *event.Feed, requestID *int32, eventType walletevent.EventType, payloadObj interface{}, resErr error) { payload, err := json.Marshal(payloadObj) if err != nil { log.Error("Error marshaling response: %v; result error: %w", err, resErr) @@ -382,5 +392,5 @@ func (s *Service) sendResponseEvent(requestID *int32, eventType walletevent.Even *event.RequestID = int(*requestID) } - s.eventFeed.Send(event) + eventFeed.Send(event) } diff --git a/services/wallet/activity/service_test.go b/services/wallet/activity/service_test.go index 1a91c3003..74829b88f 100644 --- a/services/wallet/activity/service_test.go +++ b/services/wallet/activity/service_test.go @@ -18,6 +18,7 @@ import ( "github.com/status-im/status-go/services/wallet/transfer" "github.com/status-im/status-go/services/wallet/walletevent" "github.com/status-im/status-go/t/helpers" + "github.com/status-im/status-go/transactions" "github.com/status-im/status-go/walletdatabase" "github.com/stretchr/testify/mock" @@ -57,18 +58,26 @@ func (m *mockTokenManager) LookupToken(chainID *uint64, tokenSymbol string) (tkn return args.Get(0).(*token.Token), args.Bool(1) } -func setupTestService(tb testing.TB) (service *Service, eventFeed *event.Feed, tokenMock *mockTokenManager, collectiblesMock *mockCollectiblesManager, close func()) { +func setupTestService(tb testing.TB) (service *Service, eventFeed *event.Feed, tokenMock *mockTokenManager, collectiblesMock *mockCollectiblesManager, close func(), pendingTracker *transactions.PendingTxTracker, chainClient *transactions.MockChainClient) { db, err := helpers.SetupTestMemorySQLDB(walletdatabase.DbInitializer{}) require.NoError(tb, err) eventFeed = new(event.Feed) tokenMock = &mockTokenManager{} collectiblesMock = &mockCollectiblesManager{} - service = NewService(db, tokenMock, collectiblesMock, eventFeed) + + chainClient = transactions.NewMockChainClient() + + // Ensure we process pending transactions as needed, only once + pendingCheckInterval := time.Second + pendingTracker = transactions.NewPendingTxTracker(db, chainClient, nil, eventFeed, pendingCheckInterval) + + service = NewService(db, tokenMock, collectiblesMock, eventFeed, pendingTracker) return service, eventFeed, tokenMock, collectiblesMock, func() { + require.NoError(tb, pendingTracker.Stop()) require.NoError(tb, db.Close()) - } + }, pendingTracker, chainClient } type arg struct { @@ -101,7 +110,7 @@ func insertStubTransfersWithCollectibles(t *testing.T, db *sql.DB, args []arg) ( } func TestService_UpdateCollectibleInfo(t *testing.T) { - s, e, tM, c, close := setupTestService(t) + s, e, tM, c, close, _, _ := setupTestService(t) defer close() args := []arg{ @@ -185,7 +194,7 @@ func TestService_UpdateCollectibleInfo(t *testing.T) { } func TestService_UpdateCollectibleInfo_Error(t *testing.T) { - s, e, _, c, close := setupTestService(t) + s, e, _, c, close, _, _ := setupTestService(t) defer close() args := []arg{ @@ -229,3 +238,106 @@ func TestService_UpdateCollectibleInfo_Error(t *testing.T) { sub.Unsubscribe() } + +func TestService_IncrementalFilterUpdate(t *testing.T) { + s, e, tM, _, close, pTx, chainClient := setupTestService(t) + defer close() + + ch := make(chan walletevent.Event, 4) + sub := e.Subscribe(ch) + defer sub.Unsubscribe() + + txs, fromTrs, toTrs := transfer.GenerateTestTransfers(t, s.db, 0, 3) + transfer.InsertTestTransfer(t, s.db, txs[0].To, &txs[0]) + transfer.InsertTestTransfer(t, s.db, txs[2].To, &txs[2]) + + allAddresses := append(fromTrs, toTrs...) + + tM.On("LookupTokenIdentity", mock.Anything, eth.HexToAddress("0x0"), true).Return( + &token.Token{ + ChainID: 5, + Address: eth.HexToAddress("0x0"), + Symbol: "ETH", + }, false, + ).Times(2) + + sessionID := s.StartFilterSession(allAddresses, true, allNetworksFilter(), Filter{}, 5) + require.Greater(t, sessionID, SessionID(0)) + defer s.StopFilterSession(sessionID) + + var filterResponseCount int + + for i := 0; i < 1; i++ { + select { + case res := <-ch: + switch res.Type { + case EventActivityFilteringDone: + var payload FilterResponse + err := json.Unmarshal([]byte(res.Message), &payload) + require.NoError(t, err) + require.Equal(t, ErrorCodeSuccess, payload.ErrorCode) + require.Equal(t, 2, len(payload.Activities)) + filterResponseCount++ + } + case <-time.NewTimer(1 * time.Second).C: + require.Fail(t, "timeout while waiting for EventActivityFilteringDone") + } + } + + pendings := transactions.MockTestTransactions(t, chainClient, []transactions.TestTxSummary{{}}) + + err := pTx.StoreAndTrackPendingTx(&pendings[0]) + require.NoError(t, err) + + pendingTransactionUpdate, sessionUpdatesCount := 0, 0 + // Validate the session update event + for sessionUpdatesCount < 1 { + select { + case res := <-ch: + switch res.Type { + case transactions.EventPendingTransactionUpdate: + pendingTransactionUpdate++ + case EventActivitySessionUpdated: + var payload SessionUpdate + err := json.Unmarshal([]byte(res.Message), &payload) + require.NoError(t, err) + require.Equal(t, 1, len(payload.NewEntries)) + tx := payload.NewEntries[0] + exp := pendings[0] + // TODO #12120: this should be a multi-transaction + // require.Equal(t, exp.MultiTransactionID, tx.id) + + require.Equal(t, PendingTransactionPT, tx.payloadType) + // We don't keep type in the DB + require.Equal(t, (*int)(nil), tx.transferType) + require.Equal(t, SendAT, tx.activityType) + require.Equal(t, PendingAS, tx.activityStatus) + require.Equal(t, exp.ChainID, tx.transaction.ChainID) + require.Equal(t, exp.ChainID, *tx.chainIDOut) + require.Equal(t, (*common.ChainID)(nil), tx.chainIDIn) + require.Equal(t, exp.Hash, tx.transaction.Hash) + require.Equal(t, exp.From, tx.transaction.Address) + require.Equal(t, exp.From, *tx.sender) + require.Equal(t, exp.To, *tx.recipient) + require.Equal(t, 0, exp.Value.Int.Cmp((*big.Int)(tx.amountOut))) + require.Equal(t, exp.Timestamp, uint64(tx.timestamp)) + require.Equal(t, exp.Symbol, *tx.symbolOut) + require.Equal(t, (*string)(nil), tx.symbolIn) + require.Equal(t, (*Token)(nil), tx.tokenOut) + require.Equal(t, (*Token)(nil), tx.tokenIn) + require.Equal(t, (*eth.Address)(nil), tx.contractAddress) + + sessionUpdatesCount++ + case EventActivityFilteringDone: + filterResponseCount++ + } + case <-time.NewTimer(1 * time.Second).C: + require.Fail(t, "timeout while waiting for EventActivitySessionUpdated") + } + } + + // Don't wait for deletion + require.Equal(t, 1, pendingTransactionUpdate) + require.Equal(t, 1, filterResponseCount) + require.Equal(t, 1, sessionUpdatesCount) +} diff --git a/services/wallet/activity/session.go b/services/wallet/activity/session.go new file mode 100644 index 000000000..6e67becba --- /dev/null +++ b/services/wallet/activity/session.go @@ -0,0 +1,314 @@ +package activity + +import ( + "context" + "encoding/json" + "errors" + + "golang.org/x/exp/slices" + + eth "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/log" + "github.com/status-im/status-go/services/wallet/async" + "github.com/status-im/status-go/services/wallet/common" + "github.com/status-im/status-go/services/wallet/transfer" + "github.com/status-im/status-go/services/wallet/walletevent" + "github.com/status-im/status-go/transactions" +) + +type EntryIdentity struct { + payloadType PayloadType + transaction *transfer.TransactionIdentity + id transfer.MultiTransactionIDType +} + +type SessionID int32 + +type Session struct { + id SessionID + + // Filter info + // + addresses []eth.Address + allAddresses bool + chainIDs []common.ChainID + filter Filter + + // model is a mirror of the data model presentation has (EventActivityFilteringDone) + model []EntryIdentity +} + +// SessionUpdate payload for EventActivitySessionUpdated +type SessionUpdate struct { + // TODO #12120: add index for each entry, now all new are first entries + NewEntries []Entry `json:"newEntries,omitempty"` + Removed []EntryIdentity `json:"removed,omitempty"` + Updated []Entry `json:"updated,omitempty"` +} + +type fullFilterParams struct { + sessionID SessionID + addresses []eth.Address + allAddresses bool + chainIDs []common.ChainID + filter Filter +} + +func (s *Service) internalFilter(f fullFilterParams, offset int, count int, processResults func(entries []Entry)) { + s.scheduler.Enqueue(int32(f.sessionID), filterTask, func(ctx context.Context) (interface{}, error) { + activities, err := getActivityEntries(ctx, s.getDeps(), f.addresses, f.allAddresses, f.chainIDs, f.filter, offset, count) + return activities, err + }, func(result interface{}, taskType async.TaskType, err error) { + res := FilterResponse{ + ErrorCode: ErrorCodeFailed, + } + + if errors.Is(err, context.Canceled) || errors.Is(err, async.ErrTaskOverwritten) { + res.ErrorCode = ErrorCodeTaskCanceled + } else if err == nil { + activities := result.([]Entry) + res.Activities = activities + res.Offset = 0 + res.HasMore = len(activities) == count + res.ErrorCode = ErrorCodeSuccess + + processResults(activities) + } + + int32SessionID := int32(f.sessionID) + sendResponseEvent(s.eventFeed, &int32SessionID, EventActivityFilteringDone, res, err) + + s.getActivityDetailsAsync(int32SessionID, res.Activities) + }) +} + +func (s *Service) StartFilterSession(addresses []eth.Address, allAddresses bool, chainIDs []common.ChainID, filter Filter, firstPageCount int) SessionID { + sessionID := s.nextSessionID() + + // TODO #12120: sort rest of the filters + // TODO #12120: prettyfy this + slices.SortFunc(addresses, func(a eth.Address, b eth.Address) bool { + return a.Hex() < b.Hex() + }) + slices.Sort(chainIDs) + slices.SortFunc(filter.CounterpartyAddresses, func(a eth.Address, b eth.Address) bool { + return a.Hex() < b.Hex() + }) + + s.sessionsRWMutex.Lock() + subscribeToEvents := len(s.sessions) == 0 + s.sessions[sessionID] = &Session{ + id: sessionID, + + addresses: addresses, + allAddresses: allAddresses, + chainIDs: chainIDs, + filter: filter, + + model: make([]EntryIdentity, 0, firstPageCount), + } + if subscribeToEvents { + s.subscribeToEvents() + } + s.sessionsRWMutex.Unlock() + + s.internalFilter(fullFilterParams{ + sessionID: sessionID, + addresses: addresses, + allAddresses: allAddresses, + chainIDs: chainIDs, + filter: filter, + }, 0, firstPageCount, func(entries []Entry) { + // Mirror identities for update use + s.sessionsRWMutex.Lock() + defer s.sessionsRWMutex.Unlock() + session, ok := s.sessions[sessionID] + if ok { + session.model = make([]EntryIdentity, 0, len(entries)) + for _, a := range entries { + session.model = append(session.model, EntryIdentity{ + payloadType: a.payloadType, + transaction: a.transaction, + id: a.id, + }) + } + } + }) + + return sessionID +} + +// TODO #12120: extend the session based API +//func (s *Service) GetMoreForFilterSession(count int) {} + +// subscribeToEvents should be called with sessionsRWMutex locked for writing +func (s *Service) subscribeToEvents() { + s.ch = make(chan walletevent.Event, 100) + s.subscriptions = s.eventFeed.Subscribe(s.ch) + go s.processEvents() +} + +func (s *Service) processEvents() { + for event := range s.ch { + if event.Type == transactions.EventPendingTransactionUpdate { + var p transactions.PendingTxUpdatePayload + err := json.Unmarshal([]byte(event.Message), &p) + if err != nil { + log.Error("Error unmarshalling PendingTxUpdatePayload", "error", err) + continue + } + + for id := range s.sessions { + s.sessionsRWMutex.RLock() + pTx, pass := s.checkFilterForPending(s.sessions[id], p.TxIdentity) + if pass { + s.sessionsRWMutex.RUnlock() + s.sessionsRWMutex.Lock() + addOnTop(s.sessions[id], p.TxIdentity) + s.sessionsRWMutex.Unlock() + // TODO #12120: can't send events from an event handler + go notify(s.eventFeed, id, *pTx) + } else { + s.sessionsRWMutex.RUnlock() + } + } + } + } +} + +// checkFilterForPending should be called with sessionsRWMutex locked for reading +func (s *Service) checkFilterForPending(session *Session, id transactions.TxIdentity) (tr *transactions.PendingTransaction, pass bool) { + allChains := len(session.chainIDs) == 0 + if !allChains { + _, found := slices.BinarySearch(session.chainIDs, id.ChainID) + if !found { + return nil, false + } + } + + tr, err := s.pendingTracker.GetPendingEntry(id.ChainID, id.Hash) + if err != nil { + log.Error("Error getting pending entry", "error", err) + return nil, false + } + + if !session.allAddresses { + _, found := slices.BinarySearchFunc(session.addresses, tr.From, func(a eth.Address, b eth.Address) int { + // TODO #12120: optimize this + if a.Hex() < b.Hex() { + return -1 + } + if a.Hex() > b.Hex() { + return 1 + } + return 0 + }) + if !found { + return nil, false + } + } + + fl := session.filter + if fl.Period.StartTimestamp != NoLimitTimestampForPeriod || fl.Period.EndTimestamp != NoLimitTimestampForPeriod { + ts := int64(tr.Timestamp) + if ts < fl.Period.StartTimestamp || ts > fl.Period.EndTimestamp { + return nil, false + } + } + + // TODO #12120 check filter + // Types []Type `json:"types"` + // Statuses []Status `json:"statuses"` + // CounterpartyAddresses []eth.Address `json:"counterpartyAddresses"` + + // // Tokens + // Assets []Token `json:"assets"` + // Collectibles []Token `json:"collectibles"` + // FilterOutAssets bool `json:"filterOutAssets"` + // FilterOutCollectibles bool `json:"filterOutCollectibles"` + + return tr, true +} + +// addOnTop should be called with sessionsRWMutex locked for writing +func addOnTop(session *Session, id transactions.TxIdentity) { + session.model = append([]EntryIdentity{{ + payloadType: PendingTransactionPT, + transaction: &transfer.TransactionIdentity{ + ChainID: id.ChainID, + Hash: id.Hash, + }, + }}, session.model...) +} + +func notify(eventFeed *event.Feed, id SessionID, tx transactions.PendingTransaction) { + payload := SessionUpdate{ + NewEntries: []Entry{ + { + payloadType: PendingTransactionPT, + transaction: &transfer.TransactionIdentity{ + ChainID: tx.ChainID, + Hash: tx.Hash, + Address: tx.From, + }, + id: transfer.NoMultiTransactionID, + timestamp: int64(tx.Timestamp), + activityType: SendAT, + activityStatus: PendingAS, + amountOut: (*hexutil.Big)(tx.Value.Int), + amountIn: nil, + tokenOut: nil, + tokenIn: nil, + symbolOut: &tx.Symbol, + symbolIn: nil, + sender: &tx.From, + recipient: &tx.To, + chainIDOut: &tx.ChainID, + chainIDIn: nil, + transferType: nil, + contractAddress: nil, + }, + }, + } + + sendResponseEvent(eventFeed, (*int32)(&id), EventActivitySessionUpdated, payload, nil) +} + +// unsubscribeFromEvents should be called with sessionsRWMutex locked for writing +func (s *Service) unsubscribeFromEvents() { + s.subscriptions.Unsubscribe() + s.subscriptions = nil +} + +func (s *Service) StopFilterSession(id SessionID) { + s.sessionsRWMutex.Lock() + delete(s.sessions, id) + if len(s.sessions) == 0 { + s.unsubscribeFromEvents() + } + s.sessionsRWMutex.Unlock() + + // Cancel any pending or ongoing task + s.scheduler.Enqueue(int32(id), filterTask, func(ctx context.Context) (interface{}, error) { + return nil, nil + }, func(result interface{}, taskType async.TaskType, err error) { + // Ignore result + }) +} + +func (s *Service) getActivityDetailsAsync(requestID int32, entries []Entry) { + if len(entries) == 0 { + return + } + + ctx := context.Background() + + go func() { + activityData, err := s.getActivityDetails(ctx, entries) + if len(activityData) != 0 { + sendResponseEvent(s.eventFeed, &requestID, EventActivityFilteringUpdate, activityData, err) + } + }() +} diff --git a/services/wallet/api.go b/services/wallet/api.go index 24483e590..d375ed2c2 100644 --- a/services/wallet/api.go +++ b/services/wallet/api.go @@ -234,6 +234,7 @@ func (api *API) DeleteCustomTokenByChainID(ctx context.Context, chainID uint64, } // @deprecated +// Not used by status-desktop anymore func (api *API) GetPendingTransactions(ctx context.Context) ([]*transactions.PendingTransaction, error) { log.Debug("wallet.api.GetPendingTransactions") rst, err := api.s.pendingTxManager.GetAllPending() @@ -241,6 +242,8 @@ func (api *API) GetPendingTransactions(ctx context.Context) ([]*transactions.Pen return rst, err } +// @deprecated +// Not used by status-desktop anymore func (api *API) GetPendingTransactionsForIdentities(ctx context.Context, identities []transfer.TransactionIdentity) ( result []*transactions.PendingTransaction, err error) { @@ -589,6 +592,18 @@ func (api *API) CancelActivityFilterTask(requestID int32) error { return nil } +func (api *API) StartActivityFilterSession(addresses []common.Address, allAddresses bool, chainIDs []wcommon.ChainID, filter activity.Filter, firstPageCount int) (activity.SessionID, error) { + log.Debug("wallet.api.StartActivityFilterSession", "addr.count", len(addresses), "allAddresses", allAddresses, "chainIDs.count", len(chainIDs), "firstPageCount", firstPageCount) + + return api.s.activity.StartFilterSession(addresses, allAddresses, chainIDs, filter, firstPageCount), nil +} + +func (api *API) StopActivityFilterSession(id activity.SessionID) { + log.Debug("wallet.api.StopActivityFilterSession", "id", id) + + api.s.activity.StopFilterSession(id) +} + func (api *API) GetMultiTxDetails(ctx context.Context, multiTxID int) (*activity.EntryDetails, error) { log.Debug("wallet.api.GetMultiTxDetails", "multiTxID", multiTxID) diff --git a/services/wallet/async/scheduler.go b/services/wallet/async/scheduler.go index 2c9fd2007..d8cfc6217 100644 --- a/services/wallet/async/scheduler.go +++ b/services/wallet/async/scheduler.go @@ -23,7 +23,9 @@ type Scheduler struct { type ReplacementPolicy = int const ( + // ReplacementPolicyCancelOld for when the task arguments might change the result ReplacementPolicyCancelOld ReplacementPolicy = iota + // ReplacementPolicyIgnoreNew for when the task arguments doesn't change the result ReplacementPolicyIgnoreNew ) diff --git a/services/wallet/service.go b/services/wallet/service.go index 996c3f67d..f3af57dc6 100644 --- a/services/wallet/service.go +++ b/services/wallet/service.go @@ -148,7 +148,7 @@ func NewService( collectiblesManager := collectibles.NewManager(db, rpcClient, communityManager, contractOwnershipProviders, accountOwnershipProviders, collectibleDataProviders, collectionDataProviders, mediaServer, feed) collectibles := collectibles.NewService(db, feed, accountsDB, accountFeed, settingsFeed, communityManager, rpcClient.NetworkManager, collectiblesManager) - activity := activity.NewService(db, tokenManager, collectiblesManager, feed) + activity := activity.NewService(db, tokenManager, collectiblesManager, feed, pendingTxManager) walletconnect := walletconnect.NewService(db, rpcClient.NetworkManager, accountsDB, transactionManager, gethManager, feed, config) diff --git a/transactions/pendingtxtracker.go b/transactions/pendingtxtracker.go index 135389d88..fd6fc6163 100644 --- a/transactions/pendingtxtracker.go +++ b/transactions/pendingtxtracker.go @@ -55,6 +55,7 @@ const ( Keep AutoDeleteType = false ) +// TODO #12120: unify it with TransactionIdentity type TxIdentity struct { ChainID common.ChainID `json:"chainId"` Hash eth.Hash `json:"hash"` @@ -70,6 +71,7 @@ type StatusChangedPayload struct { Status TxStatus `json:"status"` } +// PendingTxTracker implements StatusService in common/status_node_service.go type PendingTxTracker struct { db *sql.DB rpcClient rpc.ClientInterface