feat(wallet) use sql activity filter for incremental updates

Switch from the prototype of duplicating the SQL filter as a runtime
and keeping them in sync on each event that might invalidate the current
filtered entries to a simpler approach of requesting the filter again
and doing the diff to detect the new changes.

Also add a new reset API to model the new entries design requirements.

The new approach shows less corner-case to handle and follows one source
of truth concept making debugging and future maintenance easier.

Other changes

- Fix pending mocking to work with multiple calls
- Refactor tests to account for the new changes

Updates status-desktop #12120
This commit is contained in:
Stefan 2024-01-25 23:31:18 -05:00 committed by Stefan Dunca
parent 44c39d345e
commit ca973b4aa6
9 changed files with 616 additions and 230 deletions

View File

@ -68,6 +68,8 @@ type Entry struct {
chainIDIn *common.ChainID
transferType *TransferType
contractAddress *eth.Address
isNew bool
}
// Only used for JSON marshalling
@ -91,6 +93,8 @@ type EntryData struct {
TransferType *TransferType `json:"transferType,omitempty"`
ContractAddress *eth.Address `json:"contractAddress,omitempty"`
IsNew *bool `json:"isNew,omitempty"`
NftName *string `json:"nftName,omitempty"`
NftURL *string `json:"nftUrl,omitempty"`
}
@ -121,6 +125,9 @@ func (e *Entry) MarshalJSON() ([]byte, error) {
}
data.PayloadType = e.payloadType
if e.isNew {
data.IsNew = &e.isNew
}
return json.Marshal(data)
}
@ -155,6 +162,9 @@ func (e *Entry) UnmarshalJSON(data []byte) error {
e.chainIDOut = aux.ChainIDOut
e.chainIDIn = aux.ChainIDIn
e.transferType = aux.TransferType
e.isNew = aux.IsNew != nil && *aux.IsNew
return nil
}
@ -231,6 +241,14 @@ func (e *Entry) anyIdentity() *thirdparty.CollectibleUniqueID {
return nil
}
func (e *Entry) getIdentity() EntryIdentity {
return EntryIdentity{
payloadType: e.payloadType,
id: e.id,
transaction: e.transaction,
}
}
func multiTransactionTypeToActivityType(mtType transfer.MultiTransactionType) Type {
if mtType == transfer.MultiTransactionSend {
return SendAT

View File

@ -376,7 +376,7 @@ func sendResponseEvent(eventFeed *event.Feed, requestID *int32, eventType wallet
err = resErr
}
requestIDStr := "nil"
requestIDStr := nilStr
if requestID != nil {
requestIDStr = strconv.Itoa(int(*requestID))
}

View File

@ -58,26 +58,37 @@ func (m *mockTokenManager) LookupToken(chainID *uint64, tokenSymbol string) (tkn
return args.Get(0).(*token.Token), args.Bool(1)
}
func setupTestService(tb testing.TB) (service *Service, eventFeed *event.Feed, tokenMock *mockTokenManager, collectiblesMock *mockCollectiblesManager, close func(), pendingTracker *transactions.PendingTxTracker, chainClient *transactions.MockChainClient) {
type testState struct {
service *Service
eventFeed *event.Feed
tokenMock *mockTokenManager
collectiblesMock *mockCollectiblesManager
close func()
pendingTracker *transactions.PendingTxTracker
chainClient *transactions.MockChainClient
}
func setupTestService(tb testing.TB) (state testState) {
db, err := helpers.SetupTestMemorySQLDB(walletdatabase.DbInitializer{})
require.NoError(tb, err)
eventFeed = new(event.Feed)
tokenMock = &mockTokenManager{}
collectiblesMock = &mockCollectiblesManager{}
state.eventFeed = new(event.Feed)
state.tokenMock = &mockTokenManager{}
state.collectiblesMock = &mockCollectiblesManager{}
chainClient = transactions.NewMockChainClient()
state.chainClient = transactions.NewMockChainClient()
// Ensure we process pending transactions as needed, only once
pendingCheckInterval := time.Second
pendingTracker = transactions.NewPendingTxTracker(db, chainClient, nil, eventFeed, pendingCheckInterval)
state.pendingTracker = transactions.NewPendingTxTracker(db, state.chainClient, nil, state.eventFeed, pendingCheckInterval)
service = NewService(db, tokenMock, collectiblesMock, eventFeed, pendingTracker)
return service, eventFeed, tokenMock, collectiblesMock, func() {
require.NoError(tb, pendingTracker.Stop())
state.service = NewService(db, state.tokenMock, state.collectiblesMock, state.eventFeed, state.pendingTracker)
state.close = func() {
require.NoError(tb, state.pendingTracker.Stop())
require.NoError(tb, db.Close())
}, pendingTracker, chainClient
}
return state
}
type arg struct {
@ -110,8 +121,8 @@ func insertStubTransfersWithCollectibles(t *testing.T, db *sql.DB, args []arg) (
}
func TestService_UpdateCollectibleInfo(t *testing.T) {
s, e, tM, c, close, _, _ := setupTestService(t)
defer close()
state := setupTestService(t)
defer state.close()
args := []arg{
{5, "0xA2838FDA19EB6EED3F8B9EFF411D4CD7D2DE0313", "0x0D", nil, nil},
@ -119,20 +130,20 @@ func TestService_UpdateCollectibleInfo(t *testing.T) {
{5, "0x3d6afaa395c31fcd391fe3d562e75fe9e8ec7e6a", "", nil, nil},
{5, "0xA2838FDA19EB6EED3F8B9EFF411D4CD7D2DE0313", "0x0F", nil, nil},
}
fromAddresses, toAddresses := insertStubTransfersWithCollectibles(t, s.db, args)
fromAddresses, toAddresses := insertStubTransfersWithCollectibles(t, state.service.db, args)
ch := make(chan walletevent.Event)
sub := e.Subscribe(ch)
sub := state.eventFeed.Subscribe(ch)
// Expect one call for the fungible token
tM.On("LookupTokenIdentity", uint64(5), eth.HexToAddress("0x3d6afaa395c31fcd391fe3d562e75fe9e8ec7e6a"), false).Return(
state.tokenMock.On("LookupTokenIdentity", uint64(5), eth.HexToAddress("0x3d6afaa395c31fcd391fe3d562e75fe9e8ec7e6a"), false).Return(
&token.Token{
ChainID: 5,
Address: eth.HexToAddress("0x3d6afaa395c31fcd391fe3d562e75fe9e8ec7e6a"),
Symbol: "STT",
}, false,
).Once()
c.On("FetchAssetsByCollectibleUniqueID", []thirdparty.CollectibleUniqueID{
state.collectiblesMock.On("FetchAssetsByCollectibleUniqueID", []thirdparty.CollectibleUniqueID{
{
ContractID: thirdparty.ContractID{
ChainID: args[3].chainID,
@ -158,7 +169,7 @@ func TestService_UpdateCollectibleInfo(t *testing.T) {
},
}, nil).Once()
s.FilterActivityAsync(0, append(fromAddresses, toAddresses...), true, allNetworksFilter(), Filter{}, 0, 3)
state.service.FilterActivityAsync(0, append(fromAddresses, toAddresses...), true, allNetworksFilter(), Filter{}, 0, 3)
filterResponseCount := 0
var updates []EntryData
@ -194,8 +205,8 @@ func TestService_UpdateCollectibleInfo(t *testing.T) {
}
func TestService_UpdateCollectibleInfo_Error(t *testing.T) {
s, e, _, c, close, _, _ := setupTestService(t)
defer close()
state := setupTestService(t)
defer state.close()
args := []arg{
{5, "0xA2838FDA19EB6EED3F8B9EFF411D4CD7D2DE0313", "0x762AD3E4934E687F8701F24C7274E5209213FD6208FF952ACEB325D028866949", nil, nil},
@ -203,13 +214,13 @@ func TestService_UpdateCollectibleInfo_Error(t *testing.T) {
}
ch := make(chan walletevent.Event, 4)
sub := e.Subscribe(ch)
sub := state.eventFeed.Subscribe(ch)
fromAddresses, toAddresses := insertStubTransfersWithCollectibles(t, s.db, args)
fromAddresses, toAddresses := insertStubTransfersWithCollectibles(t, state.service.db, args)
c.On("FetchAssetsByCollectibleUniqueID", mock.Anything).Return(nil, thirdparty.ErrChainIDNotSupported).Once()
state.collectiblesMock.On("FetchAssetsByCollectibleUniqueID", mock.Anything).Return(nil, thirdparty.ErrChainIDNotSupported).Once()
s.FilterActivityAsync(0, append(fromAddresses, toAddresses...), true, allNetworksFilter(), Filter{}, 0, 5)
state.service.FilterActivityAsync(0, append(fromAddresses, toAddresses...), true, allNetworksFilter(), Filter{}, 0, 5)
filterResponseCount := 0
updatesCount := 0
@ -239,35 +250,65 @@ func TestService_UpdateCollectibleInfo_Error(t *testing.T) {
sub.Unsubscribe()
}
func TestService_IncrementalFilterUpdate(t *testing.T) {
s, e, tM, _, close, pTx, chainClient := setupTestService(t)
defer close()
func setupTransactions(t *testing.T, state testState, txCount int, testTxs []transactions.TestTxSummary) (allAddresses []eth.Address, pendings []transactions.PendingTransaction, ch chan walletevent.Event, cleanup func()) {
ch = make(chan walletevent.Event, 4)
sub := state.eventFeed.Subscribe(ch)
ch := make(chan walletevent.Event, 4)
sub := e.Subscribe(ch)
defer sub.Unsubscribe()
pendings = transactions.MockTestTransactions(t, state.chainClient, testTxs)
txs, fromTrs, toTrs := transfer.GenerateTestTransfers(t, s.db, 0, 3)
transfer.InsertTestTransfer(t, s.db, txs[0].To, &txs[0])
transfer.InsertTestTransfer(t, s.db, txs[2].To, &txs[2])
txs, fromTrs, toTrs := transfer.GenerateTestTransfers(t, state.service.db, len(pendings), txCount)
for i := range txs {
transfer.InsertTestTransfer(t, state.service.db, txs[i].To, &txs[i])
}
allAddresses := append(fromTrs, toTrs...)
tM.On("LookupTokenIdentity", mock.Anything, eth.HexToAddress("0x0"), true).Return(
allAddresses = append(append(fromTrs, toTrs...), pendings[0].From, pendings[0].To)
state.tokenMock.On("LookupTokenIdentity", mock.Anything, mock.Anything, mock.Anything).Return(
&token.Token{
ChainID: 5,
Address: eth.HexToAddress("0x0"),
Address: eth.Address{},
Symbol: "ETH",
}, false,
).Times(2)
}, true,
).Times(0)
sessionID := s.StartFilterSession(allAddresses, true, allNetworksFilter(), Filter{}, 5)
require.Greater(t, sessionID, SessionID(0))
defer s.StopFilterSession(sessionID)
state.tokenMock.On("LookupToken", mock.Anything, mock.Anything).Return(
&token.Token{
ChainID: 5,
Address: eth.Address{},
Symbol: "ETH",
}, true,
).Times(0)
var filterResponseCount int
return allAddresses, pendings, ch, func() {
sub.Unsubscribe()
}
}
for i := 0; i < 1; i++ {
func validateSessionUpdateEvent(t *testing.T, ch chan walletevent.Event, filterResponseCount *int) (pendingTransactionUpdate, sessionUpdatesCount int) {
for sessionUpdatesCount < 1 {
select {
case res := <-ch:
switch res.Type {
case transactions.EventPendingTransactionUpdate:
pendingTransactionUpdate++
case EventActivitySessionUpdated:
var payload SessionUpdate
err := json.Unmarshal([]byte(res.Message), &payload)
require.NoError(t, err)
require.NotNil(t, payload.HasNewEntries)
require.True(t, *payload.HasNewEntries)
sessionUpdatesCount++
case EventActivityFilteringDone:
(*filterResponseCount)++
}
case <-time.NewTimer(1 * time.Second).C:
require.Fail(t, "timeout while waiting for EventActivitySessionUpdated")
}
}
return
}
func validateSessionUpdateEventWithPending(t *testing.T, ch chan walletevent.Event) (filterResponseCount int) {
for filterResponseCount < 1 {
select {
case res := <-ch:
switch res.Type {
@ -283,30 +324,49 @@ func TestService_IncrementalFilterUpdate(t *testing.T) {
require.Fail(t, "timeout while waiting for EventActivityFilteringDone")
}
}
return
}
pendings := transactions.MockTestTransactions(t, chainClient, []transactions.TestTxSummary{{}})
func TestService_IncrementalUpdateOnTop(t *testing.T) {
state := setupTestService(t)
defer state.close()
err := pTx.StoreAndTrackPendingTx(&pendings[0])
allAddresses, pendings, ch, cleanup := setupTransactions(t, state, 2, []transactions.TestTxSummary{{DontConfirm: true, Timestamp: 3}})
defer cleanup()
sessionID := state.service.StartFilterSession(allAddresses, true, allNetworksFilter(), Filter{}, 5)
require.Greater(t, sessionID, SessionID(0))
defer state.service.StopFilterSession(sessionID)
filterResponseCount := validateSessionUpdateEventWithPending(t, ch)
exp := pendings[0]
err := state.pendingTracker.StoreAndTrackPendingTx(&exp)
require.NoError(t, err)
pendingTransactionUpdate, sessionUpdatesCount := 0, 0
// Validate the session update event
for sessionUpdatesCount < 1 {
pendingTransactionUpdate, sessionUpdatesCount := validateSessionUpdateEvent(t, ch, &filterResponseCount)
err = state.service.ResetFilterSession(sessionID, 5)
require.NoError(t, err)
// Validate the reset data
eventActivityDoneCount := 0
for eventActivityDoneCount < 1 {
select {
case res := <-ch:
switch res.Type {
case transactions.EventPendingTransactionUpdate:
pendingTransactionUpdate++
case EventActivitySessionUpdated:
var payload SessionUpdate
case EventActivityFilteringDone:
var payload FilterResponse
err := json.Unmarshal([]byte(res.Message), &payload)
require.NoError(t, err)
require.Equal(t, 1, len(payload.NewEntries))
tx := payload.NewEntries[0]
exp := pendings[0]
// TODO #12120: this should be a multi-transaction
// require.Equal(t, exp.MultiTransactionID, tx.id)
require.Equal(t, ErrorCodeSuccess, payload.ErrorCode)
require.Equal(t, 3, len(payload.Activities))
require.True(t, payload.Activities[0].isNew)
require.False(t, payload.Activities[1].isNew)
require.False(t, payload.Activities[2].isNew)
tx := payload.Activities[0]
require.Equal(t, PendingTransactionPT, tx.payloadType)
// We don't keep type in the DB
require.Equal(t, (*int)(nil), tx.transferType)
@ -316,28 +376,79 @@ func TestService_IncrementalFilterUpdate(t *testing.T) {
require.Equal(t, exp.ChainID, *tx.chainIDOut)
require.Equal(t, (*common.ChainID)(nil), tx.chainIDIn)
require.Equal(t, exp.Hash, tx.transaction.Hash)
require.Equal(t, exp.From, tx.transaction.Address)
// Pending doesn't have address as part of identity
require.Equal(t, eth.Address{}, tx.transaction.Address)
require.Equal(t, exp.From, *tx.sender)
require.Equal(t, exp.To, *tx.recipient)
require.Equal(t, 0, exp.Value.Int.Cmp((*big.Int)(tx.amountOut)))
require.Equal(t, exp.Timestamp, uint64(tx.timestamp))
require.Equal(t, exp.Symbol, *tx.symbolOut)
require.Equal(t, (*string)(nil), tx.symbolIn)
require.Equal(t, (*Token)(nil), tx.tokenOut)
require.Equal(t, &Token{
TokenType: Native,
ChainID: 5,
}, tx.tokenOut)
require.Equal(t, (*Token)(nil), tx.tokenIn)
require.Equal(t, (*eth.Address)(nil), tx.contractAddress)
sessionUpdatesCount++
case EventActivityFilteringDone:
filterResponseCount++
eventActivityDoneCount++
}
case <-time.NewTimer(1 * time.Second).C:
require.Fail(t, "timeout while waiting for EventActivitySessionUpdated")
}
}
// Don't wait for deletion
require.Equal(t, 1, pendingTransactionUpdate)
require.Equal(t, 1, filterResponseCount)
require.Equal(t, 1, sessionUpdatesCount)
require.Equal(t, 1, eventActivityDoneCount)
}
func TestService_IncrementalUpdateFetchWindowRegression(t *testing.T) {
state := setupTestService(t)
defer state.close()
allAddresses, pendings, ch, cleanup := setupTransactions(t, state, 3, []transactions.TestTxSummary{{DontConfirm: true, Timestamp: 4}})
defer cleanup()
sessionID := state.service.StartFilterSession(allAddresses, true, allNetworksFilter(), Filter{}, 2)
require.Greater(t, sessionID, SessionID(0))
defer state.service.StopFilterSession(sessionID)
filterResponseCount := validateSessionUpdateEventWithPending(t, ch)
exp := pendings[0]
err := state.pendingTracker.StoreAndTrackPendingTx(&exp)
require.NoError(t, err)
pendingTransactionUpdate, sessionUpdatesCount := validateSessionUpdateEvent(t, ch, &filterResponseCount)
err = state.service.ResetFilterSession(sessionID, 2)
require.NoError(t, err)
// Validate the reset data
eventActivityDoneCount := 0
for eventActivityDoneCount < 1 {
select {
case res := <-ch:
switch res.Type {
case EventActivityFilteringDone:
var payload FilterResponse
err := json.Unmarshal([]byte(res.Message), &payload)
require.NoError(t, err)
require.Equal(t, ErrorCodeSuccess, payload.ErrorCode)
require.Equal(t, 2, len(payload.Activities))
require.True(t, payload.Activities[0].isNew)
require.False(t, payload.Activities[1].isNew)
eventActivityDoneCount++
}
case <-time.NewTimer(1 * time.Second).C:
require.Fail(t, "timeout while waiting for EventActivitySessionUpdated")
}
}
require.Equal(t, 1, pendingTransactionUpdate)
require.Equal(t, 1, filterResponseCount)
require.Equal(t, 1, sessionUpdatesCount)
require.Equal(t, 1, eventActivityDoneCount)
}

View File

@ -2,13 +2,12 @@ package activity
import (
"context"
"encoding/json"
"errors"
"strconv"
"golang.org/x/exp/slices"
eth "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/status-im/status-go/services/wallet/async"
@ -18,12 +17,28 @@ import (
"github.com/status-im/status-go/transactions"
)
const nilStr = "nil"
type EntryIdentity struct {
payloadType PayloadType
transaction *transfer.TransactionIdentity
id transfer.MultiTransactionIDType
}
// func (e EntryIdentity) same(a EntryIdentity) bool {
// return a.payloadType == e.payloadType && (a.transaction == e.transaction && (a.transaction == nil || (a.transaction.ChainID == e.transaction.ChainID &&
// a.transaction.Hash == e.transaction.Hash &&
// a.transaction.Address == e.transaction.Address))) && a.id == e.id
// }
func (e EntryIdentity) key() string {
txID := nilStr
if e.transaction != nil {
txID = strconv.FormatUint(uint64(e.transaction.ChainID), 10) + e.transaction.Hash.Hex() + e.transaction.Address.Hex()
}
return strconv.Itoa(e.payloadType) + txID + strconv.FormatInt(int64(e.id), 16)
}
type SessionID int32
type Session struct {
@ -36,16 +51,17 @@ type Session struct {
chainIDs []common.ChainID
filter Filter
// model is a mirror of the data model presentation has (EventActivityFilteringDone)
// model is a mirror of the data model presentation has (sent by EventActivityFilteringDone)
model []EntryIdentity
// new holds the new entries until user requests update
new []EntryIdentity
}
// SessionUpdate payload for EventActivitySessionUpdated
type SessionUpdate struct {
// TODO #12120: add index for each entry, now all new are first entries
NewEntries []Entry `json:"newEntries,omitempty"`
Removed []EntryIdentity `json:"removed,omitempty"`
Updated []Entry `json:"updated,omitempty"`
HasNewEntries *bool `json:"hasNewEntries,omitempty"`
Removed []EntryIdentity `json:"removed,omitempty"`
Updated []Entry `json:"updated,omitempty"`
}
type fullFilterParams struct {
@ -99,7 +115,7 @@ func (s *Service) StartFilterSession(addresses []eth.Address, allAddresses bool,
s.sessionsRWMutex.Lock()
subscribeToEvents := len(s.sessions) == 0
s.sessions[sessionID] = &Session{
session := &Session{
id: sessionID,
addresses: addresses,
@ -109,6 +125,8 @@ func (s *Service) StartFilterSession(addresses []eth.Address, allAddresses bool,
model: make([]EntryIdentity, 0, firstPageCount),
}
s.sessions[sessionID] = session
if subscribeToEvents {
s.subscribeToEvents()
}
@ -124,22 +142,57 @@ func (s *Service) StartFilterSession(addresses []eth.Address, allAddresses bool,
// Mirror identities for update use
s.sessionsRWMutex.Lock()
defer s.sessionsRWMutex.Unlock()
session, ok := s.sessions[sessionID]
if ok {
session.model = make([]EntryIdentity, 0, len(entries))
for _, a := range entries {
session.model = append(session.model, EntryIdentity{
payloadType: a.payloadType,
transaction: a.transaction,
id: a.id,
})
}
session.model = make([]EntryIdentity, 0, len(entries))
for _, a := range entries {
session.model = append(session.model, EntryIdentity{
payloadType: a.payloadType,
transaction: a.transaction,
id: a.id,
})
}
})
return sessionID
}
func (s *Service) ResetFilterSession(id SessionID, firstPageCount int) error {
session, found := s.sessions[id]
if !found {
return errors.New("session not found")
}
s.internalFilter(fullFilterParams{
sessionID: id,
addresses: session.addresses,
allAddresses: session.allAddresses,
chainIDs: session.chainIDs,
filter: session.filter,
}, 0, firstPageCount, func(entries []Entry) {
s.sessionsRWMutex.Lock()
defer s.sessionsRWMutex.Unlock()
// Mark new entries
newMap := entryIdsToMap(session.new)
for i, a := range entries {
_, isNew := newMap[a.getIdentity().key()]
entries[i].isNew = isNew
}
session.new = nil
// Mirror client identities for checking updates
session.model = make([]EntryIdentity, 0, len(entries))
for _, a := range entries {
session.model = append(session.model, EntryIdentity{
payloadType: a.payloadType,
transaction: a.transaction,
id: a.id,
})
}
})
return nil
}
// TODO #12120: extend the session based API
//func (s *Service) GetMoreForFilterSession(count int) {}
@ -150,127 +203,137 @@ func (s *Service) subscribeToEvents() {
go s.processEvents()
}
// func (s *Service) processEvents() {
// for event := range s.ch {
// if event.Type == transactions.EventPendingTransactionUpdate {
// var p transactions.PendingTxUpdatePayload
// err := json.Unmarshal([]byte(event.Message), &p)
// if err != nil {
// log.Error("Error unmarshalling PendingTxUpdatePayload", "error", err)
// continue
// }
// for id := range s.sessions {
// s.sessionsRWMutex.RLock()
// pTx, pass := s.checkFilterForPending(s.sessions[id], p.TxIdentity)
// if pass {
// s.sessionsRWMutex.RUnlock()
// s.sessionsRWMutex.Lock()
// addOnTop(s.sessions[id], p.TxIdentity)
// s.sessionsRWMutex.Unlock()
// // TODO #12120: can't send events from an event handler
// go notify(s.eventFeed, id, *pTx)
// } else {
// s.sessionsRWMutex.RUnlock()
// }
// }
// }
// }
// }
// TODO #12120: check that it exits on channel close
func (s *Service) processEvents() {
for event := range s.ch {
// TODO #12120: process rest of the events
// TODO #12120: debounce for 1s
if event.Type == transactions.EventPendingTransactionUpdate {
var p transactions.PendingTxUpdatePayload
err := json.Unmarshal([]byte(event.Message), &p)
if err != nil {
log.Error("Error unmarshalling PendingTxUpdatePayload", "error", err)
continue
}
for id := range s.sessions {
s.sessionsRWMutex.RLock()
pTx, pass := s.checkFilterForPending(s.sessions[id], p.TxIdentity)
if pass {
s.sessionsRWMutex.RUnlock()
s.sessionsRWMutex.Lock()
addOnTop(s.sessions[id], p.TxIdentity)
s.sessionsRWMutex.Unlock()
// TODO #12120: can't send events from an event handler
go notify(s.eventFeed, id, *pTx)
} else {
s.sessionsRWMutex.RUnlock()
for sessionID := range s.sessions {
session := s.sessions[sessionID]
activities, err := getActivityEntries(context.Background(), s.getDeps(), session.addresses, session.allAddresses, session.chainIDs, session.filter, 0, len(session.model))
if err != nil {
log.Error("Error getting activity entries", "error", err)
continue
}
s.sessionsRWMutex.RLock()
allData := append(session.model, session.new...)
new, _ /*removed*/ := findUpdates(allData, activities)
s.sessionsRWMutex.RUnlock()
s.sessionsRWMutex.Lock()
lastProcessed := -1
for i, idRes := range new {
if i-lastProcessed > 1 {
// The events are not continuous, therefore these are not on top but mixed between existing entries
break
}
lastProcessed = idRes.newPos
// TODO #12120: make it more generic to follow the detection function
// TODO #12120: hold the first few and only send mixed and removed
if session.new == nil {
session.new = make([]EntryIdentity, 0, len(new))
}
session.new = append(session.new, idRes.id)
}
// TODO #12120: mixed
s.sessionsRWMutex.Unlock()
go notify(s.eventFeed, sessionID, len(session.new) > 0)
}
}
}
}
// checkFilterForPending should be called with sessionsRWMutex locked for reading
func (s *Service) checkFilterForPending(session *Session, id transactions.TxIdentity) (tr *transactions.PendingTransaction, pass bool) {
allChains := len(session.chainIDs) == 0
if !allChains {
_, found := slices.BinarySearch(session.chainIDs, id.ChainID)
if !found {
return nil, false
}
}
// // checkFilterForPending should be called with sessionsRWMutex locked for reading
// func (s *Service) checkFilterForPending(session *Session, id transactions.TxIdentity) (tr *transactions.PendingTransaction, pass bool) {
// allChains := len(session.chainIDs) == 0
// if !allChains {
// _, found := slices.BinarySearch(session.chainIDs, id.ChainID)
// if !found {
// return nil, false
// }
// }
tr, err := s.pendingTracker.GetPendingEntry(id.ChainID, id.Hash)
if err != nil {
log.Error("Error getting pending entry", "error", err)
return nil, false
}
// tr, err := s.pendingTracker.GetPendingEntry(id.ChainID, id.Hash)
// if err != nil {
// log.Error("Error getting pending entry", "error", err)
// return nil, false
// }
if !session.allAddresses {
_, found := slices.BinarySearchFunc(session.addresses, tr.From, func(a eth.Address, b eth.Address) int {
// TODO #12120: optimize this
if a.Hex() < b.Hex() {
return -1
}
if a.Hex() > b.Hex() {
return 1
}
return 0
})
if !found {
return nil, false
}
}
// if !session.allAddresses {
// _, found := slices.BinarySearchFunc(session.addresses, tr.From, func(a eth.Address, b eth.Address) int {
// // TODO #12120: optimize this
// if a.Hex() < b.Hex() {
// return -1
// }
// if a.Hex() > b.Hex() {
// return 1
// }
// return 0
// })
// if !found {
// return nil, false
// }
// }
fl := session.filter
if fl.Period.StartTimestamp != NoLimitTimestampForPeriod || fl.Period.EndTimestamp != NoLimitTimestampForPeriod {
ts := int64(tr.Timestamp)
if ts < fl.Period.StartTimestamp || ts > fl.Period.EndTimestamp {
return nil, false
}
}
// fl := session.filter
// if fl.Period.StartTimestamp != NoLimitTimestampForPeriod || fl.Period.EndTimestamp != NoLimitTimestampForPeriod {
// ts := int64(tr.Timestamp)
// if ts < fl.Period.StartTimestamp || ts > fl.Period.EndTimestamp {
// return nil, false
// }
// }
// TODO #12120 check filter
// Types []Type `json:"types"`
// Statuses []Status `json:"statuses"`
// CounterpartyAddresses []eth.Address `json:"counterpartyAddresses"`
// // TODO #12120 check filter
// // Types []Type `json:"types"`
// // Statuses []Status `json:"statuses"`
// // CounterpartyAddresses []eth.Address `json:"counterpartyAddresses"`
// // Tokens
// Assets []Token `json:"assets"`
// Collectibles []Token `json:"collectibles"`
// FilterOutAssets bool `json:"filterOutAssets"`
// FilterOutCollectibles bool `json:"filterOutCollectibles"`
// // // Tokens
// // Assets []Token `json:"assets"`
// // Collectibles []Token `json:"collectibles"`
// // FilterOutAssets bool `json:"filterOutAssets"`
// // FilterOutCollectibles bool `json:"filterOutCollectibles"`
return tr, true
}
// return tr, true
// }
// addOnTop should be called with sessionsRWMutex locked for writing
func addOnTop(session *Session, id transactions.TxIdentity) {
session.model = append([]EntryIdentity{{
payloadType: PendingTransactionPT,
transaction: &transfer.TransactionIdentity{
ChainID: id.ChainID,
Hash: id.Hash,
},
}}, session.model...)
}
func notify(eventFeed *event.Feed, id SessionID, tx transactions.PendingTransaction) {
payload := SessionUpdate{
NewEntries: []Entry{
{
payloadType: PendingTransactionPT,
transaction: &transfer.TransactionIdentity{
ChainID: tx.ChainID,
Hash: tx.Hash,
Address: tx.From,
},
id: transfer.NoMultiTransactionID,
timestamp: int64(tx.Timestamp),
activityType: SendAT,
activityStatus: PendingAS,
amountOut: (*hexutil.Big)(tx.Value.Int),
amountIn: nil,
tokenOut: nil,
tokenIn: nil,
symbolOut: &tx.Symbol,
symbolIn: nil,
sender: &tx.From,
recipient: &tx.To,
chainIDOut: &tx.ChainID,
chainIDIn: nil,
transferType: nil,
contractAddress: nil,
},
},
func notify(eventFeed *event.Feed, id SessionID, hasNewEntries bool) {
payload := SessionUpdate{}
if hasNewEntries {
payload.HasNewEntries = &hasNewEntries
}
sendResponseEvent(eventFeed, (*int32)(&id), EventActivitySessionUpdated, payload, nil)
@ -312,3 +375,59 @@ func (s *Service) getActivityDetailsAsync(requestID int32, entries []Entry) {
}
}()
}
type mixedIdentityResult struct {
newPos int
id EntryIdentity
}
func entryIdsToMap(ids []EntryIdentity) map[string]EntryIdentity {
idsMap := make(map[string]EntryIdentity, len(ids))
for _, id := range ids {
idsMap[id.key()] = id
}
return idsMap
}
func entriesToMap(entries []Entry) map[string]Entry {
entryMap := make(map[string]Entry, len(entries))
for _, entry := range entries {
updatedIdentity := entry.getIdentity()
entryMap[updatedIdentity.key()] = entry
}
return entryMap
}
// FindUpdates returns changes in updated entries compared to the identities
//
// expects identities and entries to be sorted by timestamp
//
// the returned newer are entries that are newer than the first identity
// the returned mixed are entries that are older than the first identity (sorted by timestamp)
// the returned removed are identities that are not present in the updated entries (sorted by timestamp)
//
// implementation assumes the order of each identity doesn't change from old state (identities) and new state (updated); we have either add or removed.
func findUpdates(identities []EntryIdentity, updated []Entry) (new []mixedIdentityResult, removed []EntryIdentity) {
idsMap := entryIdsToMap(identities)
updatedMap := entriesToMap(updated)
for newIndex, entry := range updated {
id := entry.getIdentity()
if _, found := idsMap[id.key()]; !found {
new = append(new, mixedIdentityResult{
newPos: newIndex,
id: id,
})
}
}
// Account for new entries
for i := 0; i < len(identities); i++ {
id := identities[i]
if _, found := updatedMap[id.key()]; !found {
removed = append(removed, id)
}
}
return
}

View File

@ -0,0 +1,107 @@
package activity
import (
"reflect"
"testing"
"github.com/ethereum/go-ethereum/common"
"github.com/status-im/status-go/services/wallet/transfer"
)
// TODO #12120: cover missing cases
func TestFindUpdates(t *testing.T) {
txIds := []transfer.TransactionIdentity{
transfer.TransactionIdentity{
ChainID: 1,
Hash: common.HexToHash("0x1234"),
Address: common.HexToAddress("0x1234"),
},
}
type findUpdatesResult struct {
new []mixedIdentityResult
removed []EntryIdentity
}
tests := []struct {
name string
identities []EntryIdentity
updated []Entry
want findUpdatesResult
}{
{
name: "Empty to single MT update",
identities: []EntryIdentity{},
updated: []Entry{
{payloadType: MultiTransactionPT, id: 1},
},
want: findUpdatesResult{
new: []mixedIdentityResult{{0, EntryIdentity{payloadType: MultiTransactionPT, id: 1}}},
},
},
{
name: "No updates",
identities: []EntryIdentity{
EntryIdentity{
payloadType: SimpleTransactionPT, transaction: &txIds[0],
},
},
updated: []Entry{
{payloadType: SimpleTransactionPT, transaction: &txIds[0]},
},
want: findUpdatesResult{},
},
{
name: "Empty to mixed updates",
identities: []EntryIdentity{},
updated: []Entry{
{payloadType: MultiTransactionPT, id: 1},
{payloadType: PendingTransactionPT, transaction: &txIds[0]},
},
want: findUpdatesResult{
new: []mixedIdentityResult{{0, EntryIdentity{payloadType: MultiTransactionPT, id: 1}},
{1, EntryIdentity{payloadType: PendingTransactionPT, transaction: &txIds[0]}},
},
},
},
{
name: "Add one on top of one",
identities: []EntryIdentity{
EntryIdentity{
payloadType: MultiTransactionPT, id: 1,
},
},
updated: []Entry{
{payloadType: PendingTransactionPT, transaction: &txIds[0]},
{payloadType: MultiTransactionPT, id: 1},
},
want: findUpdatesResult{
new: []mixedIdentityResult{{0, EntryIdentity{payloadType: PendingTransactionPT, transaction: &txIds[0]}}},
},
},
{
name: "Add one on top keep window",
identities: []EntryIdentity{
EntryIdentity{payloadType: MultiTransactionPT, id: 1},
EntryIdentity{payloadType: PendingTransactionPT, transaction: &txIds[0]},
},
updated: []Entry{
{payloadType: MultiTransactionPT, id: 2},
{payloadType: MultiTransactionPT, id: 1},
},
want: findUpdatesResult{
new: []mixedIdentityResult{{0, EntryIdentity{payloadType: MultiTransactionPT, id: 2}}},
removed: []EntryIdentity{EntryIdentity{payloadType: PendingTransactionPT, transaction: &txIds[0]}},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
gotNew, gotRemoved := findUpdates(tt.identities, tt.updated)
if !reflect.DeepEqual(gotNew, tt.want.new) || !reflect.DeepEqual(gotRemoved, tt.want.removed) {
t.Errorf("findUpdates() = %v, %v, want %v, %v", gotNew, gotRemoved, tt.want.new, tt.want.removed)
}
})
}
}

View File

@ -602,6 +602,12 @@ func (api *API) StartActivityFilterSession(addresses []common.Address, allAddres
return api.s.activity.StartFilterSession(addresses, allAddresses, chainIDs, filter, firstPageCount), nil
}
func (api *API) ResetFilterSession(id activity.SessionID, firstPageCount int) error {
log.Debug("wallet.api.ResetFilterSession", "id", id, "firstPageCount", firstPageCount)
return api.s.activity.ResetFilterSession(id, firstPageCount)
}
func (api *API) StopActivityFilterSession(id activity.SessionID) {
log.Debug("wallet.api.StopActivityFilterSession", "id", id)

View File

@ -241,6 +241,15 @@ var TestTokens = []*token.Token{
&EthMainnet, &EthGoerli, &EthOptimism, &UsdcMainnet, &UsdcGoerli, &UsdcOptimism, &SntMainnet, &DaiMainnet, &DaiGoerli,
}
func LookupTokenIdentity(chainID uint64, address eth_common.Address, native bool) *token.Token {
for _, token := range TestTokens {
if token.ChainID == chainID && token.Address == address && token.IsNative() == native {
return token
}
}
return nil
}
var NativeTokenIndices = []int{0, 1, 2}
func InsertTestTransfer(tb testing.TB, db *sql.DB, address eth_common.Address, tr *TestTransfer) {

View File

@ -138,7 +138,7 @@ func TestPendingTxTracker_InterruptWatching(t *testing.T) {
m, stop, chainClient, eventFeed := setupTestTransactionDB(t, nil)
defer stop()
txs := GenerateTestPendingTransactions(2)
txs := GenerateTestPendingTransactions(0, 2)
// Mock the first call to getTransactionByHash
chainClient.SetAvailableClients([]common.ChainID{txs[0].ChainID})
@ -259,7 +259,7 @@ func TestPendingTxTracker_MultipleClients(t *testing.T) {
m, stop, chainClient, eventFeed := setupTestTransactionDB(t, nil)
defer stop()
txs := GenerateTestPendingTransactions(2)
txs := GenerateTestPendingTransactions(0, 2)
txs[1].ChainID++
// Mock the both clients to be available
@ -344,7 +344,7 @@ func TestPendingTxTracker_Watch(t *testing.T) {
m, stop, chainClient, eventFeed := setupTestTransactionDB(t, nil)
defer stop()
txs := GenerateTestPendingTransactions(2)
txs := GenerateTestPendingTransactions(0, 2)
// Make the second already confirmed
*txs[0].Status = Success
@ -428,7 +428,7 @@ func TestPendingTxTracker_Watch_StatusChangeIncrementally(t *testing.T) {
m, stop, chainClient, eventFeed := setupTestTransactionDB(t, common.NewAndSet(1*time.Nanosecond))
defer stop()
txs := GenerateTestPendingTransactions(2)
txs := GenerateTestPendingTransactions(0, 2)
var firsDoneWG sync.WaitGroup
firsDoneWG.Add(1)
@ -544,7 +544,7 @@ func TestPendingTransactions(t *testing.T) {
manager, stop, _, _ := setupTestTransactionDB(t, nil)
defer stop()
tx := GenerateTestPendingTransactions(1)[0]
tx := GenerateTestPendingTransactions(0, 1)[0]
rst, err := manager.GetAllPending()
require.NoError(t, err)

View File

@ -54,27 +54,27 @@ func (m *MockChainClient) AbstractEthClient(chainID common.ChainID) (chain.Batch
return m.Clients[chainID], nil
}
func GenerateTestPendingTransactions(count int) []PendingTransaction {
func GenerateTestPendingTransactions(start int, count int) []PendingTransaction {
if count > 127 {
panic("can't generate more than 127 distinct transactions")
}
txs := make([]PendingTransaction, count)
for i := 0; i < count; i++ {
// Avoid generating zero values hash and addresses
seed := i + 1
for i := start; i < count; i++ {
txs[i] = PendingTransaction{
Hash: eth.Hash{byte(seed)},
From: eth.Address{byte(seed)},
To: eth.Address{byte(seed * 2)},
Hash: eth.HexToHash(fmt.Sprintf("0x1%d", i)),
From: eth.HexToAddress(fmt.Sprintf("0x2%d", i)),
To: eth.HexToAddress(fmt.Sprintf("0x3%d", i)),
Type: RegisterENS,
AdditionalData: "someuser.stateofus.eth",
Value: bigint.BigInt{Int: big.NewInt(int64(seed))},
Value: bigint.BigInt{Int: big.NewInt(int64(i))},
GasLimit: bigint.BigInt{Int: big.NewInt(21000)},
GasPrice: bigint.BigInt{Int: big.NewInt(int64(seed))},
GasPrice: bigint.BigInt{Int: big.NewInt(int64(i))},
ChainID: 777,
Status: new(TxStatus),
AutoDelete: new(bool),
Symbol: "ETH",
Timestamp: uint64(i),
}
*txs[i].Status = Pending // set to pending by default
*txs[i].AutoDelete = true // set to true by default
@ -84,40 +84,56 @@ func GenerateTestPendingTransactions(count int) []PendingTransaction {
type TestTxSummary struct {
failStatus bool
dontConfirm bool
DontConfirm bool
// Timestamp will be used to mock the Timestamp if greater than 0
Timestamp int
}
func MockTestTransactions(t *testing.T, chainClient *MockChainClient, testTxs []TestTxSummary) []PendingTransaction {
txs := GenerateTestPendingTransactions(len(testTxs))
txs := GenerateTestPendingTransactions(0, len(testTxs))
// Mock the first call to getTransactionByHash
chainClient.SetAvailableClients([]common.ChainID{txs[0].ChainID})
cl := chainClient.Clients[txs[0].ChainID]
cl.On("BatchCallContext", mock.Anything, mock.MatchedBy(func(b []rpc.BatchElem) bool {
ok := len(b) == len(testTxs)
for i := range b {
ok = ok && b[i].Method == GetTransactionReceiptRPCName && b[i].Args[0] == txs[0].Hash
for txIdx := range txs {
tx := &txs[txIdx]
if testTxs[txIdx].Timestamp > 0 {
tx.Timestamp = uint64(testTxs[txIdx].Timestamp)
}
return ok
})).Return(nil).Once().Run(func(args mock.Arguments) {
elems := args.Get(1).([]rpc.BatchElem)
for i := range elems {
receiptWrapper, ok := elems[i].Result.(*nullableReceipt)
require.True(t, ok)
require.NotNil(t, receiptWrapper)
// Simulate parsing of eth_getTransactionReceipt response
if !testTxs[i].dontConfirm {
status := types.ReceiptStatusSuccessful
if testTxs[i].failStatus {
status = types.ReceiptStatusFailed
}
receiptWrapper.Receipt = &types.Receipt{
BlockNumber: new(big.Int).SetUint64(1),
Status: status,
// Mock the first call to getTransactionByHash
chainClient.SetAvailableClients([]common.ChainID{tx.ChainID})
cl := chainClient.Clients[tx.ChainID]
call := cl.On("BatchCallContext", mock.Anything, mock.MatchedBy(func(b []rpc.BatchElem) bool {
ok := len(b) == len(testTxs)
for i := range b {
ok = ok && b[i].Method == GetTransactionReceiptRPCName && b[i].Args[0] == tx.Hash
}
return ok
})).Return(nil)
if testTxs[txIdx].DontConfirm {
call = call.Times(0)
} else {
call = call.Once()
}
call.Run(func(args mock.Arguments) {
elems := args.Get(1).([]rpc.BatchElem)
for i := range elems {
receiptWrapper, ok := elems[i].Result.(*nullableReceipt)
require.True(t, ok)
require.NotNil(t, receiptWrapper)
// Simulate parsing of eth_getTransactionReceipt response
if !testTxs[i].DontConfirm {
status := types.ReceiptStatusSuccessful
if testTxs[i].failStatus {
status = types.ReceiptStatusFailed
}
receiptWrapper.Receipt = &types.Receipt{
BlockNumber: new(big.Int).SetUint64(1),
Status: status,
}
}
}
}
})
})
}
return txs
}