chore: move scheduler to async dir

This commit is contained in:
Dario Gabriel Lipicar 2023-07-05 10:59:39 -03:00 committed by dlipicar
parent b5224b3cc5
commit f138964a9c
3 changed files with 17 additions and 16 deletions

View File

@ -10,6 +10,7 @@ import (
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/status-im/status-go/services/wallet/async"
w_common "github.com/status-im/status-go/services/wallet/common"
"github.com/status-im/status-go/services/wallet/token"
"github.com/status-im/status-go/services/wallet/walletevent"
@ -23,17 +24,17 @@ const (
)
var (
filterTask = TaskType{
filterTask = async.TaskType{
ID: 1,
Policy: ReplacementPolicyCancelOld,
Policy: async.ReplacementPolicyCancelOld,
}
getRecipientsTask = TaskType{
getRecipientsTask = async.TaskType{
ID: 2,
Policy: ReplacementPolicyIgnoreNew,
Policy: async.ReplacementPolicyIgnoreNew,
}
getOldestTimestampTask = TaskType{
getOldestTimestampTask = async.TaskType{
ID: 3,
Policy: ReplacementPolicyCancelOld,
Policy: async.ReplacementPolicyCancelOld,
}
)
@ -42,7 +43,7 @@ type Service struct {
tokenManager *token.Manager
eventFeed *event.Feed
scheduler *Scheduler
scheduler *async.Scheduler
}
func NewService(db *sql.DB, tokenManager *token.Manager, eventFeed *event.Feed) *Service {
@ -50,7 +51,7 @@ func NewService(db *sql.DB, tokenManager *token.Manager, eventFeed *event.Feed)
db: db,
tokenManager: tokenManager,
eventFeed: eventFeed,
scheduler: NewScheduler(),
scheduler: async.NewScheduler(),
}
}
@ -78,12 +79,12 @@ func (s *Service) FilterActivityAsync(ctx context.Context, addresses []common.Ad
s.scheduler.Enqueue(filterTask, func(ctx context.Context) (interface{}, error) {
activities, err := getActivityEntries(ctx, s.getDeps(), addresses, chainIDs, filter, offset, limit)
return activities, err
}, func(result interface{}, taskType TaskType, err error) {
}, func(result interface{}, taskType async.TaskType, err error) {
res := FilterResponse{
ErrorCode: ErrorCodeFailed,
}
if errors.Is(err, context.Canceled) || errors.Is(err, ErrTaskOverwritten) {
if errors.Is(err, context.Canceled) || errors.Is(err, async.ErrTaskOverwritten) {
res.ErrorCode = ErrorCodeTaskCanceled
} else if err == nil {
activities := result.([]Entry)
@ -118,9 +119,9 @@ func (s *Service) GetRecipientsAsync(ctx context.Context, offset int, limit int)
}
result.Addresses, result.HasMore, err = GetRecipients(ctx, s.db, offset, limit)
return result, err
}, func(result interface{}, taskType TaskType, err error) {
}, func(result interface{}, taskType async.TaskType, err error) {
res := result.(*GetRecipientsResponse)
if errors.Is(err, context.Canceled) || errors.Is(err, ErrTaskOverwritten) {
if errors.Is(err, context.Canceled) || errors.Is(err, async.ErrTaskOverwritten) {
res.ErrorCode = ErrorCodeTaskCanceled
} else if err != nil {
res.ErrorCode = ErrorCodeFailed
@ -139,12 +140,12 @@ func (s *Service) GetOldestTimestampAsync(ctx context.Context, addresses []commo
s.scheduler.Enqueue(getOldestTimestampTask, func(ctx context.Context) (interface{}, error) {
timestamp, err := GetOldestTimestamp(ctx, s.db, addresses)
return timestamp, err
}, func(result interface{}, taskType TaskType, err error) {
}, func(result interface{}, taskType async.TaskType, err error) {
res := GetOldestTimestampResponse{
ErrorCode: ErrorCodeFailed,
}
if errors.Is(err, context.Canceled) || errors.Is(err, ErrTaskOverwritten) {
if errors.Is(err, context.Canceled) || errors.Is(err, async.ErrTaskOverwritten) {
res.ErrorCode = ErrorCodeTaskCanceled
} else if err == nil {
res.Timestamp = result.(int64)

View File

@ -1,4 +1,4 @@
package activity
package async
import (
"context"

View File

@ -1,4 +1,4 @@
package activity
package async
import (
"context"