diff --git a/services/wallet/activity/service.go b/services/wallet/activity/service.go index 6c0c110a6..a1a8aea03 100644 --- a/services/wallet/activity/service.go +++ b/services/wallet/activity/service.go @@ -23,21 +23,22 @@ const ( EventActivityGetOldestTimestampDone walletevent.EventType = "wallet-activity-get-oldest-timestamp-result" ) -var ( - filterTask = async.TaskType{ - ID: 1, - Policy: async.ReplacementPolicyCancelOld, - } - getRecipientsTask = async.TaskType{ - ID: 2, - Policy: async.ReplacementPolicyIgnoreNew, - } - getOldestTimestampTask = async.TaskType{ - ID: 3, - Policy: async.ReplacementPolicyCancelOld, - } +const ( + filterTaskID = int32(1) + filterTaskPolicy = async.ReplacementPolicyCancelOld + getRecipientsTaskID = int32(2) + getRecipientsTaskPolicy = async.ReplacementPolicyIgnoreNew + getOldestTimestampTaskID = int32(3) + getOldestTimestampTaskPolicy = async.ReplacementPolicyCancelOld ) +func makeTaskType(requestID int32, originalID int32, policy async.ReplacementPolicy) async.TaskType { + return async.TaskType{ + ID: int64(requestID)<<32 | int64(originalID), + Policy: policy, + } +} + type Service struct { db *sql.DB tokenManager *token.Manager @@ -75,8 +76,8 @@ type FilterResponse struct { // FilterActivityAsync allows only one filter task to run at a time // and it cancels the current one if a new one is started // All calls will trigger an EventActivityFilteringDone event with the result of the filtering -func (s *Service) FilterActivityAsync(ctx context.Context, addresses []common.Address, chainIDs []w_common.ChainID, filter Filter, offset int, limit int) { - s.scheduler.Enqueue(filterTask, func(ctx context.Context) (interface{}, error) { +func (s *Service) FilterActivityAsync(requestID int32, addresses []common.Address, chainIDs []w_common.ChainID, filter Filter, offset int, limit int) { + s.scheduler.Enqueue(makeTaskType(requestID, filterTaskID, filterTaskPolicy), func(ctx context.Context) (interface{}, error) { activities, err := getActivityEntries(ctx, s.getDeps(), addresses, chainIDs, filter, offset, limit) return activities, err }, func(result interface{}, taskType async.TaskType, err error) { @@ -94,7 +95,7 @@ func (s *Service) FilterActivityAsync(ctx context.Context, addresses []common.Ad res.ErrorCode = ErrorCodeSuccess } - s.sendResponseEvent(EventActivityFilteringDone, res, err) + s.sendResponseEvent(&requestID, EventActivityFilteringDone, res, err) }) } @@ -110,8 +111,8 @@ type GetRecipientsResponse struct { // GetRecipientsAsync returns true if a task is already running or scheduled due to a previous call; meaning that // this call won't receive an answer but client should rely on the answer from the previous call. // If no task is already scheduled false will be returned -func (s *Service) GetRecipientsAsync(ctx context.Context, offset int, limit int) bool { - return s.scheduler.Enqueue(getRecipientsTask, func(ctx context.Context) (interface{}, error) { +func (s *Service) GetRecipientsAsync(requestID int32, offset int, limit int) bool { + return s.scheduler.Enqueue(makeTaskType(requestID, getRecipientsTaskID, getRecipientsTaskPolicy), func(ctx context.Context) (interface{}, error) { var err error result := &GetRecipientsResponse{ Offset: offset, @@ -127,7 +128,7 @@ func (s *Service) GetRecipientsAsync(ctx context.Context, offset int, limit int) res.ErrorCode = ErrorCodeFailed } - s.sendResponseEvent(EventActivityGetRecipientsDone, result, err) + s.sendResponseEvent(&requestID, EventActivityGetRecipientsDone, result, err) }) } @@ -136,8 +137,8 @@ type GetOldestTimestampResponse struct { ErrorCode ErrorCode `json:"errorCode"` } -func (s *Service) GetOldestTimestampAsync(ctx context.Context, addresses []common.Address) { - s.scheduler.Enqueue(getOldestTimestampTask, func(ctx context.Context) (interface{}, error) { +func (s *Service) GetOldestTimestampAsync(requestID int32, addresses []common.Address) { + s.scheduler.Enqueue(makeTaskType(requestID, getOldestTimestampTaskID, getOldestTimestampTaskPolicy), func(ctx context.Context) (interface{}, error) { timestamp, err := GetOldestTimestamp(ctx, s.db, addresses) return timestamp, err }, func(result interface{}, taskType async.TaskType, err error) { @@ -152,7 +153,7 @@ func (s *Service) GetOldestTimestampAsync(ctx context.Context, addresses []commo res.ErrorCode = ErrorCodeSuccess } - s.sendResponseEvent(EventActivityGetOldestTimestampDone, res, err) + s.sendResponseEvent(&requestID, EventActivityGetOldestTimestampDone, res, err) }) } @@ -193,7 +194,7 @@ func (s *Service) getDeps() FilterDependencies { } } -func (s *Service) sendResponseEvent(eventType walletevent.EventType, payloadObj interface{}, resErr error) { +func (s *Service) sendResponseEvent(requestID *int32, eventType walletevent.EventType, payloadObj interface{}, resErr error) { payload, err := json.Marshal(payloadObj) if err != nil { log.Error("Error marshaling response: %v; result error: %w", err, resErr) @@ -201,10 +202,17 @@ func (s *Service) sendResponseEvent(eventType walletevent.EventType, payloadObj err = resErr } - log.Debug("wallet.api.activity.Service RESPONSE", "eventType", eventType, "error", err, "payload.len", len(payload)) + log.Debug("wallet.api.activity.Service RESPONSE", "requestID", requestID, "eventType", eventType, "error", err, "payload.len", len(payload)) - s.eventFeed.Send(walletevent.Event{ + event := walletevent.Event{ Type: eventType, Message: string(payload), - }) + } + + if requestID != nil { + event.RequestID = new(int) + *event.RequestID = int(*requestID) + } + + s.eventFeed.Send(event) } diff --git a/services/wallet/activity/service_test.go b/services/wallet/activity/service_test.go new file mode 100644 index 000000000..3195df414 --- /dev/null +++ b/services/wallet/activity/service_test.go @@ -0,0 +1,70 @@ +package activity + +import ( + "testing" + + "github.com/status-im/status-go/services/wallet/async" + + "github.com/stretchr/testify/require" +) + +func Test_makeTaskType(t *testing.T) { + type args struct { + firstRequestID int32 + secondRequestID int32 + firstOriginalID int32 + secondOriginalID int32 + policy async.ReplacementPolicy + } + tests := []struct { + name string + args args + wantDifferentIDs bool + }{ + { + name: "Different requestID", + args: args{ + firstRequestID: 1, + secondRequestID: 2, + firstOriginalID: 1, + secondOriginalID: 1, + policy: async.ReplacementPolicyCancelOld, + }, + wantDifferentIDs: true, + }, + { + name: "Different originalID", + args: args{ + firstRequestID: 1, + secondRequestID: 1, + firstOriginalID: 2, + secondOriginalID: 3, + policy: async.ReplacementPolicyCancelOld, + }, + wantDifferentIDs: true, + }, + { + name: "Same requestID and originalID", + args: args{ + firstRequestID: 1, + secondRequestID: 1, + firstOriginalID: 1, + secondOriginalID: 1, + policy: async.ReplacementPolicyCancelOld, + }, + wantDifferentIDs: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + firstTT := makeTaskType(tt.args.firstRequestID, tt.args.firstOriginalID, tt.args.policy) + secondTT := makeTaskType(tt.args.secondRequestID, tt.args.secondOriginalID, tt.args.policy) + if tt.wantDifferentIDs { + require.NotEqual(t, firstTT.ID, secondTT.ID) + } else { + require.Equal(t, firstTT.ID, secondTT.ID) + } + require.Equal(t, firstTT.Policy, secondTT.Policy) + }) + } +} diff --git a/services/wallet/api.go b/services/wallet/api.go index ac13a5038..1cec80a6f 100644 --- a/services/wallet/api.go +++ b/services/wallet/api.go @@ -543,24 +543,24 @@ func (api *API) FetchAllCurrencyFormats() (currency.FormatPerSymbol, error) { return api.s.currency.FetchAllCurrencyFormats() } -func (api *API) FilterActivityAsync(ctx context.Context, addresses []common.Address, chainIDs []wcommon.ChainID, filter activity.Filter, offset int, limit int) error { - log.Debug("wallet.api.FilterActivityAsync", "addr.count", len(addresses), "chainIDs.count", len(chainIDs), "offset", offset, "limit", limit) +func (api *API) FilterActivityAsync(requestID int32, addresses []common.Address, chainIDs []wcommon.ChainID, filter activity.Filter, offset int, limit int) error { + log.Debug("wallet.api.FilterActivityAsync", "requestID", requestID, "addr.count", len(addresses), "chainIDs.count", len(chainIDs), "offset", offset, "limit", limit) - api.s.activity.FilterActivityAsync(ctx, addresses, chainIDs, filter, offset, limit) + api.s.activity.FilterActivityAsync(requestID, addresses, chainIDs, filter, offset, limit) return nil } -func (api *API) GetRecipientsAsync(ctx context.Context, offset int, limit int) (ignored bool, err error) { +func (api *API) GetRecipientsAsync(requestID int32, offset int, limit int) (ignored bool, err error) { log.Debug("wallet.api.GetRecipientsAsync", "offset", offset, "limit", limit) - ignored = api.s.activity.GetRecipientsAsync(ctx, offset, limit) + ignored = api.s.activity.GetRecipientsAsync(requestID, offset, limit) return ignored, err } -func (api *API) GetOldestActivityTimestampAsync(ctx context.Context, addresses []common.Address) error { +func (api *API) GetOldestActivityTimestampAsync(requestID int32, addresses []common.Address) error { log.Debug("wallet.api.GetOldestActivityTimestamp", "addresses.len", len(addresses)) - api.s.activity.GetOldestTimestampAsync(ctx, addresses) + api.s.activity.GetOldestTimestampAsync(requestID, addresses) return nil } diff --git a/services/wallet/async/scheduler.go b/services/wallet/async/scheduler.go index c08e89df6..811e1047f 100644 --- a/services/wallet/async/scheduler.go +++ b/services/wallet/async/scheduler.go @@ -28,7 +28,7 @@ const ( ) type TaskType struct { - ID int + ID int64 Policy ReplacementPolicy } diff --git a/services/wallet/walletevent/events.go b/services/wallet/walletevent/events.go index 72cb5b0c5..682cbbfae 100644 --- a/services/wallet/walletevent/events.go +++ b/services/wallet/walletevent/events.go @@ -17,4 +17,5 @@ type Event struct { Message string `json:"message"` At int64 `json:"at"` ChainID uint64 `json:"chainId"` + RequestID *int `json:"requestId,omitempty"` }