feat: implement single account fetch on add and partial progress report

This commit is contained in:
Dario Gabriel Lipicar 2023-09-14 11:44:35 -03:00 committed by dlipicar
parent b4d5c22050
commit e502ba82ce
2 changed files with 176 additions and 92 deletions

View File

@ -8,8 +8,6 @@ import (
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/status-im/status-go/multiaccounts/accounts"
"github.com/status-im/status-go/rpc/network"
"github.com/status-im/status-go/services/wallet/async" "github.com/status-im/status-go/services/wallet/async"
walletCommon "github.com/status-im/status-go/services/wallet/common" walletCommon "github.com/status-im/status-go/services/wallet/common"
"github.com/status-im/status-go/services/wallet/thirdparty" "github.com/status-im/status-go/services/wallet/thirdparty"
@ -21,75 +19,57 @@ const (
accountOwnershipUpdateInterval = 30 * time.Minute accountOwnershipUpdateInterval = 30 * time.Minute
) )
// Fetches owned collectibles for all chainIDs and wallet addresses type periodicRefreshOwnedCollectiblesCommand struct {
type refreshOwnedCollectiblesCommand struct { chainID walletCommon.ChainID
manager *Manager account common.Address
ownershipDB *OwnershipDB manager *Manager
accountsDB *accounts.Database ownershipDB *OwnershipDB
walletFeed *event.Feed walletFeed *event.Feed
networkManager *network.Manager
group *async.Group
} }
func newRefreshOwnedCollectiblesCommand(manager *Manager, ownershipDB *OwnershipDB, accountsDB *accounts.Database, walletFeed *event.Feed, networkManager *network.Manager) *refreshOwnedCollectiblesCommand { func newPeriodicRefreshOwnedCollectiblesCommand(manager *Manager, ownershipDB *OwnershipDB, walletFeed *event.Feed, chainID walletCommon.ChainID, account common.Address) *periodicRefreshOwnedCollectiblesCommand {
return &refreshOwnedCollectiblesCommand{ return &periodicRefreshOwnedCollectiblesCommand{
manager: manager, manager: manager,
ownershipDB: ownershipDB, ownershipDB: ownershipDB,
accountsDB: accountsDB, walletFeed: walletFeed,
walletFeed: walletFeed, chainID: chainID,
networkManager: networkManager, account: account,
} }
} }
func (c *refreshOwnedCollectiblesCommand) Command() async.Command { func (c *periodicRefreshOwnedCollectiblesCommand) Command() async.Command {
return async.InfiniteCommand{ return async.InfiniteCommand{
Interval: accountOwnershipUpdateInterval, Interval: accountOwnershipUpdateInterval,
Runable: c.Run, Runable: c.Run,
}.Run }.Run
} }
func (c *refreshOwnedCollectiblesCommand) Run(ctx context.Context) (err error) { func (c *periodicRefreshOwnedCollectiblesCommand) Run(ctx context.Context) (err error) {
return c.updateOwnershipForAllAccounts(ctx) return c.loadOwnedCollectibles(ctx)
} }
func (c *refreshOwnedCollectiblesCommand) updateOwnershipForAllAccounts(ctx context.Context) error { func (c *periodicRefreshOwnedCollectiblesCommand) Stop() {
networks, err := c.networkManager.Get(false) if c.group != nil {
if err != nil { c.group.Stop()
return err c.group.Wait()
c.group = nil
} }
}
addresses, err := c.accountsDB.GetWalletAddresses() func (c *periodicRefreshOwnedCollectiblesCommand) loadOwnedCollectibles(ctx context.Context) error {
if err != nil { c.group = async.NewGroup(ctx)
return err
}
areTestNetworksEnabled, err := c.accountsDB.GetTestNetworksEnabled() command := newLoadOwnedCollectiblesCommand(c.manager, c.ownershipDB, c.walletFeed, c.chainID, c.account)
if err != nil { c.group.Add(command.Command())
return err
}
start := time.Now()
group := async.NewGroup(ctx)
log.Debug("refreshOwnedCollectiblesCommand started")
for _, network := range networks {
if network.IsTest != areTestNetworksEnabled {
continue
}
for _, address := range addresses {
command := newLoadOwnedCollectiblesCommand(c.manager, c.ownershipDB, c.walletFeed, walletCommon.ChainID(network.ChainID), common.Address(address))
group.Add(command.Command())
}
}
select { select {
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() return ctx.Err()
case <-group.WaitAsync(): case <-c.group.WaitAsync():
} }
log.Debug("refreshOwnedCollectiblesCommand finished", "in", time.Since(start))
return nil return nil
} }
@ -140,35 +120,54 @@ func (c *loadOwnedCollectiblesCommand) Run(parent context.Context) (err error) {
start := time.Now() start := time.Now()
c.triggerEvent(EventCollectiblesOwnershipUpdateStarted, c.chainID, c.account, "") c.triggerEvent(EventCollectiblesOwnershipUpdateStarted, c.chainID, c.account, "")
// Fetch collectibles in chunks
for {
if shouldCancel(parent) {
c.err = errors.New("context cancelled")
break
}
partialOwnership, err := c.manager.FetchCollectibleOwnershipByOwner(c.chainID, c.account, cursor, fetchLimit) lastFetchTimestamp, err := c.ownershipDB.GetOwnershipUpdateTimestamp(c.account, c.chainID)
if err != nil {
if err != nil { c.err = err
log.Error("failed loadOwnedCollectiblesCommand", "chain", c.chainID, "account", c.account, "page", pageNr, "error", err) } else {
c.err = err initialFetch := lastFetchTimestamp == InvalidTimestamp
break // Fetch collectibles in chunks
} for {
if shouldCancel(parent) {
log.Debug("partial loadOwnedCollectiblesCommand", "chain", c.chainID, "account", c.account, "page", pageNr, "found", len(partialOwnership.Items), "collectibles") c.err = errors.New("context cancelled")
break
c.partialOwnership = append(c.partialOwnership, partialOwnership.Items...) }
pageNr++ pageStart := time.Now()
cursor = partialOwnership.NextCursor log.Debug("start loadOwnedCollectiblesCommand", "chain", c.chainID, "account", c.account, "page", pageNr)
if cursor == thirdparty.FetchFromStartCursor { partialOwnership, err := c.manager.FetchCollectibleOwnershipByOwner(c.chainID, c.account, cursor, fetchLimit)
err = c.ownershipDB.Update(c.chainID, c.account, c.partialOwnership, start.Unix())
if err != nil { if err != nil {
log.Error("failed updating ownershipDB in loadOwnedCollectiblesCommand", "chain", c.chainID, "account", c.account, "error", err) log.Error("failed loadOwnedCollectiblesCommand", "chain", c.chainID, "account", c.account, "page", pageNr, "error", err)
c.err = err c.err = err
break
}
log.Debug("partial loadOwnedCollectiblesCommand", "chain", c.chainID, "account", c.account, "page", pageNr, "in", time.Since(pageStart), "found", len(partialOwnership.Items), "collectibles")
c.partialOwnership = append(c.partialOwnership, partialOwnership.Items...)
pageNr++
cursor = partialOwnership.NextCursor
finished := cursor == thirdparty.FetchFromStartCursor
// Normally, update the DB once we've finished fetching
// If this is the first fetch, make partial updates to the client to get a better UX
if initialFetch || finished {
err = c.ownershipDB.Update(c.chainID, c.account, c.partialOwnership, start.Unix())
if err != nil {
log.Error("failed updating ownershipDB in loadOwnedCollectiblesCommand", "chain", c.chainID, "account", c.account, "error", err)
c.err = err
}
}
if finished || c.err != nil {
break
} else if initialFetch {
c.triggerEvent(EventCollectiblesOwnershipUpdatePartial, c.chainID, c.account, "")
} }
break
} }
} }
@ -178,7 +177,7 @@ func (c *loadOwnedCollectiblesCommand) Run(parent context.Context) (err error) {
c.triggerEvent(EventCollectiblesOwnershipUpdateFinished, c.chainID, c.account, "") c.triggerEvent(EventCollectiblesOwnershipUpdateFinished, c.chainID, c.account, "")
} }
log.Debug("end loadOwnedCollectiblesCommand", "chain", c.chainID, "account", c.account) log.Debug("end loadOwnedCollectiblesCommand", "chain", c.chainID, "account", c.account, "in", time.Since(start))
return nil return nil
} }

View File

@ -25,6 +25,7 @@ import (
// These events are used to notify the UI of state changes // These events are used to notify the UI of state changes
const ( const (
EventCollectiblesOwnershipUpdateStarted walletevent.EventType = "wallet-collectibles-ownership-update-started" EventCollectiblesOwnershipUpdateStarted walletevent.EventType = "wallet-collectibles-ownership-update-started"
EventCollectiblesOwnershipUpdatePartial walletevent.EventType = "wallet-collectibles-ownership-update-partial"
EventCollectiblesOwnershipUpdateFinished walletevent.EventType = "wallet-collectibles-ownership-update-finished" EventCollectiblesOwnershipUpdateFinished walletevent.EventType = "wallet-collectibles-ownership-update-finished"
EventCollectiblesOwnershipUpdateFinishedWithError walletevent.EventType = "wallet-collectibles-ownership-update-finished-with-error" EventCollectiblesOwnershipUpdateFinishedWithError walletevent.EventType = "wallet-collectibles-ownership-update-finished-with-error"
@ -43,6 +44,9 @@ var (
} }
) )
type commandPerChainID = map[walletCommon.ChainID]*periodicRefreshOwnedCollectiblesCommand
type commandPerAddressAndChainID = map[common.Address]commandPerChainID
type Service struct { type Service struct {
manager *Manager manager *Manager
ownershipDB *OwnershipDB ownershipDB *OwnershipDB
@ -53,6 +57,7 @@ type Service struct {
networkManager *network.Manager networkManager *network.Manager
cancelFn context.CancelFunc cancelFn context.CancelFunc
commands commandPerAddressAndChainID
group *async.Group group *async.Group
scheduler *async.MultiClientScheduler scheduler *async.MultiClientScheduler
accountsWatcher *walletaccounts.Watcher accountsWatcher *walletaccounts.Watcher
@ -66,6 +71,7 @@ func NewService(db *sql.DB, walletFeed *event.Feed, accountsDB *accounts.Databas
accountsDB: accountsDB, accountsDB: accountsDB,
accountsFeed: accountsFeed, accountsFeed: accountsFeed,
networkManager: networkManager, networkManager: networkManager,
commands: make(commandPerAddressAndChainID),
scheduler: async.NewMultiClientScheduler(), scheduler: async.NewMultiClientScheduler(),
} }
} }
@ -155,24 +161,30 @@ func (s *Service) GetCollectiblesDetailsAsync(requestID int32, uniqueIDs []third
}) })
} }
func (s *Service) startPeriodicalOwnershipFetch() { // Starts periodical fetching for the all wallet addresses and all chains
func (s *Service) startPeriodicalOwnershipFetch() error {
if s.group != nil { if s.group != nil {
return return nil
} }
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
s.cancelFn = cancel s.cancelFn = cancel
s.group = async.NewGroup(ctx) s.group = async.NewGroup(ctx)
command := newRefreshOwnedCollectiblesCommand( addresses, err := s.accountsDB.GetWalletAddresses()
s.manager, if err != nil {
s.ownershipDB, return err
s.accountsDB, }
s.walletFeed,
s.networkManager,
)
s.group.Add(command.Command()) for _, addr := range addresses {
err := s.startPeriodicalOwnershipFetchForAccount(common.Address(addr))
if err != nil {
log.Error("Error starting periodical collectibles fetch for accpunt", "address", addr, "error", err)
return err
}
}
return nil
} }
func (s *Service) stopPeriodicalOwnershipFetch() { func (s *Service) stopPeriodicalOwnershipFetch() {
@ -184,20 +196,93 @@ func (s *Service) stopPeriodicalOwnershipFetch() {
s.group.Stop() s.group.Stop()
s.group.Wait() s.group.Wait()
s.group = nil s.group = nil
s.commands = make(commandPerAddressAndChainID)
} }
} }
// Starts (or restarts) periodical fetching for the given account address for all chains
func (s *Service) startPeriodicalOwnershipFetchForAccount(address common.Address) error {
if s.group == nil {
return errors.New("periodical fetch group not initialized")
}
networks, err := s.networkManager.Get(false)
if err != nil {
return err
}
areTestNetworksEnabled, err := s.accountsDB.GetTestNetworksEnabled()
if err != nil {
return err
}
if _, ok := s.commands[address]; ok {
for chainID, command := range s.commands[address] {
command.Stop()
delete(s.commands[address], chainID)
}
}
s.commands[address] = make(commandPerChainID)
for _, network := range networks {
if network.IsTest != areTestNetworksEnabled {
continue
}
chainID := walletCommon.ChainID(network.ChainID)
command := newPeriodicRefreshOwnedCollectiblesCommand(
s.manager,
s.ownershipDB,
s.walletFeed,
chainID,
address,
)
s.commands[address][chainID] = command
s.group.Add(command.Command())
}
return nil
}
// Stop periodical fetching for the given account address for all chains
func (s *Service) stopPeriodicalOwnershipFetchForAccount(address common.Address) error {
if s.group == nil {
return errors.New("periodical fetch group not initialized")
}
if _, ok := s.commands[address]; ok {
for _, command := range s.commands[address] {
command.Stop()
}
delete(s.commands, address)
}
return nil
}
func (s *Service) startAccountsWatcher() { func (s *Service) startAccountsWatcher() {
if s.accountsWatcher != nil { if s.accountsWatcher != nil {
return return
} }
accountChangeCb := func(changedAddresses []common.Address, eventType accountsevent.EventType, currentAddresses []common.Address) { accountChangeCb := func(changedAddresses []common.Address, eventType accountsevent.EventType, currentAddresses []common.Address) {
// Whenever an account gets added, restart fetch // Whenever an account gets added, start fetching
// TODO: Fetch only added accounts
if eventType == accountsevent.EventTypeAdded { if eventType == accountsevent.EventTypeAdded {
s.stopPeriodicalOwnershipFetch() for _, address := range changedAddresses {
s.startPeriodicalOwnershipFetch() err := s.startPeriodicalOwnershipFetchForAccount(address)
if err != nil {
log.Error("Error starting periodical collectibles fetch", "address", address, "error", err)
}
}
} else if eventType == accountsevent.EventTypeRemoved {
for _, address := range changedAddresses {
err := s.stopPeriodicalOwnershipFetchForAccount(address)
if err != nil {
log.Error("Error starting periodical collectibles fetch", "address", address, "error", err)
}
}
} }
} }
@ -215,7 +300,7 @@ func (s *Service) stopAccountsWatcher() {
func (s *Service) Start() { func (s *Service) Start() {
// Setup periodical collectibles refresh // Setup periodical collectibles refresh
s.startPeriodicalOwnershipFetch() _ = s.startPeriodicalOwnershipFetch()
// Setup collectibles fetch when a new account gets added // Setup collectibles fetch when a new account gets added
s.startAccountsWatcher() s.startAccountsWatcher()