From b0a0f078c43b38e555d2683ef56b58e412928e13 Mon Sep 17 00:00:00 2001 From: Ivan Belyakov Date: Mon, 11 Mar 2024 12:09:50 +0100 Subject: [PATCH] feat(wallet): refactored collectibles manager using circuit breaker Removed goerli from rarible and opt-goerli from alchemy clients as not supported any more --- services/wallet/collectibles/manager.go | 368 ++++++++++--------- services/wallet/thirdparty/alchemy/client.go | 2 - services/wallet/thirdparty/rarible/client.go | 2 +- 3 files changed, 203 insertions(+), 169 deletions(-) diff --git a/services/wallet/collectibles/manager.go b/services/wallet/collectibles/manager.go index d91423f4c..e35bd8de6 100644 --- a/services/wallet/collectibles/manager.go +++ b/services/wallet/collectibles/manager.go @@ -8,14 +8,14 @@ import ( "math/big" "net/http" "strings" + "sync" "time" - "github.com/afex/hystrix-go/hystrix" - "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" + "github.com/status-im/status-go/circuitbreaker" "github.com/status-im/status-go/contracts/community-tokens/collectibles" "github.com/status-im/status-go/contracts/ierc1155" "github.com/status-im/status-go/rpc" @@ -32,8 +32,6 @@ import ( const requestTimeout = 5 * time.Second const signalUpdatedCollectiblesDataPageSize = 10 -const hystrixContractOwnershipClientName = "contractOwnershipClient" - const EventCollectiblesConnectionStatusChanged walletevent.EventType = "wallet-collectible-status-changed" // ERC721 does not support function "TokenURI" if call @@ -69,9 +67,10 @@ type Manager struct { mediaServer *server.MediaServer - statuses map[string]*connection.Status - statusNotifier *connection.StatusNotifier - feed *event.Feed + statuses map[string]*connection.Status + statusNotifier *connection.StatusNotifier + feed *event.Feed + circuitBreakers sync.Map } func NewManager( @@ -84,12 +83,6 @@ func NewManager( collectionDataProviders []thirdparty.CollectionDataProvider, mediaServer *server.MediaServer, feed *event.Feed) *Manager { - hystrix.ConfigureCommand(hystrixContractOwnershipClientName, hystrix.CommandConfig{ - Timeout: 10000, - MaxConcurrentRequests: 100, - SleepWindow: 300000, - ErrorPercentThreshold: 25, - }) ownershipDB := NewOwnershipDB(db) @@ -161,35 +154,6 @@ func mapToList[K comparable, T any](m map[K]T) []T { return list } -func makeContractOwnershipCall(main func() (any, error), fallback func() (any, error)) (any, error) { - resultChan := make(chan any, 1) - errChan := hystrix.Go(hystrixContractOwnershipClientName, func() error { - res, err := main() - if err != nil { - return err - } - resultChan <- res - return nil - }, func(err error) error { - if fallback == nil { - return err - } - - res, err := fallback() - if err != nil { - return err - } - resultChan <- res - return nil - }) - select { - case result := <-resultChan: - return result, nil - case err := <-errChan: - return nil, err - } -} - func (o *Manager) doContentTypeRequest(ctx context.Context, url string) (string, error) { req, err := http.NewRequestWithContext(ctx, http.MethodHead, url, nil) if err != nil { @@ -254,68 +218,91 @@ func (o *Manager) FetchBalancesByOwnerAndContractAddress(ctx context.Context, ch func (o *Manager) FetchAllAssetsByOwnerAndContractAddress(ctx context.Context, chainID walletCommon.ChainID, owner common.Address, contractAddresses []common.Address, cursor string, limit int, providerID string) (*thirdparty.FullCollectibleDataContainer, error) { defer o.checkConnectionStatus(chainID) - anyProviderAvailable := false + cmd := circuitbreaker.Command{} for _, provider := range o.accountOwnershipProviders { if !provider.IsChainSupported(chainID) { continue } - anyProviderAvailable = true if providerID != thirdparty.FetchFromAnyProvider && providerID != provider.ID() { continue } - assetContainer, err := provider.FetchAllAssetsByOwnerAndContractAddress(ctx, chainID, owner, contractAddresses, cursor, limit) - if err != nil { - log.Error("FetchAllAssetsByOwnerAndContractAddress failed for", "provider", provider.ID(), "chainID", chainID, "err", err) - continue - } - - _, err = o.processFullCollectibleData(ctx, assetContainer.Items, true) - if err != nil { - return nil, err - } - - return assetContainer, nil + provider := provider + f := circuitbreaker.NewFunctor( + func() ([]interface{}, error) { + assetContainer, err := provider.FetchAllAssetsByOwnerAndContractAddress(ctx, chainID, owner, contractAddresses, cursor, limit) + if err != nil { + log.Error("FetchAllAssetsByOwnerAndContractAddress failed for", "provider", provider.ID(), "chainID", chainID, "err", err) + } + return []interface{}{assetContainer}, err + }, + ) + cmd.Add(f) } - if anyProviderAvailable { - return nil, ErrAllProvidersFailedForChainID + if cmd.IsEmpty() { + return nil, ErrNoProvidersAvailableForChainID } - return nil, ErrNoProvidersAvailableForChainID + + cmdRes := o.getCircuitBreaker(chainID).Execute(cmd) + if cmdRes.Error() != nil { + log.Error("FetchAllAssetsByOwnerAndContractAddress failed for", "chainID", chainID, "err", cmdRes.Error()) + return nil, cmdRes.Error() + } + + assetContainer := cmdRes.Result()[0].(*thirdparty.FullCollectibleDataContainer) + _, err := o.processFullCollectibleData(ctx, assetContainer.Items, true) + if err != nil { + return nil, err + } + + return assetContainer, nil } func (o *Manager) FetchAllAssetsByOwner(ctx context.Context, chainID walletCommon.ChainID, owner common.Address, cursor string, limit int, providerID string) (*thirdparty.FullCollectibleDataContainer, error) { defer o.checkConnectionStatus(chainID) - anyProviderAvailable := false + cmd := circuitbreaker.Command{} for _, provider := range o.accountOwnershipProviders { if !provider.IsChainSupported(chainID) { continue } - anyProviderAvailable = true if providerID != thirdparty.FetchFromAnyProvider && providerID != provider.ID() { continue } - assetContainer, err := provider.FetchAllAssetsByOwner(ctx, chainID, owner, cursor, limit) - if err != nil { - log.Error("FetchAllAssetsByOwner failed for", "provider", provider.ID(), "chainID", chainID, "err", err) - continue - } - - _, err = o.processFullCollectibleData(ctx, assetContainer.Items, true) - if err != nil { - return nil, err - } - - return assetContainer, nil + provider := provider + f := circuitbreaker.NewFunctor( + func() ([]interface{}, error) { + assetContainer, err := provider.FetchAllAssetsByOwner(ctx, chainID, owner, cursor, limit) + if err != nil { + log.Error("FetchAllAssetsByOwner failed for", "provider", provider.ID(), "chainID", chainID, "err", err) + } + return []interface{}{assetContainer}, err + }, + ) + cmd.Add(f) } - if anyProviderAvailable { - return nil, ErrAllProvidersFailedForChainID + if cmd.IsEmpty() { + return nil, ErrNoProvidersAvailableForChainID } - return nil, ErrNoProvidersAvailableForChainID + + cmdRes := o.getCircuitBreaker(chainID).Execute(cmd) + if cmdRes.Error() != nil { + log.Error("FetchAllAssetsByOwner failed for", "chainID", chainID, "err", cmdRes.Error()) + return nil, cmdRes.Error() + } + + assetContainer := cmdRes.Result()[0].(*thirdparty.FullCollectibleDataContainer) + _, err := o.processFullCollectibleData(ctx, assetContainer.Items, true) + if err != nil { + return nil, err + } + + return assetContainer, nil } + func (o *Manager) FetchERC1155Balances(ctx context.Context, owner common.Address, chainID walletCommon.ChainID, contractAddress common.Address, tokenIDs []*bigint.BigInt) ([]*bigint.BigInt, error) { if len(tokenIDs) == 0 { return nil, nil @@ -420,50 +407,81 @@ func (o *Manager) FetchCollectibleOwnershipByOwner(ctx context.Context, chainID // If asyncFetch is true, empty metadata will be returned for any missing collectibles and an EventCollectiblesDataUpdated will be sent when the data is ready. // If asyncFetch is false, it will wait for all collectibles' metadata to be retrieved before returning. func (o *Manager) FetchAssetsByCollectibleUniqueID(ctx context.Context, uniqueIDs []thirdparty.CollectibleUniqueID, asyncFetch bool) ([]thirdparty.FullCollectibleData, error) { - missingIDs, err := o.collectiblesDataDB.GetIDsNotInDB(uniqueIDs) + err := o.FetchMissingAssetsByCollectibleUniqueID(ctx, uniqueIDs, asyncFetch) if err != nil { return nil, err } - missingIDsPerChainID := thirdparty.GroupCollectibleUIDsByChainID(missingIDs) + return o.getCacheFullCollectibleData(uniqueIDs) +} - group := async.NewGroup(ctx) - group.Add(func(ctx context.Context) error { - for chainID, idsToFetch := range missingIDsPerChainID { - defer o.checkConnectionStatus(chainID) - - for _, provider := range o.collectibleDataProviders { - if !provider.IsChainSupported(chainID) { - continue - } - - fetchedAssets, err := provider.FetchAssetsByCollectibleUniqueID(ctx, idsToFetch) - if err != nil { - log.Error("FetchAssetsByCollectibleUniqueID failed for", "provider", provider.ID(), "chainID", chainID, "err", err) - continue - } - - updatedCollectibles, err := o.processFullCollectibleData(ctx, fetchedAssets, asyncFetch) - if err != nil { - log.Error("processFullCollectibleData failed for", "provider", provider.ID(), "chainID", chainID, "len(fetchedAssets)", len(fetchedAssets), "err", err) - return err - } - - if asyncFetch { - o.signalUpdatedCollectiblesData(updatedCollectibles) - } - break - } - } - - return nil - }) - - if !asyncFetch { - group.Wait() +func (o *Manager) FetchMissingAssetsByCollectibleUniqueID(ctx context.Context, uniqueIDs []thirdparty.CollectibleUniqueID, asyncFetch bool) error { + missingIDs, err := o.collectiblesDataDB.GetIDsNotInDB(uniqueIDs) + if err != nil { + return err } - return o.getCacheFullCollectibleData(uniqueIDs) + missingIDsPerChainID := thirdparty.GroupCollectibleUIDsByChainID(missingIDs) + + // Atomic group stores the error from the first failed command and stops other commands on error + group := async.NewAtomicGroup(ctx) + for chainID, idsToFetch := range missingIDsPerChainID { + group.Add(func(ctx context.Context) error { + defer o.checkConnectionStatus(chainID) + + fetchedAssets, err := o.fetchMissingAssetsForChainByCollectibleUniqueID(ctx, chainID, idsToFetch) + if err != nil { + log.Error("FetchMissingAssetsByCollectibleUniqueID failed for", "chainID", chainID, "ids", idsToFetch, "err", err) + return err + } + + updatedCollectibles, err := o.processFullCollectibleData(ctx, fetchedAssets, asyncFetch) + if err != nil { + log.Error("processFullCollectibleData failed for", "chainID", chainID, "len(fetchedAssets)", len(fetchedAssets), "err", err) + return err + } + + o.signalUpdatedCollectiblesData(updatedCollectibles) + return nil + }) + } + + if asyncFetch { + group.Wait() + return group.Error() + } + + return nil +} + +func (o *Manager) fetchMissingAssetsForChainByCollectibleUniqueID(ctx context.Context, chainID walletCommon.ChainID, idsToFetch []thirdparty.CollectibleUniqueID) ([]thirdparty.FullCollectibleData, error) { + cmd := circuitbreaker.Command{} + for _, provider := range o.collectibleDataProviders { + if !provider.IsChainSupported(chainID) { + continue + } + + provider := provider + cmd.Add(circuitbreaker.NewFunctor(func() ([]any, error) { + fetchedAssets, err := provider.FetchAssetsByCollectibleUniqueID(ctx, idsToFetch) + if err != nil { + log.Error("fetchMissingAssetsForChainByCollectibleUniqueID failed for", "provider", provider.ID(), "chainID", chainID, "err", err) + } + + return []any{fetchedAssets}, err + })) + } + + if cmd.IsEmpty() { + return nil, ErrNoProvidersAvailableForChainID // lets not stop the group if no providers are available for the chain + } + + cmdRes := o.getCircuitBreaker(chainID).Execute(cmd) + if cmdRes.Error() != nil { + log.Error("fetchMissingAssetsForChainByCollectibleUniqueID failed for", "chainID", chainID, "err", cmdRes.Error()) + return nil, cmdRes.Error() + } + return cmdRes.Result()[0].([]thirdparty.FullCollectibleData), cmdRes.Error() } func (o *Manager) FetchCollectionsDataByContractID(ctx context.Context, ids []thirdparty.ContractID) ([]thirdparty.CollectionData, error) { @@ -474,27 +492,49 @@ func (o *Manager) FetchCollectionsDataByContractID(ctx context.Context, ids []th missingIDsPerChainID := thirdparty.GroupContractIDsByChainID(missingIDs) + // Atomic group stores the error from the first failed command and stops other commands on error + group := async.NewAtomicGroup(ctx) for chainID, idsToFetch := range missingIDsPerChainID { - defer o.checkConnectionStatus(chainID) + group.Add(func(ctx context.Context) error { + defer o.checkConnectionStatus(chainID) - for _, provider := range o.collectionDataProviders { - if !provider.IsChainSupported(chainID) { - continue + cmd := circuitbreaker.Command{} + for _, provider := range o.collectionDataProviders { + if !provider.IsChainSupported(chainID) { + continue + } + + provider := provider + cmd.Add(circuitbreaker.NewFunctor(func() ([]any, error) { + fetchedCollections, err := provider.FetchCollectionsDataByContractID(ctx, idsToFetch) + return []any{fetchedCollections}, err + })) } - fetchedCollections, err := provider.FetchCollectionsDataByContractID(ctx, idsToFetch) - if err != nil { - log.Error("FetchCollectionsDataByContractID failed for", "provider", provider.ID(), "chainID", chainID, "err", err) - continue + if cmd.IsEmpty() { + return nil } + cmdRes := o.getCircuitBreaker(chainID).Execute(cmd) + if cmdRes.Error() != nil { + log.Error("FetchCollectionsDataByContractID failed for", "chainID", chainID, "err", cmdRes.Error()) + return cmdRes.Error() + } + + fetchedCollections := cmdRes.Result()[0].([]thirdparty.CollectionData) err = o.processCollectionData(ctx, fetchedCollections) if err != nil { - return nil, err + return err } - break - } + return err + }) + } + + group.Wait() + + if group.Error() != nil { + return nil, group.Error() } data, err := o.collectionsDataDB.GetData(ids) @@ -509,55 +549,35 @@ func (o *Manager) GetCollectibleOwnership(id thirdparty.CollectibleUniqueID) ([] return o.ownershipDB.GetOwnership(id) } -func (o *Manager) getContractOwnershipProviders(chainID walletCommon.ChainID) (mainProvider thirdparty.CollectibleContractOwnershipProvider, fallbackProvider thirdparty.CollectibleContractOwnershipProvider) { - mainProvider = nil - fallbackProvider = nil - - for _, provider := range o.contractOwnershipProviders { - if provider.IsChainSupported(chainID) { - if mainProvider == nil { - // First provider found - mainProvider = provider - continue - } - // Second provider found - fallbackProvider = provider - break - } - } - return -} - -func getCollectibleOwnersByContractAddressFunc(ctx context.Context, chainID walletCommon.ChainID, contractAddress common.Address, provider thirdparty.CollectibleContractOwnershipProvider) func() (any, error) { - if provider == nil { - return nil - } - return func() (any, error) { - res, err := provider.FetchCollectibleOwnersByContractAddress(ctx, chainID, contractAddress) - if err != nil { - log.Error("FetchCollectibleOwnersByContractAddress failed for", "provider", provider.ID(), "chainID", chainID, "err", err) - } - return res, err - } -} - func (o *Manager) FetchCollectibleOwnersByContractAddress(ctx context.Context, chainID walletCommon.ChainID, contractAddress common.Address) (*thirdparty.CollectibleContractOwnership, error) { defer o.checkConnectionStatus(chainID) - mainProvider, fallbackProvider := o.getContractOwnershipProviders(chainID) - if mainProvider == nil { + cmd := circuitbreaker.Command{} + for _, provider := range o.contractOwnershipProviders { + if !provider.IsChainSupported(chainID) { + continue + } + + provider := provider + cmd.Add(circuitbreaker.NewFunctor(func() ([]any, error) { + res, err := provider.FetchCollectibleOwnersByContractAddress(ctx, chainID, contractAddress) + if err != nil { + log.Error("FetchCollectibleOwnersByContractAddress failed for", "provider", provider.ID(), "chainID", chainID, "err", err) + } + return []any{res}, err + })) + } + + if cmd.IsEmpty() { return nil, ErrNoProvidersAvailableForChainID } - mainFn := getCollectibleOwnersByContractAddressFunc(ctx, chainID, contractAddress, mainProvider) - fallbackFn := getCollectibleOwnersByContractAddressFunc(ctx, chainID, contractAddress, fallbackProvider) - - owners, err := makeContractOwnershipCall(mainFn, fallbackFn) - if err != nil { - return nil, err + cmdRes := o.getCircuitBreaker(chainID).Execute(cmd) + if cmdRes.Error() != nil { + log.Error("FetchCollectibleOwnersByContractAddress failed for", "chainID", chainID, "err", cmdRes.Error()) + return nil, cmdRes.Error() } - - return owners.(*thirdparty.CollectibleContractOwnership), nil + return cmdRes.Result()[0].(*thirdparty.CollectibleContractOwnership), cmdRes.Error() } func (o *Manager) fetchTokenURI(ctx context.Context, id thirdparty.CollectibleUniqueID) (string, error) { @@ -960,3 +980,19 @@ func (o *Manager) signalUpdatedCollectiblesData(ids []thirdparty.CollectibleUniq o.feed.Send(event) } } + +func (o *Manager) getCircuitBreaker(chainID walletCommon.ChainID) *circuitbreaker.CircuitBreaker { + cb, ok := o.circuitBreakers.Load(chainID.String()) + if !ok { + cb = circuitbreaker.NewCircuitBreaker(circuitbreaker.Config{ + CommandName: chainID.String(), + Timeout: 10000, + MaxConcurrentRequests: 100, + RequestVolumeThreshold: 25, + SleepWindow: 300000, + ErrorPercentThreshold: 25, + }) + o.circuitBreakers.Store(chainID.String(), cb) + } + return cb.(*circuitbreaker.CircuitBreaker) +} diff --git a/services/wallet/thirdparty/alchemy/client.go b/services/wallet/thirdparty/alchemy/client.go index d52161889..871f57a59 100644 --- a/services/wallet/thirdparty/alchemy/client.go +++ b/services/wallet/thirdparty/alchemy/client.go @@ -33,8 +33,6 @@ func getBaseURL(chainID walletCommon.ChainID) (string, error) { return "https://eth-sepolia.g.alchemy.com", nil case walletCommon.OptimismMainnet: return "https://opt-mainnet.g.alchemy.com", nil - case walletCommon.OptimismGoerli: - return "https://opt-goerli.g.alchemy.com", nil case walletCommon.OptimismSepolia: return "https://opt-sepolia.g.alchemy.com", nil case walletCommon.ArbitrumMainnet: diff --git a/services/wallet/thirdparty/rarible/client.go b/services/wallet/thirdparty/rarible/client.go index c0d4b8073..d063aaceb 100644 --- a/services/wallet/thirdparty/rarible/client.go +++ b/services/wallet/thirdparty/rarible/client.go @@ -42,7 +42,7 @@ func getBaseURL(chainID walletCommon.ChainID) (string, error) { switch uint64(chainID) { case walletCommon.EthereumMainnet, walletCommon.ArbitrumMainnet: return "https://api.rarible.org", nil - case walletCommon.EthereumGoerli, walletCommon.ArbitrumSepolia: + case walletCommon.ArbitrumSepolia: return "https://testnet-api.rarible.org", nil }