mirror of
https://github.com/status-im/status-go.git
synced 2025-01-25 05:58:59 +00:00
c61a4000d8
Implement activity.Scheduler to serialize and limit the number of calls on the activity service. This way we protect form inefficient parallel queries and easy support async and rate limiting based on the API requirements. Refactor the activity APIs async and use the Scheduler for managing the activity service calls configured with one of the two rules: cancel ignore. Updates status-desktop #11170
208 lines
6.1 KiB
Go
208 lines
6.1 KiB
Go
package activity
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"encoding/json"
|
|
"errors"
|
|
|
|
"github.com/ethereum/go-ethereum/common"
|
|
"github.com/ethereum/go-ethereum/event"
|
|
"github.com/ethereum/go-ethereum/log"
|
|
|
|
w_common "github.com/status-im/status-go/services/wallet/common"
|
|
"github.com/status-im/status-go/services/wallet/token"
|
|
"github.com/status-im/status-go/services/wallet/walletevent"
|
|
)
|
|
|
|
const (
|
|
// FilterResponse json is sent as a message in the EventActivityFilteringDone event
|
|
EventActivityFilteringDone walletevent.EventType = "wallet-activity-filtering-done"
|
|
EventActivityGetRecipientsDone walletevent.EventType = "wallet-activity-get-recipients-result"
|
|
EventActivityGetOldestTimestampDone walletevent.EventType = "wallet-activity-get-oldest-timestamp-result"
|
|
)
|
|
|
|
var (
|
|
filterTask = TaskType{
|
|
ID: 1,
|
|
Policy: ReplacementPolicyCancelOld,
|
|
}
|
|
getRecipientsTask = TaskType{
|
|
ID: 2,
|
|
Policy: ReplacementPolicyIgnoreNew,
|
|
}
|
|
getOldestTimestampTask = TaskType{
|
|
ID: 3,
|
|
Policy: ReplacementPolicyCancelOld,
|
|
}
|
|
)
|
|
|
|
type Service struct {
|
|
db *sql.DB
|
|
tokenManager *token.Manager
|
|
eventFeed *event.Feed
|
|
|
|
scheduler *Scheduler
|
|
}
|
|
|
|
func NewService(db *sql.DB, tokenManager *token.Manager, eventFeed *event.Feed) *Service {
|
|
return &Service{
|
|
db: db,
|
|
tokenManager: tokenManager,
|
|
eventFeed: eventFeed,
|
|
scheduler: NewScheduler(),
|
|
}
|
|
}
|
|
|
|
type ErrorCode = int
|
|
|
|
const (
|
|
ErrorCodeSuccess ErrorCode = iota + 1
|
|
ErrorCodeTaskCanceled
|
|
ErrorCodeFailed
|
|
)
|
|
|
|
type FilterResponse struct {
|
|
Activities []Entry `json:"activities"`
|
|
Offset int `json:"offset"`
|
|
// Used to indicate that there might be more entries that were not returned
|
|
// based on a simple heuristic
|
|
HasMore bool `json:"hasMore"`
|
|
ErrorCode ErrorCode `json:"errorCode"`
|
|
}
|
|
|
|
// FilterActivityAsync allows only one filter task to run at a time
|
|
// and it cancels the current one if a new one is started
|
|
// All calls will trigger an EventActivityFilteringDone event with the result of the filtering
|
|
func (s *Service) FilterActivityAsync(ctx context.Context, addresses []common.Address, chainIDs []w_common.ChainID, filter Filter, offset int, limit int) {
|
|
s.scheduler.Enqueue(filterTask, func(ctx context.Context) (interface{}, error) {
|
|
activities, err := getActivityEntries(ctx, s.getDeps(), addresses, chainIDs, filter, offset, limit)
|
|
return activities, err
|
|
}, func(result interface{}, taskType TaskType, err error) {
|
|
res := FilterResponse{
|
|
ErrorCode: ErrorCodeFailed,
|
|
}
|
|
|
|
if errors.Is(err, context.Canceled) || errors.Is(err, ErrTaskOverwritten) {
|
|
res.ErrorCode = ErrorCodeTaskCanceled
|
|
} else if err == nil {
|
|
activities := result.([]Entry)
|
|
res.Activities = activities
|
|
res.Offset = offset
|
|
res.HasMore = len(activities) == limit
|
|
res.ErrorCode = ErrorCodeSuccess
|
|
}
|
|
|
|
s.sendResponseEvent(EventActivityFilteringDone, res)
|
|
})
|
|
}
|
|
|
|
type GetRecipientsResponse struct {
|
|
Addresses []common.Address `json:"addresses"`
|
|
Offset int `json:"offset"`
|
|
// Used to indicate that there might be more entries that were not returned
|
|
// based on a simple heuristic
|
|
HasMore bool `json:"hasMore"`
|
|
ErrorCode ErrorCode `json:"errorCode"`
|
|
}
|
|
|
|
// GetRecipientsAsync returns true if a task is already running or scheduled due to a previous call; meaning that
|
|
// this call won't receive an answer but client should rely on the answer from the previous call.
|
|
// If no task is already scheduled false will be returned
|
|
func (s *Service) GetRecipientsAsync(ctx context.Context, offset int, limit int) bool {
|
|
return s.scheduler.Enqueue(getRecipientsTask, func(ctx context.Context) (interface{}, error) {
|
|
var err error
|
|
result := &GetRecipientsResponse{
|
|
Offset: offset,
|
|
ErrorCode: ErrorCodeSuccess,
|
|
}
|
|
result.Addresses, result.HasMore, err = GetRecipients(ctx, s.db, offset, limit)
|
|
return result, err
|
|
}, func(result interface{}, taskType TaskType, err error) {
|
|
res := result.(*GetRecipientsResponse)
|
|
if errors.Is(err, context.Canceled) || errors.Is(err, ErrTaskOverwritten) {
|
|
res.ErrorCode = ErrorCodeTaskCanceled
|
|
} else if err != nil {
|
|
res.ErrorCode = ErrorCodeFailed
|
|
}
|
|
|
|
s.sendResponseEvent(EventActivityGetRecipientsDone, result)
|
|
})
|
|
}
|
|
|
|
type GetOldestTimestampResponse struct {
|
|
Timestamp int64 `json:"timestamp"`
|
|
ErrorCode ErrorCode `json:"errorCode"`
|
|
}
|
|
|
|
func (s *Service) GetOldestTimestampAsync(ctx context.Context, addresses []common.Address) {
|
|
s.scheduler.Enqueue(getOldestTimestampTask, func(ctx context.Context) (interface{}, error) {
|
|
timestamp, err := GetOldestTimestamp(ctx, s.db, addresses)
|
|
return timestamp, err
|
|
}, func(result interface{}, taskType TaskType, err error) {
|
|
res := GetOldestTimestampResponse{
|
|
ErrorCode: ErrorCodeFailed,
|
|
}
|
|
|
|
if errors.Is(err, context.Canceled) || errors.Is(err, ErrTaskOverwritten) {
|
|
res.ErrorCode = ErrorCodeTaskCanceled
|
|
} else if err == nil {
|
|
res.Timestamp = result.(int64)
|
|
res.ErrorCode = ErrorCodeSuccess
|
|
}
|
|
|
|
s.sendResponseEvent(EventActivityGetOldestTimestampDone, res)
|
|
})
|
|
}
|
|
|
|
func (s *Service) Stop() {
|
|
s.scheduler.Stop()
|
|
}
|
|
|
|
func (s *Service) getDeps() FilterDependencies {
|
|
return FilterDependencies{
|
|
db: s.db,
|
|
tokenSymbol: func(t Token) string {
|
|
info := s.tokenManager.LookupTokenIdentity(uint64(t.ChainID), t.Address, t.TokenType == Native)
|
|
if info == nil {
|
|
return ""
|
|
}
|
|
return info.Symbol
|
|
},
|
|
tokenFromSymbol: func(chainID *w_common.ChainID, symbol string) *Token {
|
|
var cID *uint64
|
|
if chainID != nil {
|
|
cID = new(uint64)
|
|
*cID = uint64(*chainID)
|
|
}
|
|
t, detectedNative := s.tokenManager.LookupToken(cID, symbol)
|
|
if t == nil {
|
|
return nil
|
|
}
|
|
tokenType := Native
|
|
if !detectedNative {
|
|
tokenType = Erc20
|
|
}
|
|
return &Token{
|
|
TokenType: tokenType,
|
|
ChainID: w_common.ChainID(t.ChainID),
|
|
Address: t.Address,
|
|
}
|
|
},
|
|
}
|
|
}
|
|
|
|
func (s *Service) sendResponseEvent(eventType walletevent.EventType, payloadObj interface{}) {
|
|
payload, err := json.Marshal(payloadObj)
|
|
if err != nil {
|
|
log.Error("Error marshaling response: %v", err)
|
|
}
|
|
|
|
log.Debug("wallet.api.activity.Service RESPONSE", "eventType", eventType, "error", err, "payload.len", len(payload))
|
|
|
|
s.eventFeed.Send(walletevent.Event{
|
|
Type: eventType,
|
|
Message: string(payload),
|
|
})
|
|
}
|