package collectibles import ( "context" "database/sql" "encoding/json" "errors" "math/big" "time" "go.uber.org/zap" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/event" "github.com/status-im/status-go/logutils" "github.com/status-im/status-go/multiaccounts/accounts" "github.com/status-im/status-go/rpc/network" "github.com/status-im/status-go/services/wallet/async" "github.com/status-im/status-go/services/wallet/bigint" walletCommon "github.com/status-im/status-go/services/wallet/common" "github.com/status-im/status-go/services/wallet/community" "github.com/status-im/status-go/services/wallet/thirdparty" "github.com/status-im/status-go/services/wallet/transfer" "github.com/status-im/status-go/services/wallet/walletevent" ) // These events are used to notify the UI of state changes const ( EventCollectiblesOwnershipUpdateStarted walletevent.EventType = "wallet-collectibles-ownership-update-started" EventCollectiblesOwnershipUpdatePartial walletevent.EventType = "wallet-collectibles-ownership-update-partial" EventCollectiblesOwnershipUpdateFinished walletevent.EventType = "wallet-collectibles-ownership-update-finished" EventCollectiblesOwnershipUpdateFinishedWithError walletevent.EventType = "wallet-collectibles-ownership-update-finished-with-error" EventCommunityCollectiblesReceived walletevent.EventType = "wallet-collectibles-community-collectibles-received" EventCollectiblesDataUpdated walletevent.EventType = "wallet-collectibles-data-updated" EventOwnedCollectiblesFilteringDone walletevent.EventType = "wallet-owned-collectibles-filtering-done" EventGetCollectiblesDetailsDone walletevent.EventType = "wallet-get-collectibles-details-done" EventGetCollectionSocialsDone walletevent.EventType = "wallet-get-collection-socials-done" ) type OwnershipUpdateMessage struct { Added []thirdparty.CollectibleUniqueID `json:"added"` Updated []thirdparty.CollectibleUniqueID `json:"updated"` Removed []thirdparty.CollectibleUniqueID `json:"removed"` } type CollectionSocialsMessage struct { ID thirdparty.ContractID `json:"id"` Socials *thirdparty.CollectionSocials `json:"socials"` } type CollectibleDataType byte const ( CollectibleDataTypeUniqueID CollectibleDataType = iota CollectibleDataTypeHeader CollectibleDataTypeDetails CollectibleDataTypeCommunityHeader ) type FetchType byte const ( FetchTypeNeverFetch FetchType = iota FetchTypeAlwaysFetch FetchTypeFetchIfNotCached FetchTypeFetchIfCacheOld ) type TxHashData struct { Hash common.Hash TxID common.Hash } type FetchCriteria struct { FetchType FetchType `json:"fetch_type"` MaxCacheAgeSeconds int64 `json:"max_cache_age_seconds"` } var ( filterOwnedCollectiblesTask = async.TaskType{ ID: 1, Policy: async.ReplacementPolicyCancelOld, } getCollectiblesDataTask = async.TaskType{ ID: 2, Policy: async.ReplacementPolicyCancelOld, } ) type Service struct { manager *Manager controller *Controller db *sql.DB ownershipDB *OwnershipDB transferDB *transfer.Database communityManager *community.Manager walletFeed *event.Feed scheduler *async.MultiClientScheduler group *async.Group } func NewService( db *sql.DB, walletFeed *event.Feed, accountsDB *accounts.Database, accountsFeed *event.Feed, settingsFeed *event.Feed, communityManager *community.Manager, networkManager *network.Manager, manager *Manager) *Service { s := &Service{ manager: manager, controller: NewController(db, walletFeed, accountsDB, accountsFeed, settingsFeed, networkManager, manager), db: db, ownershipDB: NewOwnershipDB(db), transferDB: transfer.NewDB(db), communityManager: communityManager, walletFeed: walletFeed, scheduler: async.NewMultiClientScheduler(), group: async.NewGroup(context.Background()), } s.controller.SetOwnedCollectiblesChangeCb(s.onOwnedCollectiblesChange) s.controller.SetCollectiblesTransferCb(s.onCollectiblesTransfer) return s } type ErrorCode = int const ( ErrorCodeSuccess ErrorCode = iota + 1 ErrorCodeTaskCanceled ErrorCodeFailed ) type OwnershipStatus struct { State OwnershipState `json:"state"` Timestamp int64 `json:"timestamp"` } type OwnershipStatusPerChainID = map[walletCommon.ChainID]OwnershipStatus type OwnershipStatusPerAddressAndChainID = map[common.Address]OwnershipStatusPerChainID type GetOwnedCollectiblesResponse struct { Collectibles []Collectible `json:"collectibles"` Offset int `json:"offset"` // Used to indicate that there might be more collectibles that were not returned // based on a simple heuristic HasMore bool `json:"hasMore"` OwnershipStatus OwnershipStatusPerAddressAndChainID `json:"ownershipStatus"` ErrorCode ErrorCode `json:"errorCode"` } type GetCollectiblesByUniqueIDResponse struct { Collectibles []Collectible `json:"collectibles"` ErrorCode ErrorCode `json:"errorCode"` } type GetOwnedCollectiblesReturnType struct { collectibles []Collectible hasMore bool ownershipStatus OwnershipStatusPerAddressAndChainID } type GetCollectiblesByUniqueIDReturnType struct { collectibles []Collectible } func (s *Service) GetOwnedCollectibles( ctx context.Context, chainIDs []walletCommon.ChainID, addresses []common.Address, filter Filter, offset int, limit int, dataType CollectibleDataType, fetchCriteria FetchCriteria) (*GetOwnedCollectiblesReturnType, error) { err := s.fetchOwnedCollectiblesIfNeeded(ctx, chainIDs, addresses, fetchCriteria) if err != nil { return nil, err } ids, hasMore, err := s.FilterOwnedCollectibles(chainIDs, addresses, filter, offset, limit) if err != nil { return nil, err } collectibles, err := s.collectibleIDsToDataType(ctx, ids, dataType) if err != nil { return nil, err } ownershipStatus, err := s.GetOwnershipStatus(chainIDs, addresses) if err != nil { return nil, err } return &GetOwnedCollectiblesReturnType{ collectibles: collectibles, hasMore: hasMore, ownershipStatus: ownershipStatus, }, err } func (s *Service) needsToFetch(chainID walletCommon.ChainID, address common.Address, fetchCriteria FetchCriteria) (bool, error) { mustFetch := false switch fetchCriteria.FetchType { case FetchTypeAlwaysFetch: mustFetch = true case FetchTypeNeverFetch: mustFetch = false case FetchTypeFetchIfNotCached, FetchTypeFetchIfCacheOld: timestamp, err := s.ownershipDB.GetOwnershipUpdateTimestamp(address, chainID) if err != nil { return false, err } if timestamp == InvalidTimestamp || (fetchCriteria.FetchType == FetchTypeFetchIfCacheOld && timestamp+fetchCriteria.MaxCacheAgeSeconds < time.Now().Unix()) { mustFetch = true } } return mustFetch, nil } func (s *Service) fetchOwnedCollectiblesIfNeeded(ctx context.Context, chainIDs []walletCommon.ChainID, addresses []common.Address, fetchCriteria FetchCriteria) error { if fetchCriteria.FetchType == FetchTypeNeverFetch { return nil } group := async.NewGroup(ctx) for _, address := range addresses { for _, chainID := range chainIDs { mustFetch, err := s.needsToFetch(chainID, address, fetchCriteria) if err != nil { return err } if mustFetch { command := newLoadOwnedCollectiblesCommand(s.manager, s.ownershipDB, s.walletFeed, chainID, address, nil) group.Add(command.Command()) } } } select { case <-ctx.Done(): return ctx.Err() case <-group.WaitAsync(): return nil } } // GetOwnedCollectiblesAsync allows only one filter task to run at a time // and it cancels the current one if a new one is started // All calls will trigger an EventOwnedCollectiblesFilteringDone event with the result of the filtering func (s *Service) GetOwnedCollectiblesAsync( requestID int32, chainIDs []walletCommon.ChainID, addresses []common.Address, filter Filter, offset int, limit int, dataType CollectibleDataType, fetchCriteria FetchCriteria) { s.scheduler.Enqueue(requestID, filterOwnedCollectiblesTask, func(ctx context.Context) (interface{}, error) { return s.GetOwnedCollectibles(ctx, chainIDs, addresses, filter, offset, limit, dataType, fetchCriteria) }, func(result interface{}, taskType async.TaskType, err error) { res := GetOwnedCollectiblesResponse{ ErrorCode: ErrorCodeFailed, } if errors.Is(err, context.Canceled) || errors.Is(err, async.ErrTaskOverwritten) { res.ErrorCode = ErrorCodeTaskCanceled } else if err == nil { fnRet := result.(*GetOwnedCollectiblesReturnType) if err == nil { res.Collectibles = fnRet.collectibles res.Offset = offset res.HasMore = fnRet.hasMore res.OwnershipStatus = fnRet.ownershipStatus res.ErrorCode = ErrorCodeSuccess } } s.sendResponseEvent(&requestID, EventOwnedCollectiblesFilteringDone, res, err) }) } func (s *Service) GetCollectiblesByUniqueID( ctx context.Context, uniqueIDs []thirdparty.CollectibleUniqueID, dataType CollectibleDataType) (*GetCollectiblesByUniqueIDReturnType, error) { collectibles, err := s.collectibleIDsToDataType(ctx, uniqueIDs, dataType) if err != nil { return nil, err } return &GetCollectiblesByUniqueIDReturnType{ collectibles: collectibles, }, err } func (s *Service) GetCollectiblesByUniqueIDAsync( requestID int32, uniqueIDs []thirdparty.CollectibleUniqueID, dataType CollectibleDataType) { s.scheduler.Enqueue(requestID, getCollectiblesDataTask, func(ctx context.Context) (interface{}, error) { return s.GetCollectiblesByUniqueID(ctx, uniqueIDs, dataType) }, func(result interface{}, taskType async.TaskType, err error) { res := GetCollectiblesByUniqueIDResponse{ ErrorCode: ErrorCodeFailed, } if errors.Is(err, context.Canceled) || errors.Is(err, async.ErrTaskOverwritten) { res.ErrorCode = ErrorCodeTaskCanceled } else if err == nil { fnRet := result.(*GetCollectiblesByUniqueIDReturnType) if err == nil { res.Collectibles = fnRet.collectibles res.ErrorCode = ErrorCodeSuccess } } s.sendResponseEvent(&requestID, EventGetCollectiblesDetailsDone, res, err) }) } func (s *Service) RefetchOwnedCollectibles() { s.controller.RefetchOwnedCollectibles() } func (s *Service) Start() { s.controller.Start() } func (s *Service) Stop() { s.controller.Stop() s.scheduler.Stop() } func (s *Service) sendResponseEvent(requestID *int32, eventType walletevent.EventType, payloadObj interface{}, resErr error) { payload, err := json.Marshal(payloadObj) if err != nil { logutils.ZapLogger().Error("Error marshaling", zap.NamedError("response", err), zap.NamedError("result", resErr)) } else { err = resErr } logutils.ZapLogger().Debug("wallet.api.collectibles.Service RESPONSE", zap.Any("requestID", requestID), zap.String("eventType", string(eventType)), zap.Int("payload.len", len(payload)), zap.Error(err), ) event := walletevent.Event{ Type: eventType, Message: string(payload), } if requestID != nil { event.RequestID = new(int) *event.RequestID = int(*requestID) } s.walletFeed.Send(event) } func (s *Service) FilterOwnedCollectibles(chainIDs []walletCommon.ChainID, owners []common.Address, filter Filter, offset int, limit int) ([]thirdparty.CollectibleUniqueID, bool, error) { ctx := context.Background() // Request one more than limit, to check if DB has more available ids, err := filterOwnedCollectibles(ctx, s.db, chainIDs, owners, filter, offset, limit+1) if err != nil { return nil, false, err } hasMore := len(ids) > limit if hasMore { ids = ids[:limit] } return ids, hasMore, nil } func (s *Service) GetOwnedCollectible(chainID walletCommon.ChainID, owner common.Address, contractAddress common.Address, tokenID *big.Int) (*thirdparty.CollectibleUniqueID, error) { return s.ownershipDB.GetOwnedCollectible(chainID, owner, contractAddress, tokenID) } func (s *Service) GetOwnershipStatus(chainIDs []walletCommon.ChainID, owners []common.Address) (OwnershipStatusPerAddressAndChainID, error) { ret := make(OwnershipStatusPerAddressAndChainID) for _, address := range owners { ret[address] = make(OwnershipStatusPerChainID) for _, chainID := range chainIDs { timestamp, err := s.ownershipDB.GetOwnershipUpdateTimestamp(address, chainID) if err != nil { return nil, err } ret[address][chainID] = OwnershipStatus{ State: s.controller.GetCommandState(chainID, address), Timestamp: timestamp, } } } return ret, nil } func (s *Service) collectibleIDsToDataType(ctx context.Context, ids []thirdparty.CollectibleUniqueID, dataType CollectibleDataType) ([]Collectible, error) { switch dataType { case CollectibleDataTypeUniqueID: return idsToCollectibles(ids), nil case CollectibleDataTypeHeader, CollectibleDataTypeDetails, CollectibleDataTypeCommunityHeader: collectibles, err := s.manager.FetchAssetsByCollectibleUniqueID(ctx, ids, true) if err != nil { return nil, err } switch dataType { case CollectibleDataTypeHeader: return fullCollectiblesDataToHeaders(collectibles), nil case CollectibleDataTypeDetails: return fullCollectiblesDataToDetails(collectibles), nil case CollectibleDataTypeCommunityHeader: return fullCollectiblesDataToCommunityHeader(collectibles), nil } } return nil, errors.New("unknown data type") } func (s *Service) onOwnedCollectiblesChange(ownedCollectiblesChange OwnedCollectiblesChange) { // Try to find a matching transfer for newly added/updated collectibles switch ownedCollectiblesChange.changeType { case OwnedCollectiblesChangeTypeAdded, OwnedCollectiblesChangeTypeUpdated: // For recently added/updated collectibles, try to find a matching transfer hashMap := s.lookupTransferForCollectibles(ownedCollectiblesChange.ownedCollectibles) s.notifyCommunityCollectiblesReceived(ownedCollectiblesChange.ownedCollectibles, hashMap) } } func (s *Service) onCollectiblesTransfer(account common.Address, chainID walletCommon.ChainID, transfers []transfer.Transfer) { for _, transfer := range transfers { // If Collectible is already in the DB, update transfer ID with the latest detected transfer id := thirdparty.CollectibleUniqueID{ ContractID: thirdparty.ContractID{ ChainID: chainID, Address: transfer.Log.Address, }, TokenID: &bigint.BigInt{Int: transfer.TokenID}, } err := s.manager.SetCollectibleTransferID(account, id, transfer.ID, true) if err != nil { logutils.ZapLogger().Error("Error setting transfer ID for collectible", zap.Error(err)) } } } func (s *Service) lookupTransferForCollectibles(ownedCollectibles OwnedCollectibles) map[thirdparty.CollectibleUniqueID]TxHashData { // There are some limitations to this approach: // - Collectibles ownership and transfers are not in sync and might represent the state at different moments. // - We have no way of knowing if the latest collectible transfer we've detected is actually the latest one, so the timestamp we // use might be older than the real one. // - There might be detected transfers that are temporarily not reflected in the collectibles ownership. // - For ERC721 tokens we should only look for incoming transfers. For ERC1155 tokens we should look for both incoming and outgoing transfers. // We need to get the contract standard for each collectible to know which approach to take. result := make(map[thirdparty.CollectibleUniqueID]TxHashData) for _, id := range ownedCollectibles.ids { transfer, err := s.transferDB.GetLatestCollectibleTransfer(ownedCollectibles.account, id) if err != nil { logutils.ZapLogger().Error("Error fetching latest collectible transfer", zap.Error(err)) continue } if transfer != nil { result[id] = TxHashData{ Hash: transfer.Transaction.Hash(), TxID: transfer.ID, } err = s.manager.SetCollectibleTransferID(ownedCollectibles.account, id, transfer.ID, false) if err != nil { logutils.ZapLogger().Error("Error setting transfer ID for collectible", zap.Error(err)) } } } return result } func (s *Service) notifyCommunityCollectiblesReceived(ownedCollectibles OwnedCollectibles, hashMap map[thirdparty.CollectibleUniqueID]TxHashData) { ctx := context.Background() firstCollectibles, err := s.ownershipDB.GetIsFirstOfCollection(ownedCollectibles.account, ownedCollectibles.ids) if err != nil { return } collectiblesData, err := s.manager.FetchAssetsByCollectibleUniqueID(ctx, ownedCollectibles.ids, false) if err != nil { logutils.ZapLogger().Error("Error fetching collectibles data", zap.Error(err)) return } communityCollectibles := fullCollectiblesDataToCommunityHeader(collectiblesData) if len(communityCollectibles) == 0 { return } type CollectibleGroup struct { contractID thirdparty.ContractID txHash string } groups := make(map[CollectibleGroup]Collectible) for _, localCollectible := range communityCollectibles { // to satisfy gosec: C601 checks collectible := localCollectible txHash := "" for key, value := range hashMap { if key.Same(&collectible.ID) { collectible.LatestTxHash = value.TxID.Hex() txHash = value.Hash.Hex() break } } for id, value := range firstCollectibles { if value && id.Same(&collectible.ID) { collectible.IsFirst = true break } } group := CollectibleGroup{ contractID: collectible.ID.ContractID, txHash: txHash, } _, ok := groups[group] if !ok { collectible.ReceivedAmount = float64(0) } collectible.ReceivedAmount = collectible.ReceivedAmount + 1 groups[group] = collectible } groupedCommunityCollectibles := make([]Collectible, 0, len(groups)) for _, collectible := range groups { groupedCommunityCollectibles = append(groupedCommunityCollectibles, collectible) } encodedMessage, err := json.Marshal(groupedCommunityCollectibles) if err != nil { return } s.walletFeed.Send(walletevent.Event{ Type: EventCommunityCollectiblesReceived, ChainID: uint64(ownedCollectibles.chainID), Accounts: []common.Address{ ownedCollectibles.account, }, Message: string(encodedMessage), }) }