Ivan Belyakov 2df9df10ab fix(tests): moved test db setup to a common place 't/helpers', created
interface for initializing db, which is implemented for appdatabase and
walletdatabase. TBD for multiaccounts DB.
Unified DB initializion for all tests using helpers and new interface.
Reduced sqlcipher kdf iterations for all tests to 1.
2023-08-18 09:00:56 +02:00

222 lines
6.8 KiB
Go

package activity
import (
"context"
"database/sql"
"encoding/json"
"errors"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/status-im/status-go/multiaccounts/accounts"
"github.com/status-im/status-go/services/wallet/async"
w_common "github.com/status-im/status-go/services/wallet/common"
"github.com/status-im/status-go/services/wallet/token"
"github.com/status-im/status-go/services/wallet/walletevent"
)
const (
// FilterResponse json is sent as a message in the EventActivityFilteringDone event
EventActivityFilteringDone walletevent.EventType = "wallet-activity-filtering-done"
EventActivityGetRecipientsDone walletevent.EventType = "wallet-activity-get-recipients-result"
EventActivityGetOldestTimestampDone walletevent.EventType = "wallet-activity-get-oldest-timestamp-result"
)
var (
filterTask = async.TaskType{
ID: 1,
Policy: async.ReplacementPolicyCancelOld,
}
getRecipientsTask = async.TaskType{
ID: 2,
Policy: async.ReplacementPolicyIgnoreNew,
}
getOldestTimestampTask = async.TaskType{
ID: 3,
Policy: async.ReplacementPolicyCancelOld,
}
)
type Service struct {
db *sql.DB
accountsDB *accounts.Database
tokenManager *token.Manager
eventFeed *event.Feed
scheduler *async.MultiClientScheduler
}
func NewService(db *sql.DB, tokenManager *token.Manager, eventFeed *event.Feed, accountsDb *accounts.Database) *Service {
return &Service{
db: db,
accountsDB: accountsDb,
tokenManager: tokenManager,
eventFeed: eventFeed,
scheduler: async.NewMultiClientScheduler(),
}
}
type ErrorCode = int
const (
ErrorCodeSuccess ErrorCode = iota + 1
ErrorCodeTaskCanceled
ErrorCodeFailed
)
type FilterResponse struct {
Activities []Entry `json:"activities"`
Offset int `json:"offset"`
// Used to indicate that there might be more entries that were not returned
// based on a simple heuristic
HasMore bool `json:"hasMore"`
ErrorCode ErrorCode `json:"errorCode"`
}
// FilterActivityAsync allows only one filter task to run at a time
// and it cancels the current one if a new one is started
// All calls will trigger an EventActivityFilteringDone event with the result of the filtering
func (s *Service) FilterActivityAsync(requestID int32, addresses []common.Address, chainIDs []w_common.ChainID, filter Filter, offset int, limit int) {
s.scheduler.Enqueue(requestID, filterTask, func(ctx context.Context) (interface{}, error) {
activities, err := getActivityEntries(ctx, s.getDeps(), addresses, chainIDs, filter, offset, limit)
return activities, err
}, func(result interface{}, taskType async.TaskType, err error) {
res := FilterResponse{
ErrorCode: ErrorCodeFailed,
}
if errors.Is(err, context.Canceled) || errors.Is(err, async.ErrTaskOverwritten) {
res.ErrorCode = ErrorCodeTaskCanceled
} else if err == nil {
activities := result.([]Entry)
res.Activities = activities
res.Offset = offset
res.HasMore = len(activities) == limit
res.ErrorCode = ErrorCodeSuccess
}
s.sendResponseEvent(&requestID, EventActivityFilteringDone, res, err)
})
}
type GetRecipientsResponse struct {
Addresses []common.Address `json:"addresses"`
Offset int `json:"offset"`
// Used to indicate that there might be more entries that were not returned
// based on a simple heuristic
HasMore bool `json:"hasMore"`
ErrorCode ErrorCode `json:"errorCode"`
}
// GetRecipientsAsync returns true if a task is already running or scheduled due to a previous call; meaning that
// this call won't receive an answer but client should rely on the answer from the previous call.
// If no task is already scheduled false will be returned
func (s *Service) GetRecipientsAsync(requestID int32, offset int, limit int) bool {
return s.scheduler.Enqueue(requestID, getRecipientsTask, func(ctx context.Context) (interface{}, error) {
var err error
result := &GetRecipientsResponse{
Offset: offset,
ErrorCode: ErrorCodeSuccess,
}
result.Addresses, result.HasMore, err = GetRecipients(ctx, s.db, offset, limit)
return result, err
}, func(result interface{}, taskType async.TaskType, err error) {
res := result.(*GetRecipientsResponse)
if errors.Is(err, context.Canceled) || errors.Is(err, async.ErrTaskOverwritten) {
res.ErrorCode = ErrorCodeTaskCanceled
} else if err != nil {
res.ErrorCode = ErrorCodeFailed
}
s.sendResponseEvent(&requestID, EventActivityGetRecipientsDone, result, err)
})
}
type GetOldestTimestampResponse struct {
Timestamp int64 `json:"timestamp"`
ErrorCode ErrorCode `json:"errorCode"`
}
func (s *Service) GetOldestTimestampAsync(requestID int32, addresses []common.Address) {
s.scheduler.Enqueue(requestID, getOldestTimestampTask, func(ctx context.Context) (interface{}, error) {
timestamp, err := GetOldestTimestamp(ctx, s.db, addresses)
return timestamp, err
}, func(result interface{}, taskType async.TaskType, err error) {
res := GetOldestTimestampResponse{
ErrorCode: ErrorCodeFailed,
}
if errors.Is(err, context.Canceled) || errors.Is(err, async.ErrTaskOverwritten) {
res.ErrorCode = ErrorCodeTaskCanceled
} else if err == nil {
res.Timestamp = result.(int64)
res.ErrorCode = ErrorCodeSuccess
}
s.sendResponseEvent(&requestID, EventActivityGetOldestTimestampDone, res, err)
})
}
func (s *Service) Stop() {
s.scheduler.Stop()
}
func (s *Service) getDeps() FilterDependencies {
return FilterDependencies{
db: s.db,
accountsDb: s.accountsDB,
tokenSymbol: func(t Token) string {
info := s.tokenManager.LookupTokenIdentity(uint64(t.ChainID), t.Address, t.TokenType == Native)
if info == nil {
return ""
}
return info.Symbol
},
tokenFromSymbol: func(chainID *w_common.ChainID, symbol string) *Token {
var cID *uint64
if chainID != nil {
cID = new(uint64)
*cID = uint64(*chainID)
}
t, detectedNative := s.tokenManager.LookupToken(cID, symbol)
if t == nil {
return nil
}
tokenType := Native
if !detectedNative {
tokenType = Erc20
}
return &Token{
TokenType: tokenType,
ChainID: w_common.ChainID(t.ChainID),
Address: t.Address,
}
},
}
}
func (s *Service) sendResponseEvent(requestID *int32, eventType walletevent.EventType, payloadObj interface{}, resErr error) {
payload, err := json.Marshal(payloadObj)
if err != nil {
log.Error("Error marshaling response: %v; result error: %w", err, resErr)
} else {
err = resErr
}
log.Debug("wallet.api.activity.Service RESPONSE", "requestID", requestID, "eventType", eventType, "error", err, "payload.len", len(payload))
event := walletevent.Event{
Type: eventType,
Message: string(payload),
}
if requestID != nil {
event.RequestID = new(int)
*event.RequestID = int(*requestID)
}
s.eventFeed.Send(event)
}