From 094228871ea2998992894467bb21dd560645d6b3 Mon Sep 17 00:00:00 2001 From: Dario Gabriel Lipicar Date: Mon, 9 Oct 2023 09:43:53 -0300 Subject: [PATCH] feat: trigger collectibles refresh on transfer --- services/wallet/async/async.go | 25 ++ services/wallet/collectibles/commands.go | 25 +- services/wallet/collectibles/controller.go | 352 +++++++++++++++++++++ services/wallet/collectibles/service.go | 201 +----------- services/wallet/transfer/commands.go | 26 ++ services/wallet/walletevent/events.go | 10 + services/wallet/walletevent/transmitter.go | 4 +- services/wallet/walletevent/watcher.go | 65 ++++ 8 files changed, 518 insertions(+), 190 deletions(-) create mode 100644 services/wallet/collectibles/controller.go create mode 100644 services/wallet/walletevent/watcher.go diff --git a/services/wallet/async/async.go b/services/wallet/async/async.go index 23d02367e..8aaf5de5d 100644 --- a/services/wallet/async/async.go +++ b/services/wallet/async/async.go @@ -12,6 +12,31 @@ type Commander interface { Command() Command } +// SingleShotCommand runs once. +type SingleShotCommand struct { + Interval time.Duration + Init func(context.Context) error + Runable func(context.Context) error +} + +func (c SingleShotCommand) Run(ctx context.Context) error { + timer := time.NewTimer(c.Interval) + if c.Init != nil { + err := c.Init(ctx) + if err != nil { + return err + } + } + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-timer.C: + _ = c.Runable(ctx) + } + } +} + // FiniteCommand terminates when error is nil. type FiniteCommand struct { Interval time.Duration diff --git a/services/wallet/collectibles/commands.go b/services/wallet/collectibles/commands.go index ed1ca41e5..2ab11231d 100644 --- a/services/wallet/collectibles/commands.go +++ b/services/wallet/collectibles/commands.go @@ -16,8 +16,18 @@ import ( ) const ( - fetchLimit = 50 // Limit number of collectibles we fetch per provider call - accountOwnershipUpdateInterval = 30 * time.Minute + fetchLimit = 50 // Limit number of collectibles we fetch per provider call + accountOwnershipUpdateInterval = 30 * time.Minute + accountOwnershipUpdateDelayInterval = 30 * time.Second +) + +type OwnershipState = int + +const ( + OwnershipStateIdle OwnershipState = iota + 1 + OwnershipStateDelayed + OwnershipStateUpdating + OwnershipStateError ) type periodicRefreshOwnedCollectiblesCommand struct { @@ -43,6 +53,17 @@ func newPeriodicRefreshOwnedCollectiblesCommand(manager *Manager, ownershipDB *O return ret } +func (c *periodicRefreshOwnedCollectiblesCommand) DelayedCommand() async.Command { + return async.SingleShotCommand{ + Interval: accountOwnershipUpdateDelayInterval, + Init: func(ctx context.Context) (err error) { + c.state.Store(OwnershipStateDelayed) + return nil + }, + Runable: c.Command(), + }.Run +} + func (c *periodicRefreshOwnedCollectiblesCommand) Command() async.Command { return async.InfiniteCommand{ Interval: accountOwnershipUpdateInterval, diff --git a/services/wallet/collectibles/controller.go b/services/wallet/collectibles/controller.go new file mode 100644 index 000000000..d293ff55a --- /dev/null +++ b/services/wallet/collectibles/controller.go @@ -0,0 +1,352 @@ +package collectibles + +import ( + "context" + "database/sql" + "errors" + "sync" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/log" + "github.com/status-im/status-go/multiaccounts/accounts" + "github.com/status-im/status-go/rpc/network" + "github.com/status-im/status-go/services/accounts/accountsevent" + walletAccounts "github.com/status-im/status-go/services/wallet/accounts" + "github.com/status-im/status-go/services/wallet/async" + walletCommon "github.com/status-im/status-go/services/wallet/common" + "github.com/status-im/status-go/services/wallet/transfer" + "github.com/status-im/status-go/services/wallet/walletevent" +) + +const ( + activityRefetchMarginSeconds = 30 * 60 // Trigger a fetch if activity is detected this many seconds before the last fetch +) + +type commandPerChainID = map[walletCommon.ChainID]*periodicRefreshOwnedCollectiblesCommand +type commandPerAddressAndChainID = map[common.Address]commandPerChainID + +type timerPerChainID = map[walletCommon.ChainID]*time.Timer +type timerPerAddressAndChainID = map[common.Address]timerPerChainID + +type Controller struct { + manager *Manager + ownershipDB *OwnershipDB + walletFeed *event.Feed + accountsDB *accounts.Database + accountsFeed *event.Feed + + networkManager *network.Manager + cancelFn context.CancelFunc + + commands commandPerAddressAndChainID + timers timerPerAddressAndChainID + group *async.Group + accountsWatcher *walletAccounts.Watcher + walletEventsWatcher *walletevent.Watcher + + commandsLock sync.RWMutex +} + +func NewController(db *sql.DB, walletFeed *event.Feed, accountsDB *accounts.Database, accountsFeed *event.Feed, networkManager *network.Manager, manager *Manager) *Controller { + return &Controller{ + manager: manager, + ownershipDB: NewOwnershipDB(db), + walletFeed: walletFeed, + accountsDB: accountsDB, + accountsFeed: accountsFeed, + networkManager: networkManager, + commands: make(commandPerAddressAndChainID), + timers: make(timerPerAddressAndChainID), + } +} + +func (c *Controller) Start() { + // Setup periodical collectibles refresh + _ = c.startPeriodicalOwnershipFetch() + + // Setup collectibles fetch when a new account gets added + c.startAccountsWatcher() + + // Setup collectibles fetch when relevant activity is detected + c.startWalletEventsWatcher() +} + +func (c *Controller) Stop() { + c.stopWalletEventsWatcher() + + c.stopAccountsWatcher() + + c.stopPeriodicalOwnershipFetch() +} + +func (c *Controller) RefetchOwnedCollectibles() { + c.stopPeriodicalOwnershipFetch() + c.manager.ResetConnectionStatus() + _ = c.startPeriodicalOwnershipFetch() +} + +func (c *Controller) GetCommandState(chainID walletCommon.ChainID, address common.Address) OwnershipState { + c.commandsLock.RLock() + defer c.commandsLock.RUnlock() + + state := OwnershipStateIdle + if c.commands[address] != nil && c.commands[address][chainID] != nil { + state = c.commands[address][chainID].GetState() + } + + return state +} + +func (c *Controller) isPeriodicalOwnershipFetchRunning() bool { + return c.group != nil +} + +// Starts periodical fetching for the all wallet addresses and all chains +func (c *Controller) startPeriodicalOwnershipFetch() error { + c.commandsLock.Lock() + defer c.commandsLock.Unlock() + + if c.isPeriodicalOwnershipFetchRunning() { + return nil + } + + ctx, cancel := context.WithCancel(context.Background()) + c.cancelFn = cancel + + c.group = async.NewGroup(ctx) + + addresses, err := c.accountsDB.GetWalletAddresses() + if err != nil { + return err + } + + for _, addr := range addresses { + err := c.startPeriodicalOwnershipFetchForAccount(common.Address(addr)) + if err != nil { + log.Error("Error starting periodical collectibles fetch for accpunt", "address", addr, "error", err) + return err + } + } + + return nil +} + +func (c *Controller) stopPeriodicalOwnershipFetch() { + c.commandsLock.Lock() + defer c.commandsLock.Unlock() + + if !c.isPeriodicalOwnershipFetchRunning() { + return + } + + if c.cancelFn != nil { + c.cancelFn() + c.cancelFn = nil + } + if c.group != nil { + c.group.Stop() + c.group.Wait() + c.group = nil + c.commands = make(commandPerAddressAndChainID) + } +} + +// Starts (or restarts) periodical fetching for the given account address for all chains +func (c *Controller) startPeriodicalOwnershipFetchForAccount(address common.Address) error { + log.Debug("wallet.api.collectibles.Controller", "Start periodical fetching", "address", address) + + networks, err := c.networkManager.Get(false) + if err != nil { + return err + } + + areTestNetworksEnabled, err := c.accountsDB.GetTestNetworksEnabled() + if err != nil { + return err + } + + for _, network := range networks { + if network.IsTest != areTestNetworksEnabled { + continue + } + chainID := walletCommon.ChainID(network.ChainID) + + err := c.startPeriodicalOwnershipFetchForAccountAndChainID(address, chainID, false) + if err != nil { + return err + } + } + + return nil +} + +// Starts (or restarts) periodical fetching for the given account address for all chains +func (c *Controller) startPeriodicalOwnershipFetchForAccountAndChainID(address common.Address, chainID walletCommon.ChainID, delayed bool) error { + log.Debug("wallet.api.collectibles.Controller", "Start periodical fetching", "address", address, "chainID", chainID, "delayed", delayed) + + if !c.isPeriodicalOwnershipFetchRunning() { + return errors.New("periodical fetch not initialized") + } + + err := c.stopPeriodicalOwnershipFetchForAccountAndChainID(address, chainID) + if err != nil { + return err + } + + if _, ok := c.commands[address]; !ok { + c.commands[address] = make(commandPerChainID) + } + + command := newPeriodicRefreshOwnedCollectiblesCommand( + c.manager, + c.ownershipDB, + c.walletFeed, + chainID, + address, + ) + + c.commands[address][chainID] = command + if delayed { + c.group.Add(command.DelayedCommand()) + } else { + c.group.Add(command.Command()) + } + + return nil +} + +// Stop periodical fetching for the given account address for all chains +func (c *Controller) stopPeriodicalOwnershipFetchForAccount(address common.Address) error { + log.Debug("wallet.api.collectibles.Controller", "Stop periodical fetching", "address", address) + + if !c.isPeriodicalOwnershipFetchRunning() { + return errors.New("periodical fetch not initialized") + } + + if _, ok := c.commands[address]; ok { + for chainID := range c.commands[address] { + err := c.stopPeriodicalOwnershipFetchForAccountAndChainID(address, chainID) + if err != nil { + return err + } + } + + } + + return nil +} + +func (c *Controller) stopPeriodicalOwnershipFetchForAccountAndChainID(address common.Address, chainID walletCommon.ChainID) error { + log.Debug("wallet.api.collectibles.Controller", "Stop periodical fetching", "address", address, "chainID", chainID) + + if !c.isPeriodicalOwnershipFetchRunning() { + return errors.New("periodical fetch not initialized") + } + + if _, ok := c.commands[address]; ok { + if _, ok := c.commands[address][chainID]; ok { + c.commands[address][chainID].Stop() + delete(c.commands[address], chainID) + } + // If it was the last chain, delete the address as well + if len(c.commands[address]) == 0 { + delete(c.commands, address) + } + } + + return nil +} + +func (c *Controller) startAccountsWatcher() { + if c.accountsWatcher != nil { + return + } + + accountChangeCb := func(changedAddresses []common.Address, eventType accountsevent.EventType, currentAddresses []common.Address) { + c.commandsLock.Lock() + defer c.commandsLock.Unlock() + // Whenever an account gets added, start fetching + if eventType == accountsevent.EventTypeAdded { + for _, address := range changedAddresses { + err := c.startPeriodicalOwnershipFetchForAccount(address) + if err != nil { + log.Error("Error starting periodical collectibles fetch", "address", address, "error", err) + } + } + } else if eventType == accountsevent.EventTypeRemoved { + for _, address := range changedAddresses { + err := c.stopPeriodicalOwnershipFetchForAccount(address) + if err != nil { + log.Error("Error starting periodical collectibles fetch", "address", address, "error", err) + } + } + } + } + + c.accountsWatcher = walletAccounts.NewWatcher(c.accountsDB, c.accountsFeed, accountChangeCb) + + c.accountsWatcher.Start() +} + +func (c *Controller) stopAccountsWatcher() { + if c.accountsWatcher != nil { + c.accountsWatcher.Stop() + c.accountsWatcher = nil + } +} + +func (c *Controller) startWalletEventsWatcher() { + if c.walletEventsWatcher != nil { + return + } + + walletEventCb := func(event walletevent.Event) { + // EventRecentHistoryReady ? + if event.Type != transfer.EventInternalERC721TransferDetected { + return + } + + chainID := walletCommon.ChainID(event.ChainID) + for _, account := range event.Accounts { + // Check last ownership update timestamp + timestamp, err := c.ownershipDB.GetOwnershipUpdateTimestamp(account, chainID) + + if err != nil { + log.Error("Error getting ownership update timestamp", "error", err) + continue + } + if timestamp == InvalidTimestamp { + // Ownership was never fetched for this account + continue + } + + timeCheck := timestamp - activityRefetchMarginSeconds + if timeCheck < 0 { + timeCheck = 0 + } + + if event.At > timeCheck { + // Restart fetching for account + chainID + c.commandsLock.Lock() + err := c.startPeriodicalOwnershipFetchForAccountAndChainID(account, chainID, true) + c.commandsLock.Unlock() + if err != nil { + log.Error("Error starting periodical collectibles fetch", "address", account, "error", err) + } + } + } + } + + c.walletEventsWatcher = walletevent.NewWatcher(c.walletFeed, walletEventCb) + + c.walletEventsWatcher.Start() +} + +func (c *Controller) stopWalletEventsWatcher() { + if c.walletEventsWatcher != nil { + c.walletEventsWatcher.Stop() + c.walletEventsWatcher = nil + } +} diff --git a/services/wallet/collectibles/service.go b/services/wallet/collectibles/service.go index c961a8039..7581f41ed 100644 --- a/services/wallet/collectibles/service.go +++ b/services/wallet/collectibles/service.go @@ -14,8 +14,6 @@ import ( "github.com/status-im/status-go/multiaccounts/accounts" "github.com/status-im/status-go/rpc/network" - "github.com/status-im/status-go/services/accounts/accountsevent" - walletaccounts "github.com/status-im/status-go/services/wallet/accounts" "github.com/status-im/status-go/services/wallet/async" walletCommon "github.com/status-im/status-go/services/wallet/common" "github.com/status-im/status-go/services/wallet/thirdparty" @@ -44,35 +42,21 @@ var ( } ) -type commandPerChainID = map[walletCommon.ChainID]*periodicRefreshOwnedCollectiblesCommand -type commandPerAddressAndChainID = map[common.Address]commandPerChainID - type Service struct { - manager *Manager - ownershipDB *OwnershipDB - walletFeed *event.Feed - accountsDB *accounts.Database - accountsFeed *event.Feed - - networkManager *network.Manager - cancelFn context.CancelFunc - - commands commandPerAddressAndChainID - group *async.Group - scheduler *async.MultiClientScheduler - accountsWatcher *walletaccounts.Watcher + manager *Manager + controller *Controller + ownershipDB *OwnershipDB + walletFeed *event.Feed + scheduler *async.MultiClientScheduler } func NewService(db *sql.DB, walletFeed *event.Feed, accountsDB *accounts.Database, accountsFeed *event.Feed, networkManager *network.Manager, manager *Manager) *Service { return &Service{ - manager: manager, - ownershipDB: NewOwnershipDB(db), - walletFeed: walletFeed, - accountsDB: accountsDB, - accountsFeed: accountsFeed, - networkManager: networkManager, - commands: make(commandPerAddressAndChainID), - scheduler: async.NewMultiClientScheduler(), + manager: manager, + controller: NewController(db, walletFeed, accountsDB, accountsFeed, networkManager, manager), + ownershipDB: NewOwnershipDB(db), + walletFeed: walletFeed, + scheduler: async.NewMultiClientScheduler(), } } @@ -84,14 +68,6 @@ const ( ErrorCodeFailed ) -type OwnershipState = int - -const ( - OwnershipStateIdle OwnershipState = iota + 1 - OwnershipStateUpdating - OwnershipStateError -) - type OwnershipStatus struct { State OwnershipState `json:"state"` Timestamp int64 `json:"timestamp"` @@ -189,160 +165,15 @@ func (s *Service) GetCollectiblesDetailsAsync(requestID int32, uniqueIDs []third } func (s *Service) RefetchOwnedCollectibles() { - s.stopPeriodicalOwnershipFetch() - s.manager.ResetConnectionStatus() - _ = s.startPeriodicalOwnershipFetch() -} - -// Starts periodical fetching for the all wallet addresses and all chains -func (s *Service) startPeriodicalOwnershipFetch() error { - if s.group != nil { - return nil - } - ctx, cancel := context.WithCancel(context.Background()) - s.cancelFn = cancel - - s.group = async.NewGroup(ctx) - - addresses, err := s.accountsDB.GetWalletAddresses() - if err != nil { - return err - } - - for _, addr := range addresses { - err := s.startPeriodicalOwnershipFetchForAccount(common.Address(addr)) - if err != nil { - log.Error("Error starting periodical collectibles fetch for accpunt", "address", addr, "error", err) - return err - } - } - - return nil -} - -func (s *Service) stopPeriodicalOwnershipFetch() { - if s.cancelFn != nil { - s.cancelFn() - s.cancelFn = nil - } - if s.group != nil { - s.group.Stop() - s.group.Wait() - s.group = nil - s.commands = make(commandPerAddressAndChainID) - } -} - -// Starts (or restarts) periodical fetching for the given account address for all chains -func (s *Service) startPeriodicalOwnershipFetchForAccount(address common.Address) error { - if s.group == nil { - return errors.New("periodical fetch group not initialized") - } - - networks, err := s.networkManager.Get(false) - if err != nil { - return err - } - - areTestNetworksEnabled, err := s.accountsDB.GetTestNetworksEnabled() - if err != nil { - return err - } - - if _, ok := s.commands[address]; ok { - for chainID, command := range s.commands[address] { - command.Stop() - delete(s.commands[address], chainID) - } - } - - s.commands[address] = make(commandPerChainID) - - for _, network := range networks { - if network.IsTest != areTestNetworksEnabled { - continue - } - chainID := walletCommon.ChainID(network.ChainID) - - command := newPeriodicRefreshOwnedCollectiblesCommand( - s.manager, - s.ownershipDB, - s.walletFeed, - chainID, - address, - ) - - s.commands[address][chainID] = command - s.group.Add(command.Command()) - } - - return nil -} - -// Stop periodical fetching for the given account address for all chains -func (s *Service) stopPeriodicalOwnershipFetchForAccount(address common.Address) error { - if s.group == nil { - return errors.New("periodical fetch group not initialized") - } - - if _, ok := s.commands[address]; ok { - for _, command := range s.commands[address] { - command.Stop() - } - delete(s.commands, address) - } - - return nil -} - -func (s *Service) startAccountsWatcher() { - if s.accountsWatcher != nil { - return - } - - accountChangeCb := func(changedAddresses []common.Address, eventType accountsevent.EventType, currentAddresses []common.Address) { - // Whenever an account gets added, start fetching - if eventType == accountsevent.EventTypeAdded { - for _, address := range changedAddresses { - err := s.startPeriodicalOwnershipFetchForAccount(address) - if err != nil { - log.Error("Error starting periodical collectibles fetch", "address", address, "error", err) - } - } - } else if eventType == accountsevent.EventTypeRemoved { - for _, address := range changedAddresses { - err := s.stopPeriodicalOwnershipFetchForAccount(address) - if err != nil { - log.Error("Error starting periodical collectibles fetch", "address", address, "error", err) - } - } - } - } - - s.accountsWatcher = walletaccounts.NewWatcher(s.accountsDB, s.accountsFeed, accountChangeCb) - - s.accountsWatcher.Start() -} - -func (s *Service) stopAccountsWatcher() { - if s.accountsWatcher != nil { - s.accountsWatcher.Stop() - s.accountsWatcher = nil - } + s.controller.RefetchOwnedCollectibles() } func (s *Service) Start() { - // Setup periodical collectibles refresh - _ = s.startPeriodicalOwnershipFetch() - - // Setup collectibles fetch when a new account gets added - s.startAccountsWatcher() + s.controller.Start() } func (s *Service) Stop() { - s.stopAccountsWatcher() - - s.stopPeriodicalOwnershipFetch() + s.controller.Stop() s.scheduler.Stop() } @@ -398,12 +229,8 @@ func (s *Service) GetOwnershipStatus(chainIDs []walletCommon.ChainID, owners []c if err != nil { return nil, err } - state := OwnershipStateIdle - if s.commands[address] != nil && s.commands[address][chainID] != nil { - state = s.commands[address][chainID].GetState() - } ret[address][chainID] = OwnershipStatus{ - State: state, + State: s.controller.GetCommandState(chainID, address), Timestamp: timestamp, } } diff --git a/services/wallet/transfer/commands.go b/services/wallet/transfer/commands.go index 8e3d4bb70..ff72fb002 100644 --- a/services/wallet/transfer/commands.go +++ b/services/wallet/transfer/commands.go @@ -33,6 +33,9 @@ const ( // EventNonArchivalNodeDetected emitted when a connection to a non archival node is detected EventNonArchivalNodeDetected walletevent.EventType = "non-archival-node-detected" + // EventInternalERC721TransferDetected emitted when ERC721 transfer is detected + EventInternalERC721TransferDetected walletevent.EventType = walletevent.InternalEventTypePrefix + "erc721-transfer-detected" + numberOfBlocksCheckedPerIteration = 40 noBlockLimit = 0 ) @@ -432,6 +435,7 @@ func (c *transfersCommand) Run(ctx context.Context) (err error) { c.fetchedTransfers = append(c.fetchedTransfers, allTransfers...) c.notifyOfNewTransfers(blockNum, allTransfers) + c.notifyOfNewERC721Transfers(allTransfers) log.Debug("transfersCommand block end", "chain", c.chainClient.NetworkID(), "address", c.address, "block", blockNum, "tranfers.len", len(allTransfers), "fetchedTransfers.len", len(c.fetchedTransfers)) @@ -621,6 +625,28 @@ func (c *transfersCommand) notifyOfNewTransfers(blockNum *big.Int, transfers []T } } +func (c *transfersCommand) notifyOfNewERC721Transfers(transfers []Transfer) { + if c.feed != nil { + // Internal event for ERC721 transfers + latestERC721TransferTimestamp := uint64(0) + for _, transfer := range transfers { + if transfer.Type == w_common.Erc721Transfer { + if transfer.Timestamp > latestERC721TransferTimestamp { + latestERC721TransferTimestamp = transfer.Timestamp + } + } + } + if latestERC721TransferTimestamp > 0 { + c.feed.Send(walletevent.Event{ + Type: EventInternalERC721TransferDetected, + Accounts: []common.Address{c.address}, + ChainID: c.chainClient.NetworkID(), + At: int64(latestERC721TransferTimestamp), + }) + } + } +} + type loadTransfersCommand struct { accounts []common.Address db *Database diff --git a/services/wallet/walletevent/events.go b/services/wallet/walletevent/events.go index 682cbbfae..fd1d6a043 100644 --- a/services/wallet/walletevent/events.go +++ b/services/wallet/walletevent/events.go @@ -2,6 +2,7 @@ package walletevent import ( "math/big" + "strings" "github.com/ethereum/go-ethereum/common" ) @@ -9,6 +10,15 @@ import ( // EventType type for event types. type EventType string +// EventType prefix to be used for internal events. +// These events are not forwarded to the client, they are only used +// within status-go. +const InternalEventTypePrefix = "INT-" + +func (t EventType) IsInternal() bool { + return strings.HasPrefix(string(t), InternalEventTypePrefix) +} + // Event is a type for transfer events. type Event struct { Type EventType `json:"type"` diff --git a/services/wallet/walletevent/transmitter.go b/services/wallet/walletevent/transmitter.go index cf81e2875..fd848eee4 100644 --- a/services/wallet/walletevent/transmitter.go +++ b/services/wallet/walletevent/transmitter.go @@ -47,7 +47,9 @@ func (tmr *SignalsTransmitter) Start() error { } return case event := <-events: - signal.SendWalletEvent(event) + if !event.Type.IsInternal() { + signal.SendWalletEvent(event) + } } } }() diff --git a/services/wallet/walletevent/watcher.go b/services/wallet/walletevent/watcher.go new file mode 100644 index 000000000..9750c2ff7 --- /dev/null +++ b/services/wallet/walletevent/watcher.go @@ -0,0 +1,65 @@ +package walletevent + +import ( + "context" + + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/log" + "github.com/status-im/status-go/services/wallet/async" +) + +type EventCb func(event Event) + +// Watcher executes a given callback whenever a wallet event gets sent +type Watcher struct { + feed *event.Feed + group *async.Group + callback EventCb +} + +func NewWatcher(feed *event.Feed, callback EventCb) *Watcher { + return &Watcher{ + feed: feed, + callback: callback, + } +} + +func (w *Watcher) Start() { + if w.group != nil { + return + } + + w.group = async.NewGroup(context.Background()) + w.group.Add(func(ctx context.Context) error { + return watch(ctx, w.feed, w.callback) + }) +} + +func (w *Watcher) Stop() { + if w.group != nil { + w.group.Stop() + w.group.Wait() + w.group = nil + } +} + +func watch(ctx context.Context, feed *event.Feed, callback EventCb) error { + ch := make(chan Event, 10) + sub := feed.Subscribe(ch) + defer sub.Unsubscribe() + + for { + select { + case <-ctx.Done(): + return nil + case err := <-sub.Err(): + if err != nil { + log.Error("wallet event watcher subscription failed", "error", err) + } + case ev := <-ch: + if callback != nil { + callback(ev) + } + } + } +}