diff --git a/multiaccounts/accounts/testutils.go b/multiaccounts/accounts/testutils.go deleted file mode 100644 index 165ff62ee..000000000 --- a/multiaccounts/accounts/testutils.go +++ /dev/null @@ -1,15 +0,0 @@ -package accounts - -import ( - "testing" - - "github.com/stretchr/testify/require" -) - -func MockTestAccounts(tb testing.TB, d *Database, accounts []*Account) { - err := d.SaveOrUpdateAccounts(accounts, false) - require.NoError(tb, err) - res, err := d.GetActiveAccounts() - require.NoError(tb, err) - require.Equal(tb, accounts[0].Address, res[0].Address) -} diff --git a/services/wallet/activity/activity.go b/services/wallet/activity/activity.go index deb45c030..729e3647d 100644 --- a/services/wallet/activity/activity.go +++ b/services/wallet/activity/activity.go @@ -18,7 +18,6 @@ import ( "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/log" - "github.com/status-im/status-go/multiaccounts/accounts" "github.com/status-im/status-go/services/wallet/bigint" "github.com/status-im/status-go/services/wallet/common" "github.com/status-im/status-go/services/wallet/thirdparty" @@ -37,8 +36,6 @@ const ( PendingTransactionPT ) -const keypairAccountsTable = "keypairs_accounts" - var ( ZeroAddress = eth.Address{} ) @@ -322,8 +319,7 @@ const ( var queryFormatString string type FilterDependencies struct { - db *sql.DB - accountsDb *accounts.Database + db *sql.DB // use token.TokenType, token.ChainID and token.Address to find the available symbol tokenSymbol func(token Token) string // use the chainID and symbol to look up token.TokenType and token.Address. Return nil if not found @@ -333,10 +329,18 @@ type FilterDependencies struct { // getActivityEntries queries the transfers, pending_transactions, and multi_transactions tables based on filter parameters and arguments // it returns metadata for all entries ordered by timestamp column // +// addresses are mandatory and used to detect activity types SendAT and ReceiveAT for transfers entries +// +// allAddresses optimization indicates if the passed addresses include all the owners in the wallet DB +// // Adding a no-limit option was never considered or required. // // TODO: optimization: consider implementing nullable []byte instead of using strings for addresses or insert binary (X'...' syntax) directly into the query -func getActivityEntries(ctx context.Context, deps FilterDependencies, addresses []eth.Address, chainIDs []common.ChainID, filter Filter, offset int, limit int) ([]Entry, error) { +func getActivityEntries(ctx context.Context, deps FilterDependencies, addresses []eth.Address, allAddresses bool, chainIDs []common.ChainID, filter Filter, offset int, limit int) ([]Entry, error) { + if len(addresses) == 0 { + return nil, errors.New("no addresses provided") + } + includeAllTokenTypeAssets := len(filter.Assets) == 0 && !filter.FilterOutAssets // Used for symbol bearing tables multi_transactions and pending_transactions @@ -378,7 +382,6 @@ func getActivityEntries(ctx context.Context, deps FilterDependencies, addresses startFilterDisabled := !(filter.Period.StartTimestamp > 0) endFilterDisabled := !(filter.Period.EndTimestamp > 0) filterActivityTypeAll := len(filter.Types) == 0 - filterAllAddresses := len(addresses) == 0 filterAllToAddresses := len(filter.CounterpartyAddresses) == 0 includeAllStatuses := len(filter.Statuses) == 0 @@ -393,10 +396,7 @@ func getActivityEntries(ctx context.Context, deps FilterDependencies, addresses filterStatusFinalized = sliceContains(filter.Statuses, FinalizedAS) } - involvedAddresses := noEntriesInTmpTableSQLValues - if !filterAllAddresses { - involvedAddresses = joinAddresses(addresses) - } + involvedAddresses := joinAddresses(addresses) toAddresses := noEntriesInTmpTableSQLValues if !filterAllToAddresses { toAddresses = joinAddresses(filter.CounterpartyAddresses) @@ -407,23 +407,24 @@ func getActivityEntries(ctx context.Context, deps FilterDependencies, addresses return strconv.Itoa(int(t)) }) - // Since the filter query needs addresses which are in a different database, we need to update the - // keypairs_accounts table in the current database with the latest addresses from the accounts database - err := updateKeypairsAccountsTable(deps.accountsDb, deps.db) + queryString := fmt.Sprintf(queryFormatString, involvedAddresses, toAddresses, assetsTokenCodes, assetsERC20, networks, + joinedMTTypes) + + // The duplicated temporary table UNION with CTE acts as an optimization + // As soon as we use filter_addresses CTE or filter_addresses_table temp table + // or switch them alternatively for JOIN or IN clauses the performance drops significantly + _, err := deps.db.Exec(fmt.Sprintf("DROP TABLE IF EXISTS filter_addresses_table; CREATE TEMP TABLE filter_addresses_table (address VARCHAR PRIMARY KEY); INSERT OR IGNORE INTO filter_addresses_table (address) VALUES %s;\n", involvedAddresses)) if err != nil { return nil, err } - queryString := fmt.Sprintf(queryFormatString, keypairAccountsTable, involvedAddresses, toAddresses, assetsTokenCodes, assetsERC20, networks, - joinedMTTypes) - rows, err := deps.db.QueryContext(ctx, queryString, startFilterDisabled, filter.Period.StartTimestamp, endFilterDisabled, filter.Period.EndTimestamp, filterActivityTypeAll, sliceContains(filter.Types, SendAT), sliceContains(filter.Types, ReceiveAT), sliceContains(filter.Types, ContractDeploymentAT), sliceContains(filter.Types, MintAT), transfer.MultiTransactionSend, fromTrType, toTrType, - filterAllAddresses, filterAllToAddresses, + allAddresses, filterAllToAddresses, includeAllStatuses, filterStatusCompleted, filterStatusFailed, filterStatusFinalized, filterStatusPending, FailedAS, CompleteAS, PendingAS, includeAllTokenTypeAssets, @@ -689,48 +690,6 @@ func contractTypeFromDBType(dbType string) (transferType *TransferType) { return transferType } -func updateKeypairsAccountsTable(accountsDb *accounts.Database, db *sql.DB) error { - _, err := db.Exec(fmt.Sprintf("CREATE TEMP TABLE IF NOT EXISTS %s (address VARCHAR PRIMARY KEY)", - keypairAccountsTable)) - if err != nil { - log.Error("failed to create 'keypairs_accounts' table", "err", err) - return err - } - - // TODO: remove dependency on accounts table by removing"all accounts filter" optimization; see #11980 - if accountsDb == nil { - return nil - } - addresses, err := accountsDb.GetWalletAddresses() - if err != nil { - log.Error("failed to get wallet addresses", "err", err) - return err - } - - tx, err := db.Begin() - if err != nil { - return err - } - - defer func() { - if err == nil { - err = tx.Commit() - return - } - _ = tx.Rollback() - }() - - for _, address := range addresses { - _, err = tx.Exec(fmt.Sprintf("INSERT OR IGNORE INTO %s (address) VALUES (?)", keypairAccountsTable), address) - if err != nil { - log.Error("failed to insert wallet addresses", "err", err) - return err - } - } - - return nil -} - // lookupAndFillInTokens ignores NFTs func lookupAndFillInTokens(deps FilterDependencies, tokenOut *Token, tokenIn *Token) (symbolOut *string, symbolIn *string) { if tokenOut != nil && tokenOut.TokenID == nil { diff --git a/services/wallet/activity/activity_test.go b/services/wallet/activity/activity_test.go index ea201712a..186d96685 100644 --- a/services/wallet/activity/activity_test.go +++ b/services/wallet/activity/activity_test.go @@ -3,13 +3,9 @@ package activity import ( "context" "database/sql" - "fmt" "math/big" "testing" - "github.com/status-im/status-go/appdatabase" - "github.com/status-im/status-go/eth-node/types" - "github.com/status-im/status-go/multiaccounts/accounts" "github.com/status-im/status-go/services/wallet/common" "github.com/status-im/status-go/services/wallet/testutils" "github.com/status-im/status-go/services/wallet/transfer" @@ -40,28 +36,20 @@ func tokenFromSymbol(chainID *common.ChainID, symbol string) *Token { } func setupTestActivityDBStorageChoice(tb testing.TB, inMemory bool) (deps FilterDependencies, close func()) { - var db, appDb *sql.DB + var db *sql.DB var err error cleanupDB := func() error { return nil } cleanupWalletDB := func() error { return nil } if inMemory { db, err = helpers.SetupTestMemorySQLDB(walletdatabase.DbInitializer{}) require.NoError(tb, err) - appDb, err = helpers.SetupTestMemorySQLDB(appdatabase.DbInitializer{}) - require.NoError(tb, err) } else { db, cleanupWalletDB, err = helpers.SetupTestSQLDB(walletdatabase.DbInitializer{}, "wallet-activity-tests") require.NoError(tb, err) - appDb, cleanupDB, err = helpers.SetupTestSQLDB(appdatabase.DbInitializer{}, "wallet-activity-tests") - require.NoError(tb, err) } - accountsDb, err := accounts.NewDB(appDb) - require.NoError(tb, err) - deps = FilterDependencies{ - db: db, - accountsDb: accountsDb, + db: db, tokenSymbol: func(token Token) string { switch token.TokenType { case Native: @@ -112,17 +100,6 @@ type testData struct { nextIndex int } -func mockTestAccountsWithAddresses(tb testing.TB, db *accounts.Database, addresses []eth.Address) { - mockedAccounts := []*accounts.Account{} - for _, address := range addresses { - mockedAccounts = append(mockedAccounts, &accounts.Account{ - Address: types.Address(address), - Type: accounts.AccountTypeWatch, - }) - } - accounts.MockTestAccounts(tb, db, mockedAccounts) -} - // Generates and adds to the DB 7 transfers and 2 multitransactions. // There are only 4 extractable activity entries (transactions + multi-transactions) with timestamps 1-4. The others are associated with a multi-transaction func fillTestData(t *testing.T, db *sql.DB) (td testData, fromAddresses, toAddresses []eth.Address) { @@ -203,7 +180,7 @@ func TestGetActivityEntriesAll(t *testing.T) { td, fromAddresses, toAddresses := fillTestData(t, deps.db) var filter Filter - entries, err := getActivityEntries(context.Background(), deps, append(toAddresses, fromAddresses...), []common.ChainID{}, filter, 0, 10) + entries, err := getActivityEntries(context.Background(), deps, append(toAddresses, fromAddresses...), true, []common.ChainID{}, filter, 0, 10) require.NoError(t, err) require.Equal(t, 4, len(entries)) @@ -295,9 +272,7 @@ func TestGetActivityEntriesWithSameTransactionForSenderAndReceiverInDB(t *testin defer close() // Add 4 extractable transactions with timestamps 1-4 - td, fromAddresses, toAddresses := fillTestData(t, deps.db) - - mockTestAccountsWithAddresses(t, deps.accountsDb, append(fromAddresses, toAddresses...)) + td, _, _ := fillTestData(t, deps.db) // Add another transaction with owner reversed senderTr := td.tr1 @@ -307,7 +282,7 @@ func TestGetActivityEntriesWithSameTransactionForSenderAndReceiverInDB(t *testin transfer.InsertTestTransfer(t, deps.db, senderTr.From, &senderTr) var filter Filter - entries, err := getActivityEntries(context.Background(), deps, []eth.Address{td.tr1.To, senderTr.From}, []common.ChainID{}, filter, 0, 10) + entries, err := getActivityEntries(context.Background(), deps, []eth.Address{td.tr1.To, senderTr.From}, false, []common.ChainID{}, filter, 0, 10) require.NoError(t, err) require.Equal(t, 2, len(entries)) @@ -335,13 +310,13 @@ func TestGetActivityEntriesFilterByTime(t *testing.T) { transfer.InsertTestTransfer(t, deps.db, trs[i].To, &trs[i]) } - mockTestAccountsWithAddresses(t, deps.accountsDb, append(append(append(fromTds, toTds...), fromTrs...), toTrs...)) + allAddresses := append(append(append(fromTds, toTds...), fromTrs...), toTrs...) // Test start only var filter Filter filter.Period.StartTimestamp = td.multiTx1.Timestamp filter.Period.EndTimestamp = NoLimitTimestampForPeriod - entries, err := getActivityEntries(context.Background(), deps, []eth.Address{}, []common.ChainID{}, filter, 0, 15) + entries, err := getActivityEntries(context.Background(), deps, allAddresses, true, []common.ChainID{}, filter, 0, 15) require.NoError(t, err) require.Equal(t, 8, len(entries)) @@ -387,7 +362,7 @@ func TestGetActivityEntriesFilterByTime(t *testing.T) { // Test complete interval filter.Period.EndTimestamp = trs[2].Timestamp - entries, err = getActivityEntries(context.Background(), deps, []eth.Address{}, []common.ChainID{}, filter, 0, 15) + entries, err = getActivityEntries(context.Background(), deps, allAddresses, true, []common.ChainID{}, filter, 0, 15) require.NoError(t, err) require.Equal(t, 5, len(entries)) @@ -433,7 +408,7 @@ func TestGetActivityEntriesFilterByTime(t *testing.T) { // Test end only filter.Period.StartTimestamp = NoLimitTimestampForPeriod - entries, err = getActivityEntries(context.Background(), deps, []eth.Address{}, []common.ChainID{}, filter, 0, 15) + entries, err = getActivityEntries(context.Background(), deps, allAddresses, true, []common.ChainID{}, filter, 0, 15) require.NoError(t, err) require.Equal(t, 7, len(entries)) // Check start and end content @@ -487,18 +462,18 @@ func TestGetActivityEntriesCheckOffsetAndLimit(t *testing.T) { transfer.InsertTestTransfer(t, deps.db, trs[i].To, &trs[i]) } - mockTestAccountsWithAddresses(t, deps.accountsDb, append(fromTrs, toTrs...)) + allAddresses := append(fromTrs, toTrs...) var filter Filter // Get all - entries, err := getActivityEntries(context.Background(), deps, []eth.Address{}, []common.ChainID{}, filter, 0, 5) + entries, err := getActivityEntries(context.Background(), deps, allAddresses, true, []common.ChainID{}, filter, 0, 5) require.NoError(t, err) require.Equal(t, 5, len(entries)) // Get time based interval filter.Period.StartTimestamp = trs[2].Timestamp filter.Period.EndTimestamp = trs[8].Timestamp - entries, err = getActivityEntries(context.Background(), deps, []eth.Address{}, []common.ChainID{}, filter, 0, 3) + entries, err = getActivityEntries(context.Background(), deps, allAddresses, true, []common.ChainID{}, filter, 0, 3) require.NoError(t, err) require.Equal(t, 3, len(entries)) // Check start and end content @@ -542,7 +517,7 @@ func TestGetActivityEntriesCheckOffsetAndLimit(t *testing.T) { }, entries[2]) // Move window 2 entries forward - entries, err = getActivityEntries(context.Background(), deps, []eth.Address{}, []common.ChainID{}, filter, 2, 3) + entries, err = getActivityEntries(context.Background(), deps, allAddresses, true, []common.ChainID{}, filter, 2, 3) require.NoError(t, err) require.Equal(t, 3, len(entries)) // Check start and end content @@ -586,7 +561,7 @@ func TestGetActivityEntriesCheckOffsetAndLimit(t *testing.T) { }, entries[2]) // Move window 4 more entries to test filter cap - entries, err = getActivityEntries(context.Background(), deps, []eth.Address{}, []common.ChainID{}, filter, 6, 3) + entries, err = getActivityEntries(context.Background(), deps, allAddresses, true, []common.ChainID{}, filter, 6, 3) require.NoError(t, err) require.Equal(t, 1, len(entries)) // Check start and end content @@ -638,16 +613,18 @@ func TestGetActivityEntriesFilterByType(t *testing.T) { defer close() // Adds 4 extractable transactions - td, _, _ := fillTestData(t, deps.db) + td, tdFromAdds, tdToAddrs := fillTestData(t, deps.db) // Add 5 extractable transactions: one MultiTransactionSwap, two MultiTransactionBridge and two MultiTransactionSend multiTxs := make([]transfer.TestMultiTransaction, 5) - trs, _, _ := transfer.GenerateTestTransfers(t, deps.db, td.nextIndex, len(multiTxs)*2) + trs, fromAddrs, toAddrs := transfer.GenerateTestTransfers(t, deps.db, td.nextIndex, len(multiTxs)*2) multiTxs[0] = transfer.GenerateTestBridgeMultiTransaction(trs[0], trs[1]) multiTxs[1] = transfer.GenerateTestSwapMultiTransaction(trs[2], testutils.SntSymbol, 100) // trs[3] multiTxs[2] = transfer.GenerateTestSendMultiTransaction(trs[4]) // trs[5] multiTxs[3] = transfer.GenerateTestBridgeMultiTransaction(trs[6], trs[7]) multiTxs[4] = transfer.GenerateTestSendMultiTransaction(trs[8]) // trs[9] + allAddresses := append(append(append(tdFromAdds, tdToAddrs...), fromAddrs...), toAddrs...) + var lastMT transfer.MultiTransactionIDType for i := range trs { if i%2 == 0 { @@ -677,12 +654,12 @@ func TestGetActivityEntriesFilterByType(t *testing.T) { filter.Types = allActivityTypesFilter() // Set tr1 to Receive and pendingTr to Send; rest of two MT remain default Send addresses := []eth.Address{td.tr1.To, td.pendingTr.From, td.multiTx1.FromAddress, td.multiTx2.FromAddress, trs[0].From, trs[2].From, trs[4].From, trs[6].From, trs[8].From, trsSpecial[0].To, trsSpecial[1].From} - entries, err := getActivityEntries(context.Background(), deps, addresses, []common.ChainID{}, filter, 0, 15) + entries, err := getActivityEntries(context.Background(), deps, addresses, false, []common.ChainID{}, filter, 0, 15) require.NoError(t, err) require.Equal(t, 11, len(entries)) filter.Types = []Type{SendAT, SwapAT} - entries, err = getActivityEntries(context.Background(), deps, addresses, []common.ChainID{}, filter, 0, 15) + entries, err = getActivityEntries(context.Background(), deps, addresses, false, []common.ChainID{}, filter, 0, 15) require.NoError(t, err) // 3 from td Send + 2 trs MT Send + 1 (swap) require.Equal(t, 6, len(entries)) @@ -697,7 +674,7 @@ func TestGetActivityEntriesFilterByType(t *testing.T) { require.Equal(t, 0, bridgeCount) filter.Types = []Type{BridgeAT, ReceiveAT} - entries, err = getActivityEntries(context.Background(), deps, addresses, []common.ChainID{}, filter, 0, 15) + entries, err = getActivityEntries(context.Background(), deps, addresses, false, []common.ChainID{}, filter, 0, 15) require.NoError(t, err) require.Equal(t, 3, len(entries)) @@ -710,7 +687,7 @@ func TestGetActivityEntriesFilterByType(t *testing.T) { require.Equal(t, 2, bridgeCount) filter.Types = []Type{MintAT} - entries, err = getActivityEntries(context.Background(), deps, addresses, []common.ChainID{}, filter, 0, 15) + entries, err = getActivityEntries(context.Background(), deps, addresses, false, []common.ChainID{}, filter, 0, 15) require.NoError(t, err) require.Equal(t, 1, len(entries)) @@ -723,7 +700,7 @@ func TestGetActivityEntriesFilterByType(t *testing.T) { require.Equal(t, 0, bridgeCount) filter.Types = []Type{ContractDeploymentAT} - entries, err = getActivityEntries(context.Background(), deps, addresses, []common.ChainID{}, filter, 0, 15) + entries, err = getActivityEntries(context.Background(), deps, addresses, false, []common.ChainID{}, filter, 0, 15) require.NoError(t, err) require.Equal(t, 1, len(entries)) @@ -737,7 +714,7 @@ func TestGetActivityEntriesFilterByType(t *testing.T) { // Filter with all addresses regression filter.Types = []Type{SendAT} - entries, err = getActivityEntries(context.Background(), deps, allAddressesFilter(), []common.ChainID{}, filter, 0, 15) + entries, err = getActivityEntries(context.Background(), deps, allAddresses, true, []common.ChainID{}, filter, 0, 15) require.NoError(t, err) require.Equal(t, 5, len(entries)) } @@ -753,18 +730,17 @@ func TestGetActivityEntriesFilterByAddresses(t *testing.T) { transfer.InsertTestTransfer(t, deps.db, trs[i].From, &trs[i]) } - mockTestAccountsWithAddresses(t, deps.accountsDb, append(append(append(fromTds, toTds...), fromTrs...), toTrs...)) + allAddresses := append(append(append(fromTds, toTds...), fromTrs...), toTrs...) var filter Filter - addressesFilter := allAddressesFilter() - entries, err := getActivityEntries(context.Background(), deps, addressesFilter, []common.ChainID{}, filter, 0, 15) + entries, err := getActivityEntries(context.Background(), deps, allAddresses, true, []common.ChainID{}, filter, 0, 15) require.NoError(t, err) require.Equal(t, 10, len(entries)) - addressesFilter = []eth.Address{td.multiTx1.ToAddress, td.multiTx2.FromAddress, trs[1].From, trs[4].From, trs[3].To} + addressesFilter := []eth.Address{td.multiTx1.ToAddress, td.multiTx2.FromAddress, trs[1].From, trs[4].From, trs[3].To} // The td.multiTx1.ToAddress and trs[3].To are missing not having them as owner address - entries, err = getActivityEntries(context.Background(), deps, addressesFilter, []common.ChainID{}, filter, 0, 15) + entries, err = getActivityEntries(context.Background(), deps, addressesFilter, false, []common.ChainID{}, filter, 0, 15) require.NoError(t, err) require.Equal(t, 3, len(entries)) require.Equal(t, Entry{ @@ -845,16 +821,16 @@ func TestGetActivityEntriesFilterByStatus(t *testing.T) { } } - mockTestAccountsWithAddresses(t, deps.accountsDb, append(append(append(fromTds, toTds...), fromTrs...), toTrs...)) + allAddresses := append(append(append(fromTds, toTds...), fromTrs...), toTrs...) var filter Filter filter.Statuses = allActivityStatusesFilter() - entries, err := getActivityEntries(context.Background(), deps, []eth.Address{}, []common.ChainID{}, filter, 0, 15) + entries, err := getActivityEntries(context.Background(), deps, allAddresses, true, []common.ChainID{}, filter, 0, 15) require.NoError(t, err) require.Equal(t, 11, len(entries)) filter.Statuses = []Status{PendingAS} - entries, err = getActivityEntries(context.Background(), deps, []eth.Address{}, []common.ChainID{}, filter, 0, 15) + entries, err = getActivityEntries(context.Background(), deps, allAddresses, true, []common.ChainID{}, filter, 0, 15) require.NoError(t, err) require.Equal(t, 3, len(entries)) require.Equal(t, td.pendingTr.Hash, entries[2].transaction.Hash) @@ -862,24 +838,24 @@ func TestGetActivityEntriesFilterByStatus(t *testing.T) { require.Equal(t, trs[1].Hash, entries[0].transaction.Hash) filter.Statuses = []Status{FailedAS} - entries, err = getActivityEntries(context.Background(), deps, []eth.Address{}, []common.ChainID{}, filter, 0, 15) + entries, err = getActivityEntries(context.Background(), deps, allAddresses, true, []common.ChainID{}, filter, 0, 15) require.NoError(t, err) require.Equal(t, 2, len(entries)) filter.Statuses = []Status{CompleteAS} - entries, err = getActivityEntries(context.Background(), deps, []eth.Address{}, []common.ChainID{}, filter, 0, 15) + entries, err = getActivityEntries(context.Background(), deps, allAddresses, true, []common.ChainID{}, filter, 0, 15) require.NoError(t, err) require.Equal(t, 6, len(entries)) // Finalized is treated as Complete, would need dynamic blockchain status to track the Finalized level filter.Statuses = []Status{FinalizedAS} - entries, err = getActivityEntries(context.Background(), deps, []eth.Address{}, []common.ChainID{}, filter, 0, 15) + entries, err = getActivityEntries(context.Background(), deps, allAddresses, true, []common.ChainID{}, filter, 0, 15) require.NoError(t, err) require.Equal(t, 6, len(entries)) // Combined filter filter.Statuses = []Status{FailedAS, PendingAS} - entries, err = getActivityEntries(context.Background(), deps, []eth.Address{}, []common.ChainID{}, filter, 0, 15) + entries, err = getActivityEntries(context.Background(), deps, allAddresses, true, []common.ChainID{}, filter, 0, 15) require.NoError(t, err) require.Equal(t, 5, len(entries)) } @@ -900,29 +876,29 @@ func TestGetActivityEntriesFilterByTokenType(t *testing.T) { }) } - mockTestAccountsWithAddresses(t, deps.accountsDb, append(append(append(fromTds, toTds...), fromTrs...), toTrs...)) + allAddresses := append(append(append(fromTds, toTds...), fromTrs...), toTrs...) var filter Filter filter.FilterOutAssets = true - entries, err := getActivityEntries(context.Background(), deps, []eth.Address{}, []common.ChainID{}, filter, 0, 15) + entries, err := getActivityEntries(context.Background(), deps, allAddresses, true, []common.ChainID{}, filter, 0, 15) require.NoError(t, err) require.Equal(t, 0, len(entries)) filter.FilterOutAssets = false filter.Assets = allTokensFilter() - entries, err = getActivityEntries(context.Background(), deps, []eth.Address{}, []common.ChainID{}, filter, 0, 15) + entries, err = getActivityEntries(context.Background(), deps, allAddresses, true, []common.ChainID{}, filter, 0, 15) require.NoError(t, err) require.Equal(t, 13, len(entries)) // Native tokens are network agnostic, hence all are returned filter.Assets = []Token{{TokenType: Native, ChainID: common.ChainID(transfer.EthMainnet.ChainID)}} - entries, err = getActivityEntries(context.Background(), deps, []eth.Address{}, []common.ChainID{}, filter, 0, 15) + entries, err = getActivityEntries(context.Background(), deps, allAddresses, true, []common.ChainID{}, filter, 0, 15) require.NoError(t, err) require.Equal(t, 5, len(entries)) // Test that it doesn't break the filter filter.Assets = []Token{{TokenType: Erc1155}} - entries, err = getActivityEntries(context.Background(), deps, []eth.Address{}, []common.ChainID{}, filter, 0, 15) + entries, err = getActivityEntries(context.Background(), deps, allAddresses, true, []common.ChainID{}, filter, 0, 15) require.NoError(t, err) require.Equal(t, 0, len(entries)) @@ -931,7 +907,7 @@ func TestGetActivityEntriesFilterByTokenType(t *testing.T) { ChainID: common.ChainID(transfer.UsdcMainnet.ChainID), Address: transfer.UsdcMainnet.Address, }} - entries, err = getActivityEntries(context.Background(), deps, []eth.Address{}, []common.ChainID{}, filter, 0, 15) + entries, err = getActivityEntries(context.Background(), deps, allAddresses, true, []common.ChainID{}, filter, 0, 15) require.NoError(t, err) // Two MT for which ChainID is ignored and one transfer on the main net and the Goerli is ignored require.Equal(t, 3, len(entries)) @@ -957,7 +933,7 @@ func TestGetActivityEntriesFilterByTokenType(t *testing.T) { ChainID: common.ChainID(transfer.UsdcGoerli.ChainID), Address: transfer.UsdcGoerli.Address, }} - entries, err = getActivityEntries(context.Background(), deps, []eth.Address{}, []common.ChainID{}, filter, 0, 15) + entries, err = getActivityEntries(context.Background(), deps, allAddresses, true, []common.ChainID{}, filter, 0, 15) require.NoError(t, err) // Two MT for which ChainID is ignored and two transfers on the main net and Goerli require.Equal(t, 4, len(entries)) @@ -978,26 +954,26 @@ func TestGetActivityEntriesFilterByToAddresses(t *testing.T) { transfer.InsertTestTransfer(t, deps.db, trs[i].To, &trs[i]) } - mockTestAccountsWithAddresses(t, deps.accountsDb, append(append(append(fromTds, toTds...), fromTrs...), toTrs...)) + allAddresses := append(append(append(fromTds, toTds...), fromTrs...), toTrs...) var filter Filter - filter.CounterpartyAddresses = allAddressesFilter() - entries, err := getActivityEntries(context.Background(), deps, []eth.Address{}, []common.ChainID{}, filter, 0, 15) + filter.CounterpartyAddresses = allAddresses + entries, err := getActivityEntries(context.Background(), deps, allAddresses, true, []common.ChainID{}, filter, 0, 15) require.NoError(t, err) require.Equal(t, 10, len(entries)) filter.CounterpartyAddresses = []eth.Address{eth.HexToAddress("0x567890")} - entries, err = getActivityEntries(context.Background(), deps, []eth.Address{}, []common.ChainID{}, filter, 0, 15) + entries, err = getActivityEntries(context.Background(), deps, allAddresses, true, []common.ChainID{}, filter, 0, 15) require.NoError(t, err) require.Equal(t, 0, len(entries)) filter.CounterpartyAddresses = []eth.Address{td.pendingTr.To, td.multiTx2.ToAddress, trs[3].To} - entries, err = getActivityEntries(context.Background(), deps, []eth.Address{}, []common.ChainID{}, filter, 0, 15) + entries, err = getActivityEntries(context.Background(), deps, allAddresses, true, []common.ChainID{}, filter, 0, 15) require.NoError(t, err) require.Equal(t, 3, len(entries)) filter.CounterpartyAddresses = []eth.Address{td.tr1.To, td.pendingTr.From, trs[3].From, trs[5].To} - entries, err = getActivityEntries(context.Background(), deps, []eth.Address{}, []common.ChainID{}, filter, 0, 15) + entries, err = getActivityEntries(context.Background(), deps, allAddresses, true, []common.ChainID{}, filter, 0, 15) require.NoError(t, err) require.Equal(t, 2, len(entries)) } @@ -1042,21 +1018,21 @@ func TestGetActivityEntriesFilterByNetworks(t *testing.T) { recordPresence(trs[i].ChainID, 4+i) transfer.InsertTestTransfer(t, deps.db, trs[i].To, &trs[i]) } - mockTestAccountsWithAddresses(t, deps.accountsDb, append(append(append(fromTds, toTds...), fromTrs...), toTrs...)) + allAddresses := append(append(append(fromTds, toTds...), fromTrs...), toTrs...) var filter Filter chainIDs := allNetworksFilter() - entries, err := getActivityEntries(context.Background(), deps, []eth.Address{}, chainIDs, filter, 0, 15) + entries, err := getActivityEntries(context.Background(), deps, allAddresses, true, chainIDs, filter, 0, 15) require.NoError(t, err) require.Equal(t, 10, len(entries)) chainIDs = []common.ChainID{5674839210} - entries, err = getActivityEntries(context.Background(), deps, []eth.Address{}, chainIDs, filter, 0, 15) + entries, err = getActivityEntries(context.Background(), deps, allAddresses, true, chainIDs, filter, 0, 15) require.NoError(t, err) require.Equal(t, 0, len(entries)) chainIDs = []common.ChainID{td.pendingTr.ChainID, td.multiTx2Tr1.ChainID, trs[3].ChainID} - entries, err = getActivityEntries(context.Background(), deps, []eth.Address{}, chainIDs, filter, 0, 15) + entries, err = getActivityEntries(context.Background(), deps, allAddresses, true, chainIDs, filter, 0, 15) require.NoError(t, err) expectedResults := make(map[int]int) for _, chainID := range chainIDs { @@ -1100,25 +1076,25 @@ func TestGetActivityEntriesFilterByNetworksOfSubTransactions(t *testing.T) { var filter Filter chainIDs := allNetworksFilter() - entries, err := getActivityEntries(context.Background(), deps, toTrs, chainIDs, filter, 0, 15) + entries, err := getActivityEntries(context.Background(), deps, toTrs, false, chainIDs, filter, 0, 15) require.NoError(t, err) require.Equal(t, 3, len(entries)) chainIDs = []common.ChainID{trs[0].ChainID, trs[1].ChainID} - entries, err = getActivityEntries(context.Background(), deps, toTrs, chainIDs, filter, 0, 15) + entries, err = getActivityEntries(context.Background(), deps, toTrs, false, chainIDs, filter, 0, 15) require.NoError(t, err) require.Equal(t, 1, len(entries)) require.Equal(t, entries[0].id, mt1.MultiTransactionID) // Filter by pending_transactions sub-transacitons chainIDs = []common.ChainID{trs[2].ChainID} - entries, err = getActivityEntries(context.Background(), deps, toTrs, chainIDs, filter, 0, 15) + entries, err = getActivityEntries(context.Background(), deps, toTrs, false, chainIDs, filter, 0, 15) require.NoError(t, err) require.Equal(t, 1, len(entries)) require.Equal(t, entries[0].id, mt1.MultiTransactionID) chainIDs = []common.ChainID{trs[3].ChainID} - entries, err = getActivityEntries(context.Background(), deps, toTrs, chainIDs, filter, 0, 15) + entries, err = getActivityEntries(context.Background(), deps, toTrs, false, chainIDs, filter, 0, 15) require.NoError(t, err) require.Equal(t, 1, len(entries)) require.Equal(t, entries[0].id, mt2.MultiTransactionID) @@ -1140,7 +1116,7 @@ func TestGetActivityEntriesCheckToAndFrom(t *testing.T) { td.multiTx1.FromAddress, td.multiTx2.FromAddress, trs[0].To, trs[1].To} var filter Filter - entries, err := getActivityEntries(context.Background(), deps, addresses, []common.ChainID{}, filter, 0, 15) + entries, err := getActivityEntries(context.Background(), deps, addresses, false, []common.ChainID{}, filter, 0, 15) require.NoError(t, err) require.Equal(t, 6, len(entries)) @@ -1165,12 +1141,12 @@ func TestGetActivityEntriesCheckContextCancellation(t *testing.T) { deps, close := setupTestActivityDB(t) defer close() - _, _, _ = fillTestData(t, deps.db) + _, fromAddresses, toAddresses := fillTestData(t, deps.db) cancellableCtx, cancelFn := context.WithCancel(context.Background()) cancelFn() - activities, err := getActivityEntries(cancellableCtx, deps, []eth.Address{}, []common.ChainID{}, Filter{}, 0, 10) + activities, err := getActivityEntries(cancellableCtx, deps, append(fromAddresses, toAddresses...), true, []common.ChainID{}, Filter{}, 0, 10) require.ErrorIs(t, err, context.Canceled) require.Equal(t, 0, len(activities)) } @@ -1187,7 +1163,7 @@ func TestGetActivityEntriesNullAddresses(t *testing.T) { trs[1].MultiTransactionID = trs[0].MultiTransactionID for i := 0; i < 3; i++ { - transfer.InsertTestTransferWithOptions(t, deps.db, trs[i].To, &trs[i], &transfer.TestTransferOptions{ + transfer.InsertTestTransferWithOptions(t, deps.db, trs[i].From, &trs[i], &transfer.TestTransferOptions{ NullifyAddresses: []eth.Address{trs[i].To}, }) } @@ -1195,13 +1171,18 @@ func TestGetActivityEntriesNullAddresses(t *testing.T) { trs[3].To = eth.Address{} transfer.InsertTestPendingTransaction(t, deps.db, &trs[3]) - mockTestAccountsWithAddresses(t, deps.accountsDb, []eth.Address{trs[0].From, trs[1].From, trs[2].From, trs[3].From}) + addresses := []eth.Address{trs[0].From, trs[1].From, trs[2].From, trs[3].From} - activities, err := getActivityEntries(context.Background(), deps, allAddressesFilter(), allNetworksFilter(), Filter{}, 0, 10) + activities, err := getActivityEntries(context.Background(), deps, addresses, false, allNetworksFilter(), Filter{}, 0, 10) require.NoError(t, err) require.Equal(t, 3, len(activities)) } +func TestGetActivityEntries_ErrorIfNoAddress(t *testing.T) { + _, err := getActivityEntries(context.Background(), FilterDependencies{}, []eth.Address{}, true, []common.ChainID{}, Filter{}, 0, 10) + require.EqualError(t, err, "no addresses provided") +} + func TestGetTxDetails(t *testing.T) { deps, close := setupTestActivityDB(t) defer close() @@ -1296,7 +1277,7 @@ func setupBenchmark(b *testing.B, inMemory bool, resultCount int) (deps FilterDe for i = 0; i < transactionCount-pendingCount; i++ { trs[i].From = accounts[i%len(accounts)] - transfer.InsertTestTransfer(b, deps.db, trs[i].To, &trs[i]) + transfer.InsertTestTransfer(b, deps.db, trs[i].From, &trs[i]) } for ; i < transactionCount; i++ { @@ -1304,7 +1285,6 @@ func setupBenchmark(b *testing.B, inMemory bool, resultCount int) (deps FilterDe transfer.InsertTestPendingTransaction(b, deps.db, &trs[i]) } - mockTestAccountsWithAddresses(b, deps.accountsDb, accounts) return } @@ -1312,7 +1292,7 @@ func BenchmarkGetActivityEntries(bArg *testing.B) { type params struct { inMemory bool resultCount int - generateTestParameters func([]eth.Address) (addresses []eth.Address, filter *Filter, startIndex int) + generateTestParameters func([]eth.Address) (addresses []eth.Address, allAddresses bool, filter *Filter, startIndex int) } testCases := []struct { name string @@ -1323,8 +1303,8 @@ func BenchmarkGetActivityEntries(bArg *testing.B) { params{ true, 10, - func([]eth.Address) ([]eth.Address, *Filter, int) { - return allAddressesFilter(), &Filter{}, 0 + func(addresses []eth.Address) ([]eth.Address, bool, *Filter, int) { + return addresses, true, &Filter{}, 0 }, }, }, @@ -1333,8 +1313,8 @@ func BenchmarkGetActivityEntries(bArg *testing.B) { params{ false, 10, - func([]eth.Address) ([]eth.Address, *Filter, int) { - return allAddressesFilter(), &Filter{}, 0 + func(addresses []eth.Address) ([]eth.Address, bool, *Filter, int) { + return addresses, true, &Filter{}, 0 }, }, }, @@ -1343,18 +1323,8 @@ func BenchmarkGetActivityEntries(bArg *testing.B) { params{ false, 10, - func(addresses []eth.Address) ([]eth.Address, *Filter, int) { - return allAddressesFilter(), &Filter{}, 200 - }, - }, - }, - { - "SSD_AllAddresses", - params{ - false, - 10, - func(addresses []eth.Address) ([]eth.Address, *Filter, int) { - return addresses, &Filter{}, 0 + func(addresses []eth.Address) ([]eth.Address, bool, *Filter, int) { + return addresses, true, &Filter{}, 200 }, }, }, @@ -1363,8 +1333,8 @@ func BenchmarkGetActivityEntries(bArg *testing.B) { params{ false, 10, - func(addresses []eth.Address) ([]eth.Address, *Filter, int) { - return addresses, &Filter{CounterpartyAddresses: addresses[3:]}, 0 + func(addresses []eth.Address) ([]eth.Address, bool, *Filter, int) { + return addresses, true, &Filter{CounterpartyAddresses: addresses[3:]}, 0 }, }, }, @@ -1373,8 +1343,8 @@ func BenchmarkGetActivityEntries(bArg *testing.B) { params{ false, 10, - func(addresses []eth.Address) ([]eth.Address, *Filter, int) { - return addresses[0:1], &Filter{}, 0 + func(addresses []eth.Address) ([]eth.Address, bool, *Filter, int) { + return addresses[0:1], false, &Filter{}, 0 }, }, }, @@ -1385,14 +1355,14 @@ func BenchmarkGetActivityEntries(bArg *testing.B) { const resultCount = 10 for _, tc := range testCases { - addresses, filter, startIndex := tc.params.generateTestParameters(accounts) + addresses, allAddresses, filter, startIndex := tc.params.generateTestParameters(accounts) bArg.Run(tc.name, func(b *testing.B) { // Reset timer after setup b.ResetTimer() // Run benchmark for i := 0; i < b.N; i++ { - res, err := getActivityEntries(context.Background(), deps, addresses, allNetworksFilter(), *filter, startIndex, resultCount) + res, err := getActivityEntries(context.Background(), deps, addresses, allAddresses, allNetworksFilter(), *filter, startIndex, resultCount) if err != nil || len(res) != resultCount { b.Error(err) } @@ -1400,59 +1370,3 @@ func BenchmarkGetActivityEntries(bArg *testing.B) { }) } } - -func TestUpdateWalletKeypairsAccountsTable(t *testing.T) { - // initialize - appDb, err := helpers.SetupTestMemorySQLDB(appdatabase.DbInitializer{}) - require.NoError(t, err) - walletDb, err := helpers.SetupTestMemorySQLDB(walletdatabase.DbInitializer{}) - require.NoError(t, err) - accountsAppDb, err := accounts.NewDB(appDb) - require.NoError(t, err) - accountsWalletDb, err := accounts.NewDB(walletDb) - require.NoError(t, err) - - // Check initially empty - addressesApp, err := accountsAppDb.GetWalletAddresses() - require.NoError(t, err) - require.Empty(t, addressesApp) - addressesWallet, err := accountsWalletDb.GetWalletAddresses() - require.Error(t, err) // no such table error - require.Empty(t, addressesWallet) - - // Insert 2 addresses in app db, but only 1 is a wallet - addresses := []types.Address{{0x01}, {0x02}, {0x03}} - accounts := []*accounts.Account{ - {Address: addresses[0], Chat: true, Wallet: true}, - {Address: addresses[1], Wallet: true}, - {Address: addresses[2]}, - } - err = accountsAppDb.SaveOrUpdateAccounts(accounts, false) - require.NoError(t, err) - - // Check only 2 wallet accs is returned in app db - addressesApp, err = accountsAppDb.GetWalletAddresses() - require.NoError(t, err) - require.Len(t, addressesApp, 2) - - // update wallet DB - err = updateKeypairsAccountsTable(accountsAppDb, walletDb) - require.NoError(t, err) - - // Check only 2 wallet acc is returned in wallet db - var count int - err = walletDb.QueryRow(fmt.Sprintf("SELECT count(address) FROM %s", keypairAccountsTable)).Scan(&count) - require.NoError(t, err) - require.Equal(t, count, 2) - - // Compare addresses between app and wallet db - rows, err := walletDb.Query(fmt.Sprintf("SELECT address FROM %s", keypairAccountsTable)) - require.NoError(t, err) - for rows.Next() { - var address types.Address - err = rows.Scan(&address) - require.NoError(t, err) - require.Contains(t, addresses, address) - } - defer rows.Close() -} diff --git a/services/wallet/activity/filter.go b/services/wallet/activity/filter.go index e2254407f..ee5dc0619 100644 --- a/services/wallet/activity/filter.go +++ b/services/wallet/activity/filter.go @@ -68,10 +68,6 @@ func allTokensFilter() []Token { return []Token{} } -func allAddressesFilter() []eth.Address { - return []eth.Address{} -} - func allNetworksFilter() []common.ChainID { return []common.ChainID{} } diff --git a/services/wallet/activity/filter.sql b/services/wallet/activity/filter.sql index e15a0afab..efaccf072 100644 --- a/services/wallet/activity/filter.sql +++ b/services/wallet/activity/filter.sql @@ -1,9 +1,5 @@ -- Query includes duplicates, will return multiple rows for the same transaction if both to and from addresses are in the address list. -- --- The addresses list will have priority in deciding the source of the duplicate transaction; see filter_addresses temp table --- TODO: #11980 --- However, if the addresses list is empty, and all addresses should be included, the accounts table will be used --- -- The switch for tr_type is used to de-conflict the source for the two entries for the same transaction -- -- UNION ALL is used to avoid the overhead of DISTINCT given that we don't expect to have duplicate entries outside the sender and receiver addresses being in the list which is handled separately @@ -43,13 +39,14 @@ WITH filter_conditions AS ( ? AS includeAllTokenTypeAssets, ? AS includeAllNetworks, ? AS pendingStatus, - "0000000000000000000000000000000000000000" AS zeroAddress + '0000000000000000000000000000000000000000' AS zeroAddress ), +-- This UNION between CTE and TEMP TABLE acts as an optimization. As soon as we drop one or use them interchangeably the performance drops significantly. filter_addresses(address) AS ( SELECT - HEX(address) + address FROM - %s + filter_addresses_table WHERE ( SELECT @@ -209,7 +206,7 @@ WHERE AND NOT ( tr_type = fromTrType and transfers.tx_to_address IS NULL - AND transfers.type = "eth" + AND transfers.type = 'eth' AND transfers.contract_address IS NOT NULL AND HEX(transfers.contract_address) != zeroAddress ) @@ -219,7 +216,7 @@ WHERE AND tr_type = toTrType AND NOT ( tr_type = toTrType - AND transfers.type = "erc721" + AND transfers.type = 'erc721' AND ( transfers.tx_from_address IS NULL OR HEX(transfers.tx_from_address) = zeroAddress @@ -230,7 +227,7 @@ WHERE filterActivityTypeContractDeployment AND tr_type = fromTrType AND transfers.tx_to_address IS NULL - AND transfers.type = "eth" + AND transfers.type = 'eth' AND transfers.contract_address IS NOT NULL AND HEX(transfers.contract_address) != zeroAddress AND ( @@ -241,7 +238,7 @@ WHERE OR ( filterActivityTypeMint AND tr_type = toTrType - AND transfers.type = "erc721" + AND transfers.type = 'erc721' AND ( transfers.tx_from_address IS NULL OR HEX(transfers.tx_from_address) = zeroAddress @@ -265,11 +262,11 @@ WHERE AND ( includeAllTokenTypeAssets OR ( - transfers.type = "eth" - AND ("ETH" IN assets_token_codes) + transfers.type = 'eth' + AND ('ETH' IN assets_token_codes) ) OR ( - transfers.type = "erc20" + transfers.type = 'erc20' AND ( ( transfers.network_id, diff --git a/services/wallet/activity/service.go b/services/wallet/activity/service.go index cefe69d6c..8e54ba0b7 100644 --- a/services/wallet/activity/service.go +++ b/services/wallet/activity/service.go @@ -11,7 +11,6 @@ import ( "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" - "github.com/status-im/status-go/multiaccounts/accounts" "github.com/status-im/status-go/services/wallet/async" "github.com/status-im/status-go/services/wallet/collectibles" w_common "github.com/status-im/status-go/services/wallet/common" @@ -43,9 +42,9 @@ var ( } ) +// Service provides an async interface, ensuring only one filter request, of each type, is running at a time. It also provides lazy load of NFT info and token mapping type Service struct { db *sql.DB - accountsDB *accounts.Database tokenManager token.ManagerInterface collectibles collectibles.ManagerInterface eventFeed *event.Feed @@ -53,10 +52,9 @@ type Service struct { scheduler *async.MultiClientScheduler } -func NewService(db *sql.DB, tokenManager token.ManagerInterface, collectibles collectibles.ManagerInterface, eventFeed *event.Feed, accountsDb *accounts.Database) *Service { +func NewService(db *sql.DB, tokenManager token.ManagerInterface, collectibles collectibles.ManagerInterface, eventFeed *event.Feed) *Service { return &Service{ db: db, - accountsDB: accountsDb, tokenManager: tokenManager, collectibles: collectibles, eventFeed: eventFeed, @@ -82,11 +80,13 @@ type FilterResponse struct { } // FilterActivityAsync allows only one filter task to run at a time -// and it cancels the current one if a new one is started +// it cancels the current one if a new one is started +// and should not expect other owners to have data in one of the queried tables +// // All calls will trigger an EventActivityFilteringDone event with the result of the filtering -func (s *Service) FilterActivityAsync(requestID int32, addresses []common.Address, chainIDs []w_common.ChainID, filter Filter, offset int, limit int) { +func (s *Service) FilterActivityAsync(requestID int32, addresses []common.Address, allAddresses bool, chainIDs []w_common.ChainID, filter Filter, offset int, limit int) { s.scheduler.Enqueue(requestID, filterTask, func(ctx context.Context) (interface{}, error) { - activities, err := getActivityEntries(ctx, s.getDeps(), addresses, chainIDs, filter, offset, limit) + activities, err := getActivityEntries(ctx, s.getDeps(), addresses, allAddresses, chainIDs, filter, offset, limit) return activities, err }, func(result interface{}, taskType async.TaskType, err error) { res := FilterResponse{ @@ -241,8 +241,7 @@ func (s *Service) Stop() { func (s *Service) getDeps() FilterDependencies { return FilterDependencies{ - db: s.db, - accountsDb: s.accountsDB, + db: s.db, tokenSymbol: func(t Token) string { info := s.tokenManager.LookupTokenIdentity(uint64(t.ChainID), t.Address, t.TokenType == Native) if info == nil { diff --git a/services/wallet/activity/service_test.go b/services/wallet/activity/service_test.go index ac212aadb..1dde48c8a 100644 --- a/services/wallet/activity/service_test.go +++ b/services/wallet/activity/service_test.go @@ -63,7 +63,7 @@ func setupTestService(tb testing.TB) (service *Service, eventFeed *event.Feed, t eventFeed = new(event.Feed) tokenMock = &mockTokenManager{} collectiblesMock = &mockCollectiblesManager{} - service = NewService(db, tokenMock, collectiblesMock, eventFeed, nil) + service = NewService(db, tokenMock, collectiblesMock, eventFeed) return service, eventFeed, tokenMock, collectiblesMock, func() { require.NoError(tb, db.Close()) @@ -79,8 +79,8 @@ type arg struct { } // insertStubTransfersWithCollectibles will insert nil if tokenIDStr is empty -func insertStubTransfersWithCollectibles(t *testing.T, db *sql.DB, args []arg) { - trs, _, _ := transfer.GenerateTestTransfers(t, db, 0, len(args)) +func insertStubTransfersWithCollectibles(t *testing.T, db *sql.DB, args []arg) (fromAddresses, toAddresses []eth.Address) { + trs, fromAddresses, toAddresses := transfer.GenerateTestTransfers(t, db, 0, len(args)) for i := range args { trs[i].ChainID = args[i].chainID if args[i].tokenIDStr == "" { @@ -96,6 +96,7 @@ func insertStubTransfersWithCollectibles(t *testing.T, db *sql.DB, args []arg) { TokenID: args[i].tokenID, }) } + return fromAddresses, toAddresses } func TestService_UpdateCollectibleInfo(t *testing.T) { @@ -108,7 +109,7 @@ func TestService_UpdateCollectibleInfo(t *testing.T) { {5, "0x3d6afaa395c31fcd391fe3d562e75fe9e8ec7e6a", "", nil, nil}, {5, "0xA2838FDA19EB6EED3F8B9EFF411D4CD7D2DE0313", "0x0F", nil, nil}, } - insertStubTransfersWithCollectibles(t, s.db, args) + fromAddresses, toAddresses := insertStubTransfersWithCollectibles(t, s.db, args) ch := make(chan walletevent.Event) sub := e.Subscribe(ch) @@ -147,7 +148,7 @@ func TestService_UpdateCollectibleInfo(t *testing.T) { }, }, nil).Once() - s.FilterActivityAsync(0, allAddressesFilter(), allNetworksFilter(), Filter{}, 0, 3) + s.FilterActivityAsync(0, append(fromAddresses, toAddresses...), true, allNetworksFilter(), Filter{}, 0, 3) filterResponseCount := 0 var updates []EntryData @@ -190,43 +191,36 @@ func TestService_UpdateCollectibleInfo_Error(t *testing.T) { {5, "0xA2838FDA19EB6EED3F8B9EFF411D4CD7D2DE0313", "0x762AD3E4934E687F8701F24C7274E5209213FD6208FF952ACEB325D028866949", nil, nil}, {5, "0xA2838FDA19EB6EED3F8B9EFF411D4CD7D2DE0313", "0x0D", nil, nil}, } - insertStubTransfersWithCollectibles(t, s.db, args) - ch := make(chan walletevent.Event) + ch := make(chan walletevent.Event, 4) sub := e.Subscribe(ch) + fromAddresses, toAddresses := insertStubTransfersWithCollectibles(t, s.db, args) + c.On("FetchAssetsByCollectibleUniqueID", mock.Anything).Return(nil, thirdparty.ErrChainIDNotSupported).Once() - s.FilterActivityAsync(0, allAddressesFilter(), allNetworksFilter(), Filter{}, 0, 5) + s.FilterActivityAsync(0, append(fromAddresses, toAddresses...), true, allNetworksFilter(), Filter{}, 0, 5) filterResponseCount := 0 updatesCount := 0 - select { - case res := <-ch: - switch res.Type { - case EventActivityFilteringDone: - var payload FilterResponse - err := json.Unmarshal([]byte(res.Message), &payload) - require.NoError(t, err) - require.Equal(t, ErrorCodeSuccess, payload.ErrorCode) - require.Equal(t, 2, len(payload.Activities)) - filterResponseCount++ - case EventActivityFilteringUpdate: - updatesCount++ + for i := 0; i < 2; i++ { + select { + case res := <-ch: + switch res.Type { + case EventActivityFilteringDone: + var payload FilterResponse + err := json.Unmarshal([]byte(res.Message), &payload) + require.NoError(t, err) + require.Equal(t, ErrorCodeSuccess, payload.ErrorCode) + require.Equal(t, 2, len(payload.Activities)) + filterResponseCount++ + case EventActivityFilteringUpdate: + updatesCount++ + } + case <-time.NewTimer(20 * time.Millisecond).C: + // We wait to ensure the EventActivityFilteringUpdate is never sent } - case <-time.NewTimer(100 * time.Millisecond).C: - } - - select { - case res := <-ch: - switch res.Type { - case EventActivityFilteringDone: - filterResponseCount++ - case EventActivityFilteringUpdate: - updatesCount++ - } - case <-time.NewTimer(100 * time.Microsecond).C: } require.Equal(t, 1, filterResponseCount) diff --git a/services/wallet/api.go b/services/wallet/api.go index bf956c581..5fd802de1 100644 --- a/services/wallet/api.go +++ b/services/wallet/api.go @@ -570,10 +570,10 @@ func (api *API) FetchAllCurrencyFormats() (currency.FormatPerSymbol, error) { return api.s.currency.FetchAllCurrencyFormats() } -func (api *API) FilterActivityAsync(requestID int32, addresses []common.Address, chainIDs []wcommon.ChainID, filter activity.Filter, offset int, limit int) error { - log.Debug("wallet.api.FilterActivityAsync", "requestID", requestID, "addr.count", len(addresses), "chainIDs.count", len(chainIDs), "offset", offset, "limit", limit) +func (api *API) FilterActivityAsync(requestID int32, addresses []common.Address, allAddresses bool, chainIDs []wcommon.ChainID, filter activity.Filter, offset int, limit int) error { + log.Debug("wallet.api.FilterActivityAsync", "requestID", requestID, "addr.count", len(addresses), "allAddresses", allAddresses, "chainIDs.count", len(chainIDs), "offset", offset, "limit", limit) - api.s.activity.FilterActivityAsync(requestID, addresses, chainIDs, filter, offset, limit) + api.s.activity.FilterActivityAsync(requestID, addresses, allAddresses, chainIDs, filter, offset, limit) return nil } diff --git a/services/wallet/service.go b/services/wallet/service.go index b00b19f00..9f358ddd7 100644 --- a/services/wallet/service.go +++ b/services/wallet/service.go @@ -143,7 +143,7 @@ func NewService( collectiblesManager := collectibles.NewManager(db, rpcClient, contractOwnershipProviders, accountOwnershipProviders, collectibleDataProviders, collectionDataProviders, openseaClient) collectibles := collectibles.NewService(db, feed, accountsDB, accountFeed, rpcClient.NetworkManager, collectiblesManager) - activity := activity.NewService(db, tokenManager, collectiblesManager, feed, accountsDB) + activity := activity.NewService(db, tokenManager, collectiblesManager, feed) return &Service{ db: db,