2023-06-08 23:52:45 +00:00
package activity
import (
"context"
"database/sql"
"encoding/json"
"errors"
2023-08-11 17:28:46 +00:00
"strconv"
2024-01-08 21:24:30 +00:00
"sync"
"sync/atomic"
2023-09-20 08:30:31 +00:00
"time"
2023-06-08 23:52:45 +00:00
2024-10-28 20:54:17 +00:00
"go.uber.org/zap"
2023-06-08 23:52:45 +00:00
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/event"
2024-10-28 20:54:17 +00:00
"github.com/status-im/status-go/logutils"
2024-03-29 12:44:50 +00:00
"github.com/status-im/status-go/multiaccounts/accounts"
2023-07-05 13:59:39 +00:00
"github.com/status-im/status-go/services/wallet/async"
2023-08-11 17:28:46 +00:00
"github.com/status-im/status-go/services/wallet/collectibles"
2023-06-08 23:52:45 +00:00
w_common "github.com/status-im/status-go/services/wallet/common"
2023-08-11 17:28:46 +00:00
"github.com/status-im/status-go/services/wallet/thirdparty"
2023-06-13 09:25:23 +00:00
"github.com/status-im/status-go/services/wallet/token"
2023-06-08 23:52:45 +00:00
"github.com/status-im/status-go/services/wallet/walletevent"
2024-01-08 21:24:30 +00:00
"github.com/status-im/status-go/transactions"
2023-06-08 23:52:45 +00:00
)
const (
2024-01-08 21:24:30 +00:00
// EventActivityFilteringDone contains a FilterResponse payload
2023-06-22 11:28:35 +00:00
EventActivityFilteringDone walletevent . EventType = "wallet-activity-filtering-done"
2023-08-11 17:28:46 +00:00
EventActivityFilteringUpdate walletevent . EventType = "wallet-activity-filtering-entries-updated"
2023-06-22 11:28:35 +00:00
EventActivityGetRecipientsDone walletevent . EventType = "wallet-activity-get-recipients-result"
EventActivityGetOldestTimestampDone walletevent . EventType = "wallet-activity-get-oldest-timestamp-result"
2023-10-03 10:49:04 +00:00
EventActivityGetCollectibles walletevent . EventType = "wallet-activity-get-collectibles"
2024-01-08 21:24:30 +00:00
// EventActivitySessionUpdated contains a SessionUpdate payload
EventActivitySessionUpdated walletevent . EventType = "wallet-activity-session-updated"
2023-06-22 11:28:35 +00:00
)
2023-08-10 19:30:17 +00:00
var (
filterTask = async . TaskType {
ID : 1 ,
Policy : async . ReplacementPolicyCancelOld ,
2023-07-24 22:54:53 +00:00
}
2023-08-10 19:30:17 +00:00
getRecipientsTask = async . TaskType {
ID : 2 ,
Policy : async . ReplacementPolicyIgnoreNew ,
}
getOldestTimestampTask = async . TaskType {
ID : 3 ,
Policy : async . ReplacementPolicyCancelOld ,
}
2023-10-03 10:49:04 +00:00
getCollectiblesTask = async . TaskType {
ID : 4 ,
Policy : async . ReplacementPolicyCancelOld ,
}
2023-08-10 19:30:17 +00:00
)
2023-07-24 22:54:53 +00:00
2023-09-12 10:19:15 +00:00
// Service provides an async interface, ensuring only one filter request, of each type, is running at a time. It also provides lazy load of NFT info and token mapping
2023-06-08 23:52:45 +00:00
type Service struct {
2023-06-13 09:25:23 +00:00
db * sql . DB
2024-03-29 12:44:50 +00:00
accountsDB * accounts . Database
2023-08-11 17:28:46 +00:00
tokenManager token . ManagerInterface
collectibles collectibles . ManagerInterface
2023-06-13 09:25:23 +00:00
eventFeed * event . Feed
2023-08-10 19:30:17 +00:00
scheduler * async . MultiClientScheduler
2024-01-08 21:24:30 +00:00
2024-11-12 18:28:40 +00:00
sessions map [ SessionID ] * Session
lastSessionID atomic . Int32
subscriptions event . Subscription
subscriptionsCancelFn context . CancelFunc
ch chan walletevent . Event
2024-01-08 21:24:30 +00:00
// sessionsRWMutex is used to protect all sessions related members
2024-02-17 02:41:44 +00:00
sessionsRWMutex sync . RWMutex
debounceDuration time . Duration
2024-01-08 21:24:30 +00:00
pendingTracker * transactions . PendingTxTracker
2023-06-08 23:52:45 +00:00
}
2024-01-08 21:24:30 +00:00
func ( s * Service ) nextSessionID ( ) SessionID {
return SessionID ( s . lastSessionID . Add ( 1 ) )
}
2024-03-29 12:44:50 +00:00
func NewService ( db * sql . DB , accountsDB * accounts . Database , tokenManager token . ManagerInterface , collectibles collectibles . ManagerInterface , eventFeed * event . Feed , pendingTracker * transactions . PendingTxTracker ) * Service {
2023-06-08 23:52:45 +00:00
return & Service {
2023-06-13 09:25:23 +00:00
db : db ,
2024-03-29 12:44:50 +00:00
accountsDB : accountsDB ,
2023-06-13 09:25:23 +00:00
tokenManager : tokenManager ,
2023-08-11 17:28:46 +00:00
collectibles : collectibles ,
2023-06-13 09:25:23 +00:00
eventFeed : eventFeed ,
2023-08-10 19:30:17 +00:00
scheduler : async . NewMultiClientScheduler ( ) ,
2024-01-08 21:24:30 +00:00
sessions : make ( map [ SessionID ] * Session ) ,
2024-02-17 02:41:44 +00:00
// here to be overwritten by tests
debounceDuration : 1 * time . Second ,
2024-01-08 21:24:30 +00:00
pendingTracker : pendingTracker ,
2023-06-08 23:52:45 +00:00
}
}
type ErrorCode = int
const (
ErrorCodeSuccess ErrorCode = iota + 1
2023-06-22 11:28:35 +00:00
ErrorCodeTaskCanceled
ErrorCodeFailed
2023-06-08 23:52:45 +00:00
)
type FilterResponse struct {
2023-06-11 14:22:25 +00:00
Activities [ ] Entry ` json:"activities" `
Offset int ` json:"offset" `
// Used to indicate that there might be more entries that were not returned
// based on a simple heuristic
HasMore bool ` json:"hasMore" `
ErrorCode ErrorCode ` json:"errorCode" `
2023-06-08 23:52:45 +00:00
}
// FilterActivityAsync allows only one filter task to run at a time
2023-09-12 10:19:15 +00:00
// it cancels the current one if a new one is started
// and should not expect other owners to have data in one of the queried tables
//
2023-06-08 23:52:45 +00:00
// All calls will trigger an EventActivityFilteringDone event with the result of the filtering
2024-01-08 21:24:30 +00:00
// TODO #12120: replace with session based APIs
2024-03-29 12:44:50 +00:00
func ( s * Service ) FilterActivityAsync ( requestID int32 , addresses [ ] common . Address , chainIDs [ ] w_common . ChainID , filter Filter , offset int , limit int ) {
2023-08-10 19:30:17 +00:00
s . scheduler . Enqueue ( requestID , filterTask , func ( ctx context . Context ) ( interface { } , error ) {
2024-03-29 12:44:50 +00:00
allAddresses := s . areAllAddresses ( addresses )
2023-09-12 10:19:15 +00:00
activities , err := getActivityEntries ( ctx , s . getDeps ( ) , addresses , allAddresses , chainIDs , filter , offset , limit )
2023-06-22 11:28:35 +00:00
return activities , err
2023-07-05 13:59:39 +00:00
} , func ( result interface { } , taskType async . TaskType , err error ) {
2023-06-22 11:28:35 +00:00
res := FilterResponse {
ErrorCode : ErrorCodeFailed ,
}
2023-06-08 23:52:45 +00:00
2023-07-05 13:59:39 +00:00
if errors . Is ( err , context . Canceled ) || errors . Is ( err , async . ErrTaskOverwritten ) {
2023-06-22 11:28:35 +00:00
res . ErrorCode = ErrorCodeTaskCanceled
} else if err == nil {
activities := result . ( [ ] Entry )
res . Activities = activities
res . Offset = offset
res . HasMore = len ( activities ) == limit
res . ErrorCode = ErrorCodeSuccess
}
2024-01-08 21:24:30 +00:00
sendResponseEvent ( s . eventFeed , & requestID , EventActivityFilteringDone , res , err )
2023-08-11 17:28:46 +00:00
2023-11-14 17:16:39 +00:00
s . getActivityDetailsAsync ( requestID , res . Activities )
2023-06-22 11:28:35 +00:00
} )
}
2023-06-08 23:52:45 +00:00
2023-10-03 10:49:04 +00:00
type CollectibleHeader struct {
ID thirdparty . CollectibleUniqueID ` json:"id" `
Name string ` json:"name" `
ImageURL string ` json:"image_url" `
}
type GetollectiblesResponse struct {
Collectibles [ ] CollectibleHeader ` json:"collectibles" `
Offset int ` json:"offset" `
// Used to indicate that there might be more collectibles that were not returned
// based on a simple heuristic
HasMore bool ` json:"hasMore" `
ErrorCode ErrorCode ` json:"errorCode" `
}
func ( s * Service ) GetActivityCollectiblesAsync ( requestID int32 , chainIDs [ ] w_common . ChainID , addresses [ ] common . Address , offset int , limit int ) {
s . scheduler . Enqueue ( requestID , getCollectiblesTask , func ( ctx context . Context ) ( interface { } , error ) {
collectibles , err := GetActivityCollectibles ( ctx , s . db , chainIDs , addresses , offset , limit )
if err != nil {
return nil , err
}
2023-12-13 12:19:25 +00:00
data , err := s . collectibles . FetchAssetsByCollectibleUniqueID ( ctx , collectibles , true )
2023-10-03 10:49:04 +00:00
if err != nil {
return nil , err
}
res := make ( [ ] CollectibleHeader , 0 , len ( data ) )
for _ , c := range data {
res = append ( res , CollectibleHeader {
ID : c . CollectibleData . ID ,
Name : c . CollectibleData . Name ,
ImageURL : c . CollectibleData . ImageURL ,
} )
}
return res , err
} , func ( result interface { } , taskType async . TaskType , err error ) {
res := GetollectiblesResponse {
ErrorCode : ErrorCodeFailed ,
}
if errors . Is ( err , context . Canceled ) || errors . Is ( err , async . ErrTaskOverwritten ) {
res . ErrorCode = ErrorCodeTaskCanceled
} else if err == nil {
collectibles := result . ( [ ] CollectibleHeader )
res . Collectibles = collectibles
res . Offset = offset
res . HasMore = len ( collectibles ) == limit
res . ErrorCode = ErrorCodeSuccess
}
2024-01-08 21:24:30 +00:00
sendResponseEvent ( s . eventFeed , & requestID , EventActivityGetCollectibles , res , err )
2023-10-03 10:49:04 +00:00
} )
}
2023-08-24 12:23:40 +00:00
func ( s * Service ) GetMultiTxDetails ( ctx context . Context , multiTxID int ) ( * EntryDetails , error ) {
return getMultiTxDetails ( ctx , s . db , multiTxID )
}
func ( s * Service ) GetTxDetails ( ctx context . Context , id string ) ( * EntryDetails , error ) {
return getTxDetails ( ctx , s . db , id )
}
2023-11-14 17:16:39 +00:00
// getActivityDetails check if any of the entries have details that are not loaded then fetch and emit result
func ( s * Service ) getActivityDetails ( ctx context . Context , entries [ ] Entry ) ( [ ] * EntryData , error ) {
2023-08-11 17:28:46 +00:00
res := make ( [ ] * EntryData , 0 )
var err error
ids := make ( [ ] thirdparty . CollectibleUniqueID , 0 )
2024-07-14 16:34:43 +00:00
entriesForIds := make ( map [ string ] [ ] * Entry )
2024-07-04 11:51:43 +00:00
idExists := func ( ids [ ] thirdparty . CollectibleUniqueID , id * thirdparty . CollectibleUniqueID ) bool {
for _ , existingID := range ids {
if existingID . Same ( id ) {
return true
}
}
return false
}
2023-08-11 17:28:46 +00:00
for i := range entries {
if ! entries [ i ] . isNFT ( ) {
continue
}
id := entries [ i ] . anyIdentity ( )
2024-07-14 16:34:43 +00:00
if id == nil {
2023-08-11 17:28:46 +00:00
continue
}
2024-07-14 16:34:43 +00:00
entriesForIds [ id . HashKey ( ) ] = append ( entriesForIds [ id . HashKey ( ) ] , & entries [ i ] )
if ! idExists ( ids , id ) {
ids = append ( ids , * id )
}
2023-08-11 17:28:46 +00:00
}
if len ( ids ) == 0 {
2023-11-14 17:16:39 +00:00
return nil , nil
2023-08-11 17:28:46 +00:00
}
2024-10-28 20:54:17 +00:00
logutils . ZapLogger ( ) . Debug ( "wallet.activity.Service lazyLoadDetails" ,
zap . Int ( "entries.len" , len ( entries ) ) ,
zap . Int ( "ids.len" , len ( ids ) ) ,
)
2023-08-11 17:28:46 +00:00
2023-12-13 12:19:25 +00:00
colData , err := s . collectibles . FetchAssetsByCollectibleUniqueID ( ctx , ids , true )
2023-08-11 17:28:46 +00:00
if err != nil {
2024-10-28 20:54:17 +00:00
logutils . ZapLogger ( ) . Error ( "Error fetching collectible details" , zap . Error ( err ) )
2023-11-14 17:16:39 +00:00
return nil , err
2023-08-11 17:28:46 +00:00
}
for _ , col := range colData {
2024-07-14 16:34:43 +00:00
nftName := w_common . NewAndSet ( col . CollectibleData . Name )
nftURL := w_common . NewAndSet ( col . CollectibleData . ImageURL )
2023-08-11 17:28:46 +00:00
for i := range ids {
2024-07-14 16:34:43 +00:00
if ! col . CollectibleData . ID . Same ( & ids [ i ] ) {
continue
}
entryList , ok := entriesForIds [ ids [ i ] . HashKey ( ) ]
if ! ok {
continue
}
for _ , e := range entryList {
data := & EntryData {
NftName : nftName ,
NftURL : nftURL ,
}
if e . payloadType == MultiTransactionPT {
data . ID = w_common . NewAndSet ( e . id )
2023-08-11 17:28:46 +00:00
} else {
2024-07-14 16:34:43 +00:00
data . Transaction = e . transaction
2023-08-11 17:28:46 +00:00
}
2024-07-14 16:34:43 +00:00
data . PayloadType = e . payloadType
res = append ( res , data )
2023-08-11 17:28:46 +00:00
}
}
}
2023-11-14 17:16:39 +00:00
return res , nil
2023-08-11 17:28:46 +00:00
}
2023-06-22 11:28:35 +00:00
type GetRecipientsResponse struct {
Addresses [ ] common . Address ` json:"addresses" `
Offset int ` json:"offset" `
// Used to indicate that there might be more entries that were not returned
// based on a simple heuristic
HasMore bool ` json:"hasMore" `
ErrorCode ErrorCode ` json:"errorCode" `
}
2023-06-08 23:52:45 +00:00
2023-06-22 11:28:35 +00:00
// GetRecipientsAsync returns true if a task is already running or scheduled due to a previous call; meaning that
// this call won't receive an answer but client should rely on the answer from the previous call.
// If no task is already scheduled false will be returned
2023-10-02 11:46:05 +00:00
func ( s * Service ) GetRecipientsAsync ( requestID int32 , chainIDs [ ] w_common . ChainID , addresses [ ] common . Address , offset int , limit int ) bool {
2023-08-10 19:30:17 +00:00
return s . scheduler . Enqueue ( requestID , getRecipientsTask , func ( ctx context . Context ) ( interface { } , error ) {
2023-06-22 11:28:35 +00:00
var err error
result := & GetRecipientsResponse {
Offset : offset ,
ErrorCode : ErrorCodeSuccess ,
}
2023-10-02 11:46:05 +00:00
result . Addresses , result . HasMore , err = GetRecipients ( ctx , s . db , chainIDs , addresses , offset , limit )
2023-06-22 11:28:35 +00:00
return result , err
2023-07-05 13:59:39 +00:00
} , func ( result interface { } , taskType async . TaskType , err error ) {
2023-06-22 11:28:35 +00:00
res := result . ( * GetRecipientsResponse )
2023-07-05 13:59:39 +00:00
if errors . Is ( err , context . Canceled ) || errors . Is ( err , async . ErrTaskOverwritten ) {
2023-06-22 11:28:35 +00:00
res . ErrorCode = ErrorCodeTaskCanceled
} else if err != nil {
res . ErrorCode = ErrorCodeFailed
}
2023-06-11 14:22:25 +00:00
2024-01-08 21:24:30 +00:00
sendResponseEvent ( s . eventFeed , & requestID , EventActivityGetRecipientsDone , result , err )
2023-06-22 11:28:35 +00:00
} )
}
2023-06-08 23:52:45 +00:00
2023-06-22 11:28:35 +00:00
type GetOldestTimestampResponse struct {
Timestamp int64 ` json:"timestamp" `
ErrorCode ErrorCode ` json:"errorCode" `
}
2023-06-08 23:52:45 +00:00
2023-07-24 22:54:53 +00:00
func ( s * Service ) GetOldestTimestampAsync ( requestID int32 , addresses [ ] common . Address ) {
2023-08-10 19:30:17 +00:00
s . scheduler . Enqueue ( requestID , getOldestTimestampTask , func ( ctx context . Context ) ( interface { } , error ) {
2023-06-22 11:28:35 +00:00
timestamp , err := GetOldestTimestamp ( ctx , s . db , addresses )
return timestamp , err
2023-07-05 13:59:39 +00:00
} , func ( result interface { } , taskType async . TaskType , err error ) {
2023-06-22 11:28:35 +00:00
res := GetOldestTimestampResponse {
ErrorCode : ErrorCodeFailed ,
2023-06-08 23:52:45 +00:00
}
2023-07-05 13:59:39 +00:00
if errors . Is ( err , context . Canceled ) || errors . Is ( err , async . ErrTaskOverwritten ) {
2023-06-22 11:28:35 +00:00
res . ErrorCode = ErrorCodeTaskCanceled
2023-06-08 23:52:45 +00:00
} else if err == nil {
2024-06-10 13:56:06 +00:00
res . Timestamp = int64 ( result . ( uint64 ) )
2023-06-08 23:52:45 +00:00
res . ErrorCode = ErrorCodeSuccess
}
2024-01-08 21:24:30 +00:00
sendResponseEvent ( s . eventFeed , & requestID , EventActivityGetOldestTimestampDone , res , err )
2023-06-22 11:28:35 +00:00
} )
2023-06-08 23:52:45 +00:00
}
2023-09-19 20:58:13 +00:00
func ( s * Service ) CancelFilterTask ( requestID int32 ) {
s . scheduler . Enqueue ( requestID , filterTask , func ( ctx context . Context ) ( interface { } , error ) {
// No-op
return nil , nil
} , func ( result interface { } , taskType async . TaskType , err error ) {
// Ignore result
} )
}
2023-06-08 23:52:45 +00:00
func ( s * Service ) Stop ( ) {
2023-06-22 11:28:35 +00:00
s . scheduler . Stop ( )
2023-06-08 23:52:45 +00:00
}
2023-06-13 09:25:23 +00:00
func ( s * Service ) getDeps ( ) FilterDependencies {
return FilterDependencies {
2023-09-12 10:19:15 +00:00
db : s . db ,
2023-06-13 09:25:23 +00:00
tokenSymbol : func ( t Token ) string {
info := s . tokenManager . LookupTokenIdentity ( uint64 ( t . ChainID ) , t . Address , t . TokenType == Native )
if info == nil {
return ""
}
return info . Symbol
} ,
tokenFromSymbol : func ( chainID * w_common . ChainID , symbol string ) * Token {
var cID * uint64
if chainID != nil {
cID = new ( uint64 )
* cID = uint64 ( * chainID )
}
t , detectedNative := s . tokenManager . LookupToken ( cID , symbol )
if t == nil {
return nil
}
tokenType := Native
if ! detectedNative {
tokenType = Erc20
}
return & Token {
TokenType : tokenType ,
ChainID : w_common . ChainID ( t . ChainID ) ,
Address : t . Address ,
}
} ,
2023-09-20 08:30:31 +00:00
currentTimestamp : func ( ) int64 {
return time . Now ( ) . Unix ( )
} ,
2023-06-13 09:25:23 +00:00
}
}
2024-01-08 21:24:30 +00:00
func sendResponseEvent ( eventFeed * event . Feed , requestID * int32 , eventType walletevent . EventType , payloadObj interface { } , resErr error ) {
2023-06-22 11:28:35 +00:00
payload , err := json . Marshal ( payloadObj )
2023-06-08 23:52:45 +00:00
if err != nil {
2024-10-28 20:54:17 +00:00
logutils . ZapLogger ( ) . Error ( "Error marshaling" , zap . NamedError ( "response" , err ) , zap . NamedError ( "result" , resErr ) )
2023-07-04 12:01:45 +00:00
} else {
err = resErr
2023-06-08 23:52:45 +00:00
}
2024-01-26 04:31:18 +00:00
requestIDStr := nilStr
2023-08-11 17:28:46 +00:00
if requestID != nil {
requestIDStr = strconv . Itoa ( int ( * requestID ) )
}
2024-10-28 20:54:17 +00:00
logutils . ZapLogger ( ) . Debug ( "wallet.api.activity.Service RESPONSE" ,
zap . String ( "requestID" , requestIDStr ) ,
zap . String ( "eventType" , string ( eventType ) ) ,
zap . Error ( err ) ,
zap . Int ( "payload.len" , len ( payload ) ) ,
)
2023-06-13 09:25:23 +00:00
2023-07-24 22:54:53 +00:00
event := walletevent . Event {
2023-06-22 11:28:35 +00:00
Type : eventType ,
2023-06-08 23:52:45 +00:00
Message : string ( payload ) ,
2023-07-24 22:54:53 +00:00
}
if requestID != nil {
event . RequestID = new ( int )
* event . RequestID = int ( * requestID )
}
2024-01-08 21:24:30 +00:00
eventFeed . Send ( event )
2023-06-08 23:52:45 +00:00
}
2024-03-29 12:44:50 +00:00
func ( s * Service ) getWalletAddreses ( ) ( [ ] common . Address , error ) {
ethAddresses , err := s . accountsDB . GetWalletAddresses ( )
if err != nil {
return nil , err
}
addresses := make ( [ ] common . Address , 0 , len ( ethAddresses ) )
for _ , ethAddress := range ethAddresses {
addresses = append ( addresses , common . Address ( ethAddress ) )
}
return addresses , nil
}
func ( s * Service ) areAllAddresses ( addresses [ ] common . Address ) bool {
// Compare with addresses in accountsDB
walletAddresses , err := s . getWalletAddreses ( )
if err != nil {
2024-10-28 20:54:17 +00:00
logutils . ZapLogger ( ) . Error ( "Error getting wallet addresses" , zap . Error ( err ) )
2024-03-29 12:44:50 +00:00
return false
}
// Check if passed addresses are the same as in the accountsDB ignoring the order
return areSlicesEqual ( walletAddresses , addresses )
}
// Comparison function to check if slices are the same ignoring the order
func areSlicesEqual ( a , b [ ] common . Address ) bool {
if len ( a ) != len ( b ) {
return false
}
// Create a map of addresses
aMap := make ( map [ common . Address ] struct { } , len ( a ) )
for _ , address := range a {
aMap [ address ] = struct { } { }
}
// Check if all passed addresses are in the map
for _ , address := range b {
if _ , ok := aMap [ address ] ; ! ok {
return false
}
}
return true
}