feat(wallet) add request id to activity events
Add and use the optional chainID in the wallet event structure. Updates status-desktop #11380
This commit is contained in:
parent
f2770b6e5e
commit
eb8d74e1ae
|
@ -23,21 +23,22 @@ const (
|
|||
EventActivityGetOldestTimestampDone walletevent.EventType = "wallet-activity-get-oldest-timestamp-result"
|
||||
)
|
||||
|
||||
var (
|
||||
filterTask = async.TaskType{
|
||||
ID: 1,
|
||||
Policy: async.ReplacementPolicyCancelOld,
|
||||
}
|
||||
getRecipientsTask = async.TaskType{
|
||||
ID: 2,
|
||||
Policy: async.ReplacementPolicyIgnoreNew,
|
||||
}
|
||||
getOldestTimestampTask = async.TaskType{
|
||||
ID: 3,
|
||||
Policy: async.ReplacementPolicyCancelOld,
|
||||
}
|
||||
const (
|
||||
filterTaskID = int32(1)
|
||||
filterTaskPolicy = async.ReplacementPolicyCancelOld
|
||||
getRecipientsTaskID = int32(2)
|
||||
getRecipientsTaskPolicy = async.ReplacementPolicyIgnoreNew
|
||||
getOldestTimestampTaskID = int32(3)
|
||||
getOldestTimestampTaskPolicy = async.ReplacementPolicyCancelOld
|
||||
)
|
||||
|
||||
func makeTaskType(requestID int32, originalID int32, policy async.ReplacementPolicy) async.TaskType {
|
||||
return async.TaskType{
|
||||
ID: int64(requestID)<<32 | int64(originalID),
|
||||
Policy: policy,
|
||||
}
|
||||
}
|
||||
|
||||
type Service struct {
|
||||
db *sql.DB
|
||||
tokenManager *token.Manager
|
||||
|
@ -75,8 +76,8 @@ type FilterResponse struct {
|
|||
// FilterActivityAsync allows only one filter task to run at a time
|
||||
// and it cancels the current one if a new one is started
|
||||
// All calls will trigger an EventActivityFilteringDone event with the result of the filtering
|
||||
func (s *Service) FilterActivityAsync(ctx context.Context, addresses []common.Address, chainIDs []w_common.ChainID, filter Filter, offset int, limit int) {
|
||||
s.scheduler.Enqueue(filterTask, func(ctx context.Context) (interface{}, error) {
|
||||
func (s *Service) FilterActivityAsync(requestID int32, addresses []common.Address, chainIDs []w_common.ChainID, filter Filter, offset int, limit int) {
|
||||
s.scheduler.Enqueue(makeTaskType(requestID, filterTaskID, filterTaskPolicy), func(ctx context.Context) (interface{}, error) {
|
||||
activities, err := getActivityEntries(ctx, s.getDeps(), addresses, chainIDs, filter, offset, limit)
|
||||
return activities, err
|
||||
}, func(result interface{}, taskType async.TaskType, err error) {
|
||||
|
@ -94,7 +95,7 @@ func (s *Service) FilterActivityAsync(ctx context.Context, addresses []common.Ad
|
|||
res.ErrorCode = ErrorCodeSuccess
|
||||
}
|
||||
|
||||
s.sendResponseEvent(EventActivityFilteringDone, res, err)
|
||||
s.sendResponseEvent(&requestID, EventActivityFilteringDone, res, err)
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -110,8 +111,8 @@ type GetRecipientsResponse struct {
|
|||
// GetRecipientsAsync returns true if a task is already running or scheduled due to a previous call; meaning that
|
||||
// this call won't receive an answer but client should rely on the answer from the previous call.
|
||||
// If no task is already scheduled false will be returned
|
||||
func (s *Service) GetRecipientsAsync(ctx context.Context, offset int, limit int) bool {
|
||||
return s.scheduler.Enqueue(getRecipientsTask, func(ctx context.Context) (interface{}, error) {
|
||||
func (s *Service) GetRecipientsAsync(requestID int32, offset int, limit int) bool {
|
||||
return s.scheduler.Enqueue(makeTaskType(requestID, getRecipientsTaskID, getRecipientsTaskPolicy), func(ctx context.Context) (interface{}, error) {
|
||||
var err error
|
||||
result := &GetRecipientsResponse{
|
||||
Offset: offset,
|
||||
|
@ -127,7 +128,7 @@ func (s *Service) GetRecipientsAsync(ctx context.Context, offset int, limit int)
|
|||
res.ErrorCode = ErrorCodeFailed
|
||||
}
|
||||
|
||||
s.sendResponseEvent(EventActivityGetRecipientsDone, result, err)
|
||||
s.sendResponseEvent(&requestID, EventActivityGetRecipientsDone, result, err)
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -136,8 +137,8 @@ type GetOldestTimestampResponse struct {
|
|||
ErrorCode ErrorCode `json:"errorCode"`
|
||||
}
|
||||
|
||||
func (s *Service) GetOldestTimestampAsync(ctx context.Context, addresses []common.Address) {
|
||||
s.scheduler.Enqueue(getOldestTimestampTask, func(ctx context.Context) (interface{}, error) {
|
||||
func (s *Service) GetOldestTimestampAsync(requestID int32, addresses []common.Address) {
|
||||
s.scheduler.Enqueue(makeTaskType(requestID, getOldestTimestampTaskID, getOldestTimestampTaskPolicy), func(ctx context.Context) (interface{}, error) {
|
||||
timestamp, err := GetOldestTimestamp(ctx, s.db, addresses)
|
||||
return timestamp, err
|
||||
}, func(result interface{}, taskType async.TaskType, err error) {
|
||||
|
@ -152,7 +153,7 @@ func (s *Service) GetOldestTimestampAsync(ctx context.Context, addresses []commo
|
|||
res.ErrorCode = ErrorCodeSuccess
|
||||
}
|
||||
|
||||
s.sendResponseEvent(EventActivityGetOldestTimestampDone, res, err)
|
||||
s.sendResponseEvent(&requestID, EventActivityGetOldestTimestampDone, res, err)
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -193,7 +194,7 @@ func (s *Service) getDeps() FilterDependencies {
|
|||
}
|
||||
}
|
||||
|
||||
func (s *Service) sendResponseEvent(eventType walletevent.EventType, payloadObj interface{}, resErr error) {
|
||||
func (s *Service) sendResponseEvent(requestID *int32, eventType walletevent.EventType, payloadObj interface{}, resErr error) {
|
||||
payload, err := json.Marshal(payloadObj)
|
||||
if err != nil {
|
||||
log.Error("Error marshaling response: %v; result error: %w", err, resErr)
|
||||
|
@ -201,10 +202,17 @@ func (s *Service) sendResponseEvent(eventType walletevent.EventType, payloadObj
|
|||
err = resErr
|
||||
}
|
||||
|
||||
log.Debug("wallet.api.activity.Service RESPONSE", "eventType", eventType, "error", err, "payload.len", len(payload))
|
||||
log.Debug("wallet.api.activity.Service RESPONSE", "requestID", requestID, "eventType", eventType, "error", err, "payload.len", len(payload))
|
||||
|
||||
s.eventFeed.Send(walletevent.Event{
|
||||
event := walletevent.Event{
|
||||
Type: eventType,
|
||||
Message: string(payload),
|
||||
})
|
||||
}
|
||||
|
||||
if requestID != nil {
|
||||
event.RequestID = new(int)
|
||||
*event.RequestID = int(*requestID)
|
||||
}
|
||||
|
||||
s.eventFeed.Send(event)
|
||||
}
|
||||
|
|
|
@ -0,0 +1,70 @@
|
|||
package activity
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/status-im/status-go/services/wallet/async"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func Test_makeTaskType(t *testing.T) {
|
||||
type args struct {
|
||||
firstRequestID int32
|
||||
secondRequestID int32
|
||||
firstOriginalID int32
|
||||
secondOriginalID int32
|
||||
policy async.ReplacementPolicy
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
args args
|
||||
wantDifferentIDs bool
|
||||
}{
|
||||
{
|
||||
name: "Different requestID",
|
||||
args: args{
|
||||
firstRequestID: 1,
|
||||
secondRequestID: 2,
|
||||
firstOriginalID: 1,
|
||||
secondOriginalID: 1,
|
||||
policy: async.ReplacementPolicyCancelOld,
|
||||
},
|
||||
wantDifferentIDs: true,
|
||||
},
|
||||
{
|
||||
name: "Different originalID",
|
||||
args: args{
|
||||
firstRequestID: 1,
|
||||
secondRequestID: 1,
|
||||
firstOriginalID: 2,
|
||||
secondOriginalID: 3,
|
||||
policy: async.ReplacementPolicyCancelOld,
|
||||
},
|
||||
wantDifferentIDs: true,
|
||||
},
|
||||
{
|
||||
name: "Same requestID and originalID",
|
||||
args: args{
|
||||
firstRequestID: 1,
|
||||
secondRequestID: 1,
|
||||
firstOriginalID: 1,
|
||||
secondOriginalID: 1,
|
||||
policy: async.ReplacementPolicyCancelOld,
|
||||
},
|
||||
wantDifferentIDs: false,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
firstTT := makeTaskType(tt.args.firstRequestID, tt.args.firstOriginalID, tt.args.policy)
|
||||
secondTT := makeTaskType(tt.args.secondRequestID, tt.args.secondOriginalID, tt.args.policy)
|
||||
if tt.wantDifferentIDs {
|
||||
require.NotEqual(t, firstTT.ID, secondTT.ID)
|
||||
} else {
|
||||
require.Equal(t, firstTT.ID, secondTT.ID)
|
||||
}
|
||||
require.Equal(t, firstTT.Policy, secondTT.Policy)
|
||||
})
|
||||
}
|
||||
}
|
|
@ -543,24 +543,24 @@ func (api *API) FetchAllCurrencyFormats() (currency.FormatPerSymbol, error) {
|
|||
return api.s.currency.FetchAllCurrencyFormats()
|
||||
}
|
||||
|
||||
func (api *API) FilterActivityAsync(ctx context.Context, addresses []common.Address, chainIDs []wcommon.ChainID, filter activity.Filter, offset int, limit int) error {
|
||||
log.Debug("wallet.api.FilterActivityAsync", "addr.count", len(addresses), "chainIDs.count", len(chainIDs), "offset", offset, "limit", limit)
|
||||
func (api *API) FilterActivityAsync(requestID int32, addresses []common.Address, chainIDs []wcommon.ChainID, filter activity.Filter, offset int, limit int) error {
|
||||
log.Debug("wallet.api.FilterActivityAsync", "requestID", requestID, "addr.count", len(addresses), "chainIDs.count", len(chainIDs), "offset", offset, "limit", limit)
|
||||
|
||||
api.s.activity.FilterActivityAsync(ctx, addresses, chainIDs, filter, offset, limit)
|
||||
api.s.activity.FilterActivityAsync(requestID, addresses, chainIDs, filter, offset, limit)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (api *API) GetRecipientsAsync(ctx context.Context, offset int, limit int) (ignored bool, err error) {
|
||||
func (api *API) GetRecipientsAsync(requestID int32, offset int, limit int) (ignored bool, err error) {
|
||||
log.Debug("wallet.api.GetRecipientsAsync", "offset", offset, "limit", limit)
|
||||
|
||||
ignored = api.s.activity.GetRecipientsAsync(ctx, offset, limit)
|
||||
ignored = api.s.activity.GetRecipientsAsync(requestID, offset, limit)
|
||||
return ignored, err
|
||||
}
|
||||
|
||||
func (api *API) GetOldestActivityTimestampAsync(ctx context.Context, addresses []common.Address) error {
|
||||
func (api *API) GetOldestActivityTimestampAsync(requestID int32, addresses []common.Address) error {
|
||||
log.Debug("wallet.api.GetOldestActivityTimestamp", "addresses.len", len(addresses))
|
||||
|
||||
api.s.activity.GetOldestTimestampAsync(ctx, addresses)
|
||||
api.s.activity.GetOldestTimestampAsync(requestID, addresses)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -28,7 +28,7 @@ const (
|
|||
)
|
||||
|
||||
type TaskType struct {
|
||||
ID int
|
||||
ID int64
|
||||
Policy ReplacementPolicy
|
||||
}
|
||||
|
||||
|
|
|
@ -17,4 +17,5 @@ type Event struct {
|
|||
Message string `json:"message"`
|
||||
At int64 `json:"at"`
|
||||
ChainID uint64 `json:"chainId"`
|
||||
RequestID *int `json:"requestId,omitempty"`
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue