From 25ff1dd75899e697aa80ccf795c32fcb6ed60c33 Mon Sep 17 00:00:00 2001 From: Dario Gabriel Lipicar Date: Thu, 10 Aug 2023 16:39:43 -0300 Subject: [PATCH] feat: make collectibles api support multiple clients --- services/wallet/api.go | 8 +++---- services/wallet/collectibles/service.go | 31 +++++++++++++++---------- 2 files changed, 23 insertions(+), 16 deletions(-) diff --git a/services/wallet/api.go b/services/wallet/api.go index fb4b10a2d..7181f7f15 100644 --- a/services/wallet/api.go +++ b/services/wallet/api.go @@ -309,17 +309,17 @@ func (api *API) FetchBalancesByOwnerAndContractAddress(chainID wcommon.ChainID, return api.s.collectiblesManager.FetchBalancesByOwnerAndContractAddress(chainID, ownerAddress, contractAddresses) } -func (api *API) FilterOwnedCollectiblesAsync(ctx context.Context, chainIDs []wcommon.ChainID, addresses []common.Address, offset int, limit int) error { +func (api *API) FilterOwnedCollectiblesAsync(requestID int32, chainIDs []wcommon.ChainID, addresses []common.Address, offset int, limit int) error { log.Debug("wallet.api.FilterOwnedCollectiblesAsync", "chainIDs.count", len(chainIDs), "addr.count", len(addresses), "offset", offset, "limit", limit) - api.s.collectibles.FilterOwnedCollectiblesAsync(ctx, chainIDs, addresses, offset, limit) + api.s.collectibles.FilterOwnedCollectiblesAsync(requestID, chainIDs, addresses, offset, limit) return nil } -func (api *API) GetCollectiblesDetailsAsync(ctx context.Context, uniqueIDs []thirdparty.CollectibleUniqueID) error { +func (api *API) GetCollectiblesDetailsAsync(requestID int32, uniqueIDs []thirdparty.CollectibleUniqueID) error { log.Debug("wallet.api.GetCollectiblesDetailsAsync") - api.s.collectibles.GetCollectiblesDetailsAsync(ctx, uniqueIDs) + api.s.collectibles.GetCollectiblesDetailsAsync(requestID, uniqueIDs) return nil } diff --git a/services/wallet/collectibles/service.go b/services/wallet/collectibles/service.go index 21eb8d381..843146901 100644 --- a/services/wallet/collectibles/service.go +++ b/services/wallet/collectibles/service.go @@ -53,7 +53,7 @@ type Service struct { cancelFn context.CancelFunc group *async.Group - scheduler *async.Scheduler + scheduler *async.MultiClientScheduler accountsWatcher *walletaccounts.Watcher } @@ -65,7 +65,7 @@ func NewService(db *sql.DB, walletFeed *event.Feed, accountsDB *accounts.Databas accountsDB: accountsDB, accountsFeed: accountsFeed, networkManager: networkManager, - scheduler: async.NewScheduler(), + scheduler: async.NewMultiClientScheduler(), } } @@ -99,8 +99,8 @@ type filterOwnedCollectiblesTaskReturnType struct { // FilterOwnedCollectiblesResponse allows only one filter task to run at a time // and it cancels the current one if a new one is started // All calls will trigger an EventOwnedCollectiblesFilteringDone event with the result of the filtering -func (s *Service) FilterOwnedCollectiblesAsync(ctx context.Context, chainIDs []walletCommon.ChainID, addresses []common.Address, offset int, limit int) { - s.scheduler.Enqueue(filterOwnedCollectiblesTask, func(ctx context.Context) (interface{}, error) { +func (s *Service) FilterOwnedCollectiblesAsync(requestID int32, chainIDs []walletCommon.ChainID, addresses []common.Address, offset int, limit int) { + s.scheduler.Enqueue(requestID, filterOwnedCollectiblesTask, func(ctx context.Context) (interface{}, error) { collectibles, hasMore, err := s.GetOwnedCollectibles(chainIDs, addresses, offset, limit) if err != nil { return nil, err @@ -129,12 +129,12 @@ func (s *Service) FilterOwnedCollectiblesAsync(ctx context.Context, chainIDs []w res.ErrorCode = ErrorCodeSuccess } - s.sendResponseEvent(EventOwnedCollectiblesFilteringDone, res, err) + s.sendResponseEvent(&requestID, EventOwnedCollectiblesFilteringDone, res, err) }) } -func (s *Service) GetCollectiblesDetailsAsync(ctx context.Context, uniqueIDs []thirdparty.CollectibleUniqueID) { - s.scheduler.Enqueue(getCollectiblesDataTask, func(ctx context.Context) (interface{}, error) { +func (s *Service) GetCollectiblesDetailsAsync(requestID int32, uniqueIDs []thirdparty.CollectibleUniqueID) { + s.scheduler.Enqueue(requestID, getCollectiblesDataTask, func(ctx context.Context) (interface{}, error) { collectibles, err := s.manager.FetchAssetsByCollectibleUniqueID(uniqueIDs) return collectibles, err }, func(result interface{}, taskType async.TaskType, err error) { @@ -150,7 +150,7 @@ func (s *Service) GetCollectiblesDetailsAsync(ctx context.Context, uniqueIDs []t res.ErrorCode = ErrorCodeSuccess } - s.sendResponseEvent(EventGetCollectiblesDetailsDone, res, err) + s.sendResponseEvent(&requestID, EventGetCollectiblesDetailsDone, res, err) }) } @@ -228,7 +228,7 @@ func (s *Service) Stop() { s.scheduler.Stop() } -func (s *Service) sendResponseEvent(eventType walletevent.EventType, payloadObj interface{}, resErr error) { +func (s *Service) sendResponseEvent(requestID *int32, eventType walletevent.EventType, payloadObj interface{}, resErr error) { payload, err := json.Marshal(payloadObj) if err != nil { log.Error("Error marshaling response: %v; result error: %w", err, resErr) @@ -236,12 +236,19 @@ func (s *Service) sendResponseEvent(eventType walletevent.EventType, payloadObj err = resErr } - log.Debug("wallet.api.collectibles.Service RESPONSE", "eventType", eventType, "error", err, "payload.len", len(payload)) + log.Debug("wallet.api.collectibles.Service RESPONSE", "requestID", requestID, "eventType", eventType, "error", err, "payload.len", len(payload)) - s.walletFeed.Send(walletevent.Event{ + event := walletevent.Event{ Type: eventType, Message: string(payload), - }) + } + + if requestID != nil { + event.RequestID = new(int) + *event.RequestID = int(*requestID) + } + + s.walletFeed.Send(event) } func (s *Service) GetOwnedCollectibles(chainIDs []walletCommon.ChainID, owners []common.Address, offset int, limit int) ([]thirdparty.CollectibleUniqueID, bool, error) {