feat(wallet): refactored collectibles manager using circuit breaker

Removed goerli from rarible and opt-goerli from alchemy clients as
not supported any more
This commit is contained in:
Ivan Belyakov 2024-03-11 12:09:50 +01:00 committed by IvanBelyakoff
parent 7d7fa07754
commit b0a0f078c4
3 changed files with 203 additions and 169 deletions

View File

@ -8,14 +8,14 @@ import (
"math/big"
"net/http"
"strings"
"sync"
"time"
"github.com/afex/hystrix-go/hystrix"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/status-im/status-go/circuitbreaker"
"github.com/status-im/status-go/contracts/community-tokens/collectibles"
"github.com/status-im/status-go/contracts/ierc1155"
"github.com/status-im/status-go/rpc"
@ -32,8 +32,6 @@ import (
const requestTimeout = 5 * time.Second
const signalUpdatedCollectiblesDataPageSize = 10
const hystrixContractOwnershipClientName = "contractOwnershipClient"
const EventCollectiblesConnectionStatusChanged walletevent.EventType = "wallet-collectible-status-changed"
// ERC721 does not support function "TokenURI" if call
@ -72,6 +70,7 @@ type Manager struct {
statuses map[string]*connection.Status
statusNotifier *connection.StatusNotifier
feed *event.Feed
circuitBreakers sync.Map
}
func NewManager(
@ -84,12 +83,6 @@ func NewManager(
collectionDataProviders []thirdparty.CollectionDataProvider,
mediaServer *server.MediaServer,
feed *event.Feed) *Manager {
hystrix.ConfigureCommand(hystrixContractOwnershipClientName, hystrix.CommandConfig{
Timeout: 10000,
MaxConcurrentRequests: 100,
SleepWindow: 300000,
ErrorPercentThreshold: 25,
})
ownershipDB := NewOwnershipDB(db)
@ -161,35 +154,6 @@ func mapToList[K comparable, T any](m map[K]T) []T {
return list
}
func makeContractOwnershipCall(main func() (any, error), fallback func() (any, error)) (any, error) {
resultChan := make(chan any, 1)
errChan := hystrix.Go(hystrixContractOwnershipClientName, func() error {
res, err := main()
if err != nil {
return err
}
resultChan <- res
return nil
}, func(err error) error {
if fallback == nil {
return err
}
res, err := fallback()
if err != nil {
return err
}
resultChan <- res
return nil
})
select {
case result := <-resultChan:
return result, nil
case err := <-errChan:
return nil, err
}
}
func (o *Manager) doContentTypeRequest(ctx context.Context, url string) (string, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodHead, url, nil)
if err != nil {
@ -254,68 +218,91 @@ func (o *Manager) FetchBalancesByOwnerAndContractAddress(ctx context.Context, ch
func (o *Manager) FetchAllAssetsByOwnerAndContractAddress(ctx context.Context, chainID walletCommon.ChainID, owner common.Address, contractAddresses []common.Address, cursor string, limit int, providerID string) (*thirdparty.FullCollectibleDataContainer, error) {
defer o.checkConnectionStatus(chainID)
anyProviderAvailable := false
cmd := circuitbreaker.Command{}
for _, provider := range o.accountOwnershipProviders {
if !provider.IsChainSupported(chainID) {
continue
}
anyProviderAvailable = true
if providerID != thirdparty.FetchFromAnyProvider && providerID != provider.ID() {
continue
}
provider := provider
f := circuitbreaker.NewFunctor(
func() ([]interface{}, error) {
assetContainer, err := provider.FetchAllAssetsByOwnerAndContractAddress(ctx, chainID, owner, contractAddresses, cursor, limit)
if err != nil {
log.Error("FetchAllAssetsByOwnerAndContractAddress failed for", "provider", provider.ID(), "chainID", chainID, "err", err)
continue
}
return []interface{}{assetContainer}, err
},
)
cmd.Add(f)
}
_, err = o.processFullCollectibleData(ctx, assetContainer.Items, true)
if cmd.IsEmpty() {
return nil, ErrNoProvidersAvailableForChainID
}
cmdRes := o.getCircuitBreaker(chainID).Execute(cmd)
if cmdRes.Error() != nil {
log.Error("FetchAllAssetsByOwnerAndContractAddress failed for", "chainID", chainID, "err", cmdRes.Error())
return nil, cmdRes.Error()
}
assetContainer := cmdRes.Result()[0].(*thirdparty.FullCollectibleDataContainer)
_, err := o.processFullCollectibleData(ctx, assetContainer.Items, true)
if err != nil {
return nil, err
}
return assetContainer, nil
}
if anyProviderAvailable {
return nil, ErrAllProvidersFailedForChainID
}
return nil, ErrNoProvidersAvailableForChainID
}
func (o *Manager) FetchAllAssetsByOwner(ctx context.Context, chainID walletCommon.ChainID, owner common.Address, cursor string, limit int, providerID string) (*thirdparty.FullCollectibleDataContainer, error) {
defer o.checkConnectionStatus(chainID)
anyProviderAvailable := false
cmd := circuitbreaker.Command{}
for _, provider := range o.accountOwnershipProviders {
if !provider.IsChainSupported(chainID) {
continue
}
anyProviderAvailable = true
if providerID != thirdparty.FetchFromAnyProvider && providerID != provider.ID() {
continue
}
provider := provider
f := circuitbreaker.NewFunctor(
func() ([]interface{}, error) {
assetContainer, err := provider.FetchAllAssetsByOwner(ctx, chainID, owner, cursor, limit)
if err != nil {
log.Error("FetchAllAssetsByOwner failed for", "provider", provider.ID(), "chainID", chainID, "err", err)
continue
}
return []interface{}{assetContainer}, err
},
)
cmd.Add(f)
}
_, err = o.processFullCollectibleData(ctx, assetContainer.Items, true)
if cmd.IsEmpty() {
return nil, ErrNoProvidersAvailableForChainID
}
cmdRes := o.getCircuitBreaker(chainID).Execute(cmd)
if cmdRes.Error() != nil {
log.Error("FetchAllAssetsByOwner failed for", "chainID", chainID, "err", cmdRes.Error())
return nil, cmdRes.Error()
}
assetContainer := cmdRes.Result()[0].(*thirdparty.FullCollectibleDataContainer)
_, err := o.processFullCollectibleData(ctx, assetContainer.Items, true)
if err != nil {
return nil, err
}
return assetContainer, nil
}
if anyProviderAvailable {
return nil, ErrAllProvidersFailedForChainID
}
return nil, ErrNoProvidersAvailableForChainID
}
func (o *Manager) FetchERC1155Balances(ctx context.Context, owner common.Address, chainID walletCommon.ChainID, contractAddress common.Address, tokenIDs []*bigint.BigInt) ([]*bigint.BigInt, error) {
if len(tokenIDs) == 0 {
return nil, nil
@ -420,50 +407,81 @@ func (o *Manager) FetchCollectibleOwnershipByOwner(ctx context.Context, chainID
// If asyncFetch is true, empty metadata will be returned for any missing collectibles and an EventCollectiblesDataUpdated will be sent when the data is ready.
// If asyncFetch is false, it will wait for all collectibles' metadata to be retrieved before returning.
func (o *Manager) FetchAssetsByCollectibleUniqueID(ctx context.Context, uniqueIDs []thirdparty.CollectibleUniqueID, asyncFetch bool) ([]thirdparty.FullCollectibleData, error) {
missingIDs, err := o.collectiblesDataDB.GetIDsNotInDB(uniqueIDs)
err := o.FetchMissingAssetsByCollectibleUniqueID(ctx, uniqueIDs, asyncFetch)
if err != nil {
return nil, err
}
return o.getCacheFullCollectibleData(uniqueIDs)
}
func (o *Manager) FetchMissingAssetsByCollectibleUniqueID(ctx context.Context, uniqueIDs []thirdparty.CollectibleUniqueID, asyncFetch bool) error {
missingIDs, err := o.collectiblesDataDB.GetIDsNotInDB(uniqueIDs)
if err != nil {
return err
}
missingIDsPerChainID := thirdparty.GroupCollectibleUIDsByChainID(missingIDs)
group := async.NewGroup(ctx)
group.Add(func(ctx context.Context) error {
// Atomic group stores the error from the first failed command and stops other commands on error
group := async.NewAtomicGroup(ctx)
for chainID, idsToFetch := range missingIDsPerChainID {
group.Add(func(ctx context.Context) error {
defer o.checkConnectionStatus(chainID)
fetchedAssets, err := o.fetchMissingAssetsForChainByCollectibleUniqueID(ctx, chainID, idsToFetch)
if err != nil {
log.Error("FetchMissingAssetsByCollectibleUniqueID failed for", "chainID", chainID, "ids", idsToFetch, "err", err)
return err
}
updatedCollectibles, err := o.processFullCollectibleData(ctx, fetchedAssets, asyncFetch)
if err != nil {
log.Error("processFullCollectibleData failed for", "chainID", chainID, "len(fetchedAssets)", len(fetchedAssets), "err", err)
return err
}
o.signalUpdatedCollectiblesData(updatedCollectibles)
return nil
})
}
if asyncFetch {
group.Wait()
return group.Error()
}
return nil
}
func (o *Manager) fetchMissingAssetsForChainByCollectibleUniqueID(ctx context.Context, chainID walletCommon.ChainID, idsToFetch []thirdparty.CollectibleUniqueID) ([]thirdparty.FullCollectibleData, error) {
cmd := circuitbreaker.Command{}
for _, provider := range o.collectibleDataProviders {
if !provider.IsChainSupported(chainID) {
continue
}
provider := provider
cmd.Add(circuitbreaker.NewFunctor(func() ([]any, error) {
fetchedAssets, err := provider.FetchAssetsByCollectibleUniqueID(ctx, idsToFetch)
if err != nil {
log.Error("FetchAssetsByCollectibleUniqueID failed for", "provider", provider.ID(), "chainID", chainID, "err", err)
continue
log.Error("fetchMissingAssetsForChainByCollectibleUniqueID failed for", "provider", provider.ID(), "chainID", chainID, "err", err)
}
updatedCollectibles, err := o.processFullCollectibleData(ctx, fetchedAssets, asyncFetch)
if err != nil {
log.Error("processFullCollectibleData failed for", "provider", provider.ID(), "chainID", chainID, "len(fetchedAssets)", len(fetchedAssets), "err", err)
return err
return []any{fetchedAssets}, err
}))
}
if asyncFetch {
o.signalUpdatedCollectiblesData(updatedCollectibles)
}
break
}
if cmd.IsEmpty() {
return nil, ErrNoProvidersAvailableForChainID // lets not stop the group if no providers are available for the chain
}
return nil
})
if !asyncFetch {
group.Wait()
cmdRes := o.getCircuitBreaker(chainID).Execute(cmd)
if cmdRes.Error() != nil {
log.Error("fetchMissingAssetsForChainByCollectibleUniqueID failed for", "chainID", chainID, "err", cmdRes.Error())
return nil, cmdRes.Error()
}
return o.getCacheFullCollectibleData(uniqueIDs)
return cmdRes.Result()[0].([]thirdparty.FullCollectibleData), cmdRes.Error()
}
func (o *Manager) FetchCollectionsDataByContractID(ctx context.Context, ids []thirdparty.ContractID) ([]thirdparty.CollectionData, error) {
@ -474,27 +492,49 @@ func (o *Manager) FetchCollectionsDataByContractID(ctx context.Context, ids []th
missingIDsPerChainID := thirdparty.GroupContractIDsByChainID(missingIDs)
// Atomic group stores the error from the first failed command and stops other commands on error
group := async.NewAtomicGroup(ctx)
for chainID, idsToFetch := range missingIDsPerChainID {
group.Add(func(ctx context.Context) error {
defer o.checkConnectionStatus(chainID)
cmd := circuitbreaker.Command{}
for _, provider := range o.collectionDataProviders {
if !provider.IsChainSupported(chainID) {
continue
}
provider := provider
cmd.Add(circuitbreaker.NewFunctor(func() ([]any, error) {
fetchedCollections, err := provider.FetchCollectionsDataByContractID(ctx, idsToFetch)
if err != nil {
log.Error("FetchCollectionsDataByContractID failed for", "provider", provider.ID(), "chainID", chainID, "err", err)
continue
return []any{fetchedCollections}, err
}))
}
if cmd.IsEmpty() {
return nil
}
cmdRes := o.getCircuitBreaker(chainID).Execute(cmd)
if cmdRes.Error() != nil {
log.Error("FetchCollectionsDataByContractID failed for", "chainID", chainID, "err", cmdRes.Error())
return cmdRes.Error()
}
fetchedCollections := cmdRes.Result()[0].([]thirdparty.CollectionData)
err = o.processCollectionData(ctx, fetchedCollections)
if err != nil {
return nil, err
return err
}
break
return err
})
}
group.Wait()
if group.Error() != nil {
return nil, group.Error()
}
data, err := o.collectionsDataDB.GetData(ids)
@ -509,55 +549,35 @@ func (o *Manager) GetCollectibleOwnership(id thirdparty.CollectibleUniqueID) ([]
return o.ownershipDB.GetOwnership(id)
}
func (o *Manager) getContractOwnershipProviders(chainID walletCommon.ChainID) (mainProvider thirdparty.CollectibleContractOwnershipProvider, fallbackProvider thirdparty.CollectibleContractOwnershipProvider) {
mainProvider = nil
fallbackProvider = nil
func (o *Manager) FetchCollectibleOwnersByContractAddress(ctx context.Context, chainID walletCommon.ChainID, contractAddress common.Address) (*thirdparty.CollectibleContractOwnership, error) {
defer o.checkConnectionStatus(chainID)
cmd := circuitbreaker.Command{}
for _, provider := range o.contractOwnershipProviders {
if provider.IsChainSupported(chainID) {
if mainProvider == nil {
// First provider found
mainProvider = provider
if !provider.IsChainSupported(chainID) {
continue
}
// Second provider found
fallbackProvider = provider
break
}
}
return
}
func getCollectibleOwnersByContractAddressFunc(ctx context.Context, chainID walletCommon.ChainID, contractAddress common.Address, provider thirdparty.CollectibleContractOwnershipProvider) func() (any, error) {
if provider == nil {
return nil
}
return func() (any, error) {
provider := provider
cmd.Add(circuitbreaker.NewFunctor(func() ([]any, error) {
res, err := provider.FetchCollectibleOwnersByContractAddress(ctx, chainID, contractAddress)
if err != nil {
log.Error("FetchCollectibleOwnersByContractAddress failed for", "provider", provider.ID(), "chainID", chainID, "err", err)
}
return res, err
return []any{res}, err
}))
}
}
func (o *Manager) FetchCollectibleOwnersByContractAddress(ctx context.Context, chainID walletCommon.ChainID, contractAddress common.Address) (*thirdparty.CollectibleContractOwnership, error) {
defer o.checkConnectionStatus(chainID)
mainProvider, fallbackProvider := o.getContractOwnershipProviders(chainID)
if mainProvider == nil {
if cmd.IsEmpty() {
return nil, ErrNoProvidersAvailableForChainID
}
mainFn := getCollectibleOwnersByContractAddressFunc(ctx, chainID, contractAddress, mainProvider)
fallbackFn := getCollectibleOwnersByContractAddressFunc(ctx, chainID, contractAddress, fallbackProvider)
owners, err := makeContractOwnershipCall(mainFn, fallbackFn)
if err != nil {
return nil, err
cmdRes := o.getCircuitBreaker(chainID).Execute(cmd)
if cmdRes.Error() != nil {
log.Error("FetchCollectibleOwnersByContractAddress failed for", "chainID", chainID, "err", cmdRes.Error())
return nil, cmdRes.Error()
}
return owners.(*thirdparty.CollectibleContractOwnership), nil
return cmdRes.Result()[0].(*thirdparty.CollectibleContractOwnership), cmdRes.Error()
}
func (o *Manager) fetchTokenURI(ctx context.Context, id thirdparty.CollectibleUniqueID) (string, error) {
@ -960,3 +980,19 @@ func (o *Manager) signalUpdatedCollectiblesData(ids []thirdparty.CollectibleUniq
o.feed.Send(event)
}
}
func (o *Manager) getCircuitBreaker(chainID walletCommon.ChainID) *circuitbreaker.CircuitBreaker {
cb, ok := o.circuitBreakers.Load(chainID.String())
if !ok {
cb = circuitbreaker.NewCircuitBreaker(circuitbreaker.Config{
CommandName: chainID.String(),
Timeout: 10000,
MaxConcurrentRequests: 100,
RequestVolumeThreshold: 25,
SleepWindow: 300000,
ErrorPercentThreshold: 25,
})
o.circuitBreakers.Store(chainID.String(), cb)
}
return cb.(*circuitbreaker.CircuitBreaker)
}

View File

@ -33,8 +33,6 @@ func getBaseURL(chainID walletCommon.ChainID) (string, error) {
return "https://eth-sepolia.g.alchemy.com", nil
case walletCommon.OptimismMainnet:
return "https://opt-mainnet.g.alchemy.com", nil
case walletCommon.OptimismGoerli:
return "https://opt-goerli.g.alchemy.com", nil
case walletCommon.OptimismSepolia:
return "https://opt-sepolia.g.alchemy.com", nil
case walletCommon.ArbitrumMainnet:

View File

@ -42,7 +42,7 @@ func getBaseURL(chainID walletCommon.ChainID) (string, error) {
switch uint64(chainID) {
case walletCommon.EthereumMainnet, walletCommon.ArbitrumMainnet:
return "https://api.rarible.org", nil
case walletCommon.EthereumGoerli, walletCommon.ArbitrumSepolia:
case walletCommon.ArbitrumSepolia:
return "https://testnet-api.rarible.org", nil
}