feat: parallel collectibles fetching per account and chain

This commit is contained in:
Dario Gabriel Lipicar 2023-07-26 14:48:14 -03:00 committed by dlipicar
parent 57424e076c
commit 10a42e639d
5 changed files with 181 additions and 87 deletions

View File

@ -2,7 +2,6 @@ package collectibles
import (
"context"
"database/sql"
"fmt"
"math/big"
"strings"
@ -43,10 +42,9 @@ type Manager struct {
opensea *opensea.Client
nftCache map[walletCommon.ChainID]map[string]thirdparty.CollectibleData
nftCacheLock sync.RWMutex
ownershipDB *OwnershipDB
}
func NewManager(rpcClient *rpc.Client, db *sql.DB, mainContractOwnershipProvider thirdparty.CollectibleContractOwnershipProvider, fallbackContractOwnershipProvider thirdparty.CollectibleContractOwnershipProvider, opensea *opensea.Client) *Manager {
func NewManager(rpcClient *rpc.Client, mainContractOwnershipProvider thirdparty.CollectibleContractOwnershipProvider, fallbackContractOwnershipProvider thirdparty.CollectibleContractOwnershipProvider, opensea *opensea.Client) *Manager {
hystrix.ConfigureCommand(hystrixContractOwnershipClientName, hystrix.CommandConfig{
Timeout: 10000,
MaxConcurrentRequests: 100,
@ -60,7 +58,6 @@ func NewManager(rpcClient *rpc.Client, db *sql.DB, mainContractOwnershipProvider
fallbackContractOwnershipProvider: fallbackContractOwnershipProvider,
opensea: opensea,
nftCache: make(map[walletCommon.ChainID]map[string]thirdparty.CollectibleData),
ownershipDB: NewOwnershipDB(db),
}
}
@ -190,38 +187,15 @@ func (o *Manager) FetchAllAssetsByOwner(chainID walletCommon.ChainID, owner comm
return assetContainer, nil
}
func (o *Manager) UpdateOwnedCollectibles(chainID walletCommon.ChainID, owner common.Address) error {
assetContainer, err := o.FetchAllAssetsByOwner(chainID, owner, FetchFromStartCursor, FetchNoLimit)
func (o *Manager) FetchCollectibleOwnershipByOwner(chainID walletCommon.ChainID, owner common.Address, cursor string, limit int) (*thirdparty.CollectibleOwnershipContainer, error) {
assetContainer, err := o.FetchAllAssetsByOwner(chainID, owner, cursor, limit)
if err != nil {
return err
return nil, err
}
err = o.processOwnedAssets(chainID, owner, assetContainer.Collectibles)
if err != nil {
return err
}
ret := assetContainer.ToOwnershipContainer()
err = o.processAssets(assetContainer.Collectibles)
if err != nil {
return err
}
return nil
}
func (o *Manager) GetOwnedCollectibles(chainIDs []walletCommon.ChainID, owners []common.Address, offset int, limit int) ([]thirdparty.CollectibleUniqueID, bool, error) {
// Request one more than limit, to check if DB has more available
ids, err := o.ownershipDB.GetOwnedCollectibles(chainIDs, owners, offset, limit+1)
if err != nil {
return nil, false, err
}
hasMore := len(ids) > limit
if hasMore {
ids = ids[:limit]
}
return ids, hasMore, nil
return &ret, nil
}
func (o *Manager) FetchAssetsByCollectibleUniqueID(uniqueIDs []thirdparty.CollectibleUniqueID) ([]thirdparty.CollectibleData, error) {
@ -298,19 +272,6 @@ func (o *Manager) fetchTokenURI(id thirdparty.CollectibleUniqueID) (string, erro
return tokenURI, err
}
func (o *Manager) processOwnedAssets(chainID walletCommon.ChainID, address common.Address, assets []thirdparty.CollectibleData) error {
ownership := make([]thirdparty.CollectibleUniqueID, 0, len(assets))
for _, asset := range assets {
ownership = append(ownership, asset.ID)
}
return o.setCacheOwnedCollectibles(chainID, address, ownership)
}
func (o *Manager) setCacheOwnedCollectibles(chainID walletCommon.ChainID, address common.Address, ownership []thirdparty.CollectibleUniqueID) error {
return o.ownershipDB.Update(chainID, address, ownership)
}
func (o *Manager) processAssets(assets []thirdparty.CollectibleData) error {
for idx, asset := range assets {
id := asset.ID

View File

@ -2,33 +2,38 @@ package collectibles
import (
"context"
"errors"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
statustypes "github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/multiaccounts/accounts"
"github.com/status-im/status-go/rpc/network"
"github.com/status-im/status-go/services/wallet/async"
walletCommon "github.com/status-im/status-go/services/wallet/common"
"github.com/status-im/status-go/services/wallet/thirdparty"
"github.com/status-im/status-go/services/wallet/walletevent"
)
const (
fetchLimit = 50 // Limit number of collectibles we fetch per provider call
accountOwnershipUpdateInterval = 30 * time.Minute
)
// Fetches owned collectibles for all chainIDs and wallet addresses
type refreshOwnedCollectiblesCommand struct {
manager *Manager
ownershipDB *OwnershipDB
accountsDB *accounts.Database
walletFeed *event.Feed
networkManager *network.Manager
}
func newRefreshOwnedCollectiblesCommand(manager *Manager, accountsDB *accounts.Database, walletFeed *event.Feed, networkManager *network.Manager) *refreshOwnedCollectiblesCommand {
func newRefreshOwnedCollectiblesCommand(manager *Manager, ownershipDB *OwnershipDB, accountsDB *accounts.Database, walletFeed *event.Feed, networkManager *network.Manager) *refreshOwnedCollectiblesCommand {
return &refreshOwnedCollectiblesCommand{
manager: manager,
ownershipDB: ownershipDB,
accountsDB: accountsDB,
walletFeed: walletFeed,
networkManager: networkManager,
@ -43,53 +48,145 @@ func (c *refreshOwnedCollectiblesCommand) Command() async.Command {
}
func (c *refreshOwnedCollectiblesCommand) Run(ctx context.Context) (err error) {
err = c.updateOwnershipForAllAccounts(ctx)
if ctx.Err() != nil {
c.triggerEvent(EventCollectiblesOwnershipUpdateFinished, statustypes.Address{}, "Service cancelled")
return ctx.Err()
}
if err != nil {
c.triggerEvent(EventCollectiblesOwnershipUpdateFinishedWithError, statustypes.Address{}, err.Error())
}
return err
}
func (c *refreshOwnedCollectiblesCommand) triggerEvent(eventType walletevent.EventType, account statustypes.Address, message string) {
c.walletFeed.Send(walletevent.Event{
Type: eventType,
Accounts: []common.Address{
common.Address(account),
},
Message: message,
})
return c.updateOwnershipForAllAccounts(ctx)
}
func (c *refreshOwnedCollectiblesCommand) updateOwnershipForAllAccounts(ctx context.Context) error {
addresses, err := c.accountsDB.GetWalletAddresses()
if err != nil {
return err
}
for _, address := range addresses {
_ = c.updateOwnershipForAccount(ctx, address)
}
return nil
}
func (c *refreshOwnedCollectiblesCommand) updateOwnershipForAccount(ctx context.Context, address statustypes.Address) error {
networks, err := c.networkManager.Get(false)
if err != nil {
return err
}
c.triggerEvent(EventCollectiblesOwnershipUpdateStarted, address, "")
addresses, err := c.accountsDB.GetWalletAddresses()
if err != nil {
return err
}
areTestNetworksEnabled, err := c.accountsDB.GetTestNetworksEnabled()
if err != nil {
return err
}
start := time.Now()
group := async.NewGroup(ctx)
log.Debug("refreshOwnedCollectiblesCommand started")
for _, network := range networks {
err := c.manager.UpdateOwnedCollectibles(walletCommon.ChainID(network.ChainID), common.Address(address))
if err != nil {
log.Warn("Error updating collectibles ownership", "chainID", network.ChainID, "address", address.String(), "err", err)
if network.IsTest != areTestNetworksEnabled {
continue
}
for _, address := range addresses {
command := newLoadOwnedCollectiblesCommand(c.manager, c.ownershipDB, c.walletFeed, walletCommon.ChainID(network.ChainID), common.Address(address))
group.Add(command.Command())
}
}
c.triggerEvent(EventCollectiblesOwnershipUpdateFinished, address, "")
select {
case <-ctx.Done():
return ctx.Err()
case <-group.WaitAsync():
}
log.Debug("refreshOwnedCollectiblesCommand finished", "in", time.Since(start))
return nil
}
// Fetches owned collectibles for a ChainID+OwnerAddress combination in chunks
// and updates the ownershipDB when all chunks are loaded
type loadOwnedCollectiblesCommand struct {
chainID walletCommon.ChainID
account common.Address
manager *Manager
ownershipDB *OwnershipDB
walletFeed *event.Feed
// Not to be set by the caller
partialOwnership []thirdparty.CollectibleUniqueID
err error
}
func newLoadOwnedCollectiblesCommand(manager *Manager, ownershipDB *OwnershipDB, walletFeed *event.Feed, chainID walletCommon.ChainID, account common.Address) *loadOwnedCollectiblesCommand {
return &loadOwnedCollectiblesCommand{
manager: manager,
ownershipDB: ownershipDB,
walletFeed: walletFeed,
chainID: chainID,
account: account,
}
}
func (c *loadOwnedCollectiblesCommand) Command() async.Command {
return c.Run
}
func (c *loadOwnedCollectiblesCommand) triggerEvent(eventType walletevent.EventType, chainID walletCommon.ChainID, account common.Address, message string) {
c.walletFeed.Send(walletevent.Event{
Type: eventType,
ChainID: uint64(chainID),
Accounts: []common.Address{
account,
},
Message: message,
})
}
func (c *loadOwnedCollectiblesCommand) Run(parent context.Context) (err error) {
log.Debug("start loadOwnedCollectiblesCommand", "chain", c.chainID, "account", c.account)
pageNr := 0
cursor := FetchFromStartCursor
c.triggerEvent(EventCollectiblesOwnershipUpdateStarted, c.chainID, c.account, "")
// Fetch collectibles in chunks
for {
if shouldCancel(parent) {
c.err = errors.New("context cancelled")
break
}
partialOwnership, err := c.manager.FetchCollectibleOwnershipByOwner(c.chainID, c.account, cursor, fetchLimit)
if err != nil {
log.Error("failed loadOwnedCollectiblesCommand", "chain", c.chainID, "account", c.account, "page", pageNr, "error", err)
c.err = err
break
}
log.Debug("partial loadOwnedCollectiblesCommand", "chain", c.chainID, "account", c.account, "page", pageNr, "found", len(partialOwnership.Collectibles), "collectibles")
c.partialOwnership = append(c.partialOwnership, partialOwnership.Collectibles...)
pageNr++
cursor = partialOwnership.NextCursor
if cursor == FetchFromStartCursor {
err = c.ownershipDB.Update(c.chainID, c.account, c.partialOwnership)
if err != nil {
log.Error("failed updating ownershipDB in loadOwnedCollectiblesCommand", "chain", c.chainID, "account", c.account, "error", err)
c.err = err
}
break
}
}
if c.err != nil {
c.triggerEvent(EventCollectiblesOwnershipUpdateFinishedWithError, c.chainID, c.account, c.err.Error())
} else {
c.triggerEvent(EventCollectiblesOwnershipUpdateFinished, c.chainID, c.account, "")
}
log.Debug("end loadOwnedCollectiblesCommand", "chain", c.chainID, "account", c.account)
return nil
}
// shouldCancel returns true if the context has been cancelled and task should be aborted
func shouldCancel(ctx context.Context) bool {
select {
case <-ctx.Done():
return true
default:
}
return false
}

View File

@ -44,7 +44,7 @@ var (
type Service struct {
manager *Manager
db *sql.DB
ownershipDB *OwnershipDB
walletFeed *event.Feed
accountsDB *accounts.Database
accountsFeed *event.Feed
@ -60,7 +60,7 @@ type Service struct {
func NewService(db *sql.DB, walletFeed *event.Feed, accountsDB *accounts.Database, accountsFeed *event.Feed, networkManager *network.Manager, manager *Manager) *Service {
return &Service{
manager: manager,
db: db,
ownershipDB: NewOwnershipDB(db),
walletFeed: walletFeed,
accountsDB: accountsDB,
accountsFeed: accountsFeed,
@ -101,7 +101,7 @@ type filterOwnedCollectiblesTaskReturnType struct {
// All calls will trigger an EventOwnedCollectiblesFilteringDone event with the result of the filtering
func (s *Service) FilterOwnedCollectiblesAsync(ctx context.Context, chainIDs []walletCommon.ChainID, addresses []common.Address, offset int, limit int) {
s.scheduler.Enqueue(filterOwnedCollectiblesTask, func(ctx context.Context) (interface{}, error) {
collectibles, hasMore, err := s.manager.GetOwnedCollectibles(chainIDs, addresses, offset, limit)
collectibles, hasMore, err := s.GetOwnedCollectibles(chainIDs, addresses, offset, limit)
if err != nil {
return nil, err
}
@ -165,6 +165,7 @@ func (s *Service) startPeriodicalOwnershipFetch() {
command := newRefreshOwnedCollectiblesCommand(
s.manager,
s.ownershipDB,
s.accountsDB,
s.walletFeed,
s.networkManager,
@ -242,3 +243,18 @@ func (s *Service) sendResponseEvent(eventType walletevent.EventType, payloadObj
Message: string(payload),
})
}
func (s *Service) GetOwnedCollectibles(chainIDs []walletCommon.ChainID, owners []common.Address, offset int, limit int) ([]thirdparty.CollectibleUniqueID, bool, error) {
// Request one more than limit, to check if DB has more available
ids, err := s.ownershipDB.GetOwnedCollectibles(chainIDs, owners, offset, limit+1)
if err != nil {
return nil, false, err
}
hasMore := len(ids) > limit
if hasMore {
ids = ids[:limit]
}
return ids, hasMore, nil
}

View File

@ -108,7 +108,7 @@ func NewService(
alchemyClient := alchemy.NewClient(config.WalletConfig.AlchemyAPIKeys)
infuraClient := infura.NewClient(config.WalletConfig.InfuraAPIKey, config.WalletConfig.InfuraAPIKeySecret)
openseaClient := opensea.NewClient(config.WalletConfig.OpenseaAPIKey, walletFeed)
collectiblesManager := collectibles.NewManager(rpcClient, db, alchemyClient, infuraClient, openseaClient)
collectiblesManager := collectibles.NewManager(rpcClient, alchemyClient, infuraClient, openseaClient)
collectibles := collectibles.NewService(db, walletFeed, accountsDB, accountFeed, rpcClient.NetworkManager, collectiblesManager)
return &Service{
db: db,

View File

@ -74,12 +74,32 @@ type CollectibleHeader struct {
CollectionName string `json:"collection_name"`
}
type CollectibleOwnershipContainer struct {
Collectibles []CollectibleUniqueID
NextCursor string
PreviousCursor string
}
type CollectibleDataContainer struct {
Collectibles []CollectibleData
NextCursor string
PreviousCursor string
}
func (c *CollectibleDataContainer) ToOwnershipContainer() CollectibleOwnershipContainer {
ret := CollectibleOwnershipContainer{
Collectibles: make([]CollectibleUniqueID, 0, len(c.Collectibles)),
NextCursor: c.NextCursor,
PreviousCursor: c.PreviousCursor,
}
for _, collectible := range c.Collectibles {
ret.Collectibles = append(ret.Collectibles, collectible.ID)
}
return ret
}
func (c *CollectibleData) toHeader() CollectibleHeader {
return CollectibleHeader{
ID: c.ID,