chore: move multi-client scheduler implementation to separate file

This commit is contained in:
Dario Gabriel Lipicar 2023-08-10 16:30:17 -03:00 committed by dlipicar
parent 9d0acc2265
commit d6aae82566
3 changed files with 61 additions and 28 deletions

View File

@ -23,28 +23,27 @@ const (
EventActivityGetOldestTimestampDone walletevent.EventType = "wallet-activity-get-oldest-timestamp-result" EventActivityGetOldestTimestampDone walletevent.EventType = "wallet-activity-get-oldest-timestamp-result"
) )
const ( var (
filterTaskID = int32(1) filterTask = async.TaskType{
filterTaskPolicy = async.ReplacementPolicyCancelOld ID: 1,
getRecipientsTaskID = int32(2) Policy: async.ReplacementPolicyCancelOld,
getRecipientsTaskPolicy = async.ReplacementPolicyIgnoreNew
getOldestTimestampTaskID = int32(3)
getOldestTimestampTaskPolicy = async.ReplacementPolicyCancelOld
)
func makeTaskType(requestID int32, originalID int32, policy async.ReplacementPolicy) async.TaskType {
return async.TaskType{
ID: int64(requestID)<<32 | int64(originalID),
Policy: policy,
} }
} getRecipientsTask = async.TaskType{
ID: 2,
Policy: async.ReplacementPolicyIgnoreNew,
}
getOldestTimestampTask = async.TaskType{
ID: 3,
Policy: async.ReplacementPolicyCancelOld,
}
)
type Service struct { type Service struct {
db *sql.DB db *sql.DB
tokenManager *token.Manager tokenManager *token.Manager
eventFeed *event.Feed eventFeed *event.Feed
scheduler *async.Scheduler scheduler *async.MultiClientScheduler
} }
func NewService(db *sql.DB, tokenManager *token.Manager, eventFeed *event.Feed) *Service { func NewService(db *sql.DB, tokenManager *token.Manager, eventFeed *event.Feed) *Service {
@ -52,7 +51,7 @@ func NewService(db *sql.DB, tokenManager *token.Manager, eventFeed *event.Feed)
db: db, db: db,
tokenManager: tokenManager, tokenManager: tokenManager,
eventFeed: eventFeed, eventFeed: eventFeed,
scheduler: async.NewScheduler(), scheduler: async.NewMultiClientScheduler(),
} }
} }
@ -77,7 +76,7 @@ type FilterResponse struct {
// and it cancels the current one if a new one is started // and it cancels the current one if a new one is started
// All calls will trigger an EventActivityFilteringDone event with the result of the filtering // All calls will trigger an EventActivityFilteringDone event with the result of the filtering
func (s *Service) FilterActivityAsync(requestID int32, addresses []common.Address, chainIDs []w_common.ChainID, filter Filter, offset int, limit int) { func (s *Service) FilterActivityAsync(requestID int32, addresses []common.Address, chainIDs []w_common.ChainID, filter Filter, offset int, limit int) {
s.scheduler.Enqueue(makeTaskType(requestID, filterTaskID, filterTaskPolicy), func(ctx context.Context) (interface{}, error) { s.scheduler.Enqueue(requestID, filterTask, func(ctx context.Context) (interface{}, error) {
activities, err := getActivityEntries(ctx, s.getDeps(), addresses, chainIDs, filter, offset, limit) activities, err := getActivityEntries(ctx, s.getDeps(), addresses, chainIDs, filter, offset, limit)
return activities, err return activities, err
}, func(result interface{}, taskType async.TaskType, err error) { }, func(result interface{}, taskType async.TaskType, err error) {
@ -112,7 +111,7 @@ type GetRecipientsResponse struct {
// this call won't receive an answer but client should rely on the answer from the previous call. // this call won't receive an answer but client should rely on the answer from the previous call.
// If no task is already scheduled false will be returned // If no task is already scheduled false will be returned
func (s *Service) GetRecipientsAsync(requestID int32, offset int, limit int) bool { func (s *Service) GetRecipientsAsync(requestID int32, offset int, limit int) bool {
return s.scheduler.Enqueue(makeTaskType(requestID, getRecipientsTaskID, getRecipientsTaskPolicy), func(ctx context.Context) (interface{}, error) { return s.scheduler.Enqueue(requestID, getRecipientsTask, func(ctx context.Context) (interface{}, error) {
var err error var err error
result := &GetRecipientsResponse{ result := &GetRecipientsResponse{
Offset: offset, Offset: offset,
@ -138,7 +137,7 @@ type GetOldestTimestampResponse struct {
} }
func (s *Service) GetOldestTimestampAsync(requestID int32, addresses []common.Address) { func (s *Service) GetOldestTimestampAsync(requestID int32, addresses []common.Address) {
s.scheduler.Enqueue(makeTaskType(requestID, getOldestTimestampTaskID, getOldestTimestampTaskPolicy), func(ctx context.Context) (interface{}, error) { s.scheduler.Enqueue(requestID, getOldestTimestampTask, func(ctx context.Context) (interface{}, error) {
timestamp, err := GetOldestTimestamp(ctx, s.db, addresses) timestamp, err := GetOldestTimestamp(ctx, s.db, addresses)
return timestamp, err return timestamp, err
}, func(result interface{}, taskType async.TaskType, err error) { }, func(result interface{}, taskType async.TaskType, err error) {

View File

@ -0,0 +1,26 @@
package async
type MultiClientScheduler struct {
scheduler *Scheduler
}
func NewMultiClientScheduler() *MultiClientScheduler {
return &MultiClientScheduler{
scheduler: NewScheduler(),
}
}
func (s *MultiClientScheduler) Stop() {
s.scheduler.Stop()
}
func makeTaskType(requestID int32, origTaskType TaskType) TaskType {
return TaskType{
ID: int64(requestID)<<32 | origTaskType.ID,
Policy: origTaskType.Policy,
}
}
func (s *MultiClientScheduler) Enqueue(requestID int32, taskType TaskType, taskFn taskFunction, resFn resultFunction) (ignored bool) {
return s.scheduler.Enqueue(makeTaskType(requestID, taskType), taskFn, resFn)
}

View File

@ -1,10 +1,8 @@
package activity package async
import ( import (
"testing" "testing"
"github.com/status-im/status-go/services/wallet/async"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -14,7 +12,7 @@ func Test_makeTaskType(t *testing.T) {
secondRequestID int32 secondRequestID int32
firstOriginalID int32 firstOriginalID int32
secondOriginalID int32 secondOriginalID int32
policy async.ReplacementPolicy policy ReplacementPolicy
} }
tests := []struct { tests := []struct {
name string name string
@ -28,7 +26,7 @@ func Test_makeTaskType(t *testing.T) {
secondRequestID: 2, secondRequestID: 2,
firstOriginalID: 1, firstOriginalID: 1,
secondOriginalID: 1, secondOriginalID: 1,
policy: async.ReplacementPolicyCancelOld, policy: ReplacementPolicyCancelOld,
}, },
wantDifferentIDs: true, wantDifferentIDs: true,
}, },
@ -39,7 +37,7 @@ func Test_makeTaskType(t *testing.T) {
secondRequestID: 1, secondRequestID: 1,
firstOriginalID: 2, firstOriginalID: 2,
secondOriginalID: 3, secondOriginalID: 3,
policy: async.ReplacementPolicyCancelOld, policy: ReplacementPolicyCancelOld,
}, },
wantDifferentIDs: true, wantDifferentIDs: true,
}, },
@ -50,15 +48,25 @@ func Test_makeTaskType(t *testing.T) {
secondRequestID: 1, secondRequestID: 1,
firstOriginalID: 1, firstOriginalID: 1,
secondOriginalID: 1, secondOriginalID: 1,
policy: async.ReplacementPolicyCancelOld, policy: ReplacementPolicyCancelOld,
}, },
wantDifferentIDs: false, wantDifferentIDs: false,
}, },
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
firstTT := makeTaskType(tt.args.firstRequestID, tt.args.firstOriginalID, tt.args.policy) firstTT := makeTaskType(
secondTT := makeTaskType(tt.args.secondRequestID, tt.args.secondOriginalID, tt.args.policy) tt.args.firstRequestID,
TaskType{
ID: int64(tt.args.firstOriginalID),
Policy: tt.args.policy,
})
secondTT := makeTaskType(
tt.args.secondRequestID,
TaskType{
ID: int64(tt.args.secondOriginalID),
Policy: tt.args.policy,
})
if tt.wantDifferentIDs { if tt.wantDifferentIDs {
require.NotEqual(t, firstTT.ID, secondTT.ID) require.NotEqual(t, firstTT.ID, secondTT.ID)
} else { } else {