feat: implement collectibles service

This commit is contained in:
Dario Gabriel Lipicar 2023-07-18 12:02:56 -03:00 committed by dlipicar
parent b1cf54974e
commit 5ba5611a8d
5 changed files with 394 additions and 0 deletions

View File

@ -305,9 +305,24 @@ func (api *API) GetCryptoOnRamps(ctx context.Context) ([]CryptoOnRamp, error) {
func (api *API) FetchBalancesByOwnerAndContractAddress(chainID wcommon.ChainID, ownerAddress common.Address, contractAddresses []common.Address) (thirdparty.TokenBalancesPerContractAddress, error) {
log.Debug("call to FetchBalancesByOwnerAndContractAddress")
return api.s.collectiblesManager.FetchBalancesByOwnerAndContractAddress(chainID, ownerAddress, contractAddresses)
}
func (api *API) FilterOwnedCollectiblesAsync(ctx context.Context, chainIDs []wcommon.ChainID, addresses []common.Address, offset int, limit int) error {
log.Debug("wallet.api.FilterOwnedCollectiblesAsync", "chainIDs.count", len(chainIDs), "addr.count", len(addresses), "offset", offset, "limit", limit)
api.s.collectibles.FilterOwnedCollectiblesAsync(ctx, chainIDs, addresses, offset, limit)
return nil
}
func (api *API) GetCollectiblesDataAsync(ctx context.Context, uniqueIDs []thirdparty.CollectibleUniqueID) error {
log.Debug("wallet.api.GetCollectiblesDetailsAsync")
api.s.collectibles.GetCollectiblesDataAsync(ctx, uniqueIDs)
return nil
}
// Old Collectibles API - To be deprecated
func (api *API) GetOpenseaCollectionsByOwner(ctx context.Context, chainID wcommon.ChainID, owner common.Address) ([]opensea.OwnedCollection, error) {
log.Debug("call to GetOpenseaCollectionsByOwner")

View File

@ -39,6 +39,8 @@ type Manager struct {
opensea *opensea.Client
nftCache map[walletCommon.ChainID]map[string]thirdparty.CollectibleData
nftCacheLock sync.RWMutex
ownershipCache map[walletCommon.ChainID]map[common.Address][]thirdparty.CollectibleUniqueID
ownershipCacheLock sync.RWMutex
}
func NewManager(rpcClient *rpc.Client, mainContractOwnershipProvider thirdparty.CollectibleContractOwnershipProvider, fallbackContractOwnershipProvider thirdparty.CollectibleContractOwnershipProvider, opensea *opensea.Client) *Manager {
@ -54,6 +56,8 @@ func NewManager(rpcClient *rpc.Client, mainContractOwnershipProvider thirdparty.
mainContractOwnershipProvider: mainContractOwnershipProvider,
fallbackContractOwnershipProvider: fallbackContractOwnershipProvider,
opensea: opensea,
nftCache: make(map[walletCommon.ChainID]map[string]thirdparty.CollectibleData),
ownershipCache: make(map[walletCommon.ChainID]map[common.Address][]thirdparty.CollectibleUniqueID),
}
}
@ -183,6 +187,59 @@ func (o *Manager) FetchAllAssetsByOwner(chainID walletCommon.ChainID, owner comm
return assetContainer, nil
}
func (o *Manager) UpdateOwnedCollectibles(chainID walletCommon.ChainID, owner common.Address) error {
assetContainer, err := o.FetchAllAssetsByOwner(chainID, owner, "", 0)
if err != nil {
return err
}
err = o.processOwnedAssets(chainID, owner, assetContainer.Collectibles)
if err != nil {
return err
}
err = o.processAssets(assetContainer.Collectibles)
if err != nil {
return err
}
return nil
}
func (o *Manager) GetOwnedCollectibles(chainIDs []walletCommon.ChainID, owners []common.Address, offset int, limit int) ([]thirdparty.CollectibleUniqueID, bool, error) {
o.ownershipCacheLock.RLock()
defer o.ownershipCacheLock.RUnlock()
ids := make([]thirdparty.CollectibleUniqueID, 0)
for _, chainID := range chainIDs {
if _, ok := o.ownershipCache[chainID]; !ok {
continue
}
for _, owner := range owners {
ids = append(ids, o.ownershipCache[chainID][owner]...)
}
}
// For compatibility with SQL OFFSET, skip first 'offset' elems
lowIdx := offset + 1
if len(ids) <= lowIdx {
return nil, false, nil
}
highIdx := offset + limit
if len(ids) < highIdx {
highIdx = len(ids)
}
hasMore := len(ids) > highIdx
ret := ids[lowIdx:highIdx]
return ret, hasMore, nil
}
func (o *Manager) FetchAssetsByCollectibleUniqueID(uniqueIDs []thirdparty.CollectibleUniqueID) ([]thirdparty.CollectibleData, error) {
idsToFetch := o.getIDsNotInCollectiblesDataCache(uniqueIDs)
if len(idsToFetch) > 0 {
@ -257,6 +314,29 @@ func (o *Manager) fetchTokenURI(id thirdparty.CollectibleUniqueID) (string, erro
return tokenURI, err
}
func (o *Manager) processOwnedAssets(chainID walletCommon.ChainID, address common.Address, assets []thirdparty.CollectibleData) error {
ownership := make([]thirdparty.CollectibleUniqueID, 0, len(assets))
for _, asset := range assets {
ownership = append(ownership, asset.ID)
}
o.setCacheOwnedCollectibles(chainID, address, ownership)
return nil
}
func (o *Manager) setCacheOwnedCollectibles(chainID walletCommon.ChainID, address common.Address, ownership []thirdparty.CollectibleUniqueID) {
o.ownershipCacheLock.Lock()
defer o.ownershipCacheLock.Unlock()
if _, ok := o.ownershipCache[chainID]; !ok {
o.ownershipCache[chainID] = make(map[common.Address][]thirdparty.CollectibleUniqueID)
}
// Ownership data should be fully replaced with newest list
o.ownershipCache[chainID][address] = ownership
}
func (o *Manager) processAssets(assets []thirdparty.CollectibleData) error {
for idx, asset := range assets {
id := asset.ID

View File

@ -0,0 +1,101 @@
package collectibles
import (
"context"
"database/sql"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
statustypes "github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/multiaccounts/accounts"
"github.com/status-im/status-go/rpc/network"
"github.com/status-im/status-go/services/wallet/async"
walletCommon "github.com/status-im/status-go/services/wallet/common"
"github.com/status-im/status-go/services/wallet/walletevent"
)
const (
accountOwnershipUpdateInterval = 30 * time.Minute
)
type refreshOwnedCollectiblesCommand struct {
manager *Manager
db *sql.DB
eventFeed *event.Feed
networkManager *network.Manager
}
func newRefreshOwnedCollectiblesCommand(manager *Manager, db *sql.DB, eventFeed *event.Feed, networkManager *network.Manager) *refreshOwnedCollectiblesCommand {
return &refreshOwnedCollectiblesCommand{
manager: manager,
db: db,
eventFeed: eventFeed,
networkManager: networkManager,
}
}
func (c *refreshOwnedCollectiblesCommand) Command() async.Command {
return async.InfiniteCommand{
Interval: accountOwnershipUpdateInterval,
Runable: c.Run,
}.Run
}
func (c *refreshOwnedCollectiblesCommand) Run(ctx context.Context) (err error) {
err = c.updateOwnershipForAllAccounts(ctx)
if ctx.Err() != nil {
c.triggerEvent(EventCollectiblesOwnershipUpdateFinished, statustypes.Address{}, "Service cancelled")
return ctx.Err()
}
if err != nil {
c.triggerEvent(EventCollectiblesOwnershipUpdateFinishedWithError, statustypes.Address{}, err.Error())
}
return err
}
func (c *refreshOwnedCollectiblesCommand) triggerEvent(eventType walletevent.EventType, account statustypes.Address, message string) {
c.eventFeed.Send(walletevent.Event{
Type: eventType,
Accounts: []common.Address{
common.Address(account),
},
Message: message,
})
}
func (c *refreshOwnedCollectiblesCommand) updateOwnershipForAllAccounts(ctx context.Context) error {
accountsDB, err := accounts.NewDB(c.db)
if err != nil {
return err
}
addresses, err := accountsDB.GetWalletAddresses()
if err != nil {
return err
}
for _, address := range addresses {
_ = c.updateOwnershipForAccount(ctx, address)
}
return nil
}
func (c *refreshOwnedCollectiblesCommand) updateOwnershipForAccount(ctx context.Context, address statustypes.Address) error {
networks, err := c.networkManager.Get(false)
if err != nil {
return err
}
c.triggerEvent(EventCollectiblesOwnershipUpdateStarted, address, "")
for _, network := range networks {
err := c.manager.UpdateOwnedCollectibles(walletCommon.ChainID(network.ChainID), common.Address(address))
if err != nil {
log.Warn("Error updating collectibles ownership", "chainID", network.ChainID, "address", address.String(), "err", err)
}
}
c.triggerEvent(EventCollectiblesOwnershipUpdateFinished, address, "")
return nil
}

View File

@ -0,0 +1,193 @@
package collectibles
import (
"context"
"database/sql"
"encoding/json"
"errors"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/status-im/status-go/rpc/network"
"github.com/status-im/status-go/services/wallet/async"
walletCommon "github.com/status-im/status-go/services/wallet/common"
"github.com/status-im/status-go/services/wallet/thirdparty"
"github.com/status-im/status-go/services/wallet/walletevent"
)
// These events are used to notify the UI of state changes
const (
EventCollectiblesOwnershipUpdateStarted walletevent.EventType = "wallet-collectibles-ownership-update-started"
EventCollectiblesOwnershipUpdateFinished walletevent.EventType = "wallet-collectibles-ownership-update-finished"
EventCollectiblesOwnershipUpdateFinishedWithError walletevent.EventType = "wallet-collectibles-ownership-update-finished-with-error"
EventOwnedCollectiblesFilteringDone walletevent.EventType = "wallet-owned-collectibles-filtering-done"
EventGetCollectiblesDataDone walletevent.EventType = "wallet-get-collectibles-data-done"
)
var (
filterOwnedCollectiblesTask = async.TaskType{
ID: 1,
Policy: async.ReplacementPolicyCancelOld,
}
getCollectiblesDataTask = async.TaskType{
ID: 2,
Policy: async.ReplacementPolicyCancelOld,
}
)
type Service struct {
manager *Manager
db *sql.DB
eventFeed *event.Feed
networkManager *network.Manager
cancelFn context.CancelFunc
group *async.Group
scheduler *async.Scheduler
}
func NewService(db *sql.DB, eventFeed *event.Feed, networkManager *network.Manager, manager *Manager) *Service {
return &Service{
manager: manager,
db: db,
eventFeed: eventFeed,
networkManager: networkManager,
scheduler: async.NewScheduler(),
}
}
type ErrorCode = int
const (
ErrorCodeSuccess ErrorCode = iota + 1
ErrorCodeTaskCanceled
ErrorCodeFailed
)
type FilterOwnedCollectiblesResponse struct {
Collectibles []thirdparty.CollectibleHeader `json:"collectibles"`
Offset int `json:"offset"`
// Used to indicate that there might be more collectibles that were not returned
// based on a simple heuristic
HasMore bool `json:"hasMore"`
ErrorCode ErrorCode `json:"errorCode"`
}
type GetCollectiblesDataResponse struct {
Collectibles []thirdparty.CollectibleData `json:"collectibles"`
ErrorCode ErrorCode `json:"errorCode"`
}
type filterOwnedCollectiblesTaskReturnType struct {
collectibles []thirdparty.CollectibleHeader
hasMore bool
}
// FilterOwnedCollectiblesResponse allows only one filter task to run at a time
// and it cancels the current one if a new one is started
// All calls will trigger an EventOwnedCollectiblesFilteringDone event with the result of the filtering
func (s *Service) FilterOwnedCollectiblesAsync(ctx context.Context, chainIDs []walletCommon.ChainID, addresses []common.Address, offset int, limit int) {
s.scheduler.Enqueue(filterOwnedCollectiblesTask, func(ctx context.Context) (interface{}, error) {
collectibles, hasMore, err := s.manager.GetOwnedCollectibles(chainIDs, addresses, offset, limit)
if err != nil {
return nil, err
}
data, err := s.manager.FetchAssetsByCollectibleUniqueID(collectibles)
if err != nil {
return nil, err
}
return filterOwnedCollectiblesTaskReturnType{
collectibles: thirdparty.CollectiblesToHeaders(data),
hasMore: hasMore,
}, err
}, func(result interface{}, taskType async.TaskType, err error) {
res := FilterOwnedCollectiblesResponse{
ErrorCode: ErrorCodeFailed,
}
if errors.Is(err, context.Canceled) || errors.Is(err, async.ErrTaskOverwritten) {
res.ErrorCode = ErrorCodeTaskCanceled
} else if err == nil {
fnRet := result.(filterOwnedCollectiblesTaskReturnType)
res.Collectibles = fnRet.collectibles
res.Offset = offset
res.HasMore = fnRet.hasMore
res.ErrorCode = ErrorCodeSuccess
}
s.sendResponseEvent(EventOwnedCollectiblesFilteringDone, res, err)
})
}
func (s *Service) GetCollectiblesDataAsync(ctx context.Context, uniqueIDs []thirdparty.CollectibleUniqueID) {
s.scheduler.Enqueue(getCollectiblesDataTask, func(ctx context.Context) (interface{}, error) {
collectibles, err := s.manager.FetchAssetsByCollectibleUniqueID(uniqueIDs)
return collectibles, err
}, func(result interface{}, taskType async.TaskType, err error) {
res := GetCollectiblesDataResponse{
ErrorCode: ErrorCodeFailed,
}
if errors.Is(err, context.Canceled) || errors.Is(err, async.ErrTaskOverwritten) {
res.ErrorCode = ErrorCodeTaskCanceled
} else if err == nil {
collectibles := result.([]thirdparty.CollectibleData)
res.Collectibles = collectibles
res.ErrorCode = ErrorCodeSuccess
}
s.sendResponseEvent(EventGetCollectiblesDataDone, res, err)
})
}
func (s *Service) Start() {
if s.group != nil {
return
}
ctx, cancel := context.WithCancel(context.Background())
s.cancelFn = cancel
s.group = async.NewGroup(ctx)
command := newRefreshOwnedCollectiblesCommand(
s.manager,
s.db,
s.eventFeed,
s.networkManager,
)
s.group.Add(command.Command())
}
func (s *Service) Stop() {
if s.cancelFn != nil {
s.cancelFn()
s.cancelFn = nil
}
if s.group != nil {
s.group.Stop()
s.group.Wait()
s.group = nil
}
s.scheduler.Stop()
}
func (s *Service) sendResponseEvent(eventType walletevent.EventType, payloadObj interface{}, resErr error) {
payload, err := json.Marshal(payloadObj)
if err != nil {
log.Error("Error marshaling response: %v; result error: %w", err, resErr)
} else {
err = resErr
}
log.Debug("wallet.api.collectibles.Service RESPONSE", "eventType", eventType, "error", err, "payload.len", len(payload))
s.eventFeed.Send(walletevent.Event{
Type: eventType,
Message: string(payload),
})
}

View File

@ -109,6 +109,7 @@ func NewService(
infuraClient := infura.NewClient(config.WalletConfig.InfuraAPIKey, config.WalletConfig.InfuraAPIKeySecret)
openseaClient := opensea.NewClient(config.WalletConfig.OpenseaAPIKey, walletFeed)
collectiblesManager := collectibles.NewManager(rpcClient, alchemyClient, infuraClient, openseaClient)
collectibles := collectibles.NewService(db, walletFeed, rpcClient.NetworkManager, collectiblesManager)
return &Service{
db: db,
accountsDB: accountsDB,
@ -120,6 +121,7 @@ func NewService(
transferController: transferController,
cryptoOnRampManager: cryptoOnRampManager,
collectiblesManager: collectiblesManager,
collectibles: collectibles,
feesManager: &FeeManager{rpcClient},
gethManager: gethManager,
marketManager: marketManager,
@ -152,6 +154,7 @@ type Service struct {
marketManager *market.Manager
started bool
collectiblesManager *collectibles.Manager
collectibles *collectibles.Service
gethManager *account.GethManager
transactor *transactions.Transactor
ens *ens.Service
@ -173,6 +176,7 @@ func (s *Service) Start() error {
err := s.signals.Start()
s.history.Start()
_ = s.pendingTxManager.Start()
s.collectibles.Start()
s.started = true
return err
}
@ -197,6 +201,7 @@ func (s *Service) Stop() error {
s.history.Stop()
s.activity.Stop()
s.pendingTxManager.Stop()
s.collectibles.Stop()
s.started = false
log.Info("wallet stopped")
return nil