feat: trigger collectibles fetch on new account
This commit is contained in:
parent
5ba5611a8d
commit
85d8e83394
|
@ -0,0 +1,86 @@
|
|||
package walletaccounts
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/event"
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
"github.com/status-im/status-go/multiaccounts/accounts"
|
||||
"github.com/status-im/status-go/services/accounts/accountsevent"
|
||||
"github.com/status-im/status-go/services/wallet/async"
|
||||
)
|
||||
|
||||
type AccountsChangeCb func(changedAddresses []common.Address, eventType accountsevent.EventType, currentAddresses []common.Address)
|
||||
|
||||
// Watcher executes a given callback whenever an account gets added/removed
|
||||
type Watcher struct {
|
||||
accountsDB *accounts.Database
|
||||
accountFeed *event.Feed
|
||||
group *async.Group
|
||||
callback AccountsChangeCb
|
||||
}
|
||||
|
||||
func NewWatcher(accountsDB *accounts.Database, accountFeed *event.Feed, callback AccountsChangeCb) *Watcher {
|
||||
return &Watcher{
|
||||
accountsDB: accountsDB,
|
||||
accountFeed: accountFeed,
|
||||
callback: callback,
|
||||
}
|
||||
}
|
||||
|
||||
func (w *Watcher) Start() {
|
||||
if w.group != nil {
|
||||
return
|
||||
}
|
||||
|
||||
w.group = async.NewGroup(context.Background())
|
||||
w.group.Add(func(ctx context.Context) error {
|
||||
return watch(ctx, w.accountsDB, w.accountFeed, w.callback)
|
||||
})
|
||||
}
|
||||
|
||||
func (w *Watcher) Stop() {
|
||||
if w.group != nil {
|
||||
w.group.Stop()
|
||||
w.group.Wait()
|
||||
w.group = nil
|
||||
}
|
||||
}
|
||||
|
||||
func onAccountsChange(accountsDB *accounts.Database, callback AccountsChangeCb, changedAddresses []common.Address, eventType accountsevent.EventType) {
|
||||
currentEthAddresses, err := accountsDB.GetWalletAddresses()
|
||||
|
||||
if err != nil {
|
||||
log.Error("failed getting wallet addresses", "error", err)
|
||||
return
|
||||
}
|
||||
|
||||
currentAddresses := make([]common.Address, 0, len(currentEthAddresses))
|
||||
for _, ethAddress := range currentEthAddresses {
|
||||
currentAddresses = append(currentAddresses, common.Address(ethAddress))
|
||||
}
|
||||
|
||||
if callback != nil {
|
||||
callback(changedAddresses, eventType, currentAddresses)
|
||||
}
|
||||
}
|
||||
|
||||
func watch(ctx context.Context, accountsDB *accounts.Database, accountFeed *event.Feed, callback AccountsChangeCb) error {
|
||||
ch := make(chan accountsevent.Event, 1)
|
||||
sub := accountFeed.Subscribe(ch)
|
||||
defer sub.Unsubscribe()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
case err := <-sub.Err():
|
||||
if err != nil {
|
||||
log.Error("accounts watcher subscription failed", "error", err)
|
||||
}
|
||||
case ev := <-ch:
|
||||
onAccountsChange(accountsDB, callback, ev.Accounts, ev.Type)
|
||||
}
|
||||
}
|
||||
}
|
|
@ -10,8 +10,11 @@ import (
|
|||
"github.com/ethereum/go-ethereum/event"
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
|
||||
"github.com/status-im/status-go/multiaccounts/accounts"
|
||||
"github.com/status-im/status-go/rpc/network"
|
||||
|
||||
"github.com/status-im/status-go/services/accounts/accountsevent"
|
||||
walletaccounts "github.com/status-im/status-go/services/wallet/accounts"
|
||||
"github.com/status-im/status-go/services/wallet/async"
|
||||
walletCommon "github.com/status-im/status-go/services/wallet/common"
|
||||
"github.com/status-im/status-go/services/wallet/thirdparty"
|
||||
|
@ -42,19 +45,25 @@ var (
|
|||
type Service struct {
|
||||
manager *Manager
|
||||
db *sql.DB
|
||||
eventFeed *event.Feed
|
||||
walletFeed *event.Feed
|
||||
accountsDB *accounts.Database
|
||||
accountsFeed *event.Feed
|
||||
|
||||
networkManager *network.Manager
|
||||
cancelFn context.CancelFunc
|
||||
|
||||
group *async.Group
|
||||
scheduler *async.Scheduler
|
||||
accountsWatcher *walletaccounts.Watcher
|
||||
}
|
||||
|
||||
func NewService(db *sql.DB, eventFeed *event.Feed, networkManager *network.Manager, manager *Manager) *Service {
|
||||
func NewService(db *sql.DB, walletFeed *event.Feed, accountsDB *accounts.Database, accountsFeed *event.Feed, networkManager *network.Manager, manager *Manager) *Service {
|
||||
return &Service{
|
||||
manager: manager,
|
||||
db: db,
|
||||
eventFeed: eventFeed,
|
||||
walletFeed: walletFeed,
|
||||
accountsDB: accountsDB,
|
||||
accountsFeed: accountsFeed,
|
||||
networkManager: networkManager,
|
||||
scheduler: async.NewScheduler(),
|
||||
}
|
||||
|
@ -144,7 +153,8 @@ func (s *Service) GetCollectiblesDataAsync(ctx context.Context, uniqueIDs []thir
|
|||
s.sendResponseEvent(EventGetCollectiblesDataDone, res, err)
|
||||
})
|
||||
}
|
||||
func (s *Service) Start() {
|
||||
|
||||
func (s *Service) startPeriodicalOwnershipFetch() {
|
||||
if s.group != nil {
|
||||
return
|
||||
}
|
||||
|
@ -156,14 +166,14 @@ func (s *Service) Start() {
|
|||
command := newRefreshOwnedCollectiblesCommand(
|
||||
s.manager,
|
||||
s.db,
|
||||
s.eventFeed,
|
||||
s.walletFeed,
|
||||
s.networkManager,
|
||||
)
|
||||
|
||||
s.group.Add(command.Command())
|
||||
}
|
||||
|
||||
func (s *Service) Stop() {
|
||||
func (s *Service) stopPeriodicalOwnershipFetch() {
|
||||
if s.cancelFn != nil {
|
||||
s.cancelFn()
|
||||
s.cancelFn = nil
|
||||
|
@ -173,6 +183,47 @@ func (s *Service) Stop() {
|
|||
s.group.Wait()
|
||||
s.group = nil
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Service) startAccountsWatcher() {
|
||||
if s.accountsWatcher != nil {
|
||||
return
|
||||
}
|
||||
|
||||
accountChangeCb := func(changedAddresses []common.Address, eventType accountsevent.EventType, currentAddresses []common.Address) {
|
||||
// Whenever an account gets added, restart fetch
|
||||
// TODO: Fetch only added accounts
|
||||
if eventType == accountsevent.EventTypeAdded {
|
||||
s.stopPeriodicalOwnershipFetch()
|
||||
s.startPeriodicalOwnershipFetch()
|
||||
}
|
||||
}
|
||||
|
||||
s.accountsWatcher = walletaccounts.NewWatcher(s.accountsDB, s.accountsFeed, accountChangeCb)
|
||||
|
||||
s.accountsWatcher.Start()
|
||||
}
|
||||
|
||||
func (s *Service) stopAccountsWatcher() {
|
||||
if s.accountsWatcher != nil {
|
||||
s.accountsWatcher.Stop()
|
||||
s.accountsWatcher = nil
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Service) Start() {
|
||||
// Setup periodical collectibles refresh
|
||||
s.startPeriodicalOwnershipFetch()
|
||||
|
||||
// Setup collectibles fetch when a new account gets added
|
||||
s.startAccountsWatcher()
|
||||
}
|
||||
|
||||
func (s *Service) Stop() {
|
||||
s.stopAccountsWatcher()
|
||||
|
||||
s.stopPeriodicalOwnershipFetch()
|
||||
|
||||
s.scheduler.Stop()
|
||||
}
|
||||
|
||||
|
@ -186,7 +237,7 @@ func (s *Service) sendResponseEvent(eventType walletevent.EventType, payloadObj
|
|||
|
||||
log.Debug("wallet.api.collectibles.Service RESPONSE", "eventType", eventType, "error", err, "payload.len", len(payload))
|
||||
|
||||
s.eventFeed.Send(walletevent.Event{
|
||||
s.walletFeed.Send(walletevent.Event{
|
||||
Type: eventType,
|
||||
Message: string(payload),
|
||||
})
|
||||
|
|
|
@ -109,7 +109,7 @@ func NewService(
|
|||
infuraClient := infura.NewClient(config.WalletConfig.InfuraAPIKey, config.WalletConfig.InfuraAPIKeySecret)
|
||||
openseaClient := opensea.NewClient(config.WalletConfig.OpenseaAPIKey, walletFeed)
|
||||
collectiblesManager := collectibles.NewManager(rpcClient, alchemyClient, infuraClient, openseaClient)
|
||||
collectibles := collectibles.NewService(db, walletFeed, rpcClient.NetworkManager, collectiblesManager)
|
||||
collectibles := collectibles.NewService(db, walletFeed, accountsDB, accountFeed, rpcClient.NetworkManager, collectiblesManager)
|
||||
return &Service{
|
||||
db: db,
|
||||
accountsDB: accountsDB,
|
||||
|
|
Loading…
Reference in New Issue