status-go/services/wallet/activity/session.go

434 lines
13 KiB
Go

package activity
import (
"context"
"errors"
"strconv"
"golang.org/x/exp/slices"
eth "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/status-im/status-go/services/wallet/async"
"github.com/status-im/status-go/services/wallet/common"
"github.com/status-im/status-go/services/wallet/transfer"
"github.com/status-im/status-go/services/wallet/walletevent"
"github.com/status-im/status-go/transactions"
)
const nilStr = "nil"
type EntryIdentity struct {
payloadType PayloadType
transaction *transfer.TransactionIdentity
id transfer.MultiTransactionIDType
}
// func (e EntryIdentity) same(a EntryIdentity) bool {
// return a.payloadType == e.payloadType && (a.transaction == e.transaction && (a.transaction == nil || (a.transaction.ChainID == e.transaction.ChainID &&
// a.transaction.Hash == e.transaction.Hash &&
// a.transaction.Address == e.transaction.Address))) && a.id == e.id
// }
func (e EntryIdentity) key() string {
txID := nilStr
if e.transaction != nil {
txID = strconv.FormatUint(uint64(e.transaction.ChainID), 10) + e.transaction.Hash.Hex() + e.transaction.Address.Hex()
}
return strconv.Itoa(e.payloadType) + txID + strconv.FormatInt(int64(e.id), 16)
}
type SessionID int32
type Session struct {
id SessionID
// Filter info
//
addresses []eth.Address
allAddresses bool
chainIDs []common.ChainID
filter Filter
// model is a mirror of the data model presentation has (sent by EventActivityFilteringDone)
model []EntryIdentity
// new holds the new entries until user requests update
new []EntryIdentity
}
// SessionUpdate payload for EventActivitySessionUpdated
type SessionUpdate struct {
HasNewEntries *bool `json:"hasNewEntries,omitempty"`
Removed []EntryIdentity `json:"removed,omitempty"`
Updated []Entry `json:"updated,omitempty"`
}
type fullFilterParams struct {
sessionID SessionID
addresses []eth.Address
allAddresses bool
chainIDs []common.ChainID
filter Filter
}
func (s *Service) internalFilter(f fullFilterParams, offset int, count int, processResults func(entries []Entry)) {
s.scheduler.Enqueue(int32(f.sessionID), filterTask, func(ctx context.Context) (interface{}, error) {
activities, err := getActivityEntries(ctx, s.getDeps(), f.addresses, f.allAddresses, f.chainIDs, f.filter, offset, count)
return activities, err
}, func(result interface{}, taskType async.TaskType, err error) {
res := FilterResponse{
ErrorCode: ErrorCodeFailed,
}
if errors.Is(err, context.Canceled) || errors.Is(err, async.ErrTaskOverwritten) {
res.ErrorCode = ErrorCodeTaskCanceled
} else if err == nil {
activities := result.([]Entry)
res.Activities = activities
res.Offset = 0
res.HasMore = len(activities) == count
res.ErrorCode = ErrorCodeSuccess
processResults(activities)
}
int32SessionID := int32(f.sessionID)
sendResponseEvent(s.eventFeed, &int32SessionID, EventActivityFilteringDone, res, err)
s.getActivityDetailsAsync(int32SessionID, res.Activities)
})
}
func (s *Service) StartFilterSession(addresses []eth.Address, allAddresses bool, chainIDs []common.ChainID, filter Filter, firstPageCount int) SessionID {
sessionID := s.nextSessionID()
// TODO #12120: sort rest of the filters
// TODO #12120: prettyfy this
slices.SortFunc(addresses, func(a eth.Address, b eth.Address) bool {
return a.Hex() < b.Hex()
})
slices.Sort(chainIDs)
slices.SortFunc(filter.CounterpartyAddresses, func(a eth.Address, b eth.Address) bool {
return a.Hex() < b.Hex()
})
s.sessionsRWMutex.Lock()
subscribeToEvents := len(s.sessions) == 0
session := &Session{
id: sessionID,
addresses: addresses,
allAddresses: allAddresses,
chainIDs: chainIDs,
filter: filter,
model: make([]EntryIdentity, 0, firstPageCount),
}
s.sessions[sessionID] = session
if subscribeToEvents {
s.subscribeToEvents()
}
s.sessionsRWMutex.Unlock()
s.internalFilter(fullFilterParams{
sessionID: sessionID,
addresses: addresses,
allAddresses: allAddresses,
chainIDs: chainIDs,
filter: filter,
}, 0, firstPageCount, func(entries []Entry) {
// Mirror identities for update use
s.sessionsRWMutex.Lock()
defer s.sessionsRWMutex.Unlock()
session.model = make([]EntryIdentity, 0, len(entries))
for _, a := range entries {
session.model = append(session.model, EntryIdentity{
payloadType: a.payloadType,
transaction: a.transaction,
id: a.id,
})
}
})
return sessionID
}
func (s *Service) ResetFilterSession(id SessionID, firstPageCount int) error {
session, found := s.sessions[id]
if !found {
return errors.New("session not found")
}
s.internalFilter(fullFilterParams{
sessionID: id,
addresses: session.addresses,
allAddresses: session.allAddresses,
chainIDs: session.chainIDs,
filter: session.filter,
}, 0, firstPageCount, func(entries []Entry) {
s.sessionsRWMutex.Lock()
defer s.sessionsRWMutex.Unlock()
// Mark new entries
newMap := entryIdsToMap(session.new)
for i, a := range entries {
_, isNew := newMap[a.getIdentity().key()]
entries[i].isNew = isNew
}
session.new = nil
// Mirror client identities for checking updates
session.model = make([]EntryIdentity, 0, len(entries))
for _, a := range entries {
session.model = append(session.model, EntryIdentity{
payloadType: a.payloadType,
transaction: a.transaction,
id: a.id,
})
}
})
return nil
}
// TODO #12120: extend the session based API
//func (s *Service) GetMoreForFilterSession(count int) {}
// subscribeToEvents should be called with sessionsRWMutex locked for writing
func (s *Service) subscribeToEvents() {
s.ch = make(chan walletevent.Event, 100)
s.subscriptions = s.eventFeed.Subscribe(s.ch)
go s.processEvents()
}
// func (s *Service) processEvents() {
// for event := range s.ch {
// if event.Type == transactions.EventPendingTransactionUpdate {
// var p transactions.PendingTxUpdatePayload
// err := json.Unmarshal([]byte(event.Message), &p)
// if err != nil {
// log.Error("Error unmarshalling PendingTxUpdatePayload", "error", err)
// continue
// }
// for id := range s.sessions {
// s.sessionsRWMutex.RLock()
// pTx, pass := s.checkFilterForPending(s.sessions[id], p.TxIdentity)
// if pass {
// s.sessionsRWMutex.RUnlock()
// s.sessionsRWMutex.Lock()
// addOnTop(s.sessions[id], p.TxIdentity)
// s.sessionsRWMutex.Unlock()
// // TODO #12120: can't send events from an event handler
// go notify(s.eventFeed, id, *pTx)
// } else {
// s.sessionsRWMutex.RUnlock()
// }
// }
// }
// }
// }
// TODO #12120: check that it exits on channel close
func (s *Service) processEvents() {
for event := range s.ch {
// TODO #12120: process rest of the events
// TODO #12120: debounce for 1s
if event.Type == transactions.EventPendingTransactionUpdate {
for sessionID := range s.sessions {
session := s.sessions[sessionID]
activities, err := getActivityEntries(context.Background(), s.getDeps(), session.addresses, session.allAddresses, session.chainIDs, session.filter, 0, len(session.model))
if err != nil {
log.Error("Error getting activity entries", "error", err)
continue
}
s.sessionsRWMutex.RLock()
allData := append(session.model, session.new...)
new, _ /*removed*/ := findUpdates(allData, activities)
s.sessionsRWMutex.RUnlock()
s.sessionsRWMutex.Lock()
lastProcessed := -1
for i, idRes := range new {
if i-lastProcessed > 1 {
// The events are not continuous, therefore these are not on top but mixed between existing entries
break
}
lastProcessed = idRes.newPos
// TODO #12120: make it more generic to follow the detection function
// TODO #12120: hold the first few and only send mixed and removed
if session.new == nil {
session.new = make([]EntryIdentity, 0, len(new))
}
session.new = append(session.new, idRes.id)
}
// TODO #12120: mixed
s.sessionsRWMutex.Unlock()
go notify(s.eventFeed, sessionID, len(session.new) > 0)
}
}
}
}
// // checkFilterForPending should be called with sessionsRWMutex locked for reading
// func (s *Service) checkFilterForPending(session *Session, id transactions.TxIdentity) (tr *transactions.PendingTransaction, pass bool) {
// allChains := len(session.chainIDs) == 0
// if !allChains {
// _, found := slices.BinarySearch(session.chainIDs, id.ChainID)
// if !found {
// return nil, false
// }
// }
// tr, err := s.pendingTracker.GetPendingEntry(id.ChainID, id.Hash)
// if err != nil {
// log.Error("Error getting pending entry", "error", err)
// return nil, false
// }
// if !session.allAddresses {
// _, found := slices.BinarySearchFunc(session.addresses, tr.From, func(a eth.Address, b eth.Address) int {
// // TODO #12120: optimize this
// if a.Hex() < b.Hex() {
// return -1
// }
// if a.Hex() > b.Hex() {
// return 1
// }
// return 0
// })
// if !found {
// return nil, false
// }
// }
// fl := session.filter
// if fl.Period.StartTimestamp != NoLimitTimestampForPeriod || fl.Period.EndTimestamp != NoLimitTimestampForPeriod {
// ts := int64(tr.Timestamp)
// if ts < fl.Period.StartTimestamp || ts > fl.Period.EndTimestamp {
// return nil, false
// }
// }
// // TODO #12120 check filter
// // Types []Type `json:"types"`
// // Statuses []Status `json:"statuses"`
// // CounterpartyAddresses []eth.Address `json:"counterpartyAddresses"`
// // // Tokens
// // Assets []Token `json:"assets"`
// // Collectibles []Token `json:"collectibles"`
// // FilterOutAssets bool `json:"filterOutAssets"`
// // FilterOutCollectibles bool `json:"filterOutCollectibles"`
// return tr, true
// }
func notify(eventFeed *event.Feed, id SessionID, hasNewEntries bool) {
payload := SessionUpdate{}
if hasNewEntries {
payload.HasNewEntries = &hasNewEntries
}
sendResponseEvent(eventFeed, (*int32)(&id), EventActivitySessionUpdated, payload, nil)
}
// unsubscribeFromEvents should be called with sessionsRWMutex locked for writing
func (s *Service) unsubscribeFromEvents() {
s.subscriptions.Unsubscribe()
s.subscriptions = nil
}
func (s *Service) StopFilterSession(id SessionID) {
s.sessionsRWMutex.Lock()
delete(s.sessions, id)
if len(s.sessions) == 0 {
s.unsubscribeFromEvents()
}
s.sessionsRWMutex.Unlock()
// Cancel any pending or ongoing task
s.scheduler.Enqueue(int32(id), filterTask, func(ctx context.Context) (interface{}, error) {
return nil, nil
}, func(result interface{}, taskType async.TaskType, err error) {
// Ignore result
})
}
func (s *Service) getActivityDetailsAsync(requestID int32, entries []Entry) {
if len(entries) == 0 {
return
}
ctx := context.Background()
go func() {
activityData, err := s.getActivityDetails(ctx, entries)
if len(activityData) != 0 {
sendResponseEvent(s.eventFeed, &requestID, EventActivityFilteringUpdate, activityData, err)
}
}()
}
type mixedIdentityResult struct {
newPos int
id EntryIdentity
}
func entryIdsToMap(ids []EntryIdentity) map[string]EntryIdentity {
idsMap := make(map[string]EntryIdentity, len(ids))
for _, id := range ids {
idsMap[id.key()] = id
}
return idsMap
}
func entriesToMap(entries []Entry) map[string]Entry {
entryMap := make(map[string]Entry, len(entries))
for _, entry := range entries {
updatedIdentity := entry.getIdentity()
entryMap[updatedIdentity.key()] = entry
}
return entryMap
}
// FindUpdates returns changes in updated entries compared to the identities
//
// expects identities and entries to be sorted by timestamp
//
// the returned newer are entries that are newer than the first identity
// the returned mixed are entries that are older than the first identity (sorted by timestamp)
// the returned removed are identities that are not present in the updated entries (sorted by timestamp)
//
// implementation assumes the order of each identity doesn't change from old state (identities) and new state (updated); we have either add or removed.
func findUpdates(identities []EntryIdentity, updated []Entry) (new []mixedIdentityResult, removed []EntryIdentity) {
idsMap := entryIdsToMap(identities)
updatedMap := entriesToMap(updated)
for newIndex, entry := range updated {
id := entry.getIdentity()
if _, found := idsMap[id.key()]; !found {
new = append(new, mixedIdentityResult{
newPos: newIndex,
id: id,
})
}
}
// Account for new entries
for i := 0; i < len(identities); i++ {
id := identities[i]
if _, found := updatedMap[id.key()]; !found {
removed = append(removed, id)
}
}
return
}