From 3466ac2661bb19cd0eaa27761551de3b4d31b393 Mon Sep 17 00:00:00 2001 From: dlipicar Date: Mon, 25 Nov 2024 17:44:39 -0300 Subject: [PATCH] feat!: implement new activityV2 filter (#6102) * feat!: implement new activityV2 filter * chore_: pr comments --- services/wallet/activity/activity.go | 135 ++-- services/wallet/activity/activity_v2.go | 323 +++++++++ services/wallet/activity/errors.go | 9 + services/wallet/activity/filter.go | 4 +- services/wallet/activity/service.go | 9 +- services/wallet/activity/service_test.go | 281 -------- services/wallet/activity/session.go | 485 +------------- services/wallet/activity/session_service.go | 623 ++++++++++++++++++ .../wallet/activity/session_service_test.go | 339 ++++++++++ services/wallet/api.go | 12 +- services/wallet/routeexecution/manager.go | 16 +- services/wallet/service.go | 2 +- .../transfer/transaction_manager_route.go | 2 + tests-functional/clients/signals.py | 45 +- .../tests/test_wallet_activity_session.py | 90 +++ tests-functional/wallet_utils.py | 131 ++++ transactions/testhelpers.go | 6 + ...virtual_columns_to_route_input_data.up.sql | 5 + 18 files changed, 1693 insertions(+), 824 deletions(-) create mode 100644 services/wallet/activity/activity_v2.go create mode 100644 services/wallet/activity/errors.go create mode 100644 services/wallet/activity/session_service.go create mode 100644 services/wallet/activity/session_service_test.go create mode 100644 tests-functional/tests/test_wallet_activity_session.py create mode 100644 tests-functional/wallet_utils.py create mode 100644 walletdatabase/migrations/sql/1730807123_add_from_to_address_virtual_columns_to_route_input_data.up.sql diff --git a/services/wallet/activity/activity.go b/services/wallet/activity/activity.go index f5f8e8a55..6048aa50c 100644 --- a/services/wallet/activity/activity.go +++ b/services/wallet/activity/activity.go @@ -40,7 +40,7 @@ var ( ZeroAddress = eth.Address{} ) -type TransferType = int +type TransferType = int64 const ( TransferTypeEth TransferType = iota + 1 @@ -49,51 +49,66 @@ const ( TransferTypeErc1155 ) +const ( + L1FinalizationDuration = 960 // A block on layer 1 is every 12s, finalization require 64 blocks. A buffer of 16 blocks is added to not create false positives. + L2FinalizationDuration = 648000 // 7.5 days in seconds for layer 2 finalization. 0.5 day is buffer to not create false positive. +) + +const ( + NoLimit = 0 +) + type Entry struct { - payloadType PayloadType - transaction *transfer.TransactionIdentity - id common.MultiTransactionIDType - timestamp int64 - activityType Type - activityStatus Status - amountOut *hexutil.Big // Used for activityType SendAT, SwapAT, BridgeAT - amountIn *hexutil.Big // Used for activityType ReceiveAT, BuyAT, SwapAT, BridgeAT, ApproveAT - tokenOut *Token // Used for activityType SendAT, SwapAT, BridgeAT - tokenIn *Token // Used for activityType ReceiveAT, BuyAT, SwapAT, BridgeAT, ApproveAT - symbolOut *string - symbolIn *string - sender *eth.Address - recipient *eth.Address - chainIDOut *common.ChainID - chainIDIn *common.ChainID - transferType *TransferType - contractAddress *eth.Address - communityID *string + payloadType PayloadType + transaction *transfer.TransactionIdentity // ID for SimpleTransactionPT and PendingTransactionPT. Origin transaction for MultiTransactionPT + id common.MultiTransactionIDType // ID for MultiTransactionPT + transactions []*transfer.TransactionIdentity // List of transactions for MultiTransactionPT + timestamp int64 + activityType Type + activityStatus Status + amountOut *hexutil.Big // Used for activityType SendAT, SwapAT, BridgeAT + amountIn *hexutil.Big // Used for activityType ReceiveAT, BuyAT, SwapAT, BridgeAT, ApproveAT + tokenOut *Token // Used for activityType SendAT, SwapAT, BridgeAT + tokenIn *Token // Used for activityType ReceiveAT, BuyAT, SwapAT, BridgeAT, ApproveAT + symbolOut *string + symbolIn *string + sender *eth.Address + recipient *eth.Address + chainIDOut *common.ChainID + chainIDIn *common.ChainID + transferType *TransferType + contractAddress *eth.Address // Used for contract deployment + communityID *string + interactedContractAddress *eth.Address + approvalSpender *eth.Address isNew bool // isNew is used to indicate if the entry is newer than session start (changed state also) } // Only used for JSON marshalling type EntryData struct { - PayloadType PayloadType `json:"payloadType"` - Transaction *transfer.TransactionIdentity `json:"transaction,omitempty"` - ID *common.MultiTransactionIDType `json:"id,omitempty"` - Timestamp *int64 `json:"timestamp,omitempty"` - ActivityType *Type `json:"activityType,omitempty"` - ActivityStatus *Status `json:"activityStatus,omitempty"` - AmountOut *hexutil.Big `json:"amountOut,omitempty"` - AmountIn *hexutil.Big `json:"amountIn,omitempty"` - TokenOut *Token `json:"tokenOut,omitempty"` - TokenIn *Token `json:"tokenIn,omitempty"` - SymbolOut *string `json:"symbolOut,omitempty"` - SymbolIn *string `json:"symbolIn,omitempty"` - Sender *eth.Address `json:"sender,omitempty"` - Recipient *eth.Address `json:"recipient,omitempty"` - ChainIDOut *common.ChainID `json:"chainIdOut,omitempty"` - ChainIDIn *common.ChainID `json:"chainIdIn,omitempty"` - TransferType *TransferType `json:"transferType,omitempty"` - ContractAddress *eth.Address `json:"contractAddress,omitempty"` - CommunityID *string `json:"communityId,omitempty"` + PayloadType PayloadType `json:"payloadType"` + Transaction *transfer.TransactionIdentity `json:"transaction,omitempty"` + ID *common.MultiTransactionIDType `json:"id,omitempty"` + Transactions []*transfer.TransactionIdentity `json:"transactions,omitempty"` + Timestamp *int64 `json:"timestamp,omitempty"` + ActivityType *Type `json:"activityType,omitempty"` + ActivityStatus *Status `json:"activityStatus,omitempty"` + AmountOut *hexutil.Big `json:"amountOut,omitempty"` + AmountIn *hexutil.Big `json:"amountIn,omitempty"` + TokenOut *Token `json:"tokenOut,omitempty"` + TokenIn *Token `json:"tokenIn,omitempty"` + SymbolOut *string `json:"symbolOut,omitempty"` + SymbolIn *string `json:"symbolIn,omitempty"` + Sender *eth.Address `json:"sender,omitempty"` + Recipient *eth.Address `json:"recipient,omitempty"` + ChainIDOut *common.ChainID `json:"chainIdOut,omitempty"` + ChainIDIn *common.ChainID `json:"chainIdIn,omitempty"` + TransferType *TransferType `json:"transferType,omitempty"` + ContractAddress *eth.Address `json:"contractAddress,omitempty"` + CommunityID *string `json:"communityId,omitempty"` + InteractedContractAddress *eth.Address `json:"interactedContractAddress,omitempty"` + ApprovalSpender *eth.Address `json:"approvalSpender,omitempty"` IsNew *bool `json:"isNew,omitempty"` @@ -103,26 +118,29 @@ type EntryData struct { func (e *Entry) MarshalJSON() ([]byte, error) { data := EntryData{ - Timestamp: &e.timestamp, - ActivityType: &e.activityType, - ActivityStatus: &e.activityStatus, - AmountOut: e.amountOut, - AmountIn: e.amountIn, - TokenOut: e.tokenOut, - TokenIn: e.tokenIn, - SymbolOut: e.symbolOut, - SymbolIn: e.symbolIn, - Sender: e.sender, - Recipient: e.recipient, - ChainIDOut: e.chainIDOut, - ChainIDIn: e.chainIDIn, - TransferType: e.transferType, - ContractAddress: e.contractAddress, - CommunityID: e.communityID, + Timestamp: &e.timestamp, + ActivityType: &e.activityType, + ActivityStatus: &e.activityStatus, + AmountOut: e.amountOut, + AmountIn: e.amountIn, + TokenOut: e.tokenOut, + TokenIn: e.tokenIn, + SymbolOut: e.symbolOut, + SymbolIn: e.symbolIn, + Sender: e.sender, + Recipient: e.recipient, + ChainIDOut: e.chainIDOut, + ChainIDIn: e.chainIDIn, + TransferType: e.transferType, + ContractAddress: e.contractAddress, + CommunityID: e.communityID, + InteractedContractAddress: e.interactedContractAddress, + ApprovalSpender: e.approvalSpender, } if e.payloadType == MultiTransactionPT { data.ID = common.NewAndSet(e.id) + data.Transactions = e.transactions } else { data.Transaction = e.transaction } @@ -145,6 +163,7 @@ func (e *Entry) UnmarshalJSON(data []byte) error { if aux.ID != nil { e.id = *aux.ID } + e.transactions = aux.Transactions if aux.Timestamp != nil { e.timestamp = *aux.Timestamp } @@ -166,6 +185,8 @@ func (e *Entry) UnmarshalJSON(data []byte) error { e.chainIDIn = aux.ChainIDIn e.transferType = aux.TransferType e.communityID = aux.CommunityID + e.interactedContractAddress = aux.InteractedContractAddress + e.approvalSpender = aux.ApprovalSpender e.isNew = aux.IsNew != nil && *aux.IsNew @@ -496,8 +517,8 @@ func getActivityEntries(ctx context.Context, deps FilterDependencies, addresses includeAllNetworks, transactions.Pending, deps.currentTimestamp(), - 648000, // 7.5 days in seconds for layer 2 finalization. 0.5 day is buffer to not create false positive. - 960, // A block on layer 1 is every 12s, finalization require 64 blocks. A buffer of 16 blocks is added to not create false positives. + L2FinalizationDuration, + L1FinalizationDuration, limit, offset) if err != nil { return nil, err diff --git a/services/wallet/activity/activity_v2.go b/services/wallet/activity/activity_v2.go new file mode 100644 index 000000000..5d6d62133 --- /dev/null +++ b/services/wallet/activity/activity_v2.go @@ -0,0 +1,323 @@ +package activity + +import ( + "context" + "database/sql" + "time" + + sq "github.com/Masterminds/squirrel" + + eth "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" + ethTypes "github.com/ethereum/go-ethereum/core/types" + + "go.uber.org/zap" + + "github.com/status-im/status-go/logutils" + wCommon "github.com/status-im/status-go/services/wallet/common" + "github.com/status-im/status-go/services/wallet/requests" + pathProcessorCommon "github.com/status-im/status-go/services/wallet/router/pathprocessor/common" + "github.com/status-im/status-go/services/wallet/router/routes" + "github.com/status-im/status-go/services/wallet/token" + "github.com/status-im/status-go/services/wallet/transfer" + "github.com/status-im/status-go/services/wallet/wallettypes" + "github.com/status-im/status-go/sqlite" + "github.com/status-im/status-go/transactions" +) + +// getActivityEntriesV2 queries the route_* and tracked_transactions based on filter parameters and arguments +// it returns metadata for all entries ordered by timestamp column +func getActivityEntriesV2(ctx context.Context, deps FilterDependencies, addresses []eth.Address, allAddresses bool, chainIDs []wCommon.ChainID, filter Filter, offset int, limit int) ([]Entry, error) { + if len(addresses) == 0 { + return nil, ErrNoAddressesProvided + } + if len(chainIDs) == 0 { + return nil, ErrNoChainIDsProvided + } + + q := sq.Select(` + st.tx_json, + rpt.tx_args_json, + rpt.is_approval, + rp.path_json, + rip.route_input_params_json, + rbtp.route_build_tx_params_json, + tt.tx_status, + tt.timestamp + `).Distinct() + q = q.From("sent_transactions st"). + LeftJoin(`route_path_transactions rpt ON + st.chain_id = rpt.chain_id AND + st.tx_hash = rpt.tx_hash`). + LeftJoin(`tracked_transactions tt ON + st.chain_id = tt.chain_id AND + st.tx_hash = tt.tx_hash`). + LeftJoin(`route_paths rp ON + rpt.uuid = rp.uuid AND + rpt.path_idx = rp.path_idx`). + LeftJoin(`route_build_tx_parameters rbtp ON + rpt.uuid = rbtp.uuid`). + LeftJoin(`route_input_parameters rip ON + rpt.uuid = rip.uuid`) + q = q.OrderBy("tt.timestamp DESC") + + qConditions := sq.And{} + + qConditions = append(qConditions, sq.Eq{"rpt.chain_id": chainIDs}) + qConditions = append(qConditions, sq.Eq{"rip.from_address": addresses}) + + q = q.Where(qConditions) + + if limit != NoLimit { + q = q.Limit(uint64(limit)) + q = q.Offset(uint64(offset)) + } + + query, args, err := q.ToSql() + if err != nil { + return nil, err + } + + stmt, err := deps.db.Prepare(query) + if err != nil { + return nil, err + } + defer stmt.Close() + + rows, err := stmt.QueryContext(ctx, args...) + if err != nil { + return nil, err + } + defer rows.Close() + + data, err := rowsToDataV2(rows) + if err != nil { + return nil, err + } + + return dataToEntriesV2(deps, data) +} + +type entryDataV2 struct { + TxArgs *wallettypes.SendTxArgs + Tx *ethTypes.Transaction + IsApproval bool + Status transactions.TxStatus + Timestamp int64 + Path *routes.Path + RouteInputParams *requests.RouteInputParams + BuildInputParams *requests.RouterBuildTransactionsParams +} + +func newEntryDataV2() *entryDataV2 { + return &entryDataV2{ + TxArgs: new(wallettypes.SendTxArgs), + Tx: new(ethTypes.Transaction), + Path: new(routes.Path), + RouteInputParams: new(requests.RouteInputParams), + BuildInputParams: new(requests.RouterBuildTransactionsParams), + } +} + +func rowsToDataV2(rows *sql.Rows) ([]*entryDataV2, error) { + var ret []*entryDataV2 + for rows.Next() { + data := newEntryDataV2() + + nullableTx := sqlite.JSONBlob{Data: data.Tx} + nullableTxArgs := sqlite.JSONBlob{Data: data.TxArgs} + nullableIsApproval := sql.NullBool{} + nullablePath := sqlite.JSONBlob{Data: data.Path} + nullableRouteInputParams := sqlite.JSONBlob{Data: data.RouteInputParams} + nullableBuildInputParams := sqlite.JSONBlob{Data: data.BuildInputParams} + nullableStatus := sql.NullString{} + nullableTimestamp := sql.NullInt64{} + + err := rows.Scan( + &nullableTx, + &nullableTxArgs, + &nullableIsApproval, + &nullablePath, + &nullableRouteInputParams, + &nullableBuildInputParams, + &nullableStatus, + &nullableTimestamp, + ) + if err != nil { + return nil, err + } + + // Check all necessary fields are not null + if !nullableTxArgs.Valid || + !nullableTx.Valid || + !nullableIsApproval.Valid || + !nullableStatus.Valid || + !nullableTimestamp.Valid || + !nullablePath.Valid || + !nullableRouteInputParams.Valid || + !nullableBuildInputParams.Valid { + logutils.ZapLogger().Warn("some fields missing in entryData") + continue + } + + data.IsApproval = nullableIsApproval.Bool + data.Status = nullableStatus.String + data.Timestamp = nullableTimestamp.Int64 + + ret = append(ret, data) + } + + return ret, nil +} + +func dataToEntriesV2(deps FilterDependencies, data []*entryDataV2) ([]Entry, error) { + var ret []Entry + + now := time.Now().Unix() + + for _, d := range data { + chainID := wCommon.ChainID(d.Path.FromChain.ChainID) + + entry := Entry{ + payloadType: MultiTransactionPT, // Temporary, to keep compatibility with clients + id: d.TxArgs.MultiTransactionID, + transactions: []*transfer.TransactionIdentity{ + { + ChainID: chainID, + Hash: d.Tx.Hash(), + Address: d.RouteInputParams.AddrFrom, + }, + }, + timestamp: d.Timestamp, + activityType: getActivityTypeV2(d.Path.ProcessorName, d.IsApproval), + activityStatus: getActivityStatusV2(d.Status, d.Timestamp, now, getFinalizationPeriod(chainID)), + amountOut: d.Path.AmountIn, // Path and Activity have inverse perspective for amountIn and amountOut + amountIn: d.Path.AmountOut, // Path has the Tx perspective, Activity has the Account perspective + tokenOut: getToken(d.Path.FromToken, d.Path.ProcessorName), + tokenIn: getToken(d.Path.ToToken, d.Path.ProcessorName), + sender: &d.RouteInputParams.AddrFrom, + recipient: &d.RouteInputParams.AddrTo, + transferType: getTransferType(d.Path.FromToken, d.Path.ProcessorName), + //contractAddress: // TODO: Handle community contract deployment + //communityID: + } + + if d.Path.FromChain != nil { + chainID := wCommon.ChainID(d.Path.FromChain.ChainID) + entry.chainIDOut = &chainID + } + if d.Path.ToChain != nil { + chainID := wCommon.ChainID(d.Path.ToChain.ChainID) + entry.chainIDIn = &chainID + } + + entry.symbolOut, entry.symbolIn = lookupAndFillInTokens(deps, entry.tokenOut, entry.tokenIn) + + if entry.transferType == nil || TokenType(*entry.transferType) != Native { + interactedAddress := eth.BytesToAddress(d.Tx.To().Bytes()) + entry.interactedContractAddress = &interactedAddress + } + + if entry.activityType == ApproveAT { + entry.approvalSpender = d.Path.ApprovalContractAddress + } + + ret = append(ret, entry) + } + + return ret, nil +} + +func getActivityTypeV2(processorName string, isApproval bool) Type { + if isApproval { + return ApproveAT + } + + switch processorName { + case pathProcessorCommon.ProcessorTransferName, pathProcessorCommon.ProcessorERC721Name, pathProcessorCommon.ProcessorERC1155Name: + return SendAT + case pathProcessorCommon.ProcessorBridgeHopName, pathProcessorCommon.ProcessorBridgeCelerName: + return BridgeAT + case pathProcessorCommon.ProcessorSwapParaswapName: + return SwapAT + } + return UnknownAT +} + +func getActivityStatusV2(status transactions.TxStatus, timestamp int64, now int64, finalizationDuration int64) Status { + switch status { + case transactions.Pending: + return PendingAS + case transactions.Success: + if timestamp+finalizationDuration > now { + return FinalizedAS + } + return CompleteAS + case transactions.Failed: + return FailedAS + } + + logutils.ZapLogger().Error("unhandled transaction status value") + return FailedAS +} + +func getFinalizationPeriod(chainID wCommon.ChainID) int64 { + switch uint64(chainID) { + case wCommon.EthereumMainnet, wCommon.EthereumSepolia: + return L1FinalizationDuration + } + + return L2FinalizationDuration +} + +func getTransferType(fromToken *token.Token, processorName string) *TransferType { + ret := new(TransferType) + + switch processorName { + case pathProcessorCommon.ProcessorTransferName: + if fromToken.IsNative() { + *ret = TransferTypeEth + break + } + *ret = TransferTypeErc20 + case pathProcessorCommon.ProcessorERC721Name: + *ret = TransferTypeErc721 + case pathProcessorCommon.ProcessorERC1155Name: + *ret = TransferTypeErc1155 + default: + ret = nil + } + + return ret +} + +func getToken(token *token.Token, processorName string) *Token { + if token == nil { + return nil + } + + ret := new(Token) + ret.ChainID = wCommon.ChainID(token.ChainID) + if token.IsNative() { + ret.TokenType = Native + } else { + ret.Address = token.Address + switch processorName { + case pathProcessorCommon.ProcessorERC721Name, pathProcessorCommon.ProcessorERC1155Name: + id, err := wCommon.GetTokenIdFromSymbol(token.Symbol) + if err != nil { + logutils.ZapLogger().Warn("malformed token symbol", zap.Error(err)) + return nil + } + ret.TokenID = (*hexutil.Big)(id) + if processorName == pathProcessorCommon.ProcessorERC721Name { + ret.TokenType = Erc721 + } else { + ret.TokenType = Erc1155 + } + default: + ret.TokenType = Erc20 + } + } + return ret +} diff --git a/services/wallet/activity/errors.go b/services/wallet/activity/errors.go new file mode 100644 index 000000000..aa6b2e58c --- /dev/null +++ b/services/wallet/activity/errors.go @@ -0,0 +1,9 @@ +package activity + +import "errors" + +var ( + ErrNoAddressesProvided = errors.New("no addresses provided") + ErrNoChainIDsProvided = errors.New("no chainIDs provided") + ErrSessionNotFound = errors.New("session not found") +) diff --git a/services/wallet/activity/filter.go b/services/wallet/activity/filter.go index 5c0aebe55..3af9483d9 100644 --- a/services/wallet/activity/filter.go +++ b/services/wallet/activity/filter.go @@ -4,6 +4,7 @@ import ( "context" "database/sql" "fmt" + "math" // used for embedding the sql query in the binary _ "embed" @@ -31,7 +32,7 @@ type Period struct { EndTimestamp int64 `json:"endTimestamp"` } -type Type int +type Type uint64 const ( SendAT Type = iota @@ -42,6 +43,7 @@ const ( ContractDeploymentAT MintAT ApproveAT + UnknownAT = math.MaxUint64 ) func allActivityTypesFilter() []Type { diff --git a/services/wallet/activity/service.go b/services/wallet/activity/service.go index ac0dbb6cd..ea73d7886 100644 --- a/services/wallet/activity/service.go +++ b/services/wallet/activity/service.go @@ -67,10 +67,11 @@ type Service struct { scheduler *async.MultiClientScheduler - sessions map[SessionID]*Session - lastSessionID atomic.Int32 - subscriptions event.Subscription - ch chan walletevent.Event + sessions map[SessionID]*Session + lastSessionID atomic.Int32 + subscriptions event.Subscription + subscriptionsCancelFn context.CancelFunc + ch chan walletevent.Event // sessionsRWMutex is used to protect all sessions related members sessionsRWMutex sync.RWMutex debounceDuration time.Duration diff --git a/services/wallet/activity/service_test.go b/services/wallet/activity/service_test.go index 9b9651a59..05d724086 100644 --- a/services/wallet/activity/service_test.go +++ b/services/wallet/activity/service_test.go @@ -413,284 +413,3 @@ func validateFilteringDone(t *testing.T, ch chan walletevent.Event, resCount int } return } - -func TestService_IncrementalUpdateOnTop(t *testing.T) { - state := setupTestService(t) - defer state.close() - - transactionCount := 2 - allAddresses, pendings, ch, cleanup := setupTransactions(t, state, transactionCount, []transactions.TestTxSummary{{DontConfirm: true, Timestamp: transactionCount + 1}}) - defer cleanup() - - sessionID := state.service.StartFilterSession(allAddresses, allNetworksFilter(), Filter{}, 5) - require.Greater(t, sessionID, SessionID(0)) - defer state.service.StopFilterSession(sessionID) - - filterResponseCount := validateFilteringDone(t, ch, 2, nil, nil) - - exp := pendings[0] - err := state.pendingTracker.StoreAndTrackPendingTx(&exp) - require.NoError(t, err) - - vFn := getValidateSessionUpdateHasNewOnTopFn(t) - pendingTransactionUpdate, sessionUpdatesCount := validateSessionUpdateEvent(t, ch, &filterResponseCount, 1, vFn) - - err = state.service.ResetFilterSession(sessionID, 5) - require.NoError(t, err) - - // Validate the reset data - eventActivityDoneCount := validateFilteringDone(t, ch, 3, func(payload FilterResponse) { - require.True(t, payload.Activities[0].isNew) - require.False(t, payload.Activities[1].isNew) - require.False(t, payload.Activities[2].isNew) - - // Check the new transaction data - newTx := payload.Activities[0] - require.Equal(t, PendingTransactionPT, newTx.payloadType) - // We don't keep type in the DB - require.Equal(t, (*int)(nil), newTx.transferType) - require.Equal(t, SendAT, newTx.activityType) - require.Equal(t, PendingAS, newTx.activityStatus) - require.Equal(t, exp.ChainID, newTx.transaction.ChainID) - require.Equal(t, exp.ChainID, *newTx.chainIDOut) - require.Equal(t, (*common.ChainID)(nil), newTx.chainIDIn) - require.Equal(t, exp.Hash, newTx.transaction.Hash) - // Pending doesn't have address as part of identity - require.Equal(t, eth.Address{}, newTx.transaction.Address) - require.Equal(t, exp.From, *newTx.sender) - require.Equal(t, exp.To, *newTx.recipient) - require.Equal(t, 0, exp.Value.Int.Cmp((*big.Int)(newTx.amountOut))) - require.Equal(t, exp.Timestamp, uint64(newTx.timestamp)) - require.Equal(t, exp.Symbol, *newTx.symbolOut) - require.Equal(t, (*string)(nil), newTx.symbolIn) - require.Equal(t, &Token{ - TokenType: Native, - ChainID: 5, - }, newTx.tokenOut) - require.Equal(t, (*Token)(nil), newTx.tokenIn) - require.Equal(t, (*eth.Address)(nil), newTx.contractAddress) - - // Check the order of the following transaction data - require.Equal(t, SimpleTransactionPT, payload.Activities[1].payloadType) - require.Equal(t, int64(transactionCount), payload.Activities[1].timestamp) - require.Equal(t, SimpleTransactionPT, payload.Activities[2].payloadType) - require.Equal(t, int64(transactionCount-1), payload.Activities[2].timestamp) - }, nil) - - require.Equal(t, 1, pendingTransactionUpdate) - require.Equal(t, 1, filterResponseCount) - require.Equal(t, 1, sessionUpdatesCount) - require.Equal(t, 1, eventActivityDoneCount) -} - -func TestService_IncrementalUpdateMixed(t *testing.T) { - state := setupTestService(t) - defer state.close() - - transactionCount := 5 - allAddresses, pendings, ch, cleanup := setupTransactions(t, state, transactionCount, - []transactions.TestTxSummary{ - {DontConfirm: true, Timestamp: 2}, - {DontConfirm: true, Timestamp: 4}, - {DontConfirm: true, Timestamp: 6}, - }, - ) - defer cleanup() - - sessionID := state.service.StartFilterSession(allAddresses, allNetworksFilter(), Filter{}, 5) - require.Greater(t, sessionID, SessionID(0)) - defer state.service.StopFilterSession(sessionID) - - filterResponseCount := validateFilteringDone(t, ch, 5, nil, nil) - - for i := range pendings { - err := state.pendingTracker.StoreAndTrackPendingTx(&pendings[i]) - require.NoError(t, err) - } - - pendingTransactionUpdate, sessionUpdatesCount := validateSessionUpdateEvent(t, ch, &filterResponseCount, 2, func(payload SessionUpdate) bool { - require.Nil(t, payload.HasNewOnTop) - require.NotEmpty(t, payload.New) - for _, update := range payload.New { - require.True(t, update.Entry.isNew) - foundIdx := -1 - for i, pTx := range pendings { - if pTx.Hash == update.Entry.transaction.Hash && pTx.ChainID == update.Entry.transaction.ChainID { - foundIdx = i - break - } - } - require.Greater(t, foundIdx, -1, "the updated transaction should be found in the pending list") - pendings = append(pendings[:foundIdx], pendings[foundIdx+1:]...) - } - return len(pendings) == 1 - }) - - // Validate that the last one (oldest) is out of the window - require.Equal(t, 1, len(pendings)) - require.Equal(t, uint64(2), pendings[0].Timestamp) - - require.Equal(t, 3, pendingTransactionUpdate) - require.LessOrEqual(t, sessionUpdatesCount, 3) - require.Equal(t, 1, filterResponseCount) - -} - -func TestService_IncrementalUpdateFetchWindow(t *testing.T) { - state := setupTestService(t) - defer state.close() - - transactionCount := 5 - allAddresses, pendings, ch, cleanup := setupTransactions(t, state, transactionCount, []transactions.TestTxSummary{{DontConfirm: true, Timestamp: transactionCount + 1}}) - defer cleanup() - - sessionID := state.service.StartFilterSession(allAddresses, allNetworksFilter(), Filter{}, 2) - require.Greater(t, sessionID, SessionID(0)) - defer state.service.StopFilterSession(sessionID) - - filterResponseCount := validateFilteringDone(t, ch, 2, nil, nil) - - exp := pendings[0] - err := state.pendingTracker.StoreAndTrackPendingTx(&exp) - require.NoError(t, err) - - vFn := getValidateSessionUpdateHasNewOnTopFn(t) - pendingTransactionUpdate, sessionUpdatesCount := validateSessionUpdateEvent(t, ch, &filterResponseCount, 1, vFn) - - err = state.service.ResetFilterSession(sessionID, 2) - require.NoError(t, err) - - // Validate the reset data - eventActivityDoneCount := validateFilteringDone(t, ch, 2, func(payload FilterResponse) { - require.True(t, payload.Activities[0].isNew) - require.Equal(t, int64(transactionCount+1), payload.Activities[0].timestamp) - require.False(t, payload.Activities[1].isNew) - require.Equal(t, int64(transactionCount), payload.Activities[1].timestamp) - }, nil) - - require.Equal(t, 1, pendingTransactionUpdate) - require.Equal(t, 1, filterResponseCount) - require.Equal(t, 1, sessionUpdatesCount) - require.Equal(t, 1, eventActivityDoneCount) - - err = state.service.GetMoreForFilterSession(sessionID, 2) - require.NoError(t, err) - - eventActivityDoneCount = validateFilteringDone(t, ch, 2, func(payload FilterResponse) { - require.False(t, payload.Activities[0].isNew) - require.Equal(t, int64(transactionCount-1), payload.Activities[0].timestamp) - require.False(t, payload.Activities[1].isNew) - require.Equal(t, int64(transactionCount-2), payload.Activities[1].timestamp) - }, common.NewAndSet(extraExpect{common.NewAndSet(2), nil})) - require.Equal(t, 1, eventActivityDoneCount) -} - -func TestService_IncrementalUpdateFetchWindowNoReset(t *testing.T) { - state := setupTestService(t) - defer state.close() - - transactionCount := 5 - allAddresses, pendings, ch, cleanup := setupTransactions(t, state, transactionCount, []transactions.TestTxSummary{{DontConfirm: true, Timestamp: transactionCount + 1}}) - defer cleanup() - - sessionID := state.service.StartFilterSession(allAddresses, allNetworksFilter(), Filter{}, 2) - require.Greater(t, sessionID, SessionID(0)) - defer state.service.StopFilterSession(sessionID) - - filterResponseCount := validateFilteringDone(t, ch, 2, func(payload FilterResponse) { - require.Equal(t, int64(transactionCount), payload.Activities[0].timestamp) - require.Equal(t, int64(transactionCount-1), payload.Activities[1].timestamp) - }, nil) - - exp := pendings[0] - err := state.pendingTracker.StoreAndTrackPendingTx(&exp) - require.NoError(t, err) - - vFn := getValidateSessionUpdateHasNewOnTopFn(t) - pendingTransactionUpdate, sessionUpdatesCount := validateSessionUpdateEvent(t, ch, &filterResponseCount, 1, vFn) - require.Equal(t, 1, pendingTransactionUpdate) - require.Equal(t, 1, filterResponseCount) - require.Equal(t, 1, sessionUpdatesCount) - - err = state.service.GetMoreForFilterSession(sessionID, 2) - require.NoError(t, err) - - // Validate that client continue loading the next window without being affected by the internal state of new - eventActivityDoneCount := validateFilteringDone(t, ch, 2, func(payload FilterResponse) { - require.False(t, payload.Activities[0].isNew) - require.Equal(t, int64(transactionCount-2), payload.Activities[0].timestamp) - require.False(t, payload.Activities[1].isNew) - require.Equal(t, int64(transactionCount-3), payload.Activities[1].timestamp) - }, common.NewAndSet(extraExpect{common.NewAndSet(2), nil})) - require.Equal(t, 1, eventActivityDoneCount) -} - -// Simulate and validate a multi-step user flow that was also a regression in the original implementation -func TestService_FilteredIncrementalUpdateResetAndClear(t *testing.T) { - state := setupTestService(t) - defer state.close() - - transactionCount := 5 - allAddresses, pendings, ch, cleanup := setupTransactions(t, state, transactionCount, []transactions.TestTxSummary{{DontConfirm: true, Timestamp: transactionCount + 1}}) - defer cleanup() - - // Generate new transaction for step 5 - newOffset := transactionCount + 2 - newTxs, newFromTrs, newToTrs := transfer.GenerateTestTransfers(t, state.service.db, newOffset, 1) - allAddresses = append(append(allAddresses, newFromTrs...), newToTrs...) - - // 1. User visualizes transactions for the first time - sessionID := state.service.StartFilterSession(allAddresses, allNetworksFilter(), Filter{}, 4) - require.Greater(t, sessionID, SessionID(0)) - defer state.service.StopFilterSession(sessionID) - - validateFilteringDone(t, ch, 4, nil, nil) - - // 2. User applies a filter for pending transactions - err := state.service.UpdateFilterForSession(sessionID, Filter{Statuses: []Status{PendingAS}}, 4) - require.NoError(t, err) - - filterResponseCount := validateFilteringDone(t, ch, 0, nil, nil) - - // 3. A pending transaction is added - exp := pendings[0] - err = state.pendingTracker.StoreAndTrackPendingTx(&exp) - require.NoError(t, err) - - vFn := getValidateSessionUpdateHasNewOnTopFn(t) - pendingTransactionUpdate, sessionUpdatesCount := validateSessionUpdateEvent(t, ch, &filterResponseCount, 1, vFn) - - // 4. User resets the view and the new pending transaction has the new flag - err = state.service.ResetFilterSession(sessionID, 2) - require.NoError(t, err) - - // Validate the reset data - eventActivityDoneCount := validateFilteringDone(t, ch, 1, func(payload FilterResponse) { - require.True(t, payload.Activities[0].isNew) - require.Equal(t, int64(transactionCount+1), payload.Activities[0].timestamp) - }, nil) - - require.Equal(t, 1, pendingTransactionUpdate) - require.Equal(t, 1, filterResponseCount) - require.Equal(t, 1, sessionUpdatesCount) - require.Equal(t, 1, eventActivityDoneCount) - - // 5. A new transaction is downloaded - transfer.InsertTestTransfer(t, state.service.db, newTxs[0].To, &newTxs[0]) - - // 6. User clears the filter and only the new transaction should have the new flag - err = state.service.UpdateFilterForSession(sessionID, Filter{}, 4) - require.NoError(t, err) - - eventActivityDoneCount = validateFilteringDone(t, ch, 4, func(payload FilterResponse) { - require.True(t, payload.Activities[0].isNew) - require.Equal(t, int64(newOffset), payload.Activities[0].timestamp) - require.False(t, payload.Activities[1].isNew) - require.Equal(t, int64(newOffset-1), payload.Activities[1].timestamp) - require.False(t, payload.Activities[2].isNew) - require.Equal(t, int64(newOffset-2), payload.Activities[2].timestamp) - require.False(t, payload.Activities[3].isNew) - require.Equal(t, int64(newOffset-3), payload.Activities[3].timestamp) - }, nil) - require.Equal(t, 1, eventActivityDoneCount) -} diff --git a/services/wallet/activity/session.go b/services/wallet/activity/session.go index b2c43d1d4..3190933cf 100644 --- a/services/wallet/activity/session.go +++ b/services/wallet/activity/session.go @@ -1,22 +1,12 @@ package activity import ( - "context" - "errors" "strconv" - "time" - - "go.uber.org/zap" + "sync" eth "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/event" - gocommon "github.com/status-im/status-go/common" - "github.com/status-im/status-go/logutils" - "github.com/status-im/status-go/services/wallet/async" "github.com/status-im/status-go/services/wallet/common" "github.com/status-im/status-go/services/wallet/transfer" - "github.com/status-im/status-go/services/wallet/walletevent" - "github.com/status-im/status-go/transactions" ) const nilStr = "nil" @@ -47,14 +37,9 @@ func (e EntryIdentity) key() string { type SessionID int32 // Session stores state related to a filter session -// The user happy flow is: -// 1. StartFilterSession to get a new SessionID and client be notified by the current state -// 2. GetMoreForFilterSession anytime to get more entries after the first page -// 3. UpdateFilterForSession to update the filter and get the new state or clean the filter and get the newer entries -// 4. ResetFilterSession in case client receives SessionUpdate with HasNewOnTop = true to get the latest state -// 5. StopFilterSession to stop the session when no used (user changed from activity screens or changed addresses and chains) type Session struct { - id SessionID + id SessionID + version Version // Filter info // @@ -68,462 +53,16 @@ type Session struct { noFilterModel map[string]EntryIdentity // new holds the new entries until user requests update by calling ResetFilterSession new []EntryIdentity + + mu *sync.RWMutex } -type EntryUpdate struct { - Pos int `json:"pos"` - Entry *Entry `json:"entry"` -} - -// SessionUpdate payload for EventActivitySessionUpdated -type SessionUpdate struct { - HasNewOnTop *bool `json:"hasNewOnTop,omitempty"` - New []*EntryUpdate `json:"new,omitempty"` - Removed []EntryIdentity `json:"removed,omitempty"` -} - -type fullFilterParams struct { - sessionID SessionID - addresses []eth.Address - chainIDs []common.ChainID - filter Filter -} - -func (s *Service) internalFilter(f fullFilterParams, offset int, count int, processResults func(entries []Entry) (offsetOverride int)) { - s.scheduler.Enqueue(int32(f.sessionID), filterTask, func(ctx context.Context) (interface{}, error) { - allAddresses := s.areAllAddresses(f.addresses) - activities, err := getActivityEntries(ctx, s.getDeps(), f.addresses, allAddresses, f.chainIDs, f.filter, offset, count) - return activities, err - }, func(result interface{}, taskType async.TaskType, err error) { - res := FilterResponse{ - ErrorCode: ErrorCodeFailed, - } - - if errors.Is(err, context.Canceled) || errors.Is(err, async.ErrTaskOverwritten) { - res.ErrorCode = ErrorCodeTaskCanceled - } else if err == nil { - activities := result.([]Entry) - res.Activities = activities - res.HasMore = len(activities) == count - res.ErrorCode = ErrorCodeSuccess - - res.Offset = processResults(activities) - } - - int32SessionID := int32(f.sessionID) - sendResponseEvent(s.eventFeed, &int32SessionID, EventActivityFilteringDone, res, err) - - s.getActivityDetailsAsync(int32SessionID, res.Activities) - }) -} - -// mirrorIdentities for update use -func mirrorIdentities(entries []Entry) []EntryIdentity { - model := make([]EntryIdentity, 0, len(entries)) - for _, a := range entries { - model = append(model, EntryIdentity{ - payloadType: a.payloadType, - transaction: a.transaction, - id: a.id, - }) - } - return model -} - -func (s *Service) internalFilterForSession(session *Session, firstPageCount int) { - s.internalFilter( - fullFilterParams{ - sessionID: session.id, - addresses: session.addresses, - chainIDs: session.chainIDs, - filter: session.filter, - }, - 0, - firstPageCount, - func(entries []Entry) (offset int) { - s.sessionsRWMutex.Lock() - defer s.sessionsRWMutex.Unlock() - - session.model = mirrorIdentities(entries) - - return 0 - }, - ) -} - -func (s *Service) StartFilterSession(addresses []eth.Address, chainIDs []common.ChainID, filter Filter, firstPageCount int) SessionID { - sessionID := s.nextSessionID() - - session := &Session{ - id: sessionID, - - addresses: addresses, - chainIDs: chainIDs, - filter: filter, - - model: make([]EntryIdentity, 0, firstPageCount), - } - - s.sessionsRWMutex.Lock() - subscribeToEvents := len(s.sessions) == 0 - - s.sessions[sessionID] = session - - if subscribeToEvents { - s.subscribeToEvents() - } - s.sessionsRWMutex.Unlock() - - s.internalFilterForSession(session, firstPageCount) - - return sessionID -} - -// UpdateFilterForSession is to be called for updating the filter of a specific session -// After calling this method to set a filter all the incoming changes will be reported with -// Entry.isNew = true when filter is reset to empty -func (s *Service) UpdateFilterForSession(id SessionID, filter Filter, firstPageCount int) error { - s.sessionsRWMutex.RLock() - session, found := s.sessions[id] - if !found { - s.sessionsRWMutex.RUnlock() - return errors.New("session not found") - } - - prevFilterEmpty := session.filter.IsEmpty() - newFilerEmpty := filter.IsEmpty() - s.sessionsRWMutex.RUnlock() - - s.sessionsRWMutex.Lock() - - session.new = nil - - session.filter = filter - - if prevFilterEmpty && !newFilerEmpty { - // Session is moving from empty to non-empty filter - // Take a snapshot of the current model - session.noFilterModel = entryIdsToMap(session.model) - - session.model = make([]EntryIdentity, 0, firstPageCount) - - // In this case there is nothing to flag so we request the first page - s.internalFilterForSession(session, firstPageCount) - } else if !prevFilterEmpty && newFilerEmpty { - // Session is moving from non-empty to empty filter - // In this case we need to flag all the new entries that are not in the noFilterModel - s.internalFilter( - fullFilterParams{ - sessionID: session.id, - addresses: session.addresses, - chainIDs: session.chainIDs, - filter: session.filter, - }, - 0, - firstPageCount, - func(entries []Entry) (offset int) { - s.sessionsRWMutex.Lock() - defer s.sessionsRWMutex.Unlock() - - // Mark new entries - for i, a := range entries { - _, found := session.noFilterModel[a.getIdentity().key()] - entries[i].isNew = !found - } - - // Mirror identities for update use - session.model = mirrorIdentities(entries) - session.noFilterModel = nil - return 0 - }, - ) - } else { - // Else act as a normal filter update - s.internalFilterForSession(session, firstPageCount) - } - s.sessionsRWMutex.Unlock() - - return nil -} - -// ResetFilterSession is to be called when SessionUpdate.HasNewOnTop == true to -// update client with the latest state including new on top entries -func (s *Service) ResetFilterSession(id SessionID, firstPageCount int) error { - session, found := s.sessions[id] - if !found { - return errors.New("session not found") - } - - s.internalFilter( - fullFilterParams{ - sessionID: id, - addresses: session.addresses, - chainIDs: session.chainIDs, - filter: session.filter, - }, - 0, - firstPageCount, - func(entries []Entry) (offset int) { - s.sessionsRWMutex.Lock() - defer s.sessionsRWMutex.Unlock() - - // Mark new entries - newMap := entryIdsToMap(session.new) - for i, a := range entries { - _, isNew := newMap[a.getIdentity().key()] - entries[i].isNew = isNew - } - session.new = nil - - if session.noFilterModel != nil { - // Add reported new entries to mark them as seen - for _, a := range newMap { - session.noFilterModel[a.key()] = a - } - } - - // Mirror client identities for checking updates - session.model = mirrorIdentities(entries) - - return 0 - }, - ) - return nil -} - -func (s *Service) GetMoreForFilterSession(id SessionID, pageCount int) error { - session, found := s.sessions[id] - if !found { - return errors.New("session not found") - } - - prevModelLen := len(session.model) - s.internalFilter( - fullFilterParams{ - sessionID: id, - addresses: session.addresses, - chainIDs: session.chainIDs, - filter: session.filter, - }, - prevModelLen+len(session.new), - pageCount, - func(entries []Entry) (offset int) { - s.sessionsRWMutex.Lock() - defer s.sessionsRWMutex.Unlock() - - // Mirror client identities for checking updates - for _, a := range entries { - session.model = append(session.model, EntryIdentity{ - payloadType: a.payloadType, - transaction: a.transaction, - id: a.id, - }) - } - - // Overwrite the offset to account for new entries - return prevModelLen - }, - ) - return nil -} - -// subscribeToEvents should be called with sessionsRWMutex locked for writing -func (s *Service) subscribeToEvents() { - s.ch = make(chan walletevent.Event, 100) - s.subscriptions = s.eventFeed.Subscribe(s.ch) - go s.processEvents() -} - -// processEvents runs only if more than one session is active -func (s *Service) processEvents() { - defer gocommon.LogOnPanic() - eventCount := 0 - lastUpdate := time.Now().UnixMilli() - for event := range s.ch { - if event.Type == transactions.EventPendingTransactionUpdate || - event.Type == transactions.EventPendingTransactionStatusChanged || - event.Type == transfer.EventNewTransfers { - eventCount++ - } - // debounce events updates - if eventCount > 0 && - (time.Duration(time.Now().UnixMilli()-lastUpdate)*time.Millisecond) >= s.debounceDuration { - s.detectNew(eventCount) - eventCount = 0 - lastUpdate = time.Now().UnixMilli() - } +func (s *Session) getFullFilterParams() fullFilterParams { + return fullFilterParams{ + sessionID: s.id, + version: s.version, + addresses: s.addresses, + chainIDs: s.chainIDs, + filter: s.filter, } } - -func (s *Service) detectNew(changeCount int) { - for sessionID := range s.sessions { - session := s.sessions[sessionID] - - fetchLen := len(session.model) + changeCount - allAddresses := s.areAllAddresses(session.addresses) - activities, err := getActivityEntries(context.Background(), s.getDeps(), session.addresses, allAddresses, session.chainIDs, session.filter, 0, fetchLen) - if err != nil { - logutils.ZapLogger().Error("Error getting activity entries", zap.Error(err)) - continue - } - - s.sessionsRWMutex.RLock() - allData := append(session.new, session.model...) - new, _ /*removed*/ := findUpdates(allData, activities) - s.sessionsRWMutex.RUnlock() - - s.sessionsRWMutex.Lock() - lastProcessed := -1 - onTop := true - var mixed []*EntryUpdate - for i, idRes := range new { - // Detect on top - if onTop { - // mixedIdentityResult.newPos includes session.new, therefore compensate for it - if ((idRes.newPos - len(session.new)) - lastProcessed) > 1 { - // From now on the events are not on top and continuous but mixed between existing entries - onTop = false - mixed = make([]*EntryUpdate, 0, len(new)-i) - } - lastProcessed = idRes.newPos - } - - if onTop { - if session.new == nil { - session.new = make([]EntryIdentity, 0, len(new)) - } - session.new = append(session.new, idRes.id) - } else { - modelPos := idRes.newPos - len(session.new) - entry := activities[idRes.newPos] - entry.isNew = true - mixed = append(mixed, &EntryUpdate{ - Pos: modelPos, - Entry: &entry, - }) - // Insert in session model at modelPos index - session.model = append(session.model[:modelPos], append([]EntryIdentity{{payloadType: entry.payloadType, transaction: entry.transaction, id: entry.id}}, session.model[modelPos:]...)...) - } - } - - s.sessionsRWMutex.Unlock() - - if len(session.new) > 0 || len(mixed) > 0 { - go notify(s.eventFeed, sessionID, len(session.new) > 0, mixed) - } - } -} - -func notify(eventFeed *event.Feed, id SessionID, hasNewOnTop bool, mixed []*EntryUpdate) { - defer gocommon.LogOnPanic() - payload := SessionUpdate{ - New: mixed, - } - - if hasNewOnTop { - payload.HasNewOnTop = &hasNewOnTop - } - - sendResponseEvent(eventFeed, (*int32)(&id), EventActivitySessionUpdated, payload, nil) -} - -// unsubscribeFromEvents should be called with sessionsRWMutex locked for writing -func (s *Service) unsubscribeFromEvents() { - s.subscriptions.Unsubscribe() - close(s.ch) - s.ch = nil - s.subscriptions = nil -} - -func (s *Service) StopFilterSession(id SessionID) { - s.sessionsRWMutex.Lock() - delete(s.sessions, id) - if len(s.sessions) == 0 { - s.unsubscribeFromEvents() - } - s.sessionsRWMutex.Unlock() - - // Cancel any pending or ongoing task - s.scheduler.Enqueue(int32(id), filterTask, func(ctx context.Context) (interface{}, error) { - return nil, nil - }, func(result interface{}, taskType async.TaskType, err error) {}) -} - -func (s *Service) getActivityDetailsAsync(requestID int32, entries []Entry) { - if len(entries) == 0 { - return - } - - ctx := context.Background() - - go func() { - defer gocommon.LogOnPanic() - activityData, err := s.getActivityDetails(ctx, entries) - if len(activityData) != 0 { - sendResponseEvent(s.eventFeed, &requestID, EventActivityFilteringUpdate, activityData, err) - } - }() -} - -type mixedIdentityResult struct { - newPos int - id EntryIdentity -} - -func entryIdsToMap(ids []EntryIdentity) map[string]EntryIdentity { - idsMap := make(map[string]EntryIdentity, len(ids)) - for _, id := range ids { - idsMap[id.key()] = id - } - return idsMap -} - -func entriesToMap(entries []Entry) map[string]Entry { - entryMap := make(map[string]Entry, len(entries)) - for _, entry := range entries { - updatedIdentity := entry.getIdentity() - entryMap[updatedIdentity.key()] = entry - } - return entryMap -} - -// FindUpdates returns changes in updated entries compared to the identities -// -// expects identities and entries to be sorted by timestamp -// -// the returned newer are entries that are newer than the first identity -// the returned mixed are entries that are older than the first identity (sorted by timestamp) -// the returned removed are identities that are not present in the updated entries (sorted by timestamp) -// -// implementation assumes the order of each identity doesn't change from old state (identities) and new state (updated); we have either add or removed. -func findUpdates(identities []EntryIdentity, updated []Entry) (new []mixedIdentityResult, removed []EntryIdentity) { - if len(updated) == 0 { - return - } - - idsMap := entryIdsToMap(identities) - updatedMap := entriesToMap(updated) - - for newIndex, entry := range updated { - id := entry.getIdentity() - if _, found := idsMap[id.key()]; !found { - new = append(new, mixedIdentityResult{ - newPos: newIndex, - id: id, - }) - } - - if len(identities) > 0 && entry.getIdentity().same(identities[len(identities)-1]) { - break - } - } - - // Account for new entries - for i := 0; i < len(identities); i++ { - id := identities[i] - if _, found := updatedMap[id.key()]; !found { - removed = append(removed, id) - } - } - return -} diff --git a/services/wallet/activity/session_service.go b/services/wallet/activity/session_service.go new file mode 100644 index 000000000..775b3bcf8 --- /dev/null +++ b/services/wallet/activity/session_service.go @@ -0,0 +1,623 @@ +package activity + +import ( + "context" + "encoding/json" + "errors" + "strconv" + "sync" + "time" + + eth "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/event" + + gocommon "github.com/status-im/status-go/common" + "github.com/status-im/status-go/logutils" + "github.com/status-im/status-go/services/wallet/async" + "github.com/status-im/status-go/services/wallet/common" + "github.com/status-im/status-go/services/wallet/responses" + "github.com/status-im/status-go/services/wallet/routeexecution" + "github.com/status-im/status-go/services/wallet/transfer" + "github.com/status-im/status-go/services/wallet/walletevent" + "github.com/status-im/status-go/transactions" + + "go.uber.org/zap" +) + +type Version string + +const ( + V1 Version = "v1" + V2 Version = "v2" +) + +type TransactionID struct { + ChainID common.ChainID + Hash eth.Hash +} + +func (t TransactionID) key() string { + return strconv.FormatUint(uint64(t.ChainID), 10) + t.Hash.Hex() +} + +type EntryUpdate struct { + Pos int `json:"pos"` + Entry *Entry `json:"entry"` +} + +// SessionUpdate payload for EventActivitySessionUpdated +type SessionUpdate struct { + HasNewOnTop *bool `json:"hasNewOnTop,omitempty"` + New []*EntryUpdate `json:"new,omitempty"` + Removed []EntryIdentity `json:"removed,omitempty"` +} + +type fullFilterParams struct { + sessionID SessionID + version Version + addresses []eth.Address + chainIDs []common.ChainID + filter Filter +} + +func (s *Service) getActivityEntries(ctx context.Context, f fullFilterParams, offset int, count int) ([]Entry, error) { + allAddresses := s.areAllAddresses(f.addresses) + if f.version == V1 { + return getActivityEntries(ctx, s.getDeps(), f.addresses, allAddresses, f.chainIDs, f.filter, offset, count) + } + return getActivityEntriesV2(ctx, s.getDeps(), f.addresses, allAddresses, f.chainIDs, f.filter, offset, count) +} + +func (s *Service) internalFilter(f fullFilterParams, offset int, count int, processResults func(entries []Entry) (offsetOverride int)) { + s.scheduler.Enqueue(int32(f.sessionID), filterTask, func(ctx context.Context) (interface{}, error) { + return s.getActivityEntries(ctx, f, offset, count) + }, func(result interface{}, taskType async.TaskType, err error) { + res := FilterResponse{ + ErrorCode: ErrorCodeFailed, + } + + if errors.Is(err, context.Canceled) || errors.Is(err, async.ErrTaskOverwritten) { + res.ErrorCode = ErrorCodeTaskCanceled + } else if err == nil { + activities := result.([]Entry) + res.Activities = activities + res.HasMore = len(activities) == count + res.ErrorCode = ErrorCodeSuccess + + res.Offset = processResults(activities) + } + + int32SessionID := int32(f.sessionID) + sendResponseEvent(s.eventFeed, &int32SessionID, EventActivityFilteringDone, res, err) + + s.getActivityDetailsAsync(int32SessionID, res.Activities) + }) +} + +// mirrorIdentities for update use +func mirrorIdentities(entries []Entry) []EntryIdentity { + model := make([]EntryIdentity, 0, len(entries)) + for _, a := range entries { + model = append(model, EntryIdentity{ + payloadType: a.payloadType, + transaction: a.transaction, + id: a.id, + }) + } + return model +} + +func (s *Service) internalFilterForSession(session *Session, firstPageCount int) { + s.internalFilter( + session.getFullFilterParams(), + 0, + firstPageCount, + func(entries []Entry) (offset int) { + session.model = mirrorIdentities(entries) + + return 0 + }, + ) +} + +func (s *Service) StartFilterSession(addresses []eth.Address, chainIDs []common.ChainID, filter Filter, firstPageCount int, version Version) SessionID { + sessionID := s.nextSessionID() + + session := &Session{ + id: sessionID, + version: version, + + addresses: addresses, + chainIDs: chainIDs, + filter: filter, + + model: make([]EntryIdentity, 0, firstPageCount), + + mu: &sync.RWMutex{}, + } + + s.addSession(session) + + session.mu.Lock() + defer session.mu.Unlock() + s.internalFilterForSession(session, firstPageCount) + + return sessionID +} + +// UpdateFilterForSession is to be called for updating the filter of a specific session +// After calling this method to set a filter all the incoming changes will be reported with +// Entry.isNew = true when filter is reset to empty +func (s *Service) UpdateFilterForSession(id SessionID, filter Filter, firstPageCount int) error { + session := s.getSession(id) + if session == nil { + return ErrSessionNotFound + } + + session.mu.Lock() + defer session.mu.Unlock() + + prevFilterEmpty := session.filter.IsEmpty() + newFilerEmpty := filter.IsEmpty() + + session.new = nil + + session.filter = filter + + if prevFilterEmpty && !newFilerEmpty { + // Session is moving from empty to non-empty filter + // Take a snapshot of the current model + session.noFilterModel = entryIdsToMap(session.model) + + session.model = make([]EntryIdentity, 0, firstPageCount) + + // In this case there is nothing to flag so we request the first page + s.internalFilterForSession(session, firstPageCount) + } else if !prevFilterEmpty && newFilerEmpty { + // Session is moving from non-empty to empty filter + // In this case we need to flag all the new entries that are not in the noFilterModel + s.internalFilter( + session.getFullFilterParams(), + 0, + firstPageCount, + func(entries []Entry) (offset int) { + // Mark new entries + for i, a := range entries { + _, found := session.noFilterModel[a.getIdentity().key()] + entries[i].isNew = !found + } + + // Mirror identities for update use + session.model = mirrorIdentities(entries) + session.noFilterModel = nil + return 0 + }, + ) + } else { + // Else act as a normal filter update + s.internalFilterForSession(session, firstPageCount) + } + + return nil +} + +// ResetFilterSession is to be called when SessionUpdate.HasNewOnTop == true to +// update client with the latest state including new on top entries +func (s *Service) ResetFilterSession(id SessionID, firstPageCount int) error { + session := s.getSession(id) + if session == nil { + return ErrSessionNotFound + } + + session.mu.Lock() + defer session.mu.Unlock() + + s.internalFilter( + session.getFullFilterParams(), + 0, + firstPageCount, + func(entries []Entry) (offset int) { + // Mark new entries + newMap := entryIdsToMap(session.new) + for i, a := range entries { + _, isNew := newMap[a.getIdentity().key()] + entries[i].isNew = isNew + } + session.new = nil + + if session.noFilterModel != nil { + // Add reported new entries to mark them as seen + for _, a := range newMap { + session.noFilterModel[a.key()] = a + } + } + + // Mirror client identities for checking updates + session.model = mirrorIdentities(entries) + + return 0 + }, + ) + return nil +} + +func (s *Service) GetMoreForFilterSession(id SessionID, pageCount int) error { + session := s.getSession(id) + if session == nil { + return ErrSessionNotFound + } + + session.mu.Lock() + defer session.mu.Unlock() + + prevModelLen := len(session.model) + s.internalFilter( + session.getFullFilterParams(), + prevModelLen+len(session.new), + pageCount, + func(entries []Entry) (offset int) { + // Mirror client identities for checking updates + for _, a := range entries { + session.model = append(session.model, EntryIdentity{ + payloadType: a.payloadType, + transaction: a.transaction, + id: a.id, + }) + } + + // Overwrite the offset to account for new entries + return prevModelLen + }, + ) + return nil +} + +// subscribeToEvents should be called with sessionsRWMutex locked for writing +func (s *Service) subscribeToEvents() { + s.ch = make(chan walletevent.Event, 100) + s.subscriptions = s.eventFeed.Subscribe(s.ch) + ctx, cancel := context.WithCancel(context.Background()) + s.subscriptionsCancelFn = cancel + go s.processEvents(ctx) +} + +// processEvents runs only if more than one session is active +func (s *Service) processEvents(ctx context.Context) { + defer gocommon.LogOnPanic() + eventCount := 0 + changedTxs := make([]TransactionID, 0) + newTxs := false + + var debounceTimer *time.Timer + debouncerCh := make(chan struct{}) + debounceProcessChangesFn := func() { + if debounceTimer == nil { + debounceTimer = time.AfterFunc(s.debounceDuration, func() { + debouncerCh <- struct{}{} + }) + } + } + + for { + select { + case event := <-s.ch: + switch event.Type { + case transactions.EventPendingTransactionUpdate: + eventCount++ + var payload transactions.PendingTxUpdatePayload + if err := json.Unmarshal([]byte(event.Message), &payload); err != nil { + logutils.ZapLogger().Error("Error unmarshalling PendingTxUpdatePayload", zap.Error(err)) + continue + } + changedTxs = append(changedTxs, TransactionID{ + ChainID: payload.ChainID, + Hash: payload.Hash, + }) + debounceProcessChangesFn() + case transactions.EventPendingTransactionStatusChanged: + eventCount++ + var payload transactions.StatusChangedPayload + if err := json.Unmarshal([]byte(event.Message), &payload); err != nil { + logutils.ZapLogger().Error("Error unmarshalling StatusChangedPayload", zap.Error(err)) + continue + } + changedTxs = append(changedTxs, TransactionID{ + ChainID: payload.ChainID, + Hash: payload.Hash, + }) + debounceProcessChangesFn() + case transfer.EventNewTransfers: + eventCount++ + // No updates here, these are detected with their final state, just trigger + // the detection of new entries + newTxs = true + debounceProcessChangesFn() + case routeexecution.EventRouteExecutionTransactionSent: + sentTxs, ok := event.EventParams.(*responses.RouterSentTransactions) + if ok && sentTxs != nil { + for _, tx := range sentTxs.SentTransactions { + changedTxs = append(changedTxs, TransactionID{ + ChainID: common.ChainID(tx.FromChain), + Hash: eth.Hash(tx.Hash), + }) + } + } + debounceProcessChangesFn() + } + case <-debouncerCh: + if eventCount > 0 || newTxs || len(changedTxs) > 0 { + s.processChanges(eventCount, changedTxs) + eventCount = 0 + newTxs = false + changedTxs = nil + debounceTimer = nil + } + case <-ctx.Done(): + return + } + } +} + +func (s *Service) processChangesForSession(session *Session, eventCount int, changedTxs []TransactionID) { + session.mu.Lock() + defer session.mu.Unlock() + + f := session.getFullFilterParams() + limit := NoLimit + if session.version == V1 { + limit = len(session.model) + eventCount + } + activities, err := s.getActivityEntries(context.Background(), f, 0, limit) + if err != nil { + logutils.ZapLogger().Error("Error getting activity entries", zap.Error(err)) + return + } + + if session.version != V1 { + s.processEntryDataUpdates(session.id, activities, changedTxs) + } + + allData := append(session.new, session.model...) + new, _ /*removed*/ := findUpdates(allData, activities) + + lastProcessed := -1 + onTop := true + var mixed []*EntryUpdate + for i, idRes := range new { + // Detect on top + if onTop { + // mixedIdentityResult.newPos includes session.new, therefore compensate for it + if ((idRes.newPos - len(session.new)) - lastProcessed) > 1 { + // From now on the events are not on top and continuous but mixed between existing entries + onTop = false + mixed = make([]*EntryUpdate, 0, len(new)-i) + } + lastProcessed = idRes.newPos + } + + if onTop { + if session.new == nil { + session.new = make([]EntryIdentity, 0, len(new)) + } + session.new = append(session.new, idRes.id) + } else { + modelPos := idRes.newPos - len(session.new) + entry := activities[idRes.newPos] + entry.isNew = true + mixed = append(mixed, &EntryUpdate{ + Pos: modelPos, + Entry: &entry, + }) + // Insert in session model at modelPos index + session.model = append(session.model[:modelPos], append([]EntryIdentity{{payloadType: entry.payloadType, transaction: entry.transaction, id: entry.id}}, session.model[modelPos:]...)...) + } + } + + if len(session.new) > 0 || len(mixed) > 0 { + go notify(s.eventFeed, session.id, len(session.new) > 0, mixed) + } +} + +func (s *Service) processChanges(eventCount int, changedTxs []TransactionID) { + sessions := s.getAllSessions() + for _, session := range sessions { + s.processChangesForSession(session, eventCount, changedTxs) + } +} + +func (s *Service) processEntryDataUpdates(sessionID SessionID, entries []Entry, changedTxs []TransactionID) { + updateData := make([]*EntryData, 0, len(changedTxs)) + + entriesMap := make(map[string]Entry, len(entries)) + for _, e := range entries { + if e.payloadType == MultiTransactionPT { + if e.id != common.NoMultiTransactionID { + for _, tx := range e.transactions { + id := TransactionID{ + ChainID: tx.ChainID, + Hash: tx.Hash, + } + entriesMap[id.key()] = e + } + } + } else if e.transaction != nil { + id := TransactionID{ + ChainID: e.transaction.ChainID, + Hash: e.transaction.Hash, + } + entriesMap[id.key()] = e + } + } + + for _, tx := range changedTxs { + e, found := entriesMap[tx.key()] + if !found { + continue + } + + data := &EntryData{ + ActivityStatus: &e.activityStatus, + } + if e.payloadType == MultiTransactionPT { + data.ID = common.NewAndSet(e.id) + } else { + data.Transaction = e.transaction + } + data.PayloadType = e.payloadType + + updateData = append(updateData, data) + } + + if len(updateData) > 0 { + requestID := int32(sessionID) + sendResponseEvent(s.eventFeed, &requestID, EventActivityFilteringUpdate, updateData, nil) + } +} + +func notify(eventFeed *event.Feed, id SessionID, hasNewOnTop bool, mixed []*EntryUpdate) { + defer gocommon.LogOnPanic() + payload := SessionUpdate{ + New: mixed, + } + + if hasNewOnTop { + payload.HasNewOnTop = &hasNewOnTop + } + + sendResponseEvent(eventFeed, (*int32)(&id), EventActivitySessionUpdated, payload, nil) +} + +// unsubscribeFromEvents should be called with sessionsRWMutex locked for writing +func (s *Service) unsubscribeFromEvents() { + s.subscriptionsCancelFn() + s.subscriptionsCancelFn = nil + s.subscriptions.Unsubscribe() + close(s.ch) + s.ch = nil + s.subscriptions = nil +} + +func (s *Service) StopFilterSession(id SessionID) { + s.removeSesssion(id) + + // Cancel any pending or ongoing task + s.scheduler.Enqueue(int32(id), filterTask, func(ctx context.Context) (interface{}, error) { + return nil, nil + }, func(result interface{}, taskType async.TaskType, err error) {}) +} + +func (s *Service) getActivityDetailsAsync(requestID int32, entries []Entry) { + if len(entries) == 0 { + return + } + + ctx := context.Background() + + go func() { + defer gocommon.LogOnPanic() + activityData, err := s.getActivityDetails(ctx, entries) + if len(activityData) != 0 { + sendResponseEvent(s.eventFeed, &requestID, EventActivityFilteringUpdate, activityData, err) + } + }() +} + +type mixedIdentityResult struct { + newPos int + id EntryIdentity +} + +func entryIdsToMap(ids []EntryIdentity) map[string]EntryIdentity { + idsMap := make(map[string]EntryIdentity, len(ids)) + for _, id := range ids { + idsMap[id.key()] = id + } + return idsMap +} + +func entriesToMap(entries []Entry) map[string]Entry { + entryMap := make(map[string]Entry, len(entries)) + for _, entry := range entries { + updatedIdentity := entry.getIdentity() + entryMap[updatedIdentity.key()] = entry + } + return entryMap +} + +// FindUpdates returns changes in updated entries compared to the identities +// +// expects identities and entries to be sorted by timestamp +// +// the returned newer are entries that are newer than the first identity +// the returned mixed are entries that are older than the first identity (sorted by timestamp) +// the returned removed are identities that are not present in the updated entries (sorted by timestamp) +// +// implementation assumes the order of each identity doesn't change from old state (identities) and new state (updated); we have either add or removed. +func findUpdates(identities []EntryIdentity, updated []Entry) (new []mixedIdentityResult, removed []EntryIdentity) { + if len(updated) == 0 { + return + } + + idsMap := entryIdsToMap(identities) + updatedMap := entriesToMap(updated) + + for newIndex, entry := range updated { + id := entry.getIdentity() + if _, found := idsMap[id.key()]; !found { + new = append(new, mixedIdentityResult{ + newPos: newIndex, + id: id, + }) + } + + if len(identities) > 0 && entry.getIdentity().same(identities[len(identities)-1]) { + break + } + } + + // Account for new entries + for i := 0; i < len(identities); i++ { + id := identities[i] + if _, found := updatedMap[id.key()]; !found { + removed = append(removed, id) + } + } + return +} + +func (s *Service) addSession(session *Session) { + s.sessionsRWMutex.Lock() + defer s.sessionsRWMutex.Unlock() + subscribeToEvents := len(s.sessions) == 0 + + s.sessions[session.id] = session + + if subscribeToEvents { + s.subscribeToEvents() + } +} + +func (s *Service) removeSesssion(id SessionID) { + s.sessionsRWMutex.Lock() + defer s.sessionsRWMutex.Unlock() + delete(s.sessions, id) + if len(s.sessions) == 0 { + s.unsubscribeFromEvents() + } +} + +func (s *Service) getSession(id SessionID) *Session { + s.sessionsRWMutex.RLock() + defer s.sessionsRWMutex.RUnlock() + return s.sessions[id] +} + +func (s *Service) getAllSessions() []*Session { + s.sessionsRWMutex.RLock() + defer s.sessionsRWMutex.RUnlock() + sessions := make([]*Session, 0, len(s.sessions)) + for _, session := range s.sessions { + sessions = append(sessions, session) + } + return sessions +} diff --git a/services/wallet/activity/session_service_test.go b/services/wallet/activity/session_service_test.go new file mode 100644 index 000000000..bc44e9a36 --- /dev/null +++ b/services/wallet/activity/session_service_test.go @@ -0,0 +1,339 @@ +package activity + +import ( + "math/big" + "sync" + "testing" + + eth "github.com/ethereum/go-ethereum/common" + "github.com/status-im/status-go/services/wallet/common" + "github.com/status-im/status-go/services/wallet/transfer" + "github.com/status-im/status-go/transactions" + + "github.com/stretchr/testify/require" +) + +func TestService_IncrementalUpdateOnTop(t *testing.T) { + state := setupTestService(t) + defer state.close() + + transactionCount := 2 + allAddresses, pendings, ch, cleanup := setupTransactions(t, state, transactionCount, []transactions.TestTxSummary{{DontConfirm: true, Timestamp: transactionCount + 1}}) + defer cleanup() + + sessionID := state.service.StartFilterSession(allAddresses, allNetworksFilter(), Filter{}, 5, V1) + require.Greater(t, sessionID, SessionID(0)) + defer state.service.StopFilterSession(sessionID) + + filterResponseCount := validateFilteringDone(t, ch, 2, nil, nil) + + exp := pendings[0] + err := state.pendingTracker.StoreAndTrackPendingTx(&exp) + require.NoError(t, err) + + vFn := getValidateSessionUpdateHasNewOnTopFn(t) + pendingTransactionUpdate, sessionUpdatesCount := validateSessionUpdateEvent(t, ch, &filterResponseCount, 1, vFn) + + err = state.service.ResetFilterSession(sessionID, 5) + require.NoError(t, err) + + // Validate the reset data + eventActivityDoneCount := validateFilteringDone(t, ch, 3, func(payload FilterResponse) { + require.True(t, payload.Activities[0].isNew) + require.False(t, payload.Activities[1].isNew) + require.False(t, payload.Activities[2].isNew) + + // Check the new transaction data + newTx := payload.Activities[0] + require.Equal(t, PendingTransactionPT, newTx.payloadType) + // We don't keep type in the DB + require.Equal(t, (*int64)(nil), newTx.transferType) + require.Equal(t, SendAT, newTx.activityType) + require.Equal(t, PendingAS, newTx.activityStatus) + require.Equal(t, exp.ChainID, newTx.transaction.ChainID) + require.Equal(t, exp.ChainID, *newTx.chainIDOut) + require.Equal(t, (*common.ChainID)(nil), newTx.chainIDIn) + require.Equal(t, exp.Hash, newTx.transaction.Hash) + // Pending doesn't have address as part of identity + require.Equal(t, eth.Address{}, newTx.transaction.Address) + require.Equal(t, exp.From, *newTx.sender) + require.Equal(t, exp.To, *newTx.recipient) + require.Equal(t, 0, exp.Value.Int.Cmp((*big.Int)(newTx.amountOut))) + require.Equal(t, exp.Timestamp, uint64(newTx.timestamp)) + require.Equal(t, exp.Symbol, *newTx.symbolOut) + require.Equal(t, (*string)(nil), newTx.symbolIn) + require.Equal(t, &Token{ + TokenType: Native, + ChainID: 5, + }, newTx.tokenOut) + require.Equal(t, (*Token)(nil), newTx.tokenIn) + require.Equal(t, (*eth.Address)(nil), newTx.contractAddress) + + // Check the order of the following transaction data + require.Equal(t, SimpleTransactionPT, payload.Activities[1].payloadType) + require.Equal(t, int64(transactionCount), payload.Activities[1].timestamp) + require.Equal(t, SimpleTransactionPT, payload.Activities[2].payloadType) + require.Equal(t, int64(transactionCount-1), payload.Activities[2].timestamp) + }, nil) + + require.Equal(t, 1, pendingTransactionUpdate) + require.Equal(t, 1, filterResponseCount) + require.Equal(t, 1, sessionUpdatesCount) + require.Equal(t, 1, eventActivityDoneCount) +} + +func TestService_IncrementalUpdateMixed(t *testing.T) { + state := setupTestService(t) + defer state.close() + + transactionCount := 5 + allAddresses, pendings, ch, cleanup := setupTransactions(t, state, transactionCount, + []transactions.TestTxSummary{ + {DontConfirm: true, Timestamp: 2}, + {DontConfirm: true, Timestamp: 4}, + {DontConfirm: true, Timestamp: 6}, + }, + ) + defer cleanup() + + sessionID := state.service.StartFilterSession(allAddresses, allNetworksFilter(), Filter{}, 5, V1) + require.Greater(t, sessionID, SessionID(0)) + defer state.service.StopFilterSession(sessionID) + + filterResponseCount := validateFilteringDone(t, ch, 5, nil, nil) + + for i := range pendings { + err := state.pendingTracker.StoreAndTrackPendingTx(&pendings[i]) + require.NoError(t, err) + } + + pendingTransactionUpdate, sessionUpdatesCount := validateSessionUpdateEvent(t, ch, &filterResponseCount, 2, func(payload SessionUpdate) bool { + require.Nil(t, payload.HasNewOnTop) + require.NotEmpty(t, payload.New) + for _, update := range payload.New { + require.True(t, update.Entry.isNew) + foundIdx := -1 + for i, pTx := range pendings { + if pTx.Hash == update.Entry.transaction.Hash && pTx.ChainID == update.Entry.transaction.ChainID { + foundIdx = i + break + } + } + require.Greater(t, foundIdx, -1, "the updated transaction should be found in the pending list") + pendings = append(pendings[:foundIdx], pendings[foundIdx+1:]...) + } + return len(pendings) == 1 + }) + + // Validate that the last one (oldest) is out of the window + require.Equal(t, 1, len(pendings)) + require.Equal(t, uint64(2), pendings[0].Timestamp) + + require.Equal(t, 3, pendingTransactionUpdate) + require.LessOrEqual(t, sessionUpdatesCount, 3) + require.Equal(t, 1, filterResponseCount) + +} + +func TestService_IncrementalUpdateFetchWindow(t *testing.T) { + state := setupTestService(t) + defer state.close() + + transactionCount := 5 + allAddresses, pendings, ch, cleanup := setupTransactions(t, state, transactionCount, []transactions.TestTxSummary{{DontConfirm: true, Timestamp: transactionCount + 1}}) + defer cleanup() + + sessionID := state.service.StartFilterSession(allAddresses, allNetworksFilter(), Filter{}, 2, V1) + require.Greater(t, sessionID, SessionID(0)) + defer state.service.StopFilterSession(sessionID) + + filterResponseCount := validateFilteringDone(t, ch, 2, nil, nil) + + exp := pendings[0] + err := state.pendingTracker.StoreAndTrackPendingTx(&exp) + require.NoError(t, err) + + vFn := getValidateSessionUpdateHasNewOnTopFn(t) + pendingTransactionUpdate, sessionUpdatesCount := validateSessionUpdateEvent(t, ch, &filterResponseCount, 1, vFn) + + err = state.service.ResetFilterSession(sessionID, 2) + require.NoError(t, err) + + // Validate the reset data + eventActivityDoneCount := validateFilteringDone(t, ch, 2, func(payload FilterResponse) { + require.True(t, payload.Activities[0].isNew) + require.Equal(t, int64(transactionCount+1), payload.Activities[0].timestamp) + require.False(t, payload.Activities[1].isNew) + require.Equal(t, int64(transactionCount), payload.Activities[1].timestamp) + }, nil) + + require.Equal(t, 1, pendingTransactionUpdate) + require.Equal(t, 1, filterResponseCount) + require.Equal(t, 1, sessionUpdatesCount) + require.Equal(t, 1, eventActivityDoneCount) + + err = state.service.GetMoreForFilterSession(sessionID, 2) + require.NoError(t, err) + + eventActivityDoneCount = validateFilteringDone(t, ch, 2, func(payload FilterResponse) { + require.False(t, payload.Activities[0].isNew) + require.Equal(t, int64(transactionCount-1), payload.Activities[0].timestamp) + require.False(t, payload.Activities[1].isNew) + require.Equal(t, int64(transactionCount-2), payload.Activities[1].timestamp) + }, common.NewAndSet(extraExpect{common.NewAndSet(2), nil})) + require.Equal(t, 1, eventActivityDoneCount) +} + +func TestService_IncrementalUpdateFetchWindowNoReset(t *testing.T) { + state := setupTestService(t) + defer state.close() + + transactionCount := 5 + allAddresses, pendings, ch, cleanup := setupTransactions(t, state, transactionCount, []transactions.TestTxSummary{{DontConfirm: true, Timestamp: transactionCount + 1}}) + defer cleanup() + + sessionID := state.service.StartFilterSession(allAddresses, allNetworksFilter(), Filter{}, 2, V1) + require.Greater(t, sessionID, SessionID(0)) + defer state.service.StopFilterSession(sessionID) + + filterResponseCount := validateFilteringDone(t, ch, 2, func(payload FilterResponse) { + require.Equal(t, int64(transactionCount), payload.Activities[0].timestamp) + require.Equal(t, int64(transactionCount-1), payload.Activities[1].timestamp) + }, nil) + + exp := pendings[0] + err := state.pendingTracker.StoreAndTrackPendingTx(&exp) + require.NoError(t, err) + + vFn := getValidateSessionUpdateHasNewOnTopFn(t) + pendingTransactionUpdate, sessionUpdatesCount := validateSessionUpdateEvent(t, ch, &filterResponseCount, 1, vFn) + require.Equal(t, 1, pendingTransactionUpdate) + require.Equal(t, 1, filterResponseCount) + require.Equal(t, 1, sessionUpdatesCount) + + err = state.service.GetMoreForFilterSession(sessionID, 2) + require.NoError(t, err) + + // Validate that client continue loading the next window without being affected by the internal state of new + eventActivityDoneCount := validateFilteringDone(t, ch, 2, func(payload FilterResponse) { + require.False(t, payload.Activities[0].isNew) + require.Equal(t, int64(transactionCount-2), payload.Activities[0].timestamp) + require.False(t, payload.Activities[1].isNew) + require.Equal(t, int64(transactionCount-3), payload.Activities[1].timestamp) + }, common.NewAndSet(extraExpect{common.NewAndSet(2), nil})) + require.Equal(t, 1, eventActivityDoneCount) +} + +// Simulate and validate a multi-step user flow that was also a regression in the original implementation +func TestService_FilteredIncrementalUpdateResetAndClear(t *testing.T) { + state := setupTestService(t) + defer state.close() + + transactionCount := 5 + allAddresses, pendings, ch, cleanup := setupTransactions(t, state, transactionCount, []transactions.TestTxSummary{{DontConfirm: true, Timestamp: transactionCount + 1}}) + defer cleanup() + + // Generate new transaction for step 5 + newOffset := transactionCount + 2 + newTxs, newFromTrs, newToTrs := transfer.GenerateTestTransfers(t, state.service.db, newOffset, 1) + allAddresses = append(append(allAddresses, newFromTrs...), newToTrs...) + + // 1. User visualizes transactions for the first time + sessionID := state.service.StartFilterSession(allAddresses, allNetworksFilter(), Filter{}, 4, V1) + require.Greater(t, sessionID, SessionID(0)) + defer state.service.StopFilterSession(sessionID) + + validateFilteringDone(t, ch, 4, nil, nil) + + // 2. User applies a filter for pending transactions + err := state.service.UpdateFilterForSession(sessionID, Filter{Statuses: []Status{PendingAS}}, 4) + require.NoError(t, err) + + filterResponseCount := validateFilteringDone(t, ch, 0, nil, nil) + + // 3. A pending transaction is added + exp := pendings[0] + err = state.pendingTracker.StoreAndTrackPendingTx(&exp) + require.NoError(t, err) + + vFn := getValidateSessionUpdateHasNewOnTopFn(t) + pendingTransactionUpdate, sessionUpdatesCount := validateSessionUpdateEvent(t, ch, &filterResponseCount, 1, vFn) + + // 4. User resets the view and the new pending transaction has the new flag + err = state.service.ResetFilterSession(sessionID, 2) + require.NoError(t, err) + + // Validate the reset data + eventActivityDoneCount := validateFilteringDone(t, ch, 1, func(payload FilterResponse) { + require.True(t, payload.Activities[0].isNew) + require.Equal(t, int64(transactionCount+1), payload.Activities[0].timestamp) + }, nil) + + require.Equal(t, 1, pendingTransactionUpdate) + require.Equal(t, 1, filterResponseCount) + require.Equal(t, 1, sessionUpdatesCount) + require.Equal(t, 1, eventActivityDoneCount) + + // 5. A new transaction is downloaded + transfer.InsertTestTransfer(t, state.service.db, newTxs[0].To, &newTxs[0]) + + // 6. User clears the filter and only the new transaction should have the new flag + err = state.service.UpdateFilterForSession(sessionID, Filter{}, 4) + require.NoError(t, err) + + eventActivityDoneCount = validateFilteringDone(t, ch, 4, func(payload FilterResponse) { + require.True(t, payload.Activities[0].isNew) + require.Equal(t, int64(newOffset), payload.Activities[0].timestamp) + require.False(t, payload.Activities[1].isNew) + require.Equal(t, int64(newOffset-1), payload.Activities[1].timestamp) + require.False(t, payload.Activities[2].isNew) + require.Equal(t, int64(newOffset-2), payload.Activities[2].timestamp) + require.False(t, payload.Activities[3].isNew) + require.Equal(t, int64(newOffset-3), payload.Activities[3].timestamp) + }, nil) + require.Equal(t, 1, eventActivityDoneCount) +} + +// Test the different session-related endpoints in a multi-threaded environment +func TestService_MultiThread(t *testing.T) { + state := setupTestService(t) + defer state.close() + + const n = 3 + var wg sync.WaitGroup + wg.Add(n) + for i := 0; i < n; i++ { + go func() { + defer wg.Done() + sessionID := state.service.StartFilterSession([]eth.Address{}, allNetworksFilter(), Filter{}, 5, V2) + require.Greater(t, sessionID, SessionID(0)) + + transactionCount := 5 + _, _, _, cleanup := setupTransactions(t, state, transactionCount, []transactions.TestTxSummary{{DontConfirm: true, Timestamp: transactionCount + 1}}) + defer cleanup() + + const m = 10 + var subwg sync.WaitGroup + subwg.Add(m) + for j := 0; j < m; j++ { + go func() { + defer subwg.Done() + var suberr error + + suberr = state.service.UpdateFilterForSession(sessionID, Filter{}, 5) + require.NoError(t, suberr) + + suberr = state.service.ResetFilterSession(sessionID, 5) + require.NoError(t, suberr) + + suberr = state.service.GetMoreForFilterSession(sessionID, 5) + require.NoError(t, suberr) + }() + } + subwg.Wait() + + state.service.StopFilterSession(sessionID) + }() + } + wg.Wait() +} diff --git a/services/wallet/api.go b/services/wallet/api.go index 45d29ccc3..597088923 100644 --- a/services/wallet/api.go +++ b/services/wallet/api.go @@ -814,7 +814,17 @@ func (api *API) StartActivityFilterSession(addresses []common.Address, chainIDs zap.Int("firstPageCount", firstPageCount), ) - return api.s.activity.StartFilterSession(addresses, chainIDs, filter, firstPageCount), nil + return api.s.activity.StartFilterSession(addresses, chainIDs, filter, firstPageCount, activity.V1), nil +} + +func (api *API) StartActivityFilterSessionV2(addresses []common.Address, chainIDs []wcommon.ChainID, filter activity.Filter, firstPageCount int) (activity.SessionID, error) { + logutils.ZapLogger().Debug("wallet.api.StartActivityFilterSessionV2", + zap.Int("addr.count", len(addresses)), + zap.Int("chainIDs.count", len(chainIDs)), + zap.Int("firstPageCount", firstPageCount), + ) + + return api.s.activity.StartFilterSession(addresses, chainIDs, filter, firstPageCount, activity.V2), nil } func (api *API) UpdateActivityFilterForSession(sessionID activity.SessionID, filter activity.Filter, firstPageCount int) error { diff --git a/services/wallet/routeexecution/manager.go b/services/wallet/routeexecution/manager.go index 87fa4a840..7fb96b198 100644 --- a/services/wallet/routeexecution/manager.go +++ b/services/wallet/routeexecution/manager.go @@ -8,6 +8,7 @@ import ( "go.uber.org/zap" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" "github.com/status-im/status-go/logutils" @@ -21,26 +22,33 @@ import ( pathProcessorCommon "github.com/status-im/status-go/services/wallet/router/pathprocessor/common" "github.com/status-im/status-go/services/wallet/router/sendtype" "github.com/status-im/status-go/services/wallet/transfer" + "github.com/status-im/status-go/services/wallet/walletevent" "github.com/status-im/status-go/services/wallet/wallettypes" "github.com/status-im/status-go/signal" ) +const ( + EventRouteExecutionTransactionSent walletevent.EventType = walletevent.InternalEventTypePrefix + "wallet-route-execution-transaction-sent" +) + type Manager struct { router *router.Router transactionManager *transfer.TransactionManager transferController *transfer.Controller db *storage.DB + eventFeed *event.Feed // Local data used for storage purposes buildInputParams *requests.RouterBuildTransactionsParams } -func NewManager(walletDB *sql.DB, router *router.Router, transactionManager *transfer.TransactionManager, transferController *transfer.Controller) *Manager { +func NewManager(walletDB *sql.DB, eventFeed *event.Feed, router *router.Router, transactionManager *transfer.TransactionManager, transferController *transfer.Controller) *Manager { return &Manager{ router: router, transactionManager: transactionManager, transferController: transferController, db: storage.NewDB(walletDB), + eventFeed: eventFeed, } } @@ -134,6 +142,12 @@ func (m *Manager) SendRouterTransactionsWithSignatures(ctx context.Context, send response.SendDetails.ErrorResponse = err.(*statusErrors.ErrorResponse) } signal.SendWalletEvent(signal.RouterTransactionsSent, response) + + event := walletevent.Event{ + Type: EventRouteExecutionTransactionSent, + EventParams: response, + } + m.eventFeed.Send(event) }() _, routeInputParams = m.router.GetBestRouteAndAssociatedInputParams() diff --git a/services/wallet/service.go b/services/wallet/service.go index 1a473d307..3b5abebd4 100644 --- a/services/wallet/service.go +++ b/services/wallet/service.go @@ -204,7 +204,7 @@ func NewService( router.AddPathProcessor(processor) } - routeExecutionManager := routeexecution.NewManager(db, router, transactionManager, transferController) + routeExecutionManager := routeexecution.NewManager(db, feed, router, transactionManager, transferController) return &Service{ db: db, diff --git a/services/wallet/transfer/transaction_manager_route.go b/services/wallet/transfer/transaction_manager_route.go index a919b797e..4fa38b6b3 100644 --- a/services/wallet/transfer/transaction_manager_route.go +++ b/services/wallet/transfer/transaction_manager_route.go @@ -350,6 +350,8 @@ func addSignatureAndSendTransaction( return nil, err } + txData.TxArgs.MultiTransactionID = multiTransactionID + return responses.NewRouterSentTransaction(txData.TxArgs, txData.SentHash, isApproval), nil } diff --git a/tests-functional/clients/signals.py b/tests-functional/clients/signals.py index 28b670af4..d6a5d4c6b 100644 --- a/tests-functional/clients/signals.py +++ b/tests-functional/clients/signals.py @@ -12,23 +12,58 @@ class SignalClient: self.await_signals = await_signals self.received_signals = { - signal: [] for signal in self.await_signals + # For each signal type, store: + # - list of received signals + # - expected received event delta count (resets to 1 after each wait_for_event call) + # - expected received event count + # - a function that takes the received signal as an argument and returns True if the signal is accepted (counted) or discarded + signal: { + "received": [], + "delta_count": 1, + "expected_count": 1, + "accept_fn": None + } for signal in self.await_signals } def on_message(self, ws, signal): signal = json.loads(signal) - if signal.get("type") in self.await_signals: - self.received_signals[signal["type"]].append(signal) + signal_type = signal.get("type") + if signal_type in self.await_signals: + accept_fn = self.received_signals[signal_type]["accept_fn"] + if not accept_fn or accept_fn(signal): + self.received_signals[signal_type]["received"].append(signal) + + def check_signal_type(self, signal_type): + if signal_type not in self.await_signals: + raise ValueError(f"Signal type {signal_type} is not in the list of awaited signals") + + # Used to set up how many instances of a signal to wait for, before triggering the actions + # that cause them to be emitted. + def prepare_wait_for_signal(self, signal_type, delta_count, accept_fn=None): + self.check_signal_type(signal_type) + + if delta_count < 1: + raise ValueError("delta_count must be greater than 0") + self.received_signals[signal_type]["delta_count"] = delta_count + self.received_signals[signal_type]["expected_count"] = len(self.received_signals[signal_type]["received"]) + delta_count + self.received_signals[signal_type]["accept_fn"] = accept_fn def wait_for_signal(self, signal_type, timeout=20): + self.check_signal_type(signal_type) + start_time = time.time() - while not self.received_signals.get(signal_type): + received_signals = self.received_signals.get(signal_type) + while (not received_signals) or len(received_signals["received"]) < received_signals["expected_count"]: if time.time() - start_time >= timeout: raise TimeoutError( f"Signal {signal_type} is not received in {timeout} seconds") time.sleep(0.2) logging.debug(f"Signal {signal_type} is received in {round(time.time() - start_time)} seconds") - return self.received_signals[signal_type][0] + delta_count = received_signals["delta_count"] + self.prepare_wait_for_signal(signal_type, 1) + if delta_count == 1: + return self.received_signals[signal_type]["received"][-1] + return self.received_signals[signal_type]["received"][-delta_count:] def _on_error(self, ws, error): logging.error(f"Error: {error}") diff --git a/tests-functional/tests/test_wallet_activity_session.py b/tests-functional/tests/test_wallet_activity_session.py new file mode 100644 index 000000000..a99a6bdec --- /dev/null +++ b/tests-functional/tests/test_wallet_activity_session.py @@ -0,0 +1,90 @@ +import json +import random + +import pytest + +from constants import user_1 +from test_cases import SignalTestCase + +import wallet_utils + +import logging + +EventActivityFilteringDone = "wallet-activity-filtering-done" +EventActivityFilteringUpdate = "wallet-activity-filtering-entries-updated" +EventActivitySessionUpdated = "wallet-activity-session-updated" + +def validate_entry(entry, tx_data): + assert entry["transactions"][0]["chainId"] == tx_data["tx_status"]["chainId"] + assert entry["transactions"][0]["hash"] == tx_data["tx_status"]["hash"] + +@pytest.mark.wallet +@pytest.mark.rpc +class TestWalletActivitySession(SignalTestCase): + await_signals = ["wallet", + "wallet.suggested.routes", + "wallet.router.sign-transactions", + "wallet.router.sending-transactions-started", + "wallet.transaction.status-changed", + "wallet.router.transactions-sent"] + + def setup_method(self): + super().setup_method() + self.request_id = str(random.randint(1, 8888)) + + def test_wallet_start_activity_filter_session(self): + tx_data = [] # (routes, build_tx, tx_signatures, tx_status) + # Set up a transactions for account before starting session + tx_data.append(wallet_utils.send_router_transaction(self.rpc_client, self.signal_client)) + + # Start activity session + method = "wallet_startActivityFilterSessionV2" + params = [[user_1.address], [self.network_id], + {"period": {"startTimestamp": 0, "endTimestamp": 0}, "types": [], "statuses": [], + "counterpartyAddresses": [], "assets": [], "collectibles": [], "filterOutAssets": False, + "filterOutCollectibles": False}, 10] + self.signal_client.prepare_wait_for_signal("wallet", 1, lambda signal : signal["event"]["type"] == EventActivityFilteringDone) + response = self.rpc_client.rpc_valid_request(method, params, self.request_id) + event_response = self.signal_client.wait_for_signal("wallet", timeout=10)['event'] + + # Check response + sessionID = int(response.json()["result"]) + assert sessionID > 0 + + # Check response event + assert int(event_response['requestId']) == sessionID + message = json.loads(event_response['message'].replace("'", "\"")) + assert int(message['errorCode']) == 1 + assert len(message['activities']) > 0 # Should have at least 1 entry + # First activity entry should match last sent transaction + validate_entry(message['activities'][0], tx_data[-1]) + + # Trigger new transaction + self.signal_client.prepare_wait_for_signal("wallet", 1, lambda signal : signal["event"]["type"] == EventActivitySessionUpdated and signal['event']['requestId'] == sessionID) + tx_data.append(wallet_utils.send_router_transaction(self.rpc_client, self.signal_client)) + print(tx_data[-1]) + event_response = self.signal_client.wait_for_signal("wallet", timeout=10)['event'] + + # Check response event + assert int(event_response['requestId']) == sessionID + message = json.loads(event_response['message'].replace("'", "\"")) + assert message['hasNewOnTop'] # New entries reported + + # Reset activity session + method = "wallet_resetActivityFilterSession" + params = [sessionID, 10] + self.signal_client.prepare_wait_for_signal("wallet", 1, lambda signal : signal["event"]["type"] == EventActivityFilteringDone and signal['event']['requestId'] == sessionID) + response = self.rpc_client.rpc_valid_request(method, params, self.request_id) + event_response = self.signal_client.wait_for_signal("wallet", timeout=10)['event'] + + # Check response event + assert int(event_response['requestId']) == sessionID + message = json.loads(event_response['message'].replace("'", "\"")) + assert int(message['errorCode']) == 1 + assert len(message['activities']) > 1 # Should have at least 2 entries + + # First activity entry should match last sent transaction + validate_entry(message['activities'][0], tx_data[-1]) + + # Second activity entry should match second to last sent transaction + validate_entry(message['activities'][1], tx_data[-2]) diff --git a/tests-functional/wallet_utils.py b/tests-functional/wallet_utils.py new file mode 100644 index 000000000..d61e8b1bc --- /dev/null +++ b/tests-functional/wallet_utils.py @@ -0,0 +1,131 @@ +import json +import logging +import jsonschema +import uuid +import threading +import time + +from conftest import option +from constants import user_1, user_2 + +from clients.signals import SignalClient + +def verify_json_schema(response, method): + with open(f"{option.base_dir}/schemas/{method}", "r") as schema: + jsonschema.validate(instance=response, + schema=json.load(schema)) + +def get_suggested_routes(rpc_client, signal_client, **kwargs): + _uuid = str(uuid.uuid4()) + amount_in = "0xde0b6b3a7640000" + + method = "wallet_getSuggestedRoutesAsync" + input_params = { + "uuid": _uuid, + "sendType": 0, + "addrFrom": user_1.address, + "addrTo": user_2.address, + "amountIn": amount_in, + "amountOut": "0x0", + "tokenID": "ETH", + "tokenIDIsOwnerToken": False, + "toTokenID": "", + "disabledFromChainIDs": [10, 42161], + "disabledToChainIDs": [10, 42161], + "gasFeeMode": 1, + "fromLockedAmount": {} + } + for key, new_value in kwargs.items(): + if key in input_params: + input_params[key] = new_value + else: + logging.info( + f"Warning: The key '{key}' does not exist in the input_params parameters and will be ignored.") + params = [input_params] + + signal_client.prepare_wait_for_signal("wallet.suggested.routes", 1) + _ = rpc_client.rpc_valid_request(method, params) + + routes = signal_client.wait_for_signal("wallet.suggested.routes") + assert routes['event']['Uuid'] == _uuid + + return routes['event'] + +def build_transactions_from_route(rpc_client, signal_client, uuid, **kwargs): + method = "wallet_buildTransactionsFromRoute" + build_tx_params = { + "uuid": uuid, + "slippagePercentage": 0 + } + for key, new_value in kwargs.items(): + if key in build_tx_params: + build_tx_params[key] = new_value + else: + logging.info( + f"Warning: The key '{key}' does not exist in the build_tx_params parameters and will be ignored.") + params = [build_tx_params] + _ = rpc_client.rpc_valid_request(method, params) + + wallet_router_sign_transactions = signal_client.wait_for_signal("wallet.router.sign-transactions") + + assert wallet_router_sign_transactions['event']['signingDetails']['signOnKeycard'] == False + transaction_hashes = wallet_router_sign_transactions['event']['signingDetails']['hashes'] + + assert transaction_hashes, "Transaction hashes are empty!" + + return wallet_router_sign_transactions['event'] + +def sign_messages(rpc_client, hashes): + tx_signatures = {} + + for hash in hashes: + + method = "wallet_signMessage" + params = [ + hash, + user_1.address, + option.password + ] + + response = rpc_client.rpc_valid_request(method, params) + + if response.json()["result"].startswith("0x"): + tx_signature = response.json()["result"][2:] + + signature = { + "r": tx_signature[:64], + "s": tx_signature[64:128], + "v": tx_signature[128:] + } + + tx_signatures[hash] = signature + return tx_signatures + +def send_router_transactions_with_signatures(rpc_client, signal_client, uuid, tx_signatures): + method = "wallet_sendRouterTransactionsWithSignatures" + params = [ + { + "uuid": uuid, + "Signatures": tx_signatures + } + ] + _ = rpc_client.rpc_valid_request(method, params) + + tx_status = signal_client.wait_for_signal( + "wallet.transaction.status-changed") + + assert tx_status["event"]["status"] == "Success" + + return tx_status["event"] + +def send_router_transaction(rpc_client, signal_client, **kwargs): + routes = get_suggested_routes(rpc_client, signal_client, **kwargs) + build_tx = build_transactions_from_route(rpc_client, signal_client, routes['Uuid']) + tx_signatures = sign_messages(rpc_client, build_tx['signingDetails']['hashes']) + tx_status = send_router_transactions_with_signatures(rpc_client, signal_client, routes['Uuid'], tx_signatures) + return { + "routes": routes, + "build_tx": build_tx, + "tx_signatures": tx_signatures, + "tx_status": tx_status + } diff --git a/transactions/testhelpers.go b/transactions/testhelpers.go index e10d0acbe..88f63c180 100644 --- a/transactions/testhelpers.go +++ b/transactions/testhelpers.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "math/big" + "sync" "testing" eth "github.com/ethereum/go-ethereum/common" @@ -35,6 +36,7 @@ type MockChainClient struct { mock_client.MockClientInterface Clients map[common.ChainID]*MockETHClient + mu sync.RWMutex } var _ chain.ClientInterface = (*MockChainClient)(nil) @@ -46,6 +48,8 @@ func NewMockChainClient() *MockChainClient { } func (m *MockChainClient) SetAvailableClients(chainIDs []common.ChainID) *MockChainClient { + m.mu.Lock() + defer m.mu.Unlock() for _, chainID := range chainIDs { if _, ok := m.Clients[chainID]; !ok { m.Clients[chainID] = new(MockETHClient) @@ -55,6 +59,8 @@ func (m *MockChainClient) SetAvailableClients(chainIDs []common.ChainID) *MockCh } func (m *MockChainClient) AbstractEthClient(chainID common.ChainID) (ethclient.BatchCallClient, error) { + m.mu.RLock() + defer m.mu.RUnlock() if _, ok := m.Clients[chainID]; !ok { panic(fmt.Sprintf("no mock client for chainID %d", chainID)) } diff --git a/walletdatabase/migrations/sql/1730807123_add_from_to_address_virtual_columns_to_route_input_data.up.sql b/walletdatabase/migrations/sql/1730807123_add_from_to_address_virtual_columns_to_route_input_data.up.sql new file mode 100644 index 000000000..60282ef68 --- /dev/null +++ b/walletdatabase/migrations/sql/1730807123_add_from_to_address_virtual_columns_to_route_input_data.up.sql @@ -0,0 +1,5 @@ +ALTER TABLE route_input_parameters ADD COLUMN from_address BLOB NOT NULL AS (unhex(substr(json_extract(route_input_params_json, '$.addrFrom'),3))); +ALTER TABLE route_input_parameters ADD COLUMN to_address BLOB NOT NULL AS (unhex(substr(json_extract(route_input_params_json, '$.addrTo'),3))); + +CREATE INDEX IF NOT EXISTS idx_route_input_parameters_per_from_address ON route_input_parameters (from_address); +CREATE INDEX IF NOT EXISTS idx_route_input_parameters_per_to_address ON route_input_parameters (to_address);