From e502ba82ceda0819a5482771e30a02a34824d23d Mon Sep 17 00:00:00 2001 From: Dario Gabriel Lipicar Date: Thu, 14 Sep 2023 11:44:35 -0300 Subject: [PATCH] feat: implement single account fetch on add and partial progress report --- services/wallet/collectibles/commands.go | 153 +++++++++++------------ services/wallet/collectibles/service.go | 115 ++++++++++++++--- 2 files changed, 176 insertions(+), 92 deletions(-) diff --git a/services/wallet/collectibles/commands.go b/services/wallet/collectibles/commands.go index c7c1fd39e..243927f01 100644 --- a/services/wallet/collectibles/commands.go +++ b/services/wallet/collectibles/commands.go @@ -8,8 +8,6 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" - "github.com/status-im/status-go/multiaccounts/accounts" - "github.com/status-im/status-go/rpc/network" "github.com/status-im/status-go/services/wallet/async" walletCommon "github.com/status-im/status-go/services/wallet/common" "github.com/status-im/status-go/services/wallet/thirdparty" @@ -21,75 +19,57 @@ const ( accountOwnershipUpdateInterval = 30 * time.Minute ) -// Fetches owned collectibles for all chainIDs and wallet addresses -type refreshOwnedCollectiblesCommand struct { - manager *Manager - ownershipDB *OwnershipDB - accountsDB *accounts.Database - walletFeed *event.Feed - networkManager *network.Manager +type periodicRefreshOwnedCollectiblesCommand struct { + chainID walletCommon.ChainID + account common.Address + manager *Manager + ownershipDB *OwnershipDB + walletFeed *event.Feed + + group *async.Group } -func newRefreshOwnedCollectiblesCommand(manager *Manager, ownershipDB *OwnershipDB, accountsDB *accounts.Database, walletFeed *event.Feed, networkManager *network.Manager) *refreshOwnedCollectiblesCommand { - return &refreshOwnedCollectiblesCommand{ - manager: manager, - ownershipDB: ownershipDB, - accountsDB: accountsDB, - walletFeed: walletFeed, - networkManager: networkManager, +func newPeriodicRefreshOwnedCollectiblesCommand(manager *Manager, ownershipDB *OwnershipDB, walletFeed *event.Feed, chainID walletCommon.ChainID, account common.Address) *periodicRefreshOwnedCollectiblesCommand { + return &periodicRefreshOwnedCollectiblesCommand{ + manager: manager, + ownershipDB: ownershipDB, + walletFeed: walletFeed, + chainID: chainID, + account: account, } } -func (c *refreshOwnedCollectiblesCommand) Command() async.Command { +func (c *periodicRefreshOwnedCollectiblesCommand) Command() async.Command { return async.InfiniteCommand{ Interval: accountOwnershipUpdateInterval, Runable: c.Run, }.Run } -func (c *refreshOwnedCollectiblesCommand) Run(ctx context.Context) (err error) { - return c.updateOwnershipForAllAccounts(ctx) +func (c *periodicRefreshOwnedCollectiblesCommand) Run(ctx context.Context) (err error) { + return c.loadOwnedCollectibles(ctx) } -func (c *refreshOwnedCollectiblesCommand) updateOwnershipForAllAccounts(ctx context.Context) error { - networks, err := c.networkManager.Get(false) - if err != nil { - return err +func (c *periodicRefreshOwnedCollectiblesCommand) Stop() { + if c.group != nil { + c.group.Stop() + c.group.Wait() + c.group = nil } +} - addresses, err := c.accountsDB.GetWalletAddresses() - if err != nil { - return err - } +func (c *periodicRefreshOwnedCollectiblesCommand) loadOwnedCollectibles(ctx context.Context) error { + c.group = async.NewGroup(ctx) - areTestNetworksEnabled, err := c.accountsDB.GetTestNetworksEnabled() - if err != nil { - return err - } - - start := time.Now() - group := async.NewGroup(ctx) - - log.Debug("refreshOwnedCollectiblesCommand started") - - for _, network := range networks { - if network.IsTest != areTestNetworksEnabled { - continue - } - for _, address := range addresses { - command := newLoadOwnedCollectiblesCommand(c.manager, c.ownershipDB, c.walletFeed, walletCommon.ChainID(network.ChainID), common.Address(address)) - group.Add(command.Command()) - } - } + command := newLoadOwnedCollectiblesCommand(c.manager, c.ownershipDB, c.walletFeed, c.chainID, c.account) + c.group.Add(command.Command()) select { case <-ctx.Done(): return ctx.Err() - case <-group.WaitAsync(): + case <-c.group.WaitAsync(): } - log.Debug("refreshOwnedCollectiblesCommand finished", "in", time.Since(start)) - return nil } @@ -140,35 +120,54 @@ func (c *loadOwnedCollectiblesCommand) Run(parent context.Context) (err error) { start := time.Now() c.triggerEvent(EventCollectiblesOwnershipUpdateStarted, c.chainID, c.account, "") - // Fetch collectibles in chunks - for { - if shouldCancel(parent) { - c.err = errors.New("context cancelled") - break - } - partialOwnership, err := c.manager.FetchCollectibleOwnershipByOwner(c.chainID, c.account, cursor, fetchLimit) - - if err != nil { - log.Error("failed loadOwnedCollectiblesCommand", "chain", c.chainID, "account", c.account, "page", pageNr, "error", err) - c.err = err - break - } - - log.Debug("partial loadOwnedCollectiblesCommand", "chain", c.chainID, "account", c.account, "page", pageNr, "found", len(partialOwnership.Items), "collectibles") - - c.partialOwnership = append(c.partialOwnership, partialOwnership.Items...) - - pageNr++ - cursor = partialOwnership.NextCursor - - if cursor == thirdparty.FetchFromStartCursor { - err = c.ownershipDB.Update(c.chainID, c.account, c.partialOwnership, start.Unix()) - if err != nil { - log.Error("failed updating ownershipDB in loadOwnedCollectiblesCommand", "chain", c.chainID, "account", c.account, "error", err) - c.err = err + lastFetchTimestamp, err := c.ownershipDB.GetOwnershipUpdateTimestamp(c.account, c.chainID) + if err != nil { + c.err = err + } else { + initialFetch := lastFetchTimestamp == InvalidTimestamp + // Fetch collectibles in chunks + for { + if shouldCancel(parent) { + c.err = errors.New("context cancelled") + break + } + + pageStart := time.Now() + log.Debug("start loadOwnedCollectiblesCommand", "chain", c.chainID, "account", c.account, "page", pageNr) + + partialOwnership, err := c.manager.FetchCollectibleOwnershipByOwner(c.chainID, c.account, cursor, fetchLimit) + + if err != nil { + log.Error("failed loadOwnedCollectiblesCommand", "chain", c.chainID, "account", c.account, "page", pageNr, "error", err) + c.err = err + break + } + + log.Debug("partial loadOwnedCollectiblesCommand", "chain", c.chainID, "account", c.account, "page", pageNr, "in", time.Since(pageStart), "found", len(partialOwnership.Items), "collectibles") + + c.partialOwnership = append(c.partialOwnership, partialOwnership.Items...) + + pageNr++ + cursor = partialOwnership.NextCursor + + finished := cursor == thirdparty.FetchFromStartCursor + + // Normally, update the DB once we've finished fetching + // If this is the first fetch, make partial updates to the client to get a better UX + if initialFetch || finished { + err = c.ownershipDB.Update(c.chainID, c.account, c.partialOwnership, start.Unix()) + if err != nil { + log.Error("failed updating ownershipDB in loadOwnedCollectiblesCommand", "chain", c.chainID, "account", c.account, "error", err) + c.err = err + } + } + + if finished || c.err != nil { + break + } else if initialFetch { + c.triggerEvent(EventCollectiblesOwnershipUpdatePartial, c.chainID, c.account, "") } - break } } @@ -178,7 +177,7 @@ func (c *loadOwnedCollectiblesCommand) Run(parent context.Context) (err error) { c.triggerEvent(EventCollectiblesOwnershipUpdateFinished, c.chainID, c.account, "") } - log.Debug("end loadOwnedCollectiblesCommand", "chain", c.chainID, "account", c.account) + log.Debug("end loadOwnedCollectiblesCommand", "chain", c.chainID, "account", c.account, "in", time.Since(start)) return nil } diff --git a/services/wallet/collectibles/service.go b/services/wallet/collectibles/service.go index 51cd8e1df..bca1b61f3 100644 --- a/services/wallet/collectibles/service.go +++ b/services/wallet/collectibles/service.go @@ -25,6 +25,7 @@ import ( // These events are used to notify the UI of state changes const ( EventCollectiblesOwnershipUpdateStarted walletevent.EventType = "wallet-collectibles-ownership-update-started" + EventCollectiblesOwnershipUpdatePartial walletevent.EventType = "wallet-collectibles-ownership-update-partial" EventCollectiblesOwnershipUpdateFinished walletevent.EventType = "wallet-collectibles-ownership-update-finished" EventCollectiblesOwnershipUpdateFinishedWithError walletevent.EventType = "wallet-collectibles-ownership-update-finished-with-error" @@ -43,6 +44,9 @@ var ( } ) +type commandPerChainID = map[walletCommon.ChainID]*periodicRefreshOwnedCollectiblesCommand +type commandPerAddressAndChainID = map[common.Address]commandPerChainID + type Service struct { manager *Manager ownershipDB *OwnershipDB @@ -53,6 +57,7 @@ type Service struct { networkManager *network.Manager cancelFn context.CancelFunc + commands commandPerAddressAndChainID group *async.Group scheduler *async.MultiClientScheduler accountsWatcher *walletaccounts.Watcher @@ -66,6 +71,7 @@ func NewService(db *sql.DB, walletFeed *event.Feed, accountsDB *accounts.Databas accountsDB: accountsDB, accountsFeed: accountsFeed, networkManager: networkManager, + commands: make(commandPerAddressAndChainID), scheduler: async.NewMultiClientScheduler(), } } @@ -155,24 +161,30 @@ func (s *Service) GetCollectiblesDetailsAsync(requestID int32, uniqueIDs []third }) } -func (s *Service) startPeriodicalOwnershipFetch() { +// Starts periodical fetching for the all wallet addresses and all chains +func (s *Service) startPeriodicalOwnershipFetch() error { if s.group != nil { - return + return nil } ctx, cancel := context.WithCancel(context.Background()) s.cancelFn = cancel s.group = async.NewGroup(ctx) - command := newRefreshOwnedCollectiblesCommand( - s.manager, - s.ownershipDB, - s.accountsDB, - s.walletFeed, - s.networkManager, - ) + addresses, err := s.accountsDB.GetWalletAddresses() + if err != nil { + return err + } - s.group.Add(command.Command()) + for _, addr := range addresses { + err := s.startPeriodicalOwnershipFetchForAccount(common.Address(addr)) + if err != nil { + log.Error("Error starting periodical collectibles fetch for accpunt", "address", addr, "error", err) + return err + } + } + + return nil } func (s *Service) stopPeriodicalOwnershipFetch() { @@ -184,20 +196,93 @@ func (s *Service) stopPeriodicalOwnershipFetch() { s.group.Stop() s.group.Wait() s.group = nil + s.commands = make(commandPerAddressAndChainID) } } +// Starts (or restarts) periodical fetching for the given account address for all chains +func (s *Service) startPeriodicalOwnershipFetchForAccount(address common.Address) error { + if s.group == nil { + return errors.New("periodical fetch group not initialized") + } + + networks, err := s.networkManager.Get(false) + if err != nil { + return err + } + + areTestNetworksEnabled, err := s.accountsDB.GetTestNetworksEnabled() + if err != nil { + return err + } + + if _, ok := s.commands[address]; ok { + for chainID, command := range s.commands[address] { + command.Stop() + delete(s.commands[address], chainID) + } + } + + s.commands[address] = make(commandPerChainID) + + for _, network := range networks { + if network.IsTest != areTestNetworksEnabled { + continue + } + chainID := walletCommon.ChainID(network.ChainID) + + command := newPeriodicRefreshOwnedCollectiblesCommand( + s.manager, + s.ownershipDB, + s.walletFeed, + chainID, + address, + ) + + s.commands[address][chainID] = command + s.group.Add(command.Command()) + } + + return nil +} + +// Stop periodical fetching for the given account address for all chains +func (s *Service) stopPeriodicalOwnershipFetchForAccount(address common.Address) error { + if s.group == nil { + return errors.New("periodical fetch group not initialized") + } + + if _, ok := s.commands[address]; ok { + for _, command := range s.commands[address] { + command.Stop() + } + delete(s.commands, address) + } + + return nil +} + func (s *Service) startAccountsWatcher() { if s.accountsWatcher != nil { return } accountChangeCb := func(changedAddresses []common.Address, eventType accountsevent.EventType, currentAddresses []common.Address) { - // Whenever an account gets added, restart fetch - // TODO: Fetch only added accounts + // Whenever an account gets added, start fetching if eventType == accountsevent.EventTypeAdded { - s.stopPeriodicalOwnershipFetch() - s.startPeriodicalOwnershipFetch() + for _, address := range changedAddresses { + err := s.startPeriodicalOwnershipFetchForAccount(address) + if err != nil { + log.Error("Error starting periodical collectibles fetch", "address", address, "error", err) + } + } + } else if eventType == accountsevent.EventTypeRemoved { + for _, address := range changedAddresses { + err := s.stopPeriodicalOwnershipFetchForAccount(address) + if err != nil { + log.Error("Error starting periodical collectibles fetch", "address", address, "error", err) + } + } } } @@ -215,7 +300,7 @@ func (s *Service) stopAccountsWatcher() { func (s *Service) Start() { // Setup periodical collectibles refresh - s.startPeriodicalOwnershipFetch() + _ = s.startPeriodicalOwnershipFetch() // Setup collectibles fetch when a new account gets added s.startAccountsWatcher()