feat(wallet) lazy load activity info optimization

Trigger async fetching of extra information on each activity filtering
request. Only emit the update event for incomplete entries.

Other changes:

- Make DataEntry light as event payload by making all the fields
  optional
- Add new required fields to the activity DataEntry
- Add collectibles.ManagerInterface to aid testing

Note: this PR keeps compatibility with current master by always
providing non-optional multi-transaction ID. The TODO will be executed
before merging the status-desktop PR.

Experienced a hang on FetchAssetsByCollectibleUniqueID call with:
[{{5 0x21263a042aFE4bAE34F08Bb318056C181bD96D3b} 1209},
{{5 0x9A95631794a42d30C47f214fBe02A72585df35e1} 237},
{{5 0x9A95631794a42d30C47f214fBe02A72585df35e1} 236},
{{5 0x9A95631794a42d30C47f214fBe02A72585df35e1} 832},
{{5 0x9A95631794a42d30C47f214fBe02A72585df35e1} 830},
{{5 0x9A95631794a42d30C47f214fBe02A72585df35e1} 853}]

Updates status-desktop #11597
This commit is contained in:
Stefan 2023-08-11 18:28:46 +01:00 committed by Stefan Dunca
parent d61677403a
commit c0f32748b4
11 changed files with 505 additions and 77 deletions

View File

@ -16,7 +16,9 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/status-im/status-go/multiaccounts/accounts"
"github.com/status-im/status-go/services/wallet/bigint"
"github.com/status-im/status-go/services/wallet/common"
"github.com/status-im/status-go/services/wallet/thirdparty"
"github.com/status-im/status-go/services/wallet/transfer"
"github.com/status-im/status-go/transactions"
@ -48,80 +50,99 @@ const (
)
type Entry struct {
payloadType PayloadType
transaction *transfer.TransactionIdentity
id transfer.MultiTransactionIDType
timestamp int64
activityType Type
activityStatus Status
amountOut *hexutil.Big // Used for activityType SendAT, SwapAT, BridgeAT
amountIn *hexutil.Big // Used for activityType ReceiveAT, BuyAT, SwapAT, BridgeAT
tokenOut *Token // Used for activityType SendAT, SwapAT, BridgeAT
tokenIn *Token // Used for activityType ReceiveAT, BuyAT, SwapAT, BridgeAT
symbolOut *string
symbolIn *string
sender *eth.Address
recipient *eth.Address
chainIDOut *common.ChainID
chainIDIn *common.ChainID
transferType *TransferType
payloadType PayloadType
transaction *transfer.TransactionIdentity
id transfer.MultiTransactionIDType
timestamp int64
activityType Type
activityStatus Status
amountOut *hexutil.Big // Used for activityType SendAT, SwapAT, BridgeAT
amountIn *hexutil.Big // Used for activityType ReceiveAT, BuyAT, SwapAT, BridgeAT
tokenOut *Token // Used for activityType SendAT, SwapAT, BridgeAT
tokenIn *Token // Used for activityType ReceiveAT, BuyAT, SwapAT, BridgeAT
symbolOut *string
symbolIn *string
sender *eth.Address
recipient *eth.Address
chainIDOut *common.ChainID
chainIDIn *common.ChainID
transferType *TransferType
contractAddress *eth.Address
}
type jsonSerializationTemplate struct {
PayloadType PayloadType `json:"payloadType"`
Transaction *transfer.TransactionIdentity `json:"transaction"`
ID transfer.MultiTransactionIDType `json:"id"`
Timestamp int64 `json:"timestamp"`
ActivityType Type `json:"activityType"`
ActivityStatus Status `json:"activityStatus"`
AmountOut *hexutil.Big `json:"amountOut"`
AmountIn *hexutil.Big `json:"amountIn"`
TokenOut *Token `json:"tokenOut,omitempty"`
TokenIn *Token `json:"tokenIn,omitempty"`
SymbolOut *string `json:"symbolOut,omitempty"`
SymbolIn *string `json:"symbolIn,omitempty"`
Sender *eth.Address `json:"sender,omitempty"`
Recipient *eth.Address `json:"recipient,omitempty"`
ChainIDOut *common.ChainID `json:"chainIdOut,omitempty"`
ChainIDIn *common.ChainID `json:"chainIdIn,omitempty"`
TransferType *TransferType `json:"transferType,omitempty"`
type EntryData struct {
PayloadType PayloadType `json:"payloadType"`
Transaction *transfer.TransactionIdentity `json:"transaction,omitempty"`
ID *transfer.MultiTransactionIDType `json:"id,omitempty"`
Timestamp *int64 `json:"timestamp,omitempty"`
ActivityType *Type `json:"activityType,omitempty"`
ActivityStatus *Status `json:"activityStatus,omitempty"`
AmountOut *hexutil.Big `json:"amountOut,omitempty"`
AmountIn *hexutil.Big `json:"amountIn,omitempty"`
TokenOut *Token `json:"tokenOut,omitempty"`
TokenIn *Token `json:"tokenIn,omitempty"`
SymbolOut *string `json:"symbolOut,omitempty"`
SymbolIn *string `json:"symbolIn,omitempty"`
Sender *eth.Address `json:"sender,omitempty"`
Recipient *eth.Address `json:"recipient,omitempty"`
ChainIDOut *common.ChainID `json:"chainIdOut,omitempty"`
ChainIDIn *common.ChainID `json:"chainIdIn,omitempty"`
TransferType *TransferType `json:"transferType,omitempty"`
ContractAddress *eth.Address `json:"contractAddress,omitempty"`
NftName *string `json:"nftName,omitempty"`
NftURL *string `json:"nftUrl,omitempty"`
}
func (e *Entry) MarshalJSON() ([]byte, error) {
return json.Marshal(jsonSerializationTemplate{
PayloadType: e.payloadType,
Transaction: e.transaction,
ID: e.id,
Timestamp: e.timestamp,
ActivityType: e.activityType,
ActivityStatus: e.activityStatus,
AmountOut: e.amountOut,
AmountIn: e.amountIn,
TokenOut: e.tokenOut,
TokenIn: e.tokenIn,
SymbolOut: e.symbolOut,
SymbolIn: e.symbolIn,
Sender: e.sender,
Recipient: e.recipient,
ChainIDOut: e.chainIDOut,
ChainIDIn: e.chainIDIn,
TransferType: e.transferType,
})
data := EntryData{
Timestamp: &e.timestamp,
ActivityType: &e.activityType,
ActivityStatus: &e.activityStatus,
AmountOut: e.amountOut,
AmountIn: e.amountIn,
TokenOut: e.tokenOut,
TokenIn: e.tokenIn,
SymbolOut: e.symbolOut,
SymbolIn: e.symbolIn,
Sender: e.sender,
Recipient: e.recipient,
ChainIDOut: e.chainIDOut,
ChainIDIn: e.chainIDIn,
TransferType: e.transferType,
ContractAddress: e.contractAddress,
}
if e.payloadType == MultiTransactionPT {
data.ID = common.NewAndSet(e.id)
} else {
data.Transaction = e.transaction
}
data.PayloadType = e.payloadType
return json.Marshal(data)
}
func (e *Entry) UnmarshalJSON(data []byte) error {
aux := jsonSerializationTemplate{}
aux := EntryData{}
if err := json.Unmarshal(data, &aux); err != nil {
return err
}
e.payloadType = aux.PayloadType
e.transaction = aux.Transaction
e.id = aux.ID
e.timestamp = aux.Timestamp
e.activityType = aux.ActivityType
e.activityStatus = aux.ActivityStatus
if aux.ID != nil {
e.id = *aux.ID
}
if aux.Timestamp != nil {
e.timestamp = *aux.Timestamp
}
if aux.ActivityType != nil {
e.activityType = *aux.ActivityType
}
if aux.ActivityStatus != nil {
e.activityStatus = *aux.ActivityStatus
}
e.amountOut = aux.AmountOut
e.amountIn = aux.AmountIn
e.tokenOut = aux.TokenOut
@ -174,6 +195,42 @@ func (e *Entry) PayloadType() PayloadType {
return e.payloadType
}
func (e *Entry) isNFT() bool {
tt := e.transferType
return tt != nil && (*tt == TransferTypeErc721 || *tt == TransferTypeErc1155) && ((e.tokenIn != nil && e.tokenIn.TokenID != nil) || (e.tokenOut != nil && e.tokenOut.TokenID != nil))
}
// TODO - #11952: use only one of (big.Int, bigint.BigInt and hexutil.Big)
func tokenIDToWalletBigInt(tokenID *hexutil.Big) *bigint.BigInt {
if tokenID == nil {
return nil
}
bi := new(big.Int).Set((*big.Int)(tokenID))
return &bigint.BigInt{Int: bi}
}
func (e *Entry) anyIdentity() *thirdparty.CollectibleUniqueID {
if e.tokenIn != nil {
return &thirdparty.CollectibleUniqueID{
ContractID: thirdparty.ContractID{
ChainID: e.tokenIn.ChainID,
Address: e.tokenIn.Address,
},
TokenID: tokenIDToWalletBigInt(e.tokenIn.TokenID),
}
} else if e.tokenOut != nil {
return &thirdparty.CollectibleUniqueID{
ContractID: thirdparty.ContractID{
ChainID: e.tokenOut.ChainID,
Address: e.tokenOut.Address,
},
TokenID: tokenIDToWalletBigInt(e.tokenOut.TokenID),
}
}
return nil
}
func multiTransactionTypeToActivityType(mtType transfer.MultiTransactionType) Type {
if mtType == transfer.MultiTransactionSend {
return SendAT
@ -749,13 +806,14 @@ func getActivityEntries(ctx context.Context, deps FilterDependencies, addresses
activityStatus := Status(aggregatedStatus)
var outChainID, inChainID *common.ChainID
var entry Entry
var tokenID TokenID
var tokenID *hexutil.Big
if len(dbTokenID) > 0 {
t := new(big.Int).SetBytes(dbTokenID)
tokenID = (*hexutil.Big)(t)
tokenID = (*hexutil.Big)(new(big.Int).SetBytes(dbTokenID))
}
if transferHash != nil && chainID.Valid {
// Process `transfers` row
// Extract activity type: SendAT/ReceiveAT
activityType, _ := getActivityType(dbTrType)
@ -795,6 +853,8 @@ func getActivityEntries(ctx context.Context, deps FilterDependencies, addresses
entry.amountOut = outAmount
entry.amountIn = inAmount
} else if pendingHash != nil && chainID.Valid {
// Process `pending_transactions` row
// Extract activity type: SendAT/ReceiveAT
activityType, _ := getActivityType(dbTrType)
@ -822,6 +882,8 @@ func getActivityEntries(ctx context.Context, deps FilterDependencies, addresses
entry.amountIn = inAmount
} else if multiTxID.Valid {
// Process `multi_transactions` row
mtInAmount, mtOutAmount := getMtInAndOutAmounts(dbMtFromAmount, dbMtToAmount)
// Extract activity type: SendAT/SwapAT/BridgeAT
@ -947,6 +1009,10 @@ func updateKeypairsAccountsTable(accountsDb *accounts.Database, db *sql.DB) erro
return err
}
// TODO: remove dependency on accounts table by removing"all accounts filter" optimization; see #11980
if accountsDb == nil {
return nil
}
addresses, err := accountsDb.GetWalletAddresses()
if err != nil {
log.Error("failed to get wallet addresses", "err", err)
@ -977,14 +1043,15 @@ func updateKeypairsAccountsTable(accountsDb *accounts.Database, db *sql.DB) erro
return nil
}
// lookupAndFillInTokens ignores NFTs
func lookupAndFillInTokens(deps FilterDependencies, tokenOut *Token, tokenIn *Token) (symbolOut *string, symbolIn *string) {
if tokenOut != nil {
if tokenOut != nil && tokenOut.TokenID == nil {
symbol := deps.tokenSymbol(*tokenOut)
if len(symbol) > 0 {
symbolOut = common.NewAndSet(symbol)
}
}
if tokenIn != nil {
if tokenIn != nil && tokenIn.TokenID == nil {
symbol := deps.tokenSymbol(*tokenIn)
if len(symbol) > 0 {
symbolIn = common.NewAndSet(symbol)

View File

@ -55,15 +55,13 @@ const (
Erc1155
)
type TokenID *hexutil.Big
// Token supports all tokens. Some fields might be optional, depending on the TokenType
type Token struct {
TokenType TokenType `json:"tokenType"`
// ChainID is used for TokenType.Native only to lookup the symbol, all chains will be included in the token filter
ChainID common.ChainID `json:"chainId"`
Address eth.Address `json:"address,omitempty"`
TokenID TokenID `json:"tokenId,omitempty"`
TokenID *hexutil.Big `json:"tokenId,omitempty"`
}
func allTokensFilter() []Token {

View File

@ -5,6 +5,7 @@ import (
"database/sql"
"encoding/json"
"errors"
"strconv"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/event"
@ -12,7 +13,9 @@ import (
"github.com/status-im/status-go/multiaccounts/accounts"
"github.com/status-im/status-go/services/wallet/async"
"github.com/status-im/status-go/services/wallet/collectibles"
w_common "github.com/status-im/status-go/services/wallet/common"
"github.com/status-im/status-go/services/wallet/thirdparty"
"github.com/status-im/status-go/services/wallet/token"
"github.com/status-im/status-go/services/wallet/walletevent"
)
@ -20,6 +23,7 @@ import (
const (
// FilterResponse json is sent as a message in the EventActivityFilteringDone event
EventActivityFilteringDone walletevent.EventType = "wallet-activity-filtering-done"
EventActivityFilteringUpdate walletevent.EventType = "wallet-activity-filtering-entries-updated"
EventActivityGetRecipientsDone walletevent.EventType = "wallet-activity-get-recipients-result"
EventActivityGetOldestTimestampDone walletevent.EventType = "wallet-activity-get-oldest-timestamp-result"
)
@ -42,17 +46,19 @@ var (
type Service struct {
db *sql.DB
accountsDB *accounts.Database
tokenManager *token.Manager
tokenManager token.ManagerInterface
collectibles collectibles.ManagerInterface
eventFeed *event.Feed
scheduler *async.MultiClientScheduler
}
func NewService(db *sql.DB, tokenManager *token.Manager, eventFeed *event.Feed, accountsDb *accounts.Database) *Service {
func NewService(db *sql.DB, tokenManager token.ManagerInterface, collectibles collectibles.ManagerInterface, eventFeed *event.Feed, accountsDb *accounts.Database) *Service {
return &Service{
db: db,
accountsDB: accountsDb,
tokenManager: tokenManager,
collectibles: collectibles,
eventFeed: eventFeed,
scheduler: async.NewMultiClientScheduler(),
}
@ -98,6 +104,11 @@ func (s *Service) FilterActivityAsync(requestID int32, addresses []common.Addres
}
s.sendResponseEvent(&requestID, EventActivityFilteringDone, res, err)
// Report details post-response to ensure updates have a match
if res.Activities != nil {
go s.lazyLoadDetails(requestID, res.Activities)
}
})
}
@ -109,6 +120,63 @@ func (s *Service) GetTxDetails(ctx context.Context, id string) (*EntryDetails, e
return getTxDetails(ctx, s.db, id)
}
// lazyLoadDetails check if any of the entries have details that are not loaded then fetch and emit result
func (s *Service) lazyLoadDetails(requestID int32, entries []Entry) {
res := make([]*EntryData, 0)
var err error
ids := make([]thirdparty.CollectibleUniqueID, 0)
entriesForIds := make([]*Entry, 0)
for i := range entries {
if !entries[i].isNFT() {
continue
}
id := entries[i].anyIdentity()
if id == nil {
continue
}
ids = append(ids, *id)
entriesForIds = append(entriesForIds, &entries[i])
}
if len(ids) == 0 {
return
}
log.Debug("wallet.activity.Service lazyLoadDetails", "requestID", requestID, "entries.len", len(entries), "ids.len", len(ids))
colData, err := s.collectibles.FetchAssetsByCollectibleUniqueID(ids)
if err != nil {
log.Error("Error fetching collectible details", "error", err)
return
}
for _, col := range colData {
data := &EntryData{
NftName: w_common.NewAndSet(col.CollectibleData.Name),
NftURL: w_common.NewAndSet(col.CollectibleData.ImageURL),
}
for i := range ids {
if col.CollectibleData.ID.Same(&ids[i]) {
if entriesForIds[i].payloadType == MultiTransactionPT {
data.ID = w_common.NewAndSet(entriesForIds[i].id)
} else {
data.Transaction = entriesForIds[i].transaction
}
data.PayloadType = entriesForIds[i].payloadType
}
}
res = append(res, data)
}
if len(res) > 0 {
s.sendResponseEvent(&requestID, EventActivityFilteringUpdate, res, err)
}
}
type GetRecipientsResponse struct {
Addresses []common.Address `json:"addresses"`
Offset int `json:"offset"`
@ -213,7 +281,11 @@ func (s *Service) sendResponseEvent(requestID *int32, eventType walletevent.Even
err = resErr
}
log.Debug("wallet.api.activity.Service RESPONSE", "requestID", requestID, "eventType", eventType, "error", err, "payload.len", len(payload))
requestIDStr := "nil"
if requestID != nil {
requestIDStr = strconv.Itoa(int(*requestID))
}
log.Debug("wallet.api.activity.Service RESPONSE", "requestID", requestIDStr, "eventType", eventType, "error", err, "payload.len", len(payload))
event := walletevent.Event{
Type: eventType,

View File

@ -0,0 +1,236 @@
package activity
import (
"database/sql"
"encoding/json"
"math/big"
"testing"
"time"
eth "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/event"
"github.com/status-im/status-go/services/wallet/bigint"
"github.com/status-im/status-go/services/wallet/common"
"github.com/status-im/status-go/services/wallet/thirdparty"
"github.com/status-im/status-go/services/wallet/token"
"github.com/status-im/status-go/services/wallet/transfer"
"github.com/status-im/status-go/services/wallet/walletevent"
"github.com/status-im/status-go/t/helpers"
"github.com/status-im/status-go/walletdatabase"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)
// mockCollectiblesManager implements the collectibles.ManagerInterface
type mockCollectiblesManager struct {
mock.Mock
}
func (m *mockCollectiblesManager) FetchAssetsByCollectibleUniqueID(uniqueIDs []thirdparty.CollectibleUniqueID) ([]thirdparty.FullCollectibleData, error) {
args := m.Called(uniqueIDs)
res := args.Get(0)
if res == nil {
return nil, args.Error(1)
}
return res.([]thirdparty.FullCollectibleData), args.Error(1)
}
// mockTokenManager implements the token.ManagerInterface
type mockTokenManager struct {
mock.Mock
}
func (m *mockTokenManager) LookupTokenIdentity(chainID uint64, address eth.Address, native bool) *token.Token {
args := m.Called(chainID, address, native)
res := args.Get(0)
if res == nil {
return nil
}
return res.(*token.Token)
}
func (m *mockTokenManager) LookupToken(chainID *uint64, tokenSymbol string) (tkn *token.Token, isNative bool) {
args := m.Called(chainID, tokenSymbol)
return args.Get(0).(*token.Token), args.Bool(1)
}
func setupTestService(tb testing.TB) (service *Service, eventFeed *event.Feed, tokenMock *mockTokenManager, collectiblesMock *mockCollectiblesManager, close func()) {
db, err := helpers.SetupTestMemorySQLDB(walletdatabase.DbInitializer{})
require.NoError(tb, err)
eventFeed = new(event.Feed)
tokenMock = &mockTokenManager{}
collectiblesMock = &mockCollectiblesManager{}
service = NewService(db, tokenMock, collectiblesMock, eventFeed, nil)
return service, eventFeed, tokenMock, collectiblesMock, func() {
require.NoError(tb, db.Close())
}
}
type arg struct {
chainID common.ChainID
tokenAddressStr string
tokenIDStr string
tokenID *big.Int
tokenAddress *eth.Address
}
// insertStubTransfersWithCollectibles will insert nil if tokenIDStr is empty
func insertStubTransfersWithCollectibles(t *testing.T, db *sql.DB, args []arg) {
trs, _, _ := transfer.GenerateTestTransfers(t, db, 0, len(args))
for i := range args {
trs[i].ChainID = args[i].chainID
if args[i].tokenIDStr == "" {
args[i].tokenID = nil
} else {
args[i].tokenID = new(big.Int)
args[i].tokenID.SetString(args[i].tokenIDStr, 0)
}
args[i].tokenAddress = new(eth.Address)
*args[i].tokenAddress = eth.HexToAddress(args[i].tokenAddressStr)
transfer.InsertTestTransferWithOptions(t, db, trs[i].To, &trs[i], &transfer.TestTransferOptions{
TokenAddress: *args[i].tokenAddress,
TokenID: args[i].tokenID,
})
}
}
func TestService_UpdateCollectibleInfo(t *testing.T) {
s, e, tM, c, close := setupTestService(t)
defer close()
args := []arg{
{5, "0xA2838FDA19EB6EED3F8B9EFF411D4CD7D2DE0313", "0x0D", nil, nil},
{5, "0xA2838FDA19EB6EED3F8B9EFF411D4CD7D2DE0313", "0x762AD3E4934E687F8701F24C7274E5209213FD6208FF952ACEB325D028866949", nil, nil},
{5, "0x3d6afaa395c31fcd391fe3d562e75fe9e8ec7e6a", "", nil, nil},
{5, "0xA2838FDA19EB6EED3F8B9EFF411D4CD7D2DE0313", "0x0F", nil, nil},
}
insertStubTransfersWithCollectibles(t, s.db, args)
ch := make(chan walletevent.Event)
sub := e.Subscribe(ch)
// Expect one call for the fungible token
tM.On("LookupTokenIdentity", uint64(5), eth.HexToAddress("0x3d6afaa395c31fcd391fe3d562e75fe9e8ec7e6a"), false).Return(
&token.Token{
ChainID: 5,
Address: eth.HexToAddress("0x3d6afaa395c31fcd391fe3d562e75fe9e8ec7e6a"),
Symbol: "STT",
}, false,
).Once()
c.On("FetchAssetsByCollectibleUniqueID", []thirdparty.CollectibleUniqueID{
{
ContractID: thirdparty.ContractID{
ChainID: args[3].chainID,
Address: *args[3].tokenAddress},
TokenID: &bigint.BigInt{Int: args[3].tokenID},
}, {
ContractID: thirdparty.ContractID{
ChainID: args[1].chainID,
Address: *args[1].tokenAddress},
TokenID: &bigint.BigInt{Int: args[1].tokenID},
},
}).Return([]thirdparty.FullCollectibleData{
{
CollectibleData: thirdparty.CollectibleData{
Name: "Test 2",
ImageURL: "test://url/2"},
CollectionData: nil,
}, {
CollectibleData: thirdparty.CollectibleData{
Name: "Test 1",
ImageURL: "test://url/1"},
CollectionData: nil,
},
}, nil).Once()
s.FilterActivityAsync(0, allAddressesFilter(), allNetworksFilter(), Filter{}, 0, 3)
filterResponseCount := 0
var updates []EntryData
for i := 0; i < 2; i++ {
select {
case res := <-ch:
switch res.Type {
case EventActivityFilteringDone:
var payload FilterResponse
err := json.Unmarshal([]byte(res.Message), &payload)
require.NoError(t, err)
require.Equal(t, ErrorCodeSuccess, payload.ErrorCode)
require.Equal(t, 3, len(payload.Activities))
filterResponseCount++
case EventActivityFilteringUpdate:
err := json.Unmarshal([]byte(res.Message), &updates)
require.NoError(t, err)
}
case <-time.NewTimer(1 * time.Second).C:
require.Fail(t, "timeout while waiting for event")
}
}
require.Equal(t, 1, filterResponseCount)
require.Equal(t, 2, len(updates))
require.Equal(t, "Test 2", *updates[0].NftName)
require.Equal(t, "test://url/2", *updates[0].NftURL)
require.Equal(t, "Test 1", *updates[1].NftName)
require.Equal(t, "test://url/1", *updates[1].NftURL)
sub.Unsubscribe()
}
func TestService_UpdateCollectibleInfo_Error(t *testing.T) {
s, e, _, c, close := setupTestService(t)
defer close()
args := []arg{
{5, "0xA2838FDA19EB6EED3F8B9EFF411D4CD7D2DE0313", "0x762AD3E4934E687F8701F24C7274E5209213FD6208FF952ACEB325D028866949", nil, nil},
{5, "0xA2838FDA19EB6EED3F8B9EFF411D4CD7D2DE0313", "0x0D", nil, nil},
}
insertStubTransfersWithCollectibles(t, s.db, args)
ch := make(chan walletevent.Event)
sub := e.Subscribe(ch)
c.On("FetchAssetsByCollectibleUniqueID", mock.Anything).Return(nil, thirdparty.ErrChainIDNotSupported).Once()
s.FilterActivityAsync(0, allAddressesFilter(), allNetworksFilter(), Filter{}, 0, 5)
filterResponseCount := 0
updatesCount := 0
select {
case res := <-ch:
switch res.Type {
case EventActivityFilteringDone:
var payload FilterResponse
err := json.Unmarshal([]byte(res.Message), &payload)
require.NoError(t, err)
require.Equal(t, ErrorCodeSuccess, payload.ErrorCode)
require.Equal(t, 2, len(payload.Activities))
filterResponseCount++
case EventActivityFilteringUpdate:
updatesCount++
}
case <-time.NewTimer(100 * time.Millisecond).C:
}
select {
case res := <-ch:
switch res.Type {
case EventActivityFilteringDone:
filterResponseCount++
case EventActivityFilteringUpdate:
updatesCount++
}
case <-time.NewTimer(100 * time.Microsecond).C:
}
require.Equal(t, 1, filterResponseCount)
require.Equal(t, 0, updatesCount)
sub.Unsubscribe()
}

View File

@ -150,8 +150,8 @@ func (s *Scheduler) runTask(tc *taskContext, taskFn taskFunction, resFn func(int
}
// finishedTask is the only one that can remove a task from the queue
// if the current running task is
func (s *Scheduler) finishedTask(finishedRes interface{}, finishedTask *taskContext, finishedResFn resultFunction, finishedErr error) {
// if the current running task completed (doNotDeleteCurrentTask is true)
func (s *Scheduler) finishedTask(finishedRes interface{}, doneTask *taskContext, finishedResFn resultFunction, finishedErr error) {
s.queueMutex.Lock()
// We always have a running task
@ -175,7 +175,7 @@ func (s *Scheduler) finishedTask(finishedRes interface{}, finishedTask *taskCont
s.queueMutex.Unlock()
// Report result
finishedResFn(finishedRes, finishedTask.taskType, finishedErr)
finishedResFn(finishedRes, doneTask.taskType, finishedErr)
}
func (s *Scheduler) Stop() {

View File

@ -358,3 +358,37 @@ func TestScheduler_Enqueue_ValidateOrder(t *testing.T) {
require.Equal(t, 2, resChanCount[testTask2], "expected tow task call for type: %d had %d", 2, taskSuccessCount[testTask2])
require.Equal(t, 1, resChanCount[testTask3], "expected one task call for type: %d had %d", 3, taskSuccessCount[testTask3])
}
func TestScheduler_Enqueue_InResult(t *testing.T) {
s := NewScheduler()
callChan := make(chan int, 6)
s.Enqueue(TaskType{ID: 1, Policy: ReplacementPolicyCancelOld},
func(ctx context.Context) (interface{}, error) {
callChan <- 0
return nil, nil
}, func(res interface{}, taskType TaskType, err error) {
callChan <- 1
s.Enqueue(TaskType{1, ReplacementPolicyCancelOld}, func(ctx context.Context) (interface{}, error) {
callChan <- 2
return nil, nil
}, func(res interface{}, taskType TaskType, err error) {
callChan <- 3
s.Enqueue(TaskType{1, ReplacementPolicyCancelOld}, func(ctx context.Context) (interface{}, error) {
callChan <- 4
return nil, nil
}, func(res interface{}, taskType TaskType, err error) {
callChan <- 5
})
})
},
)
for i := 0; i < 6; i++ {
select {
case res := <-callChan:
require.Equal(t, i, res)
case <-time.After(1 * time.Second):
require.Fail(t, "test not completed in time")
}
}
}

View File

@ -38,6 +38,10 @@ var (
ErrNoProvidersAvailableForChainID = errors.New("no providers available for chainID")
)
type ManagerInterface interface {
FetchAssetsByCollectibleUniqueID(uniqueIDs []thirdparty.CollectibleUniqueID) ([]thirdparty.FullCollectibleData, error)
}
type Manager struct {
rpcClient *rpc.Client
contractOwnershipProviders []thirdparty.CollectibleContractOwnershipProvider

View File

@ -102,7 +102,6 @@ func NewService(
reader := NewReader(rpcClient, tokenManager, marketManager, accountsDB, NewPersistence(db), feed)
history := history.NewService(db, accountsDB, feed, rpcClient, tokenManager, marketManager)
currency := currency.NewService(db, feed, tokenManager, marketManager)
activity := activity.NewService(db, tokenManager, feed, accountsDB)
openseaHTTPClient := opensea.NewHTTPClient()
openseaClient := opensea.NewClient(config.WalletConfig.OpenseaAPIKey, openseaHTTPClient, feed)
@ -138,6 +137,9 @@ func NewService(
collectiblesManager := collectibles.NewManager(db, rpcClient, contractOwnershipProviders, accountOwnershipProviders, collectibleDataProviders, collectionDataProviders, openseaClient)
collectibles := collectibles.NewService(db, feed, accountsDB, accountFeed, rpcClient.NetworkManager, collectiblesManager)
activity := activity.NewService(db, tokenManager, collectiblesManager, feed, accountsDB)
return &Service{
db: db,
accountsDB: accountsDB,

View File

@ -39,6 +39,10 @@ func (k *CollectibleUniqueID) HashKey() string {
return fmt.Sprintf("%s+%s", k.ContractID.HashKey(), k.TokenID.String())
}
func (k *CollectibleUniqueID) Same(other *CollectibleUniqueID) bool {
return k.ContractID.ChainID == other.ContractID.ChainID && k.ContractID.Address == other.ContractID.Address && k.TokenID.Cmp(other.TokenID.Int) == 0
}
func GroupCollectibleUIDsByChainID(uids []CollectibleUniqueID) map[w_common.ChainID][]CollectibleUniqueID {
ret := make(map[w_common.ChainID][]CollectibleUniqueID)

View File

@ -45,6 +45,11 @@ func (t *Token) IsNative() bool {
return t.Address == nativeChainAddress
}
type ManagerInterface interface {
LookupTokenIdentity(chainID uint64, address common.Address, native bool) *Token
LookupToken(chainID *uint64, tokenSymbol string) (token *Token, isNative bool)
}
// Manager is used for accessing token store. It changes the token store based on overridden tokens
type Manager struct {
db *sql.DB

View File

@ -218,6 +218,7 @@ func InsertTestTransfer(tb testing.TB, db *sql.DB, address eth_common.Address, t
type TestTransferOptions struct {
TokenAddress eth_common.Address
TokenID *big.Int
NullifyAddresses []eth_common.Address
}
@ -255,7 +256,11 @@ func InsertTestTransferWithOptions(tb testing.TB, db *sql.DB, address eth_common
tokenType := "eth"
if (opt.TokenAddress != eth_common.Address{}) {
tokenType = "erc20"
if opt.TokenID == nil {
tokenType = "erc20"
} else {
tokenType = "erc721"
}
}
// Workaround to simulate writing of NULL values for addresses
@ -288,6 +293,7 @@ func InsertTestTransferWithOptions(tb testing.TB, db *sql.DB, address eth_common
txNonce: &tr.Nonce,
tokenAddress: &opt.TokenAddress,
contractAddress: &tr.Contract,
tokenID: opt.TokenID,
}
err = updateOrInsertTransfersDBFields(tx, []transferDBFields{transfer})
require.NoError(tb, err)