From 2849a447f17ecd7e98df62b6801112ddc2dd495b Mon Sep 17 00:00:00 2001 From: Dario Gabriel Lipicar Date: Tue, 12 Nov 2024 15:28:40 -0300 Subject: [PATCH] feat!: implement new activityV2 filter --- services/wallet/activity/activity.go | 135 ++++---- services/wallet/activity/activity_v2.go | 323 ++++++++++++++++++ services/wallet/activity/filter.go | 4 +- services/wallet/activity/service.go | 9 +- services/wallet/activity/service_test.go | 12 +- services/wallet/activity/session.go | 203 +++++++++-- services/wallet/api.go | 12 +- services/wallet/routeexecution/manager.go | 16 +- services/wallet/service.go | 2 +- .../transfer/transaction_manager_route.go | 2 + tests-functional/clients/signals.py | 45 ++- .../tests/test_wallet_activity_session.py | 90 +++++ tests-functional/wallet_utils.py | 131 +++++++ ...virtual_columns_to_route_input_data.up.sql | 5 + 14 files changed, 889 insertions(+), 100 deletions(-) create mode 100644 services/wallet/activity/activity_v2.go create mode 100644 tests-functional/tests/test_wallet_activity_session.py create mode 100644 tests-functional/wallet_utils.py create mode 100644 walletdatabase/migrations/sql/1730807123_add_from_to_address_virtual_columns_to_route_input_data.up.sql diff --git a/services/wallet/activity/activity.go b/services/wallet/activity/activity.go index f5f8e8a55..6048aa50c 100644 --- a/services/wallet/activity/activity.go +++ b/services/wallet/activity/activity.go @@ -40,7 +40,7 @@ var ( ZeroAddress = eth.Address{} ) -type TransferType = int +type TransferType = int64 const ( TransferTypeEth TransferType = iota + 1 @@ -49,51 +49,66 @@ const ( TransferTypeErc1155 ) +const ( + L1FinalizationDuration = 960 // A block on layer 1 is every 12s, finalization require 64 blocks. A buffer of 16 blocks is added to not create false positives. + L2FinalizationDuration = 648000 // 7.5 days in seconds for layer 2 finalization. 0.5 day is buffer to not create false positive. +) + +const ( + NoLimit = 0 +) + type Entry struct { - payloadType PayloadType - transaction *transfer.TransactionIdentity - id common.MultiTransactionIDType - timestamp int64 - activityType Type - activityStatus Status - amountOut *hexutil.Big // Used for activityType SendAT, SwapAT, BridgeAT - amountIn *hexutil.Big // Used for activityType ReceiveAT, BuyAT, SwapAT, BridgeAT, ApproveAT - tokenOut *Token // Used for activityType SendAT, SwapAT, BridgeAT - tokenIn *Token // Used for activityType ReceiveAT, BuyAT, SwapAT, BridgeAT, ApproveAT - symbolOut *string - symbolIn *string - sender *eth.Address - recipient *eth.Address - chainIDOut *common.ChainID - chainIDIn *common.ChainID - transferType *TransferType - contractAddress *eth.Address - communityID *string + payloadType PayloadType + transaction *transfer.TransactionIdentity // ID for SimpleTransactionPT and PendingTransactionPT. Origin transaction for MultiTransactionPT + id common.MultiTransactionIDType // ID for MultiTransactionPT + transactions []*transfer.TransactionIdentity // List of transactions for MultiTransactionPT + timestamp int64 + activityType Type + activityStatus Status + amountOut *hexutil.Big // Used for activityType SendAT, SwapAT, BridgeAT + amountIn *hexutil.Big // Used for activityType ReceiveAT, BuyAT, SwapAT, BridgeAT, ApproveAT + tokenOut *Token // Used for activityType SendAT, SwapAT, BridgeAT + tokenIn *Token // Used for activityType ReceiveAT, BuyAT, SwapAT, BridgeAT, ApproveAT + symbolOut *string + symbolIn *string + sender *eth.Address + recipient *eth.Address + chainIDOut *common.ChainID + chainIDIn *common.ChainID + transferType *TransferType + contractAddress *eth.Address // Used for contract deployment + communityID *string + interactedContractAddress *eth.Address + approvalSpender *eth.Address isNew bool // isNew is used to indicate if the entry is newer than session start (changed state also) } // Only used for JSON marshalling type EntryData struct { - PayloadType PayloadType `json:"payloadType"` - Transaction *transfer.TransactionIdentity `json:"transaction,omitempty"` - ID *common.MultiTransactionIDType `json:"id,omitempty"` - Timestamp *int64 `json:"timestamp,omitempty"` - ActivityType *Type `json:"activityType,omitempty"` - ActivityStatus *Status `json:"activityStatus,omitempty"` - AmountOut *hexutil.Big `json:"amountOut,omitempty"` - AmountIn *hexutil.Big `json:"amountIn,omitempty"` - TokenOut *Token `json:"tokenOut,omitempty"` - TokenIn *Token `json:"tokenIn,omitempty"` - SymbolOut *string `json:"symbolOut,omitempty"` - SymbolIn *string `json:"symbolIn,omitempty"` - Sender *eth.Address `json:"sender,omitempty"` - Recipient *eth.Address `json:"recipient,omitempty"` - ChainIDOut *common.ChainID `json:"chainIdOut,omitempty"` - ChainIDIn *common.ChainID `json:"chainIdIn,omitempty"` - TransferType *TransferType `json:"transferType,omitempty"` - ContractAddress *eth.Address `json:"contractAddress,omitempty"` - CommunityID *string `json:"communityId,omitempty"` + PayloadType PayloadType `json:"payloadType"` + Transaction *transfer.TransactionIdentity `json:"transaction,omitempty"` + ID *common.MultiTransactionIDType `json:"id,omitempty"` + Transactions []*transfer.TransactionIdentity `json:"transactions,omitempty"` + Timestamp *int64 `json:"timestamp,omitempty"` + ActivityType *Type `json:"activityType,omitempty"` + ActivityStatus *Status `json:"activityStatus,omitempty"` + AmountOut *hexutil.Big `json:"amountOut,omitempty"` + AmountIn *hexutil.Big `json:"amountIn,omitempty"` + TokenOut *Token `json:"tokenOut,omitempty"` + TokenIn *Token `json:"tokenIn,omitempty"` + SymbolOut *string `json:"symbolOut,omitempty"` + SymbolIn *string `json:"symbolIn,omitempty"` + Sender *eth.Address `json:"sender,omitempty"` + Recipient *eth.Address `json:"recipient,omitempty"` + ChainIDOut *common.ChainID `json:"chainIdOut,omitempty"` + ChainIDIn *common.ChainID `json:"chainIdIn,omitempty"` + TransferType *TransferType `json:"transferType,omitempty"` + ContractAddress *eth.Address `json:"contractAddress,omitempty"` + CommunityID *string `json:"communityId,omitempty"` + InteractedContractAddress *eth.Address `json:"interactedContractAddress,omitempty"` + ApprovalSpender *eth.Address `json:"approvalSpender,omitempty"` IsNew *bool `json:"isNew,omitempty"` @@ -103,26 +118,29 @@ type EntryData struct { func (e *Entry) MarshalJSON() ([]byte, error) { data := EntryData{ - Timestamp: &e.timestamp, - ActivityType: &e.activityType, - ActivityStatus: &e.activityStatus, - AmountOut: e.amountOut, - AmountIn: e.amountIn, - TokenOut: e.tokenOut, - TokenIn: e.tokenIn, - SymbolOut: e.symbolOut, - SymbolIn: e.symbolIn, - Sender: e.sender, - Recipient: e.recipient, - ChainIDOut: e.chainIDOut, - ChainIDIn: e.chainIDIn, - TransferType: e.transferType, - ContractAddress: e.contractAddress, - CommunityID: e.communityID, + Timestamp: &e.timestamp, + ActivityType: &e.activityType, + ActivityStatus: &e.activityStatus, + AmountOut: e.amountOut, + AmountIn: e.amountIn, + TokenOut: e.tokenOut, + TokenIn: e.tokenIn, + SymbolOut: e.symbolOut, + SymbolIn: e.symbolIn, + Sender: e.sender, + Recipient: e.recipient, + ChainIDOut: e.chainIDOut, + ChainIDIn: e.chainIDIn, + TransferType: e.transferType, + ContractAddress: e.contractAddress, + CommunityID: e.communityID, + InteractedContractAddress: e.interactedContractAddress, + ApprovalSpender: e.approvalSpender, } if e.payloadType == MultiTransactionPT { data.ID = common.NewAndSet(e.id) + data.Transactions = e.transactions } else { data.Transaction = e.transaction } @@ -145,6 +163,7 @@ func (e *Entry) UnmarshalJSON(data []byte) error { if aux.ID != nil { e.id = *aux.ID } + e.transactions = aux.Transactions if aux.Timestamp != nil { e.timestamp = *aux.Timestamp } @@ -166,6 +185,8 @@ func (e *Entry) UnmarshalJSON(data []byte) error { e.chainIDIn = aux.ChainIDIn e.transferType = aux.TransferType e.communityID = aux.CommunityID + e.interactedContractAddress = aux.InteractedContractAddress + e.approvalSpender = aux.ApprovalSpender e.isNew = aux.IsNew != nil && *aux.IsNew @@ -496,8 +517,8 @@ func getActivityEntries(ctx context.Context, deps FilterDependencies, addresses includeAllNetworks, transactions.Pending, deps.currentTimestamp(), - 648000, // 7.5 days in seconds for layer 2 finalization. 0.5 day is buffer to not create false positive. - 960, // A block on layer 1 is every 12s, finalization require 64 blocks. A buffer of 16 blocks is added to not create false positives. + L2FinalizationDuration, + L1FinalizationDuration, limit, offset) if err != nil { return nil, err diff --git a/services/wallet/activity/activity_v2.go b/services/wallet/activity/activity_v2.go new file mode 100644 index 000000000..8273693a1 --- /dev/null +++ b/services/wallet/activity/activity_v2.go @@ -0,0 +1,323 @@ +package activity + +import ( + "context" + "database/sql" + "errors" + "time" + + sq "github.com/Masterminds/squirrel" + + eth "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" + ethTypes "github.com/ethereum/go-ethereum/core/types" + + "go.uber.org/zap" + + "github.com/status-im/status-go/logutils" + wCommon "github.com/status-im/status-go/services/wallet/common" + "github.com/status-im/status-go/services/wallet/requests" + "github.com/status-im/status-go/services/wallet/router/pathprocessor" + "github.com/status-im/status-go/services/wallet/router/routes" + "github.com/status-im/status-go/services/wallet/token" + "github.com/status-im/status-go/services/wallet/transfer" + "github.com/status-im/status-go/sqlite" + "github.com/status-im/status-go/transactions" +) + +// getActivityEntriesV2 queries the route_* and tracked_transactions based on filter parameters and arguments +// it returns metadata for all entries ordered by timestamp column +func getActivityEntriesV2(ctx context.Context, deps FilterDependencies, addresses []eth.Address, allAddresses bool, chainIDs []wCommon.ChainID, filter Filter, offset int, limit int) ([]Entry, error) { + if len(addresses) == 0 { + return nil, errors.New("no addresses provided") + } + if len(chainIDs) == 0 { + return nil, errors.New("no chainIDs provided") + } + + q := sq.Select(` + st.tx_json, + rpt.tx_args_json, + rpt.is_approval, + rp.path_json, + rip.route_input_params_json, + rbtp.route_build_tx_params_json, + tt.tx_status, + tt.timestamp + `).Distinct() + q = q.From("sent_transactions st"). + LeftJoin(`route_path_transactions rpt ON + st.chain_id = rpt.chain_id AND + st.tx_hash = rpt.tx_hash`). + LeftJoin(`tracked_transactions tt ON + st.chain_id = tt.chain_id AND + st.tx_hash = tt.tx_hash`). + LeftJoin(`route_paths rp ON + rpt.uuid = rp.uuid AND + rpt.path_idx = rp.path_idx`). + LeftJoin(`route_build_tx_parameters rbtp ON + rpt.uuid = rbtp.uuid`). + LeftJoin(`route_input_parameters rip ON + rpt.uuid = rip.uuid`) + q = q.OrderBy("tt.timestamp DESC") + + qConditions := sq.And{} + + qConditions = append(qConditions, sq.Eq{"rpt.chain_id": chainIDs}) + qConditions = append(qConditions, sq.Eq{"rip.from_address": addresses}) + + q = q.Where(qConditions) + + if limit != NoLimit { + q = q.Limit(uint64(limit)) + q = q.Offset(uint64(offset)) + } + + query, args, err := q.ToSql() + if err != nil { + return nil, err + } + + stmt, err := deps.db.Prepare(query) + if err != nil { + return nil, err + } + defer stmt.Close() + + rows, err := stmt.Query(args...) + if err != nil { + return nil, err + } + defer rows.Close() + + data, err := rowsToDataV2(rows) + if err != nil { + return nil, err + } + + return dataToEntriesV2(deps, data) +} + +type entryDataV2 struct { + TxArgs *transactions.SendTxArgs + Tx *ethTypes.Transaction + IsApproval bool + Status transactions.TxStatus + Timestamp int64 + Path *routes.Path + RouteInputParams *requests.RouteInputParams + BuildInputParams *requests.RouterBuildTransactionsParams +} + +func newEntryDataV2() *entryDataV2 { + return &entryDataV2{ + TxArgs: new(transactions.SendTxArgs), + Tx: new(ethTypes.Transaction), + Path: new(routes.Path), + RouteInputParams: new(requests.RouteInputParams), + BuildInputParams: new(requests.RouterBuildTransactionsParams), + } +} + +func rowsToDataV2(rows *sql.Rows) ([]*entryDataV2, error) { + var ret []*entryDataV2 + for rows.Next() { + data := newEntryDataV2() + + nullableTx := sqlite.JSONBlob{Data: data.Tx} + nullableTxArgs := sqlite.JSONBlob{Data: data.TxArgs} + nullableIsApproval := sql.NullBool{} + nullablePath := sqlite.JSONBlob{Data: data.Path} + nullableRouteInputParams := sqlite.JSONBlob{Data: data.RouteInputParams} + nullableBuildInputParams := sqlite.JSONBlob{Data: data.BuildInputParams} + nullableStatus := sql.NullString{} + nullableTimestamp := sql.NullInt64{} + + err := rows.Scan( + &nullableTx, + &nullableTxArgs, + &nullableIsApproval, + &nullablePath, + &nullableRouteInputParams, + &nullableBuildInputParams, + &nullableStatus, + &nullableTimestamp, + ) + if err != nil { + return nil, err + } + + // Check all necessary fields are not null + if !nullableTxArgs.Valid || + !nullableTx.Valid || + !nullableIsApproval.Valid || + !nullableStatus.Valid || + !nullableTimestamp.Valid || + !nullablePath.Valid || + !nullableRouteInputParams.Valid || + !nullableBuildInputParams.Valid { + logutils.ZapLogger().Warn("some fields missing in entryData") + continue + } + + data.IsApproval = nullableIsApproval.Bool + data.Status = nullableStatus.String + data.Timestamp = nullableTimestamp.Int64 + + ret = append(ret, data) + } + + return ret, nil +} + +func dataToEntriesV2(deps FilterDependencies, data []*entryDataV2) ([]Entry, error) { + var ret []Entry + + now := time.Now().Unix() + + for _, d := range data { + chainID := wCommon.ChainID(d.Path.FromChain.ChainID) + + entry := Entry{ + payloadType: MultiTransactionPT, // Temporary, to keep compatibility with clients + id: d.TxArgs.MultiTransactionID, + transactions: []*transfer.TransactionIdentity{ + { + ChainID: chainID, + Hash: d.Tx.Hash(), + Address: d.RouteInputParams.AddrFrom, + }, + }, + timestamp: d.Timestamp, + activityType: getActivityTypeV2(d.Path.ProcessorName, d.IsApproval), + activityStatus: getActivityStatusV2(d.Status, d.Timestamp, now, getFinalizationPeriod(chainID)), + amountOut: d.Path.AmountIn, // Path and Activity have inverse perspective for amountIn and amountOut + amountIn: d.Path.AmountOut, // Path has the Tx perspective, Activity has the Account perspective + tokenOut: getToken(d.Path.FromToken, d.Path.ProcessorName), + tokenIn: getToken(d.Path.ToToken, d.Path.ProcessorName), + sender: &d.RouteInputParams.AddrFrom, + recipient: &d.RouteInputParams.AddrTo, + transferType: getTransferType(d.Path.FromToken, d.Path.ProcessorName), + //contractAddress: // TODO: Handle community contract deployment + //communityID: + } + + if d.Path.FromChain != nil { + chainID := wCommon.ChainID(d.Path.FromChain.ChainID) + entry.chainIDOut = &chainID + } + if d.Path.ToChain != nil { + chainID := wCommon.ChainID(d.Path.ToChain.ChainID) + entry.chainIDIn = &chainID + } + + entry.symbolOut, entry.symbolIn = lookupAndFillInTokens(deps, entry.tokenOut, entry.tokenIn) + + if entry.transferType == nil || TokenType(*entry.transferType) != Native { + interactedAddress := eth.BytesToAddress(d.Tx.To().Bytes()) + entry.interactedContractAddress = &interactedAddress + } + + if entry.activityType == ApproveAT { + entry.approvalSpender = d.Path.ApprovalContractAddress + } + + ret = append(ret, entry) + } + + return ret, nil +} + +func getActivityTypeV2(processorName string, isApproval bool) Type { + if isApproval { + return ApproveAT + } + + switch processorName { + case pathprocessor.ProcessorTransferName, pathprocessor.ProcessorERC721Name, pathprocessor.ProcessorERC1155Name: + return SendAT + case pathprocessor.ProcessorBridgeHopName, pathprocessor.ProcessorBridgeCelerName: + return BridgeAT + case pathprocessor.ProcessorSwapParaswapName: + return SwapAT + } + return UnknownAT +} + +func getActivityStatusV2(status transactions.TxStatus, timestamp int64, now int64, finalizationDuration int64) Status { + switch status { + case transactions.Pending: + return PendingAS + case transactions.Success: + if timestamp+finalizationDuration > now { + return FinalizedAS + } + return CompleteAS + case transactions.Failed: + return FailedAS + } + + logutils.ZapLogger().Error("unhandled transaction status value") + return FailedAS +} + +func getFinalizationPeriod(chainID wCommon.ChainID) int64 { + switch uint64(chainID) { + case wCommon.EthereumMainnet, wCommon.EthereumSepolia: + return L1FinalizationDuration + } + + return L2FinalizationDuration +} + +func getTransferType(fromToken *token.Token, processorName string) *TransferType { + ret := new(TransferType) + + switch processorName { + case pathprocessor.ProcessorTransferName: + if fromToken.IsNative() { + *ret = TransferTypeEth + break + } + *ret = TransferTypeErc20 + case pathprocessor.ProcessorERC721Name: + *ret = TransferTypeErc721 + case pathprocessor.ProcessorERC1155Name: + *ret = TransferTypeErc1155 + default: + ret = nil + } + + return ret +} + +func getToken(token *token.Token, processorName string) *Token { + if token == nil { + return nil + } + + ret := new(Token) + ret.ChainID = wCommon.ChainID(token.ChainID) + if token.IsNative() { + ret.TokenType = Native + } else { + ret.Address = token.Address + switch processorName { + case pathprocessor.ProcessorERC721Name, pathprocessor.ProcessorERC1155Name: + id, err := wCommon.GetTokenIdFromSymbol(token.Symbol) + if err != nil { + logutils.ZapLogger().Warn("malformed token symbol", zap.Error(err)) + return nil + } + ret.TokenID = (*hexutil.Big)(id) + if processorName == pathprocessor.ProcessorERC721Name { + ret.TokenType = Erc721 + } else { + ret.TokenType = Erc1155 + } + default: + ret.TokenType = Erc20 + } + } + return ret +} diff --git a/services/wallet/activity/filter.go b/services/wallet/activity/filter.go index 5c0aebe55..3af9483d9 100644 --- a/services/wallet/activity/filter.go +++ b/services/wallet/activity/filter.go @@ -4,6 +4,7 @@ import ( "context" "database/sql" "fmt" + "math" // used for embedding the sql query in the binary _ "embed" @@ -31,7 +32,7 @@ type Period struct { EndTimestamp int64 `json:"endTimestamp"` } -type Type int +type Type uint64 const ( SendAT Type = iota @@ -42,6 +43,7 @@ const ( ContractDeploymentAT MintAT ApproveAT + UnknownAT = math.MaxUint64 ) func allActivityTypesFilter() []Type { diff --git a/services/wallet/activity/service.go b/services/wallet/activity/service.go index ac0dbb6cd..ea73d7886 100644 --- a/services/wallet/activity/service.go +++ b/services/wallet/activity/service.go @@ -67,10 +67,11 @@ type Service struct { scheduler *async.MultiClientScheduler - sessions map[SessionID]*Session - lastSessionID atomic.Int32 - subscriptions event.Subscription - ch chan walletevent.Event + sessions map[SessionID]*Session + lastSessionID atomic.Int32 + subscriptions event.Subscription + subscriptionsCancelFn context.CancelFunc + ch chan walletevent.Event // sessionsRWMutex is used to protect all sessions related members sessionsRWMutex sync.RWMutex debounceDuration time.Duration diff --git a/services/wallet/activity/service_test.go b/services/wallet/activity/service_test.go index 9b9651a59..7f280616b 100644 --- a/services/wallet/activity/service_test.go +++ b/services/wallet/activity/service_test.go @@ -422,7 +422,7 @@ func TestService_IncrementalUpdateOnTop(t *testing.T) { allAddresses, pendings, ch, cleanup := setupTransactions(t, state, transactionCount, []transactions.TestTxSummary{{DontConfirm: true, Timestamp: transactionCount + 1}}) defer cleanup() - sessionID := state.service.StartFilterSession(allAddresses, allNetworksFilter(), Filter{}, 5) + sessionID := state.service.StartFilterSession(allAddresses, allNetworksFilter(), Filter{}, 5, V1) require.Greater(t, sessionID, SessionID(0)) defer state.service.StopFilterSession(sessionID) @@ -448,7 +448,7 @@ func TestService_IncrementalUpdateOnTop(t *testing.T) { newTx := payload.Activities[0] require.Equal(t, PendingTransactionPT, newTx.payloadType) // We don't keep type in the DB - require.Equal(t, (*int)(nil), newTx.transferType) + require.Equal(t, (*int64)(nil), newTx.transferType) require.Equal(t, SendAT, newTx.activityType) require.Equal(t, PendingAS, newTx.activityStatus) require.Equal(t, exp.ChainID, newTx.transaction.ChainID) @@ -497,7 +497,7 @@ func TestService_IncrementalUpdateMixed(t *testing.T) { ) defer cleanup() - sessionID := state.service.StartFilterSession(allAddresses, allNetworksFilter(), Filter{}, 5) + sessionID := state.service.StartFilterSession(allAddresses, allNetworksFilter(), Filter{}, 5, V1) require.Greater(t, sessionID, SessionID(0)) defer state.service.StopFilterSession(sessionID) @@ -544,7 +544,7 @@ func TestService_IncrementalUpdateFetchWindow(t *testing.T) { allAddresses, pendings, ch, cleanup := setupTransactions(t, state, transactionCount, []transactions.TestTxSummary{{DontConfirm: true, Timestamp: transactionCount + 1}}) defer cleanup() - sessionID := state.service.StartFilterSession(allAddresses, allNetworksFilter(), Filter{}, 2) + sessionID := state.service.StartFilterSession(allAddresses, allNetworksFilter(), Filter{}, 2, V1) require.Greater(t, sessionID, SessionID(0)) defer state.service.StopFilterSession(sessionID) @@ -593,7 +593,7 @@ func TestService_IncrementalUpdateFetchWindowNoReset(t *testing.T) { allAddresses, pendings, ch, cleanup := setupTransactions(t, state, transactionCount, []transactions.TestTxSummary{{DontConfirm: true, Timestamp: transactionCount + 1}}) defer cleanup() - sessionID := state.service.StartFilterSession(allAddresses, allNetworksFilter(), Filter{}, 2) + sessionID := state.service.StartFilterSession(allAddresses, allNetworksFilter(), Filter{}, 2, V1) require.Greater(t, sessionID, SessionID(0)) defer state.service.StopFilterSession(sessionID) @@ -640,7 +640,7 @@ func TestService_FilteredIncrementalUpdateResetAndClear(t *testing.T) { allAddresses = append(append(allAddresses, newFromTrs...), newToTrs...) // 1. User visualizes transactions for the first time - sessionID := state.service.StartFilterSession(allAddresses, allNetworksFilter(), Filter{}, 4) + sessionID := state.service.StartFilterSession(allAddresses, allNetworksFilter(), Filter{}, 4, V1) require.Greater(t, sessionID, SessionID(0)) defer state.service.StopFilterSession(sessionID) diff --git a/services/wallet/activity/session.go b/services/wallet/activity/session.go index b2c43d1d4..913417222 100644 --- a/services/wallet/activity/session.go +++ b/services/wallet/activity/session.go @@ -2,6 +2,7 @@ package activity import ( "context" + "encoding/json" "errors" "strconv" "time" @@ -14,6 +15,8 @@ import ( "github.com/status-im/status-go/logutils" "github.com/status-im/status-go/services/wallet/async" "github.com/status-im/status-go/services/wallet/common" + "github.com/status-im/status-go/services/wallet/responses" + "github.com/status-im/status-go/services/wallet/routeexecution" "github.com/status-im/status-go/services/wallet/transfer" "github.com/status-im/status-go/services/wallet/walletevent" "github.com/status-im/status-go/transactions" @@ -21,6 +24,22 @@ import ( const nilStr = "nil" +type Version string + +const ( + V1 Version = "v1" + V2 Version = "v2" +) + +type TransactionID struct { + ChainID common.ChainID + Hash eth.Hash +} + +func (t TransactionID) key() string { + return strconv.FormatUint(uint64(t.ChainID), 10) + t.Hash.Hex() +} + type EntryIdentity struct { payloadType PayloadType transaction *transfer.TransactionIdentity @@ -54,7 +73,8 @@ type SessionID int32 // 4. ResetFilterSession in case client receives SessionUpdate with HasNewOnTop = true to get the latest state // 5. StopFilterSession to stop the session when no used (user changed from activity screens or changed addresses and chains) type Session struct { - id SessionID + id SessionID + version Version // Filter info // @@ -84,16 +104,23 @@ type SessionUpdate struct { type fullFilterParams struct { sessionID SessionID + version Version addresses []eth.Address chainIDs []common.ChainID filter Filter } +func (s *Service) getActivityEntries(ctx context.Context, f fullFilterParams, offset int, count int) ([]Entry, error) { + allAddresses := s.areAllAddresses(f.addresses) + if f.version == V1 { + return getActivityEntries(ctx, s.getDeps(), f.addresses, allAddresses, f.chainIDs, f.filter, offset, count) + } + return getActivityEntriesV2(ctx, s.getDeps(), f.addresses, allAddresses, f.chainIDs, f.filter, offset, count) +} + func (s *Service) internalFilter(f fullFilterParams, offset int, count int, processResults func(entries []Entry) (offsetOverride int)) { s.scheduler.Enqueue(int32(f.sessionID), filterTask, func(ctx context.Context) (interface{}, error) { - allAddresses := s.areAllAddresses(f.addresses) - activities, err := getActivityEntries(ctx, s.getDeps(), f.addresses, allAddresses, f.chainIDs, f.filter, offset, count) - return activities, err + return s.getActivityEntries(ctx, f, offset, count) }, func(result interface{}, taskType async.TaskType, err error) { res := FilterResponse{ ErrorCode: ErrorCodeFailed, @@ -134,6 +161,7 @@ func (s *Service) internalFilterForSession(session *Session, firstPageCount int) s.internalFilter( fullFilterParams{ sessionID: session.id, + version: session.version, addresses: session.addresses, chainIDs: session.chainIDs, filter: session.filter, @@ -151,11 +179,12 @@ func (s *Service) internalFilterForSession(session *Session, firstPageCount int) ) } -func (s *Service) StartFilterSession(addresses []eth.Address, chainIDs []common.ChainID, filter Filter, firstPageCount int) SessionID { +func (s *Service) StartFilterSession(addresses []eth.Address, chainIDs []common.ChainID, filter Filter, firstPageCount int, version Version) SessionID { sessionID := s.nextSessionID() session := &Session{ - id: sessionID, + id: sessionID, + version: version, addresses: addresses, chainIDs: chainIDs, @@ -215,6 +244,7 @@ func (s *Service) UpdateFilterForSession(id SessionID, filter Filter, firstPageC s.internalFilter( fullFilterParams{ sessionID: session.id, + version: session.version, addresses: session.addresses, chainIDs: session.chainIDs, filter: session.filter, @@ -257,6 +287,7 @@ func (s *Service) ResetFilterSession(id SessionID, firstPageCount int) error { s.internalFilter( fullFilterParams{ sessionID: id, + version: session.version, addresses: session.addresses, chainIDs: session.chainIDs, filter: session.filter, @@ -301,6 +332,7 @@ func (s *Service) GetMoreForFilterSession(id SessionID, pageCount int) error { s.internalFilter( fullFilterParams{ sessionID: id, + version: session.version, addresses: session.addresses, chainIDs: session.chainIDs, filter: session.filter, @@ -331,42 +363,114 @@ func (s *Service) GetMoreForFilterSession(id SessionID, pageCount int) error { func (s *Service) subscribeToEvents() { s.ch = make(chan walletevent.Event, 100) s.subscriptions = s.eventFeed.Subscribe(s.ch) - go s.processEvents() + ctx, cancel := context.WithCancel(context.Background()) + s.subscriptionsCancelFn = cancel + go s.processEvents(ctx) } // processEvents runs only if more than one session is active -func (s *Service) processEvents() { +func (s *Service) processEvents(ctx context.Context) { defer gocommon.LogOnPanic() eventCount := 0 - lastUpdate := time.Now().UnixMilli() - for event := range s.ch { - if event.Type == transactions.EventPendingTransactionUpdate || - event.Type == transactions.EventPendingTransactionStatusChanged || - event.Type == transfer.EventNewTransfers { - eventCount++ + changedTxs := make([]TransactionID, 0) + newTxs := false + + var debounceTimer *time.Timer + debouncerCh := make(chan struct{}) + debounceProcessChangesFn := func() { + if debounceTimer == nil { + debounceTimer = time.AfterFunc(s.debounceDuration, func() { + debouncerCh <- struct{}{} + }) } - // debounce events updates - if eventCount > 0 && - (time.Duration(time.Now().UnixMilli()-lastUpdate)*time.Millisecond) >= s.debounceDuration { - s.detectNew(eventCount) - eventCount = 0 - lastUpdate = time.Now().UnixMilli() + } + + for { + select { + case event := <-s.ch: + switch event.Type { + case transactions.EventPendingTransactionUpdate: + eventCount++ + var payload transactions.PendingTxUpdatePayload + if err := json.Unmarshal([]byte(event.Message), &payload); err != nil { + logutils.ZapLogger().Error("Error unmarshalling PendingTxUpdatePayload", zap.Error(err)) + continue + } + changedTxs = append(changedTxs, TransactionID{ + ChainID: payload.ChainID, + Hash: payload.Hash, + }) + debounceProcessChangesFn() + case transactions.EventPendingTransactionStatusChanged: + eventCount++ + var payload transactions.StatusChangedPayload + if err := json.Unmarshal([]byte(event.Message), &payload); err != nil { + logutils.ZapLogger().Error("Error unmarshalling StatusChangedPayload", zap.Error(err)) + continue + } + changedTxs = append(changedTxs, TransactionID{ + ChainID: payload.ChainID, + Hash: payload.Hash, + }) + debounceProcessChangesFn() + case transfer.EventNewTransfers: + eventCount++ + // No updates here, these are detected with their final state, just trigger + // the detection of new entries + newTxs = true + debounceProcessChangesFn() + case routeexecution.EventRouteExecutionTransactionSent: + sentTxs, ok := event.EventParams.(*responses.RouterSentTransactions) + if ok && sentTxs != nil { + for _, tx := range sentTxs.SentTransactions { + changedTxs = append(changedTxs, TransactionID{ + ChainID: common.ChainID(tx.FromChain), + Hash: eth.Hash(tx.Hash), + }) + } + } + debounceProcessChangesFn() + } + case <-debouncerCh: + if eventCount > 0 || newTxs || len(changedTxs) > 0 { + s.processChanges(eventCount, changedTxs) + eventCount = 0 + newTxs = false + changedTxs = nil + debounceTimer = nil + } + case <-ctx.Done(): + return } } } -func (s *Service) detectNew(changeCount int) { +func (s *Service) processChanges(eventCount int, changedTxs []TransactionID) { for sessionID := range s.sessions { session := s.sessions[sessionID] - fetchLen := len(session.model) + changeCount - allAddresses := s.areAllAddresses(session.addresses) - activities, err := getActivityEntries(context.Background(), s.getDeps(), session.addresses, allAddresses, session.chainIDs, session.filter, 0, fetchLen) + f := fullFilterParams{ + sessionID: session.id, + version: session.version, + addresses: session.addresses, + chainIDs: session.chainIDs, + filter: session.filter, + } + + limit := NoLimit + if session.version == V1 { + limit = len(session.model) + eventCount + } + activities, err := s.getActivityEntries(context.Background(), f, 0, limit) if err != nil { logutils.ZapLogger().Error("Error getting activity entries", zap.Error(err)) continue } + if session.version != V1 { + s.processEntryDataUpdates(sessionID, activities, changedTxs) + } + s.sessionsRWMutex.RLock() allData := append(session.new, session.model...) new, _ /*removed*/ := findUpdates(allData, activities) @@ -414,6 +518,55 @@ func (s *Service) detectNew(changeCount int) { } } +func (s *Service) processEntryDataUpdates(sessionID SessionID, entries []Entry, changedTxs []TransactionID) { + updateData := make([]*EntryData, 0, len(changedTxs)) + + entriesMap := make(map[string]Entry, len(entries)) + for _, e := range entries { + if e.payloadType == MultiTransactionPT { + if e.id != common.NoMultiTransactionID { + for _, tx := range e.transactions { + id := TransactionID{ + ChainID: tx.ChainID, + Hash: tx.Hash, + } + entriesMap[id.key()] = e + } + } + } else if e.transaction != nil { + id := TransactionID{ + ChainID: e.transaction.ChainID, + Hash: e.transaction.Hash, + } + entriesMap[id.key()] = e + } + } + + for _, tx := range changedTxs { + e, found := entriesMap[tx.key()] + if !found { + continue + } + + data := &EntryData{ + ActivityStatus: &e.activityStatus, + } + if e.payloadType == MultiTransactionPT { + data.ID = common.NewAndSet(e.id) + } else { + data.Transaction = e.transaction + } + data.PayloadType = e.payloadType + + updateData = append(updateData, data) + } + + if len(updateData) > 0 { + requestID := int32(sessionID) + sendResponseEvent(s.eventFeed, &requestID, EventActivityFilteringUpdate, updateData, nil) + } +} + func notify(eventFeed *event.Feed, id SessionID, hasNewOnTop bool, mixed []*EntryUpdate) { defer gocommon.LogOnPanic() payload := SessionUpdate{ @@ -429,6 +582,8 @@ func notify(eventFeed *event.Feed, id SessionID, hasNewOnTop bool, mixed []*Entr // unsubscribeFromEvents should be called with sessionsRWMutex locked for writing func (s *Service) unsubscribeFromEvents() { + s.subscriptionsCancelFn() + s.subscriptionsCancelFn = nil s.subscriptions.Unsubscribe() close(s.ch) s.ch = nil diff --git a/services/wallet/api.go b/services/wallet/api.go index 03c822c22..087b65268 100644 --- a/services/wallet/api.go +++ b/services/wallet/api.go @@ -809,7 +809,17 @@ func (api *API) StartActivityFilterSession(addresses []common.Address, chainIDs zap.Int("firstPageCount", firstPageCount), ) - return api.s.activity.StartFilterSession(addresses, chainIDs, filter, firstPageCount), nil + return api.s.activity.StartFilterSession(addresses, chainIDs, filter, firstPageCount, activity.V1), nil +} + +func (api *API) StartActivityFilterSessionV2(addresses []common.Address, chainIDs []wcommon.ChainID, filter activity.Filter, firstPageCount int) (activity.SessionID, error) { + logutils.ZapLogger().Debug("wallet.api.StartActivityFilterSessionV2", + zap.Int("addr.count", len(addresses)), + zap.Int("chainIDs.count", len(chainIDs)), + zap.Int("firstPageCount", firstPageCount), + ) + + return api.s.activity.StartFilterSession(addresses, chainIDs, filter, firstPageCount, activity.V2), nil } func (api *API) UpdateActivityFilterForSession(sessionID activity.SessionID, filter activity.Filter, firstPageCount int) error { diff --git a/services/wallet/routeexecution/manager.go b/services/wallet/routeexecution/manager.go index 6241bec5d..186eb7ff6 100644 --- a/services/wallet/routeexecution/manager.go +++ b/services/wallet/routeexecution/manager.go @@ -8,6 +8,7 @@ import ( "go.uber.org/zap" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" "github.com/status-im/status-go/eth-node/types" @@ -21,25 +22,32 @@ import ( "github.com/status-im/status-go/services/wallet/router/pathprocessor" "github.com/status-im/status-go/services/wallet/router/sendtype" "github.com/status-im/status-go/services/wallet/transfer" + "github.com/status-im/status-go/services/wallet/walletevent" "github.com/status-im/status-go/signal" ) +const ( + EventRouteExecutionTransactionSent walletevent.EventType = walletevent.InternalEventTypePrefix + "wallet-route-execution-transaction-sent" +) + type Manager struct { router *router.Router transactionManager *transfer.TransactionManager transferController *transfer.Controller db *DB + eventFeed *event.Feed // Local data used for storage purposes buildInputParams *requests.RouterBuildTransactionsParams } -func NewManager(walletDB *sql.DB, router *router.Router, transactionManager *transfer.TransactionManager, transferController *transfer.Controller) *Manager { +func NewManager(walletDB *sql.DB, eventFeed *event.Feed, router *router.Router, transactionManager *transfer.TransactionManager, transferController *transfer.Controller) *Manager { return &Manager{ router: router, transactionManager: transactionManager, transferController: transferController, db: NewDB(walletDB), + eventFeed: eventFeed, } } @@ -133,6 +141,12 @@ func (m *Manager) SendRouterTransactionsWithSignatures(ctx context.Context, send response.SendDetails.ErrorResponse = err.(*statusErrors.ErrorResponse) } signal.SendWalletEvent(signal.RouterTransactionsSent, response) + + event := walletevent.Event{ + Type: EventRouteExecutionTransactionSent, + EventParams: response, + } + m.eventFeed.Send(event) }() _, routeInputParams = m.router.GetBestRouteAndAssociatedInputParams() diff --git a/services/wallet/service.go b/services/wallet/service.go index 18582ac78..21c1de19c 100644 --- a/services/wallet/service.go +++ b/services/wallet/service.go @@ -196,7 +196,7 @@ func NewService( router.AddPathProcessor(processor) } - routeExecutionManager := routeexecution.NewManager(db, router, transactionManager, transferController) + routeExecutionManager := routeexecution.NewManager(db, feed, router, transactionManager, transferController) return &Service{ db: db, diff --git a/services/wallet/transfer/transaction_manager_route.go b/services/wallet/transfer/transaction_manager_route.go index 321a5a5ad..c0442061a 100644 --- a/services/wallet/transfer/transaction_manager_route.go +++ b/services/wallet/transfer/transaction_manager_route.go @@ -347,6 +347,8 @@ func addSignatureAndSendTransaction( return nil, err } + txData.TxArgs.MultiTransactionID = multiTransactionID + return responses.NewRouterSentTransaction(txData.TxArgs, txData.SentHash, isApproval), nil } diff --git a/tests-functional/clients/signals.py b/tests-functional/clients/signals.py index 28b670af4..d6a5d4c6b 100644 --- a/tests-functional/clients/signals.py +++ b/tests-functional/clients/signals.py @@ -12,23 +12,58 @@ class SignalClient: self.await_signals = await_signals self.received_signals = { - signal: [] for signal in self.await_signals + # For each signal type, store: + # - list of received signals + # - expected received event delta count (resets to 1 after each wait_for_event call) + # - expected received event count + # - a function that takes the received signal as an argument and returns True if the signal is accepted (counted) or discarded + signal: { + "received": [], + "delta_count": 1, + "expected_count": 1, + "accept_fn": None + } for signal in self.await_signals } def on_message(self, ws, signal): signal = json.loads(signal) - if signal.get("type") in self.await_signals: - self.received_signals[signal["type"]].append(signal) + signal_type = signal.get("type") + if signal_type in self.await_signals: + accept_fn = self.received_signals[signal_type]["accept_fn"] + if not accept_fn or accept_fn(signal): + self.received_signals[signal_type]["received"].append(signal) + + def check_signal_type(self, signal_type): + if signal_type not in self.await_signals: + raise ValueError(f"Signal type {signal_type} is not in the list of awaited signals") + + # Used to set up how many instances of a signal to wait for, before triggering the actions + # that cause them to be emitted. + def prepare_wait_for_signal(self, signal_type, delta_count, accept_fn=None): + self.check_signal_type(signal_type) + + if delta_count < 1: + raise ValueError("delta_count must be greater than 0") + self.received_signals[signal_type]["delta_count"] = delta_count + self.received_signals[signal_type]["expected_count"] = len(self.received_signals[signal_type]["received"]) + delta_count + self.received_signals[signal_type]["accept_fn"] = accept_fn def wait_for_signal(self, signal_type, timeout=20): + self.check_signal_type(signal_type) + start_time = time.time() - while not self.received_signals.get(signal_type): + received_signals = self.received_signals.get(signal_type) + while (not received_signals) or len(received_signals["received"]) < received_signals["expected_count"]: if time.time() - start_time >= timeout: raise TimeoutError( f"Signal {signal_type} is not received in {timeout} seconds") time.sleep(0.2) logging.debug(f"Signal {signal_type} is received in {round(time.time() - start_time)} seconds") - return self.received_signals[signal_type][0] + delta_count = received_signals["delta_count"] + self.prepare_wait_for_signal(signal_type, 1) + if delta_count == 1: + return self.received_signals[signal_type]["received"][-1] + return self.received_signals[signal_type]["received"][-delta_count:] def _on_error(self, ws, error): logging.error(f"Error: {error}") diff --git a/tests-functional/tests/test_wallet_activity_session.py b/tests-functional/tests/test_wallet_activity_session.py new file mode 100644 index 000000000..a99a6bdec --- /dev/null +++ b/tests-functional/tests/test_wallet_activity_session.py @@ -0,0 +1,90 @@ +import json +import random + +import pytest + +from constants import user_1 +from test_cases import SignalTestCase + +import wallet_utils + +import logging + +EventActivityFilteringDone = "wallet-activity-filtering-done" +EventActivityFilteringUpdate = "wallet-activity-filtering-entries-updated" +EventActivitySessionUpdated = "wallet-activity-session-updated" + +def validate_entry(entry, tx_data): + assert entry["transactions"][0]["chainId"] == tx_data["tx_status"]["chainId"] + assert entry["transactions"][0]["hash"] == tx_data["tx_status"]["hash"] + +@pytest.mark.wallet +@pytest.mark.rpc +class TestWalletActivitySession(SignalTestCase): + await_signals = ["wallet", + "wallet.suggested.routes", + "wallet.router.sign-transactions", + "wallet.router.sending-transactions-started", + "wallet.transaction.status-changed", + "wallet.router.transactions-sent"] + + def setup_method(self): + super().setup_method() + self.request_id = str(random.randint(1, 8888)) + + def test_wallet_start_activity_filter_session(self): + tx_data = [] # (routes, build_tx, tx_signatures, tx_status) + # Set up a transactions for account before starting session + tx_data.append(wallet_utils.send_router_transaction(self.rpc_client, self.signal_client)) + + # Start activity session + method = "wallet_startActivityFilterSessionV2" + params = [[user_1.address], [self.network_id], + {"period": {"startTimestamp": 0, "endTimestamp": 0}, "types": [], "statuses": [], + "counterpartyAddresses": [], "assets": [], "collectibles": [], "filterOutAssets": False, + "filterOutCollectibles": False}, 10] + self.signal_client.prepare_wait_for_signal("wallet", 1, lambda signal : signal["event"]["type"] == EventActivityFilteringDone) + response = self.rpc_client.rpc_valid_request(method, params, self.request_id) + event_response = self.signal_client.wait_for_signal("wallet", timeout=10)['event'] + + # Check response + sessionID = int(response.json()["result"]) + assert sessionID > 0 + + # Check response event + assert int(event_response['requestId']) == sessionID + message = json.loads(event_response['message'].replace("'", "\"")) + assert int(message['errorCode']) == 1 + assert len(message['activities']) > 0 # Should have at least 1 entry + # First activity entry should match last sent transaction + validate_entry(message['activities'][0], tx_data[-1]) + + # Trigger new transaction + self.signal_client.prepare_wait_for_signal("wallet", 1, lambda signal : signal["event"]["type"] == EventActivitySessionUpdated and signal['event']['requestId'] == sessionID) + tx_data.append(wallet_utils.send_router_transaction(self.rpc_client, self.signal_client)) + print(tx_data[-1]) + event_response = self.signal_client.wait_for_signal("wallet", timeout=10)['event'] + + # Check response event + assert int(event_response['requestId']) == sessionID + message = json.loads(event_response['message'].replace("'", "\"")) + assert message['hasNewOnTop'] # New entries reported + + # Reset activity session + method = "wallet_resetActivityFilterSession" + params = [sessionID, 10] + self.signal_client.prepare_wait_for_signal("wallet", 1, lambda signal : signal["event"]["type"] == EventActivityFilteringDone and signal['event']['requestId'] == sessionID) + response = self.rpc_client.rpc_valid_request(method, params, self.request_id) + event_response = self.signal_client.wait_for_signal("wallet", timeout=10)['event'] + + # Check response event + assert int(event_response['requestId']) == sessionID + message = json.loads(event_response['message'].replace("'", "\"")) + assert int(message['errorCode']) == 1 + assert len(message['activities']) > 1 # Should have at least 2 entries + + # First activity entry should match last sent transaction + validate_entry(message['activities'][0], tx_data[-1]) + + # Second activity entry should match second to last sent transaction + validate_entry(message['activities'][1], tx_data[-2]) diff --git a/tests-functional/wallet_utils.py b/tests-functional/wallet_utils.py new file mode 100644 index 000000000..d61e8b1bc --- /dev/null +++ b/tests-functional/wallet_utils.py @@ -0,0 +1,131 @@ +import json +import logging +import jsonschema +import uuid +import threading +import time + +from conftest import option +from constants import user_1, user_2 + +from clients.signals import SignalClient + +def verify_json_schema(response, method): + with open(f"{option.base_dir}/schemas/{method}", "r") as schema: + jsonschema.validate(instance=response, + schema=json.load(schema)) + +def get_suggested_routes(rpc_client, signal_client, **kwargs): + _uuid = str(uuid.uuid4()) + amount_in = "0xde0b6b3a7640000" + + method = "wallet_getSuggestedRoutesAsync" + input_params = { + "uuid": _uuid, + "sendType": 0, + "addrFrom": user_1.address, + "addrTo": user_2.address, + "amountIn": amount_in, + "amountOut": "0x0", + "tokenID": "ETH", + "tokenIDIsOwnerToken": False, + "toTokenID": "", + "disabledFromChainIDs": [10, 42161], + "disabledToChainIDs": [10, 42161], + "gasFeeMode": 1, + "fromLockedAmount": {} + } + for key, new_value in kwargs.items(): + if key in input_params: + input_params[key] = new_value + else: + logging.info( + f"Warning: The key '{key}' does not exist in the input_params parameters and will be ignored.") + params = [input_params] + + signal_client.prepare_wait_for_signal("wallet.suggested.routes", 1) + _ = rpc_client.rpc_valid_request(method, params) + + routes = signal_client.wait_for_signal("wallet.suggested.routes") + assert routes['event']['Uuid'] == _uuid + + return routes['event'] + +def build_transactions_from_route(rpc_client, signal_client, uuid, **kwargs): + method = "wallet_buildTransactionsFromRoute" + build_tx_params = { + "uuid": uuid, + "slippagePercentage": 0 + } + for key, new_value in kwargs.items(): + if key in build_tx_params: + build_tx_params[key] = new_value + else: + logging.info( + f"Warning: The key '{key}' does not exist in the build_tx_params parameters and will be ignored.") + params = [build_tx_params] + _ = rpc_client.rpc_valid_request(method, params) + + wallet_router_sign_transactions = signal_client.wait_for_signal("wallet.router.sign-transactions") + + assert wallet_router_sign_transactions['event']['signingDetails']['signOnKeycard'] == False + transaction_hashes = wallet_router_sign_transactions['event']['signingDetails']['hashes'] + + assert transaction_hashes, "Transaction hashes are empty!" + + return wallet_router_sign_transactions['event'] + +def sign_messages(rpc_client, hashes): + tx_signatures = {} + + for hash in hashes: + + method = "wallet_signMessage" + params = [ + hash, + user_1.address, + option.password + ] + + response = rpc_client.rpc_valid_request(method, params) + + if response.json()["result"].startswith("0x"): + tx_signature = response.json()["result"][2:] + + signature = { + "r": tx_signature[:64], + "s": tx_signature[64:128], + "v": tx_signature[128:] + } + + tx_signatures[hash] = signature + return tx_signatures + +def send_router_transactions_with_signatures(rpc_client, signal_client, uuid, tx_signatures): + method = "wallet_sendRouterTransactionsWithSignatures" + params = [ + { + "uuid": uuid, + "Signatures": tx_signatures + } + ] + _ = rpc_client.rpc_valid_request(method, params) + + tx_status = signal_client.wait_for_signal( + "wallet.transaction.status-changed") + + assert tx_status["event"]["status"] == "Success" + + return tx_status["event"] + +def send_router_transaction(rpc_client, signal_client, **kwargs): + routes = get_suggested_routes(rpc_client, signal_client, **kwargs) + build_tx = build_transactions_from_route(rpc_client, signal_client, routes['Uuid']) + tx_signatures = sign_messages(rpc_client, build_tx['signingDetails']['hashes']) + tx_status = send_router_transactions_with_signatures(rpc_client, signal_client, routes['Uuid'], tx_signatures) + return { + "routes": routes, + "build_tx": build_tx, + "tx_signatures": tx_signatures, + "tx_status": tx_status + } diff --git a/walletdatabase/migrations/sql/1730807123_add_from_to_address_virtual_columns_to_route_input_data.up.sql b/walletdatabase/migrations/sql/1730807123_add_from_to_address_virtual_columns_to_route_input_data.up.sql new file mode 100644 index 000000000..60282ef68 --- /dev/null +++ b/walletdatabase/migrations/sql/1730807123_add_from_to_address_virtual_columns_to_route_input_data.up.sql @@ -0,0 +1,5 @@ +ALTER TABLE route_input_parameters ADD COLUMN from_address BLOB NOT NULL AS (unhex(substr(json_extract(route_input_params_json, '$.addrFrom'),3))); +ALTER TABLE route_input_parameters ADD COLUMN to_address BLOB NOT NULL AS (unhex(substr(json_extract(route_input_params_json, '$.addrTo'),3))); + +CREATE INDEX IF NOT EXISTS idx_route_input_parameters_per_from_address ON route_input_parameters (from_address); +CREATE INDEX IF NOT EXISTS idx_route_input_parameters_per_to_address ON route_input_parameters (to_address);