feat(wallet) process all the events and debounce updates

Process missing events

Throttle down downloader's events to avoid overloading the CPU with
change detection.

Updates status-desktop #12120
This commit is contained in:
Stefan 2024-02-16 23:41:44 -03:00 committed by Stefan Dunca
parent dc726007b0
commit a63d33e04a
3 changed files with 74 additions and 57 deletions

View File

@ -68,7 +68,8 @@ type Service struct {
subscriptions event.Subscription
ch chan walletevent.Event
// sessionsRWMutex is used to protect all sessions related members
sessionsRWMutex sync.RWMutex
sessionsRWMutex sync.RWMutex
debounceDuration time.Duration
pendingTracker *transactions.PendingTxTracker
}
@ -86,6 +87,8 @@ func NewService(db *sql.DB, tokenManager token.ManagerInterface, collectibles co
scheduler: async.NewMultiClientScheduler(),
sessions: make(map[SessionID]*Session),
// here to be overwritten by tests
debounceDuration: 1 * time.Second,
pendingTracker: pendingTracker,
}

View File

@ -84,6 +84,7 @@ func setupTestService(tb testing.TB) (state testState) {
state.pendingTracker = transactions.NewPendingTxTracker(db, state.chainClient, nil, state.eventFeed, pendingCheckInterval)
state.service = NewService(db, state.tokenMock, state.collectiblesMock, state.eventFeed, state.pendingTracker)
state.service.debounceDuration = 0
state.close = func() {
require.NoError(tb, state.pendingTracker.Stop())
require.NoError(tb, db.Close())

View File

@ -4,6 +4,7 @@ import (
"context"
"errors"
"strconv"
"time"
eth "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/event"
@ -331,66 +332,78 @@ func (s *Service) subscribeToEvents() {
// processEvents runs only if more than one session is active
func (s *Service) processEvents() {
eventCount := 0
lastUpdate := time.Now().UnixMilli()
for event := range s.ch {
// TODO #12120: process rest of the events transactions.EventPendingTransactionStatusChanged, transfer.EventNewTransfers
// TODO #12120: debounce for 1s and sum all events as extraCount to be sure we don't miss any change
if event.Type == transactions.EventPendingTransactionUpdate {
for sessionID := range s.sessions {
session := s.sessions[sessionID]
if event.Type == transactions.EventPendingTransactionUpdate ||
event.Type == transactions.EventPendingTransactionStatusChanged ||
event.Type == transfer.EventNewTransfers {
eventCount++
}
// debounce events updates
if eventCount > 0 &&
(time.Duration(time.Now().UnixMilli()-lastUpdate)*time.Millisecond) >= s.debounceDuration {
s.detectNew(eventCount)
eventCount = 0
lastUpdate = time.Now().UnixMilli()
}
}
}
extraCount := 1
fetchLen := len(session.model) + extraCount
activities, err := getActivityEntries(context.Background(), s.getDeps(), session.addresses, session.allAddresses, session.chainIDs, session.filter, 0, fetchLen)
if err != nil {
log.Error("Error getting activity entries", "error", err)
continue
}
s.sessionsRWMutex.RLock()
allData := append(session.new, session.model...)
new, _ /*removed*/ := findUpdates(allData, activities)
s.sessionsRWMutex.RUnlock()
s.sessionsRWMutex.Lock()
lastProcessed := -1
onTop := true
var mixed []*EntryUpdate
for i, idRes := range new {
// Detect on top
if onTop {
// mixedIdentityResult.newPos includes session.new, therefore compensate for it
if ((idRes.newPos - len(session.new)) - lastProcessed) > 1 {
// From now on the events are not on top and continuous but mixed between existing entries
onTop = false
mixed = make([]*EntryUpdate, 0, len(new)-i)
}
lastProcessed = idRes.newPos
}
if onTop {
if session.new == nil {
session.new = make([]EntryIdentity, 0, len(new))
}
session.new = append(session.new, idRes.id)
} else {
modelPos := idRes.newPos - len(session.new)
entry := activities[idRes.newPos]
entry.isNew = true
mixed = append(mixed, &EntryUpdate{
Pos: modelPos,
Entry: &entry,
})
// Insert in session model at modelPos index
session.model = append(session.model[:modelPos], append([]EntryIdentity{{payloadType: entry.payloadType, transaction: entry.transaction, id: entry.id}}, session.model[modelPos:]...)...)
}
}
s.sessionsRWMutex.Unlock()
if len(session.new) > 0 || len(mixed) > 0 {
go notify(s.eventFeed, sessionID, len(session.new) > 0, mixed)
func (s *Service) detectNew(changeCount int) {
for sessionID := range s.sessions {
session := s.sessions[sessionID]
fetchLen := len(session.model) + changeCount
activities, err := getActivityEntries(context.Background(), s.getDeps(), session.addresses, session.allAddresses, session.chainIDs, session.filter, 0, fetchLen)
if err != nil {
log.Error("Error getting activity entries", "error", err)
continue
}
s.sessionsRWMutex.RLock()
allData := append(session.new, session.model...)
new, _ /*removed*/ := findUpdates(allData, activities)
s.sessionsRWMutex.RUnlock()
s.sessionsRWMutex.Lock()
lastProcessed := -1
onTop := true
var mixed []*EntryUpdate
for i, idRes := range new {
// Detect on top
if onTop {
// mixedIdentityResult.newPos includes session.new, therefore compensate for it
if ((idRes.newPos - len(session.new)) - lastProcessed) > 1 {
// From now on the events are not on top and continuous but mixed between existing entries
onTop = false
mixed = make([]*EntryUpdate, 0, len(new)-i)
}
lastProcessed = idRes.newPos
}
if onTop {
if session.new == nil {
session.new = make([]EntryIdentity, 0, len(new))
}
session.new = append(session.new, idRes.id)
} else {
modelPos := idRes.newPos - len(session.new)
entry := activities[idRes.newPos]
entry.isNew = true
mixed = append(mixed, &EntryUpdate{
Pos: modelPos,
Entry: &entry,
})
// Insert in session model at modelPos index
session.model = append(session.model[:modelPos], append([]EntryIdentity{{payloadType: entry.payloadType, transaction: entry.transaction, id: entry.id}}, session.model[modelPos:]...)...)
}
}
s.sessionsRWMutex.Unlock()
if len(session.new) > 0 || len(mixed) > 0 {
go notify(s.eventFeed, sessionID, len(session.new) > 0, mixed)
}
}
}