From 5ba5611a8d206cda841e76a4852d1840e56696a4 Mon Sep 17 00:00:00 2001 From: Dario Gabriel Lipicar Date: Tue, 18 Jul 2023 12:02:56 -0300 Subject: [PATCH] feat: implement collectibles service --- services/wallet/api.go | 15 ++ services/wallet/collectibles/collectibles.go | 80 ++++++++ services/wallet/collectibles/commands.go | 101 ++++++++++ services/wallet/collectibles/service.go | 193 +++++++++++++++++++ services/wallet/service.go | 5 + 5 files changed, 394 insertions(+) create mode 100644 services/wallet/collectibles/commands.go create mode 100644 services/wallet/collectibles/service.go diff --git a/services/wallet/api.go b/services/wallet/api.go index c9485fe0a..ac13a5038 100644 --- a/services/wallet/api.go +++ b/services/wallet/api.go @@ -305,9 +305,24 @@ func (api *API) GetCryptoOnRamps(ctx context.Context) ([]CryptoOnRamp, error) { func (api *API) FetchBalancesByOwnerAndContractAddress(chainID wcommon.ChainID, ownerAddress common.Address, contractAddresses []common.Address) (thirdparty.TokenBalancesPerContractAddress, error) { log.Debug("call to FetchBalancesByOwnerAndContractAddress") + return api.s.collectiblesManager.FetchBalancesByOwnerAndContractAddress(chainID, ownerAddress, contractAddresses) } +func (api *API) FilterOwnedCollectiblesAsync(ctx context.Context, chainIDs []wcommon.ChainID, addresses []common.Address, offset int, limit int) error { + log.Debug("wallet.api.FilterOwnedCollectiblesAsync", "chainIDs.count", len(chainIDs), "addr.count", len(addresses), "offset", offset, "limit", limit) + + api.s.collectibles.FilterOwnedCollectiblesAsync(ctx, chainIDs, addresses, offset, limit) + return nil +} + +func (api *API) GetCollectiblesDataAsync(ctx context.Context, uniqueIDs []thirdparty.CollectibleUniqueID) error { + log.Debug("wallet.api.GetCollectiblesDetailsAsync") + + api.s.collectibles.GetCollectiblesDataAsync(ctx, uniqueIDs) + return nil +} + // Old Collectibles API - To be deprecated func (api *API) GetOpenseaCollectionsByOwner(ctx context.Context, chainID wcommon.ChainID, owner common.Address) ([]opensea.OwnedCollection, error) { log.Debug("call to GetOpenseaCollectionsByOwner") diff --git a/services/wallet/collectibles/collectibles.go b/services/wallet/collectibles/collectibles.go index 98655787a..b50fe47a4 100644 --- a/services/wallet/collectibles/collectibles.go +++ b/services/wallet/collectibles/collectibles.go @@ -39,6 +39,8 @@ type Manager struct { opensea *opensea.Client nftCache map[walletCommon.ChainID]map[string]thirdparty.CollectibleData nftCacheLock sync.RWMutex + ownershipCache map[walletCommon.ChainID]map[common.Address][]thirdparty.CollectibleUniqueID + ownershipCacheLock sync.RWMutex } func NewManager(rpcClient *rpc.Client, mainContractOwnershipProvider thirdparty.CollectibleContractOwnershipProvider, fallbackContractOwnershipProvider thirdparty.CollectibleContractOwnershipProvider, opensea *opensea.Client) *Manager { @@ -54,6 +56,8 @@ func NewManager(rpcClient *rpc.Client, mainContractOwnershipProvider thirdparty. mainContractOwnershipProvider: mainContractOwnershipProvider, fallbackContractOwnershipProvider: fallbackContractOwnershipProvider, opensea: opensea, + nftCache: make(map[walletCommon.ChainID]map[string]thirdparty.CollectibleData), + ownershipCache: make(map[walletCommon.ChainID]map[common.Address][]thirdparty.CollectibleUniqueID), } } @@ -183,6 +187,59 @@ func (o *Manager) FetchAllAssetsByOwner(chainID walletCommon.ChainID, owner comm return assetContainer, nil } +func (o *Manager) UpdateOwnedCollectibles(chainID walletCommon.ChainID, owner common.Address) error { + assetContainer, err := o.FetchAllAssetsByOwner(chainID, owner, "", 0) + if err != nil { + return err + } + + err = o.processOwnedAssets(chainID, owner, assetContainer.Collectibles) + if err != nil { + return err + } + + err = o.processAssets(assetContainer.Collectibles) + if err != nil { + return err + } + + return nil +} + +func (o *Manager) GetOwnedCollectibles(chainIDs []walletCommon.ChainID, owners []common.Address, offset int, limit int) ([]thirdparty.CollectibleUniqueID, bool, error) { + o.ownershipCacheLock.RLock() + defer o.ownershipCacheLock.RUnlock() + + ids := make([]thirdparty.CollectibleUniqueID, 0) + + for _, chainID := range chainIDs { + if _, ok := o.ownershipCache[chainID]; !ok { + continue + } + for _, owner := range owners { + ids = append(ids, o.ownershipCache[chainID][owner]...) + } + } + + // For compatibility with SQL OFFSET, skip first 'offset' elems + lowIdx := offset + 1 + + if len(ids) <= lowIdx { + return nil, false, nil + } + + highIdx := offset + limit + if len(ids) < highIdx { + highIdx = len(ids) + } + + hasMore := len(ids) > highIdx + + ret := ids[lowIdx:highIdx] + + return ret, hasMore, nil +} + func (o *Manager) FetchAssetsByCollectibleUniqueID(uniqueIDs []thirdparty.CollectibleUniqueID) ([]thirdparty.CollectibleData, error) { idsToFetch := o.getIDsNotInCollectiblesDataCache(uniqueIDs) if len(idsToFetch) > 0 { @@ -257,6 +314,29 @@ func (o *Manager) fetchTokenURI(id thirdparty.CollectibleUniqueID) (string, erro return tokenURI, err } +func (o *Manager) processOwnedAssets(chainID walletCommon.ChainID, address common.Address, assets []thirdparty.CollectibleData) error { + ownership := make([]thirdparty.CollectibleUniqueID, 0, len(assets)) + for _, asset := range assets { + ownership = append(ownership, asset.ID) + } + + o.setCacheOwnedCollectibles(chainID, address, ownership) + + return nil +} + +func (o *Manager) setCacheOwnedCollectibles(chainID walletCommon.ChainID, address common.Address, ownership []thirdparty.CollectibleUniqueID) { + o.ownershipCacheLock.Lock() + defer o.ownershipCacheLock.Unlock() + + if _, ok := o.ownershipCache[chainID]; !ok { + o.ownershipCache[chainID] = make(map[common.Address][]thirdparty.CollectibleUniqueID) + } + + // Ownership data should be fully replaced with newest list + o.ownershipCache[chainID][address] = ownership +} + func (o *Manager) processAssets(assets []thirdparty.CollectibleData) error { for idx, asset := range assets { id := asset.ID diff --git a/services/wallet/collectibles/commands.go b/services/wallet/collectibles/commands.go new file mode 100644 index 000000000..bce9180ff --- /dev/null +++ b/services/wallet/collectibles/commands.go @@ -0,0 +1,101 @@ +package collectibles + +import ( + "context" + "database/sql" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/log" + statustypes "github.com/status-im/status-go/eth-node/types" + "github.com/status-im/status-go/multiaccounts/accounts" + "github.com/status-im/status-go/rpc/network" + "github.com/status-im/status-go/services/wallet/async" + walletCommon "github.com/status-im/status-go/services/wallet/common" + "github.com/status-im/status-go/services/wallet/walletevent" +) + +const ( + accountOwnershipUpdateInterval = 30 * time.Minute +) + +type refreshOwnedCollectiblesCommand struct { + manager *Manager + db *sql.DB + eventFeed *event.Feed + networkManager *network.Manager +} + +func newRefreshOwnedCollectiblesCommand(manager *Manager, db *sql.DB, eventFeed *event.Feed, networkManager *network.Manager) *refreshOwnedCollectiblesCommand { + return &refreshOwnedCollectiblesCommand{ + manager: manager, + db: db, + eventFeed: eventFeed, + networkManager: networkManager, + } +} + +func (c *refreshOwnedCollectiblesCommand) Command() async.Command { + return async.InfiniteCommand{ + Interval: accountOwnershipUpdateInterval, + Runable: c.Run, + }.Run +} + +func (c *refreshOwnedCollectiblesCommand) Run(ctx context.Context) (err error) { + err = c.updateOwnershipForAllAccounts(ctx) + if ctx.Err() != nil { + c.triggerEvent(EventCollectiblesOwnershipUpdateFinished, statustypes.Address{}, "Service cancelled") + return ctx.Err() + } + if err != nil { + c.triggerEvent(EventCollectiblesOwnershipUpdateFinishedWithError, statustypes.Address{}, err.Error()) + } + return err +} + +func (c *refreshOwnedCollectiblesCommand) triggerEvent(eventType walletevent.EventType, account statustypes.Address, message string) { + c.eventFeed.Send(walletevent.Event{ + Type: eventType, + Accounts: []common.Address{ + common.Address(account), + }, + Message: message, + }) +} + +func (c *refreshOwnedCollectiblesCommand) updateOwnershipForAllAccounts(ctx context.Context) error { + accountsDB, err := accounts.NewDB(c.db) + if err != nil { + return err + } + + addresses, err := accountsDB.GetWalletAddresses() + if err != nil { + return err + } + + for _, address := range addresses { + _ = c.updateOwnershipForAccount(ctx, address) + } + return nil +} + +func (c *refreshOwnedCollectiblesCommand) updateOwnershipForAccount(ctx context.Context, address statustypes.Address) error { + networks, err := c.networkManager.Get(false) + if err != nil { + return err + } + + c.triggerEvent(EventCollectiblesOwnershipUpdateStarted, address, "") + for _, network := range networks { + err := c.manager.UpdateOwnedCollectibles(walletCommon.ChainID(network.ChainID), common.Address(address)) + if err != nil { + log.Warn("Error updating collectibles ownership", "chainID", network.ChainID, "address", address.String(), "err", err) + } + } + c.triggerEvent(EventCollectiblesOwnershipUpdateFinished, address, "") + + return nil +} diff --git a/services/wallet/collectibles/service.go b/services/wallet/collectibles/service.go new file mode 100644 index 000000000..cf27823e9 --- /dev/null +++ b/services/wallet/collectibles/service.go @@ -0,0 +1,193 @@ +package collectibles + +import ( + "context" + "database/sql" + "encoding/json" + "errors" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/log" + + "github.com/status-im/status-go/rpc/network" + + "github.com/status-im/status-go/services/wallet/async" + walletCommon "github.com/status-im/status-go/services/wallet/common" + "github.com/status-im/status-go/services/wallet/thirdparty" + "github.com/status-im/status-go/services/wallet/walletevent" +) + +// These events are used to notify the UI of state changes +const ( + EventCollectiblesOwnershipUpdateStarted walletevent.EventType = "wallet-collectibles-ownership-update-started" + EventCollectiblesOwnershipUpdateFinished walletevent.EventType = "wallet-collectibles-ownership-update-finished" + EventCollectiblesOwnershipUpdateFinishedWithError walletevent.EventType = "wallet-collectibles-ownership-update-finished-with-error" + + EventOwnedCollectiblesFilteringDone walletevent.EventType = "wallet-owned-collectibles-filtering-done" + EventGetCollectiblesDataDone walletevent.EventType = "wallet-get-collectibles-data-done" +) + +var ( + filterOwnedCollectiblesTask = async.TaskType{ + ID: 1, + Policy: async.ReplacementPolicyCancelOld, + } + getCollectiblesDataTask = async.TaskType{ + ID: 2, + Policy: async.ReplacementPolicyCancelOld, + } +) + +type Service struct { + manager *Manager + db *sql.DB + eventFeed *event.Feed + networkManager *network.Manager + cancelFn context.CancelFunc + + group *async.Group + scheduler *async.Scheduler +} + +func NewService(db *sql.DB, eventFeed *event.Feed, networkManager *network.Manager, manager *Manager) *Service { + return &Service{ + manager: manager, + db: db, + eventFeed: eventFeed, + networkManager: networkManager, + scheduler: async.NewScheduler(), + } +} + +type ErrorCode = int + +const ( + ErrorCodeSuccess ErrorCode = iota + 1 + ErrorCodeTaskCanceled + ErrorCodeFailed +) + +type FilterOwnedCollectiblesResponse struct { + Collectibles []thirdparty.CollectibleHeader `json:"collectibles"` + Offset int `json:"offset"` + // Used to indicate that there might be more collectibles that were not returned + // based on a simple heuristic + HasMore bool `json:"hasMore"` + ErrorCode ErrorCode `json:"errorCode"` +} + +type GetCollectiblesDataResponse struct { + Collectibles []thirdparty.CollectibleData `json:"collectibles"` + ErrorCode ErrorCode `json:"errorCode"` +} + +type filterOwnedCollectiblesTaskReturnType struct { + collectibles []thirdparty.CollectibleHeader + hasMore bool +} + +// FilterOwnedCollectiblesResponse allows only one filter task to run at a time +// and it cancels the current one if a new one is started +// All calls will trigger an EventOwnedCollectiblesFilteringDone event with the result of the filtering +func (s *Service) FilterOwnedCollectiblesAsync(ctx context.Context, chainIDs []walletCommon.ChainID, addresses []common.Address, offset int, limit int) { + s.scheduler.Enqueue(filterOwnedCollectiblesTask, func(ctx context.Context) (interface{}, error) { + collectibles, hasMore, err := s.manager.GetOwnedCollectibles(chainIDs, addresses, offset, limit) + if err != nil { + return nil, err + } + data, err := s.manager.FetchAssetsByCollectibleUniqueID(collectibles) + if err != nil { + return nil, err + } + + return filterOwnedCollectiblesTaskReturnType{ + collectibles: thirdparty.CollectiblesToHeaders(data), + hasMore: hasMore, + }, err + }, func(result interface{}, taskType async.TaskType, err error) { + res := FilterOwnedCollectiblesResponse{ + ErrorCode: ErrorCodeFailed, + } + + if errors.Is(err, context.Canceled) || errors.Is(err, async.ErrTaskOverwritten) { + res.ErrorCode = ErrorCodeTaskCanceled + } else if err == nil { + fnRet := result.(filterOwnedCollectiblesTaskReturnType) + res.Collectibles = fnRet.collectibles + res.Offset = offset + res.HasMore = fnRet.hasMore + res.ErrorCode = ErrorCodeSuccess + } + + s.sendResponseEvent(EventOwnedCollectiblesFilteringDone, res, err) + }) +} + +func (s *Service) GetCollectiblesDataAsync(ctx context.Context, uniqueIDs []thirdparty.CollectibleUniqueID) { + s.scheduler.Enqueue(getCollectiblesDataTask, func(ctx context.Context) (interface{}, error) { + collectibles, err := s.manager.FetchAssetsByCollectibleUniqueID(uniqueIDs) + return collectibles, err + }, func(result interface{}, taskType async.TaskType, err error) { + res := GetCollectiblesDataResponse{ + ErrorCode: ErrorCodeFailed, + } + + if errors.Is(err, context.Canceled) || errors.Is(err, async.ErrTaskOverwritten) { + res.ErrorCode = ErrorCodeTaskCanceled + } else if err == nil { + collectibles := result.([]thirdparty.CollectibleData) + res.Collectibles = collectibles + res.ErrorCode = ErrorCodeSuccess + } + + s.sendResponseEvent(EventGetCollectiblesDataDone, res, err) + }) +} +func (s *Service) Start() { + if s.group != nil { + return + } + ctx, cancel := context.WithCancel(context.Background()) + s.cancelFn = cancel + + s.group = async.NewGroup(ctx) + + command := newRefreshOwnedCollectiblesCommand( + s.manager, + s.db, + s.eventFeed, + s.networkManager, + ) + + s.group.Add(command.Command()) +} + +func (s *Service) Stop() { + if s.cancelFn != nil { + s.cancelFn() + s.cancelFn = nil + } + if s.group != nil { + s.group.Stop() + s.group.Wait() + s.group = nil + } + s.scheduler.Stop() +} + +func (s *Service) sendResponseEvent(eventType walletevent.EventType, payloadObj interface{}, resErr error) { + payload, err := json.Marshal(payloadObj) + if err != nil { + log.Error("Error marshaling response: %v; result error: %w", err, resErr) + } else { + err = resErr + } + + log.Debug("wallet.api.collectibles.Service RESPONSE", "eventType", eventType, "error", err, "payload.len", len(payload)) + + s.eventFeed.Send(walletevent.Event{ + Type: eventType, + Message: string(payload), + }) +} diff --git a/services/wallet/service.go b/services/wallet/service.go index e2484a817..cad3c11f1 100644 --- a/services/wallet/service.go +++ b/services/wallet/service.go @@ -109,6 +109,7 @@ func NewService( infuraClient := infura.NewClient(config.WalletConfig.InfuraAPIKey, config.WalletConfig.InfuraAPIKeySecret) openseaClient := opensea.NewClient(config.WalletConfig.OpenseaAPIKey, walletFeed) collectiblesManager := collectibles.NewManager(rpcClient, alchemyClient, infuraClient, openseaClient) + collectibles := collectibles.NewService(db, walletFeed, rpcClient.NetworkManager, collectiblesManager) return &Service{ db: db, accountsDB: accountsDB, @@ -120,6 +121,7 @@ func NewService( transferController: transferController, cryptoOnRampManager: cryptoOnRampManager, collectiblesManager: collectiblesManager, + collectibles: collectibles, feesManager: &FeeManager{rpcClient}, gethManager: gethManager, marketManager: marketManager, @@ -152,6 +154,7 @@ type Service struct { marketManager *market.Manager started bool collectiblesManager *collectibles.Manager + collectibles *collectibles.Service gethManager *account.GethManager transactor *transactions.Transactor ens *ens.Service @@ -173,6 +176,7 @@ func (s *Service) Start() error { err := s.signals.Start() s.history.Start() _ = s.pendingTxManager.Start() + s.collectibles.Start() s.started = true return err } @@ -197,6 +201,7 @@ func (s *Service) Stop() error { s.history.Stop() s.activity.Stop() s.pendingTxManager.Stop() + s.collectibles.Stop() s.started = false log.Info("wallet stopped") return nil