feat: make collectibles api support multiple clients

This commit is contained in:
Dario Gabriel Lipicar 2023-08-10 16:39:43 -03:00 committed by dlipicar
parent d6aae82566
commit 25ff1dd758
2 changed files with 23 additions and 16 deletions

View File

@ -309,17 +309,17 @@ func (api *API) FetchBalancesByOwnerAndContractAddress(chainID wcommon.ChainID,
return api.s.collectiblesManager.FetchBalancesByOwnerAndContractAddress(chainID, ownerAddress, contractAddresses)
}
func (api *API) FilterOwnedCollectiblesAsync(ctx context.Context, chainIDs []wcommon.ChainID, addresses []common.Address, offset int, limit int) error {
func (api *API) FilterOwnedCollectiblesAsync(requestID int32, chainIDs []wcommon.ChainID, addresses []common.Address, offset int, limit int) error {
log.Debug("wallet.api.FilterOwnedCollectiblesAsync", "chainIDs.count", len(chainIDs), "addr.count", len(addresses), "offset", offset, "limit", limit)
api.s.collectibles.FilterOwnedCollectiblesAsync(ctx, chainIDs, addresses, offset, limit)
api.s.collectibles.FilterOwnedCollectiblesAsync(requestID, chainIDs, addresses, offset, limit)
return nil
}
func (api *API) GetCollectiblesDetailsAsync(ctx context.Context, uniqueIDs []thirdparty.CollectibleUniqueID) error {
func (api *API) GetCollectiblesDetailsAsync(requestID int32, uniqueIDs []thirdparty.CollectibleUniqueID) error {
log.Debug("wallet.api.GetCollectiblesDetailsAsync")
api.s.collectibles.GetCollectiblesDetailsAsync(ctx, uniqueIDs)
api.s.collectibles.GetCollectiblesDetailsAsync(requestID, uniqueIDs)
return nil
}

View File

@ -53,7 +53,7 @@ type Service struct {
cancelFn context.CancelFunc
group *async.Group
scheduler *async.Scheduler
scheduler *async.MultiClientScheduler
accountsWatcher *walletaccounts.Watcher
}
@ -65,7 +65,7 @@ func NewService(db *sql.DB, walletFeed *event.Feed, accountsDB *accounts.Databas
accountsDB: accountsDB,
accountsFeed: accountsFeed,
networkManager: networkManager,
scheduler: async.NewScheduler(),
scheduler: async.NewMultiClientScheduler(),
}
}
@ -99,8 +99,8 @@ type filterOwnedCollectiblesTaskReturnType struct {
// FilterOwnedCollectiblesResponse allows only one filter task to run at a time
// and it cancels the current one if a new one is started
// All calls will trigger an EventOwnedCollectiblesFilteringDone event with the result of the filtering
func (s *Service) FilterOwnedCollectiblesAsync(ctx context.Context, chainIDs []walletCommon.ChainID, addresses []common.Address, offset int, limit int) {
s.scheduler.Enqueue(filterOwnedCollectiblesTask, func(ctx context.Context) (interface{}, error) {
func (s *Service) FilterOwnedCollectiblesAsync(requestID int32, chainIDs []walletCommon.ChainID, addresses []common.Address, offset int, limit int) {
s.scheduler.Enqueue(requestID, filterOwnedCollectiblesTask, func(ctx context.Context) (interface{}, error) {
collectibles, hasMore, err := s.GetOwnedCollectibles(chainIDs, addresses, offset, limit)
if err != nil {
return nil, err
@ -129,12 +129,12 @@ func (s *Service) FilterOwnedCollectiblesAsync(ctx context.Context, chainIDs []w
res.ErrorCode = ErrorCodeSuccess
}
s.sendResponseEvent(EventOwnedCollectiblesFilteringDone, res, err)
s.sendResponseEvent(&requestID, EventOwnedCollectiblesFilteringDone, res, err)
})
}
func (s *Service) GetCollectiblesDetailsAsync(ctx context.Context, uniqueIDs []thirdparty.CollectibleUniqueID) {
s.scheduler.Enqueue(getCollectiblesDataTask, func(ctx context.Context) (interface{}, error) {
func (s *Service) GetCollectiblesDetailsAsync(requestID int32, uniqueIDs []thirdparty.CollectibleUniqueID) {
s.scheduler.Enqueue(requestID, getCollectiblesDataTask, func(ctx context.Context) (interface{}, error) {
collectibles, err := s.manager.FetchAssetsByCollectibleUniqueID(uniqueIDs)
return collectibles, err
}, func(result interface{}, taskType async.TaskType, err error) {
@ -150,7 +150,7 @@ func (s *Service) GetCollectiblesDetailsAsync(ctx context.Context, uniqueIDs []t
res.ErrorCode = ErrorCodeSuccess
}
s.sendResponseEvent(EventGetCollectiblesDetailsDone, res, err)
s.sendResponseEvent(&requestID, EventGetCollectiblesDetailsDone, res, err)
})
}
@ -228,7 +228,7 @@ func (s *Service) Stop() {
s.scheduler.Stop()
}
func (s *Service) sendResponseEvent(eventType walletevent.EventType, payloadObj interface{}, resErr error) {
func (s *Service) sendResponseEvent(requestID *int32, eventType walletevent.EventType, payloadObj interface{}, resErr error) {
payload, err := json.Marshal(payloadObj)
if err != nil {
log.Error("Error marshaling response: %v; result error: %w", err, resErr)
@ -236,12 +236,19 @@ func (s *Service) sendResponseEvent(eventType walletevent.EventType, payloadObj
err = resErr
}
log.Debug("wallet.api.collectibles.Service RESPONSE", "eventType", eventType, "error", err, "payload.len", len(payload))
log.Debug("wallet.api.collectibles.Service RESPONSE", "requestID", requestID, "eventType", eventType, "error", err, "payload.len", len(payload))
s.walletFeed.Send(walletevent.Event{
event := walletevent.Event{
Type: eventType,
Message: string(payload),
})
}
if requestID != nil {
event.RequestID = new(int)
*event.RequestID = int(*requestID)
}
s.walletFeed.Send(event)
}
func (s *Service) GetOwnedCollectibles(chainIDs []walletCommon.ChainID, owners []common.Address, offset int, limit int) ([]thirdparty.CollectibleUniqueID, bool, error) {