package collectibles import ( "context" "database/sql" "encoding/json" "errors" "math/big" "net/http" "strings" "sync" "time" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" "github.com/status-im/status-go/circuitbreaker" "github.com/status-im/status-go/contracts/community-tokens/collectibles" "github.com/status-im/status-go/contracts/ierc1155" "github.com/status-im/status-go/rpc" "github.com/status-im/status-go/rpc/chain" "github.com/status-im/status-go/server" "github.com/status-im/status-go/services/wallet/async" "github.com/status-im/status-go/services/wallet/bigint" walletCommon "github.com/status-im/status-go/services/wallet/common" "github.com/status-im/status-go/services/wallet/community" "github.com/status-im/status-go/services/wallet/connection" "github.com/status-im/status-go/services/wallet/thirdparty" "github.com/status-im/status-go/services/wallet/walletevent" ) const requestTimeout = 5 * time.Second const signalUpdatedCollectiblesDataPageSize = 10 const EventCollectiblesConnectionStatusChanged walletevent.EventType = "wallet-collectible-status-changed" // ERC721 does not support function "TokenURI" if call // returns error starting with one of these strings var noTokenURIErrorPrefixes = []string{ "execution reverted", "abi: attempting to unmarshall", } var ( ErrAllProvidersFailedForChainID = errors.New("all providers failed for chainID") ErrNoProvidersAvailableForChainID = errors.New("no providers available for chainID") ) type ManagerInterface interface { FetchAssetsByCollectibleUniqueID(ctx context.Context, uniqueIDs []thirdparty.CollectibleUniqueID, asyncFetch bool) ([]thirdparty.FullCollectibleData, error) FetchCollectionSocialsAsync(contractID thirdparty.ContractID) error } type Manager struct { rpcClient rpc.ClientInterface providers thirdparty.CollectibleProviders httpClient *http.Client collectiblesDataDB CollectibleDataStorage collectionsDataDB CollectionDataStorage communityManager *community.Manager ownershipDB *OwnershipDB mediaServer *server.MediaServer statuses *sync.Map statusNotifier *connection.StatusNotifier feed *event.Feed circuitBreaker *circuitbreaker.CircuitBreaker } func NewManager( db *sql.DB, rpcClient rpc.ClientInterface, communityManager *community.Manager, providers thirdparty.CollectibleProviders, mediaServer *server.MediaServer, feed *event.Feed) *Manager { var ownershipDB *OwnershipDB var statuses *sync.Map var statusNotifier *connection.StatusNotifier if db != nil { ownershipDB = NewOwnershipDB(db) statuses = initStatuses(ownershipDB) statusNotifier = createStatusNotifier(statuses, feed) } cb := circuitbreaker.NewCircuitBreaker(circuitbreaker.Config{ Timeout: 10000, MaxConcurrentRequests: 100, RequestVolumeThreshold: 25, SleepWindow: 300000, ErrorPercentThreshold: 25, }) return &Manager{ rpcClient: rpcClient, providers: providers, httpClient: &http.Client{ Timeout: requestTimeout, }, collectiblesDataDB: NewCollectibleDataDB(db), collectionsDataDB: NewCollectionDataDB(db), communityManager: communityManager, ownershipDB: ownershipDB, mediaServer: mediaServer, statuses: statuses, statusNotifier: statusNotifier, feed: feed, circuitBreaker: cb, } } func mapToList[K comparable, T any](m map[K]T) []T { list := make([]T, 0, len(m)) for _, v := range m { list = append(list, v) } return list } func (o *Manager) doContentTypeRequest(ctx context.Context, url string) (string, error) { req, err := http.NewRequestWithContext(ctx, http.MethodHead, url, nil) if err != nil { return "", err } resp, err := o.httpClient.Do(req) if err != nil { return "", err } defer func() { if err := resp.Body.Close(); err != nil { log.Error("failed to close head request body", "err", err) } }() return resp.Header.Get("Content-Type"), nil } func (o *Manager) getTokenBalancesByOwnerAddress(collectibles *thirdparty.CollectibleContractOwnership, ownerAddress common.Address) map[common.Address][]thirdparty.TokenBalance { ret := make(map[common.Address][]thirdparty.TokenBalance) for _, nftOwner := range collectibles.Owners { if nftOwner.OwnerAddress == ownerAddress { ret[collectibles.ContractAddress] = nftOwner.TokenBalances break } } return ret } func (o *Manager) FetchCachedBalancesByOwnerAndContractAddress(ctx context.Context, chainID walletCommon.ChainID, ownerAddress common.Address, contractAddresses []common.Address) (thirdparty.TokenBalancesPerContractAddress, error) { ret := make(map[common.Address][]thirdparty.TokenBalance) for _, contractAddress := range contractAddresses { ret[contractAddress] = make([]thirdparty.TokenBalance, 0) } for _, contractAddress := range contractAddresses { ownership, err := o.ownershipDB.FetchCachedCollectibleOwnersByContractAddress(chainID, contractAddress) if err != nil { return nil, err } t := o.getTokenBalancesByOwnerAddress(ownership, ownerAddress) for address, tokenBalances := range t { ret[address] = append(ret[address], tokenBalances...) } } return ret, nil } // Need to combine different providers to support all needed ChainIDs func (o *Manager) FetchBalancesByOwnerAndContractAddress(ctx context.Context, chainID walletCommon.ChainID, ownerAddress common.Address, contractAddresses []common.Address) (thirdparty.TokenBalancesPerContractAddress, error) { ret := make(thirdparty.TokenBalancesPerContractAddress) for _, contractAddress := range contractAddresses { ret[contractAddress] = make([]thirdparty.TokenBalance, 0) } // Try with account ownership providers first assetsContainer, err := o.FetchAllAssetsByOwnerAndContractAddress(ctx, chainID, ownerAddress, contractAddresses, thirdparty.FetchFromStartCursor, thirdparty.FetchNoLimit, thirdparty.FetchFromAnyProvider) if err == ErrNoProvidersAvailableForChainID { // Use contract ownership providers for _, contractAddress := range contractAddresses { ownership, err := o.FetchCollectibleOwnersByContractAddress(ctx, chainID, contractAddress) if err != nil { return nil, err } ret = o.getTokenBalancesByOwnerAddress(ownership, ownerAddress) } } else if err == nil { // Account ownership providers succeeded for _, fullData := range assetsContainer.Items { contractAddress := fullData.CollectibleData.ID.ContractID.Address balance := thirdparty.TokenBalance{ TokenID: fullData.CollectibleData.ID.TokenID, Balance: &bigint.BigInt{Int: big.NewInt(1)}, } ret[contractAddress] = append(ret[contractAddress], balance) } } else { // OpenSea could have provided, but returned error return nil, err } return ret, nil } func (o *Manager) FetchAllAssetsByOwnerAndContractAddress(ctx context.Context, chainID walletCommon.ChainID, owner common.Address, contractAddresses []common.Address, cursor string, limit int, providerID string) (*thirdparty.FullCollectibleDataContainer, error) { defer o.checkConnectionStatus(chainID) cmd := circuitbreaker.NewCommand(ctx, nil) for _, provider := range o.providers.AccountOwnershipProviders { if !provider.IsChainSupported(chainID) { continue } if providerID != thirdparty.FetchFromAnyProvider && providerID != provider.ID() { continue } provider := provider f := circuitbreaker.NewFunctor( func() ([]interface{}, error) { assetContainer, err := provider.FetchAllAssetsByOwnerAndContractAddress(ctx, chainID, owner, contractAddresses, cursor, limit) if err != nil { log.Error("FetchAllAssetsByOwnerAndContractAddress failed for", "provider", provider.ID(), "chainID", chainID, "err", err) } return []interface{}{assetContainer}, err }, getCircuitName(provider, chainID), ) cmd.Add(f) } if cmd.IsEmpty() { return nil, ErrNoProvidersAvailableForChainID } cmdRes := o.circuitBreaker.Execute(cmd) if cmdRes.Error() != nil { log.Error("FetchAllAssetsByOwnerAndContractAddress failed for", "chainID", chainID, "err", cmdRes.Error()) return nil, cmdRes.Error() } assetContainer := cmdRes.Result()[0].(*thirdparty.FullCollectibleDataContainer) _, err := o.processFullCollectibleData(ctx, assetContainer.Items, true) if err != nil { return nil, err } return assetContainer, nil } func (o *Manager) FetchAllAssetsByOwner(ctx context.Context, chainID walletCommon.ChainID, owner common.Address, cursor string, limit int, providerID string) (*thirdparty.FullCollectibleDataContainer, error) { defer o.checkConnectionStatus(chainID) cmd := circuitbreaker.NewCommand(ctx, nil) for _, provider := range o.providers.AccountOwnershipProviders { if !provider.IsChainSupported(chainID) { continue } if providerID != thirdparty.FetchFromAnyProvider && providerID != provider.ID() { continue } provider := provider f := circuitbreaker.NewFunctor( func() ([]interface{}, error) { assetContainer, err := provider.FetchAllAssetsByOwner(ctx, chainID, owner, cursor, limit) if err != nil { log.Error("FetchAllAssetsByOwner failed for", "provider", provider.ID(), "chainID", chainID, "err", err) } return []interface{}{assetContainer}, err }, getCircuitName(provider, chainID), ) cmd.Add(f) } if cmd.IsEmpty() { return nil, ErrNoProvidersAvailableForChainID } cmdRes := o.circuitBreaker.Execute(cmd) if cmdRes.Error() != nil { log.Error("FetchAllAssetsByOwner failed for", "chainID", chainID, "err", cmdRes.Error()) return nil, cmdRes.Error() } assetContainer := cmdRes.Result()[0].(*thirdparty.FullCollectibleDataContainer) _, err := o.processFullCollectibleData(ctx, assetContainer.Items, true) if err != nil { return nil, err } return assetContainer, nil } func (o *Manager) FetchERC1155Balances(ctx context.Context, owner common.Address, chainID walletCommon.ChainID, contractAddress common.Address, tokenIDs []*bigint.BigInt) ([]*bigint.BigInt, error) { if len(tokenIDs) == 0 { return nil, nil } backend, err := o.rpcClient.EthClient(uint64(chainID)) if err != nil { return nil, err } caller, err := ierc1155.NewIerc1155Caller(contractAddress, backend) if err != nil { return nil, err } owners := make([]common.Address, len(tokenIDs)) ids := make([]*big.Int, len(tokenIDs)) for i, tokenID := range tokenIDs { owners[i] = owner ids[i] = tokenID.Int } balances, err := caller.BalanceOfBatch(&bind.CallOpts{ Context: ctx, }, owners, ids) if err != nil { return nil, err } bigIntBalances := make([]*bigint.BigInt, len(balances)) for i, balance := range balances { bigIntBalances[i] = &bigint.BigInt{Int: balance} } return bigIntBalances, err } func (o *Manager) fillMissingBalances(ctx context.Context, owner common.Address, collectibles []*thirdparty.FullCollectibleData) { collectiblesByChainIDAndContractAddress := thirdparty.GroupCollectiblesByChainIDAndContractAddress(collectibles) for chainID, collectiblesByContract := range collectiblesByChainIDAndContractAddress { for contractAddress, contractCollectibles := range collectiblesByContract { collectiblesToFetchPerTokenID := make(map[string]*thirdparty.FullCollectibleData) for _, collectible := range contractCollectibles { if collectible.AccountBalance == nil { switch getContractType(*collectible) { case walletCommon.ContractTypeERC1155: collectiblesToFetchPerTokenID[collectible.CollectibleData.ID.TokenID.String()] = collectible default: // Any other type of collectible is non-fungible, balance is 1 collectible.AccountBalance = &bigint.BigInt{Int: big.NewInt(1)} } } } if len(collectiblesToFetchPerTokenID) == 0 { continue } tokenIDs := make([]*bigint.BigInt, 0, len(collectiblesToFetchPerTokenID)) for _, c := range collectiblesToFetchPerTokenID { tokenIDs = append(tokenIDs, c.CollectibleData.ID.TokenID) } balances, err := o.FetchERC1155Balances(ctx, owner, chainID, contractAddress, tokenIDs) if err != nil { log.Error("FetchERC1155Balances failed", "chainID", chainID, "contractAddress", contractAddress, "err", err) continue } for i := range balances { collectible := collectiblesToFetchPerTokenID[tokenIDs[i].String()] collectible.AccountBalance = balances[i] } } } } func (o *Manager) FetchCollectibleOwnershipByOwner(ctx context.Context, chainID walletCommon.ChainID, owner common.Address, cursor string, limit int, providerID string) (*thirdparty.CollectibleOwnershipContainer, error) { // We don't yet have an API that will return only Ownership data // Use the full Ownership + Metadata endpoint and use the data we need assetContainer, err := o.FetchAllAssetsByOwner(ctx, chainID, owner, cursor, limit, providerID) if err != nil { return nil, err } // Some providers do not give us the balances for ERC1155 tokens, so we need to fetch them separately. collectibles := make([]*thirdparty.FullCollectibleData, 0, len(assetContainer.Items)) for i := range assetContainer.Items { collectibles = append(collectibles, &assetContainer.Items[i]) } o.fillMissingBalances(ctx, owner, collectibles) ret := assetContainer.ToOwnershipContainer() return &ret, nil } // Returns collectible metadata for the given unique IDs. // If asyncFetch is true, empty metadata will be returned for any missing collectibles and an EventCollectiblesDataUpdated will be sent when the data is ready. // If asyncFetch is false, it will wait for all collectibles' metadata to be retrieved before returning. func (o *Manager) FetchAssetsByCollectibleUniqueID(ctx context.Context, uniqueIDs []thirdparty.CollectibleUniqueID, asyncFetch bool) ([]thirdparty.FullCollectibleData, error) { err := o.FetchMissingAssetsByCollectibleUniqueID(ctx, uniqueIDs, asyncFetch) if err != nil { return nil, err } return o.getCacheFullCollectibleData(uniqueIDs) } func (o *Manager) FetchMissingAssetsByCollectibleUniqueID(ctx context.Context, uniqueIDs []thirdparty.CollectibleUniqueID, asyncFetch bool) error { missingIDs, err := o.collectiblesDataDB.GetIDsNotInDB(uniqueIDs) if err != nil { return err } missingIDsPerChainID := thirdparty.GroupCollectibleUIDsByChainID(missingIDs) // Atomic group stores the error from the first failed command and stops other commands on error group := async.NewAtomicGroup(ctx) for chainID, idsToFetch := range missingIDsPerChainID { group.Add(func(ctx context.Context) error { defer o.checkConnectionStatus(chainID) fetchedAssets, err := o.fetchMissingAssetsForChainByCollectibleUniqueID(ctx, chainID, idsToFetch) if err != nil { log.Error("FetchMissingAssetsByCollectibleUniqueID failed for", "chainID", chainID, "ids", idsToFetch, "err", err) return err } updatedCollectibles, err := o.processFullCollectibleData(ctx, fetchedAssets, asyncFetch) if err != nil { log.Error("processFullCollectibleData failed for", "chainID", chainID, "len(fetchedAssets)", len(fetchedAssets), "err", err) return err } o.signalUpdatedCollectiblesData(updatedCollectibles) return nil }) } if asyncFetch { group.Wait() return group.Error() } return nil } func (o *Manager) fetchMissingAssetsForChainByCollectibleUniqueID(ctx context.Context, chainID walletCommon.ChainID, idsToFetch []thirdparty.CollectibleUniqueID) ([]thirdparty.FullCollectibleData, error) { cmd := circuitbreaker.NewCommand(ctx, nil) for _, provider := range o.providers.CollectibleDataProviders { if !provider.IsChainSupported(chainID) { continue } provider := provider cmd.Add(circuitbreaker.NewFunctor(func() ([]any, error) { fetchedAssets, err := provider.FetchAssetsByCollectibleUniqueID(ctx, idsToFetch) if err != nil { log.Error("fetchMissingAssetsForChainByCollectibleUniqueID failed for", "provider", provider.ID(), "chainID", chainID, "err", err) } return []any{fetchedAssets}, err }, getCircuitName(provider, chainID))) } if cmd.IsEmpty() { return nil, ErrNoProvidersAvailableForChainID // lets not stop the group if no providers are available for the chain } cmdRes := o.circuitBreaker.Execute(cmd) if cmdRes.Error() != nil { log.Error("fetchMissingAssetsForChainByCollectibleUniqueID failed for", "chainID", chainID, "err", cmdRes.Error()) return nil, cmdRes.Error() } return cmdRes.Result()[0].([]thirdparty.FullCollectibleData), cmdRes.Error() } func (o *Manager) FetchCollectionsDataByContractID(ctx context.Context, ids []thirdparty.ContractID) ([]thirdparty.CollectionData, error) { missingIDs, err := o.collectionsDataDB.GetIDsNotInDB(ids) if err != nil { return nil, err } missingIDsPerChainID := thirdparty.GroupContractIDsByChainID(missingIDs) // Atomic group stores the error from the first failed command and stops other commands on error group := async.NewAtomicGroup(ctx) for chainID, idsToFetch := range missingIDsPerChainID { group.Add(func(ctx context.Context) error { defer o.checkConnectionStatus(chainID) cmd := circuitbreaker.NewCommand(ctx, nil) for _, provider := range o.providers.CollectionDataProviders { if !provider.IsChainSupported(chainID) { continue } provider := provider cmd.Add(circuitbreaker.NewFunctor(func() ([]any, error) { fetchedCollections, err := provider.FetchCollectionsDataByContractID(ctx, idsToFetch) return []any{fetchedCollections}, err }, getCircuitName(provider, chainID))) } if cmd.IsEmpty() { return nil } cmdRes := o.circuitBreaker.Execute(cmd) if cmdRes.Error() != nil { log.Error("FetchCollectionsDataByContractID failed for", "chainID", chainID, "err", cmdRes.Error()) return cmdRes.Error() } fetchedCollections := cmdRes.Result()[0].([]thirdparty.CollectionData) err = o.processCollectionData(ctx, fetchedCollections) if err != nil { return err } return err }) } group.Wait() if group.Error() != nil { return nil, group.Error() } data, err := o.collectionsDataDB.GetData(ids) if err != nil { return nil, err } return mapToList(data), nil } func (o *Manager) GetCollectibleOwnership(id thirdparty.CollectibleUniqueID) ([]thirdparty.AccountBalance, error) { return o.ownershipDB.GetOwnership(id) } func (o *Manager) FetchCollectibleOwnersByContractAddress(ctx context.Context, chainID walletCommon.ChainID, contractAddress common.Address) (*thirdparty.CollectibleContractOwnership, error) { defer o.checkConnectionStatus(chainID) cmd := circuitbreaker.NewCommand(ctx, nil) for _, provider := range o.providers.ContractOwnershipProviders { if !provider.IsChainSupported(chainID) { continue } provider := provider cmd.Add(circuitbreaker.NewFunctor(func() ([]any, error) { res, err := provider.FetchCollectibleOwnersByContractAddress(ctx, chainID, contractAddress) if err != nil { log.Error("FetchCollectibleOwnersByContractAddress failed for", "provider", provider.ID(), "chainID", chainID, "err", err) } return []any{res}, err }, getCircuitName(provider, chainID))) } if cmd.IsEmpty() { return nil, ErrNoProvidersAvailableForChainID } cmdRes := o.circuitBreaker.Execute(cmd) if cmdRes.Error() != nil { log.Error("FetchCollectibleOwnersByContractAddress failed for", "chainID", chainID, "err", cmdRes.Error()) return nil, cmdRes.Error() } return cmdRes.Result()[0].(*thirdparty.CollectibleContractOwnership), cmdRes.Error() } func (o *Manager) fetchTokenURI(ctx context.Context, id thirdparty.CollectibleUniqueID) (string, error) { if id.TokenID == nil { return "", errors.New("empty token ID") } backend, err := o.rpcClient.EthClient(uint64(id.ContractID.ChainID)) if err != nil { return "", err } backend = getClientWithNoCircuitTripping(backend) caller, err := collectibles.NewCollectiblesCaller(id.ContractID.Address, backend) if err != nil { return "", err } tokenURI, err := caller.TokenURI(&bind.CallOpts{ Context: ctx, }, id.TokenID.Int) if err != nil { for _, errorPrefix := range noTokenURIErrorPrefixes { if strings.Contains(err.Error(), errorPrefix) { // Contract doesn't support "TokenURI" method return "", nil } } return "", err } return tokenURI, err } func isMetadataEmpty(asset thirdparty.CollectibleData) bool { return asset.Description == "" && asset.ImageURL == "" } // Processes collectible metadata obtained from a provider and ensures any missing data is fetched. // If asyncFetch is true, community collectibles metadata will be fetched async and an EventCollectiblesDataUpdated will be sent when the data is ready. // If asyncFetch is false, it will wait for all community collectibles' metadata to be retrieved before returning. func (o *Manager) processFullCollectibleData(ctx context.Context, assets []thirdparty.FullCollectibleData, asyncFetch bool) ([]thirdparty.CollectibleUniqueID, error) { fullyFetchedAssets := make(map[string]*thirdparty.FullCollectibleData) communityCollectibles := make(map[string][]*thirdparty.FullCollectibleData) processedIDs := make([]thirdparty.CollectibleUniqueID, 0, len(assets)) // Start with all assets, remove if any of the fetch steps fail for idx := range assets { asset := &assets[idx] id := asset.CollectibleData.ID fullyFetchedAssets[id.HashKey()] = asset } // Detect community collectibles for _, asset := range fullyFetchedAssets { // Only check community ownership if metadata is empty if isMetadataEmpty(asset.CollectibleData) { // Get TokenURI if not given by provider err := o.fillTokenURI(ctx, asset) if err != nil { log.Error("fillTokenURI failed", "err", err) delete(fullyFetchedAssets, asset.CollectibleData.ID.HashKey()) continue } // Get CommunityID if obtainable from TokenURI err = o.fillCommunityID(asset) if err != nil { log.Error("fillCommunityID failed", "err", err) delete(fullyFetchedAssets, asset.CollectibleData.ID.HashKey()) continue } // Get metadata from community if community collectible communityID := asset.CollectibleData.CommunityID if communityID != "" { if _, ok := communityCollectibles[communityID]; !ok { communityCollectibles[communityID] = make([]*thirdparty.FullCollectibleData, 0) } communityCollectibles[communityID] = append(communityCollectibles[communityID], asset) // Community collectibles are handled separately, remove from list delete(fullyFetchedAssets, asset.CollectibleData.ID.HashKey()) } } } // Community collectibles are grouped by community ID for communityID, communityAssets := range communityCollectibles { if asyncFetch { o.fetchCommunityAssetsAsync(ctx, communityID, communityAssets) } else { err := o.fetchCommunityAssets(communityID, communityAssets) if err != nil { log.Error("fetchCommunityAssets failed", "communityID", communityID, "err", err) continue } for _, asset := range communityAssets { processedIDs = append(processedIDs, asset.CollectibleData.ID) } } } for _, asset := range fullyFetchedAssets { err := o.fillAnimationMediatype(ctx, asset) if err != nil { log.Error("fillAnimationMediatype failed", "err", err) delete(fullyFetchedAssets, asset.CollectibleData.ID.HashKey()) continue } } // Save successfully fetched data to DB collectiblesData := make([]thirdparty.CollectibleData, 0, len(assets)) collectionsData := make([]thirdparty.CollectionData, 0, len(assets)) missingCollectionIDs := make([]thirdparty.ContractID, 0) for _, asset := range fullyFetchedAssets { id := asset.CollectibleData.ID processedIDs = append(processedIDs, id) collectiblesData = append(collectiblesData, asset.CollectibleData) if asset.CollectionData != nil { collectionsData = append(collectionsData, *asset.CollectionData) } else { missingCollectionIDs = append(missingCollectionIDs, id.ContractID) } } err := o.collectiblesDataDB.SetData(collectiblesData, true) if err != nil { return nil, err } err = o.collectionsDataDB.SetData(collectionsData, true) if err != nil { return nil, err } if len(missingCollectionIDs) > 0 { // Calling this ensures collection data is fetched and cached (if not already available) _, err := o.FetchCollectionsDataByContractID(ctx, missingCollectionIDs) if err != nil { return nil, err } } return processedIDs, nil } func (o *Manager) fillTokenURI(ctx context.Context, asset *thirdparty.FullCollectibleData) error { id := asset.CollectibleData.ID tokenURI := asset.CollectibleData.TokenURI // Only need to fetch it from contract if it was empty if tokenURI == "" { tokenURI, err := o.fetchTokenURI(ctx, id) if err != nil { return err } asset.CollectibleData.TokenURI = tokenURI } return nil } func (o *Manager) fillCommunityID(asset *thirdparty.FullCollectibleData) error { tokenURI := asset.CollectibleData.TokenURI communityID := "" if tokenURI != "" { communityID = o.communityManager.GetCommunityID(tokenURI) } asset.CollectibleData.CommunityID = communityID return nil } func (o *Manager) fetchCommunityAssets(communityID string, communityAssets []*thirdparty.FullCollectibleData) error { communityFound, err := o.communityManager.FillCollectiblesMetadata(communityID, communityAssets) if err != nil { log.Error("FillCollectiblesMetadata failed", "communityID", communityID, "err", err) } else if !communityFound { log.Warn("fetchCommunityAssets community not found", "communityID", communityID) } // If the community is found, we update the DB. // If the community is not found, we only insert new entries to the DB (don't replace what is already there). allowUpdate := communityFound collectiblesData := make([]thirdparty.CollectibleData, 0, len(communityAssets)) collectionsData := make([]thirdparty.CollectionData, 0, len(communityAssets)) for _, asset := range communityAssets { collectiblesData = append(collectiblesData, asset.CollectibleData) if asset.CollectionData != nil { collectionsData = append(collectionsData, *asset.CollectionData) } } err = o.collectiblesDataDB.SetData(collectiblesData, allowUpdate) if err != nil { log.Error("collectiblesDataDB SetData failed", "communityID", communityID, "err", err) return err } err = o.collectionsDataDB.SetData(collectionsData, allowUpdate) if err != nil { log.Error("collectionsDataDB SetData failed", "communityID", communityID, "err", err) return err } for _, asset := range communityAssets { if asset.CollectibleCommunityInfo != nil { err = o.collectiblesDataDB.SetCommunityInfo(asset.CollectibleData.ID, *asset.CollectibleCommunityInfo) if err != nil { log.Error("collectiblesDataDB SetCommunityInfo failed", "communityID", communityID, "err", err) return err } } } return nil } func (o *Manager) fetchCommunityAssetsAsync(_ context.Context, communityID string, communityAssets []*thirdparty.FullCollectibleData) { if len(communityAssets) == 0 { return } go func() { err := o.fetchCommunityAssets(communityID, communityAssets) if err != nil { log.Error("fetchCommunityAssets failed", "communityID", communityID, "err", err) return } // Metadata is up to date in db at this point, fetch and send Event. ids := make([]thirdparty.CollectibleUniqueID, 0, len(communityAssets)) for _, asset := range communityAssets { ids = append(ids, asset.CollectibleData.ID) } o.signalUpdatedCollectiblesData(ids) }() } func (o *Manager) fillAnimationMediatype(ctx context.Context, asset *thirdparty.FullCollectibleData) error { if len(asset.CollectibleData.AnimationURL) > 0 { contentType, err := o.doContentTypeRequest(ctx, asset.CollectibleData.AnimationURL) if err != nil { asset.CollectibleData.AnimationURL = "" } asset.CollectibleData.AnimationMediaType = contentType } return nil } func (o *Manager) processCollectionData(_ context.Context, collections []thirdparty.CollectionData) error { return o.collectionsDataDB.SetData(collections, true) } func (o *Manager) getCacheFullCollectibleData(uniqueIDs []thirdparty.CollectibleUniqueID) ([]thirdparty.FullCollectibleData, error) { ret := make([]thirdparty.FullCollectibleData, 0, len(uniqueIDs)) collectiblesData, err := o.collectiblesDataDB.GetData(uniqueIDs) if err != nil { return nil, err } contractIDs := make([]thirdparty.ContractID, 0, len(uniqueIDs)) for _, id := range uniqueIDs { contractIDs = append(contractIDs, id.ContractID) } collectionsData, err := o.collectionsDataDB.GetData(contractIDs) if err != nil { return nil, err } for _, id := range uniqueIDs { collectibleData, ok := collectiblesData[id.HashKey()] if !ok { // Use empty data, set only ID collectibleData = thirdparty.CollectibleData{ ID: id, } } if o.mediaServer != nil && len(collectibleData.ImagePayload) > 0 { collectibleData.ImageURL = o.mediaServer.MakeWalletCollectibleImagesURL(collectibleData.ID) } collectionData, ok := collectionsData[id.ContractID.HashKey()] if !ok { // Use empty data, set only ID collectionData = thirdparty.CollectionData{ ID: id.ContractID, } } if o.mediaServer != nil && len(collectionData.ImagePayload) > 0 { collectionData.ImageURL = o.mediaServer.MakeWalletCollectionImagesURL(collectionData.ID) } communityInfo, _, err := o.communityManager.GetCommunityInfo(collectibleData.CommunityID) if err != nil { return nil, err } collectibleCommunityInfo, err := o.collectiblesDataDB.GetCommunityInfo(id) if err != nil { return nil, err } ownership, err := o.ownershipDB.GetOwnership(id) if err != nil { return nil, err } fullData := thirdparty.FullCollectibleData{ CollectibleData: collectibleData, CollectionData: &collectionData, CommunityInfo: communityInfo, CollectibleCommunityInfo: collectibleCommunityInfo, Ownership: ownership, } ret = append(ret, fullData) } return ret, nil } func (o *Manager) SetCollectibleTransferID(ownerAddress common.Address, id thirdparty.CollectibleUniqueID, transferID common.Hash, notify bool) error { changed, err := o.ownershipDB.SetTransferID(ownerAddress, id, transferID) if err != nil { return err } if changed && notify { o.signalUpdatedCollectiblesData([]thirdparty.CollectibleUniqueID{id}) } return nil } // Reset connection status to trigger notifications // on the next status update func (o *Manager) ResetConnectionStatus() { o.statuses.Range(func(key, value interface{}) bool { value.(*connection.Status).ResetStateValue() return true }) } func (o *Manager) checkConnectionStatus(chainID walletCommon.ChainID) { for _, provider := range o.providers.GetProviderList() { if provider.IsChainSupported(chainID) && provider.IsConnected() { if status, ok := o.statuses.Load(chainID.String()); ok { status.(*connection.Status).SetIsConnected(true) } return } } // If no chain in statuses, add it statusVal, ok := o.statuses.Load(chainID.String()) if !ok { status := connection.NewStatus() status.SetIsConnected(false) o.statuses.Store(chainID.String(), status) o.updateStatusNotifier() } else { statusVal.(*connection.Status).SetIsConnected(false) } } func (o *Manager) signalUpdatedCollectiblesData(ids []thirdparty.CollectibleUniqueID) { // We limit how much collectibles data we send in each event to avoid problems on the client side for startIdx := 0; startIdx < len(ids); startIdx += signalUpdatedCollectiblesDataPageSize { endIdx := startIdx + signalUpdatedCollectiblesDataPageSize if endIdx > len(ids) { endIdx = len(ids) } pageIDs := ids[startIdx:endIdx] collectibles, err := o.getCacheFullCollectibleData(pageIDs) if err != nil { log.Error("Error getting FullCollectibleData from cache: %v", err) return } // Send update event with most complete data type available details := fullCollectiblesDataToDetails(collectibles) payload, err := json.Marshal(details) if err != nil { log.Error("Error marshaling response: %v", err) return } event := walletevent.Event{ Type: EventCollectiblesDataUpdated, Message: string(payload), } o.feed.Send(event) } } func (o *Manager) SearchCollectibles(ctx context.Context, chainID walletCommon.ChainID, text string, cursor string, limit int, providerID string) (*thirdparty.FullCollectibleDataContainer, error) { defer o.checkConnectionStatus(chainID) anyProviderAvailable := false for _, provider := range o.providers.SearchProviders { if !provider.IsChainSupported(chainID) { continue } anyProviderAvailable = true if providerID != thirdparty.FetchFromAnyProvider && providerID != provider.ID() { continue } // TODO (#13951): Be smarter about how we handle the user-entered string collections := []common.Address{} container, err := provider.SearchCollectibles(ctx, chainID, collections, text, cursor, limit) if err != nil { log.Error("FetchAllAssetsByOwner failed for", "provider", provider.ID(), "chainID", chainID, "err", err) continue } _, err = o.processFullCollectibleData(ctx, container.Items, true) if err != nil { return nil, err } return container, nil } if anyProviderAvailable { return nil, ErrAllProvidersFailedForChainID } return nil, ErrNoProvidersAvailableForChainID } func (o *Manager) SearchCollections(ctx context.Context, chainID walletCommon.ChainID, query string, cursor string, limit int, providerID string) (*thirdparty.CollectionDataContainer, error) { defer o.checkConnectionStatus(chainID) anyProviderAvailable := false for _, provider := range o.providers.SearchProviders { if !provider.IsChainSupported(chainID) { continue } anyProviderAvailable = true if providerID != thirdparty.FetchFromAnyProvider && providerID != provider.ID() { continue } // TODO (#13951): Be smarter about how we handle the user-entered string container, err := provider.SearchCollections(ctx, chainID, query, cursor, limit) if err != nil { log.Error("FetchAllAssetsByOwner failed for", "provider", provider.ID(), "chainID", chainID, "err", err) continue } err = o.processCollectionData(ctx, container.Items) if err != nil { return nil, err } return container, nil } if anyProviderAvailable { return nil, ErrAllProvidersFailedForChainID } return nil, ErrNoProvidersAvailableForChainID } func (o *Manager) FetchCollectionSocialsAsync(contractID thirdparty.ContractID) error { go func() { defer o.checkConnectionStatus(contractID.ChainID) socials, err := o.getOrFetchSocialsForCollection(context.Background(), contractID) if err != nil || socials == nil { log.Debug("FetchCollectionSocialsAsync failed for", "chainID", contractID.ChainID, "address", contractID.Address, "err", err) return } socialsMessage := CollectionSocialsMessage{ ID: contractID, Socials: socials, } payload, err := json.Marshal(socialsMessage) if err != nil { log.Error("Error marshaling response: %v", err) return } event := walletevent.Event{ Type: EventGetCollectionSocialsDone, Message: string(payload), } o.feed.Send(event) }() return nil } func (o *Manager) getOrFetchSocialsForCollection(_ context.Context, contractID thirdparty.ContractID) (*thirdparty.CollectionSocials, error) { socials, err := o.collectionsDataDB.GetSocialsForID(contractID) if err != nil { log.Debug("getOrFetchSocialsForCollection failed for", "chainID", contractID.ChainID, "address", contractID.Address, "err", err) return nil, err } if socials == nil { return o.fetchSocialsForCollection(context.Background(), contractID) } return socials, nil } func (o *Manager) fetchSocialsForCollection(ctx context.Context, contractID thirdparty.ContractID) (*thirdparty.CollectionSocials, error) { cmd := circuitbreaker.NewCommand(ctx, nil) for _, provider := range o.providers.CollectibleDataProviders { if !provider.IsChainSupported(contractID.ChainID) { continue } provider := provider cmd.Add(circuitbreaker.NewFunctor(func() ([]interface{}, error) { socials, err := provider.FetchCollectionSocials(ctx, contractID) if err != nil { log.Error("FetchCollectionSocials failed for", "provider", provider.ID(), "chainID", contractID.ChainID, "err", err) } return []interface{}{socials}, err }, getCircuitName(provider, contractID.ChainID))) } if cmd.IsEmpty() { return nil, ErrNoProvidersAvailableForChainID // lets not stop the group if no providers are available for the chain } cmdRes := o.circuitBreaker.Execute(cmd) if cmdRes.Error() != nil { log.Error("fetchSocialsForCollection failed for", "chainID", contractID.ChainID, "err", cmdRes.Error()) return nil, cmdRes.Error() } socials := cmdRes.Result()[0].(*thirdparty.CollectionSocials) err := o.collectionsDataDB.SetCollectionSocialsData(contractID, socials) if err != nil { log.Error("Error saving socials to DB: %v", err) return nil, err } return socials, cmdRes.Error() } func (o *Manager) updateStatusNotifier() { o.statusNotifier = createStatusNotifier(o.statuses, o.feed) } func initStatuses(ownershipDB *OwnershipDB) *sync.Map { statuses := &sync.Map{} for _, chainID := range walletCommon.AllChainIDs() { status := connection.NewStatus() state := status.GetState() latestUpdateTimestamp, err := ownershipDB.GetLatestOwnershipUpdateTimestamp(chainID) if err == nil { state.LastSuccessAt = latestUpdateTimestamp status.SetState(state) } statuses.Store(chainID.String(), status) } return statuses } func createStatusNotifier(statuses *sync.Map, feed *event.Feed) *connection.StatusNotifier { return connection.NewStatusNotifier( statuses, EventCollectiblesConnectionStatusChanged, feed, ) } // Different providers have API keys per chain or per testnet/mainnet. // Proper implementation should respect that. For now, the safest solution is to use the provider ID and chain ID as the key. func getCircuitName(provider thirdparty.CollectibleProvider, chainID walletCommon.ChainID) string { return provider.ID() + chainID.String() } func getCircuitNameForTokenURI(mainCircuitName string) string { return mainCircuitName + "_tokenURI" } // As we don't use hystrix internal way of switching to another circuit, just its metrics, // we still can switch to another provider without tripping the circuit. func getClientWithNoCircuitTripping(backend chain.ClientInterface) chain.ClientInterface { copyable := backend.(chain.Copyable) if copyable != nil { backendCopy := copyable.Copy().(chain.ClientInterface) hm := backendCopy.(chain.HealthMonitor) if hm != nil { cb := circuitbreaker.NewCircuitBreaker(circuitbreaker.Config{ Timeout: 20000, MaxConcurrentRequests: 100, SleepWindow: 300000, ErrorPercentThreshold: 101, // Always healthy }) cb.SetOverrideCircuitNameHandler(func(circuitName string) string { return getCircuitNameForTokenURI(circuitName) }) hm.SetCircuitBreaker(cb) backend = backendCopy } } return backend }