124 lines
2.8 KiB
Go
124 lines
2.8 KiB
Go
|
package activity
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"database/sql"
|
||
|
"encoding/json"
|
||
|
"errors"
|
||
|
"fmt"
|
||
|
"sync"
|
||
|
|
||
|
"github.com/ethereum/go-ethereum/common"
|
||
|
"github.com/ethereum/go-ethereum/event"
|
||
|
"github.com/ethereum/go-ethereum/log"
|
||
|
|
||
|
w_common "github.com/status-im/status-go/services/wallet/common"
|
||
|
"github.com/status-im/status-go/services/wallet/walletevent"
|
||
|
)
|
||
|
|
||
|
const (
|
||
|
// FilterResponse json is sent as a message in the EventActivityFilteringDone event
|
||
|
EventActivityFilteringDone walletevent.EventType = "wallet-activity-filtering-done"
|
||
|
)
|
||
|
|
||
|
type Service struct {
|
||
|
db *sql.DB
|
||
|
eventFeed *event.Feed
|
||
|
|
||
|
context context.Context
|
||
|
cancelFn context.CancelFunc
|
||
|
wg sync.WaitGroup
|
||
|
mu sync.Mutex
|
||
|
}
|
||
|
|
||
|
func NewService(db *sql.DB, eventFeed *event.Feed) *Service {
|
||
|
return &Service{
|
||
|
db: db,
|
||
|
eventFeed: eventFeed,
|
||
|
}
|
||
|
}
|
||
|
|
||
|
type ErrorCode = int
|
||
|
|
||
|
const (
|
||
|
ErrorCodeSuccess ErrorCode = iota + 1
|
||
|
ErrorCodeFilterCanceled
|
||
|
ErrorCodeFilterFailed
|
||
|
)
|
||
|
|
||
|
type FilterResponse struct {
|
||
|
Activities []Entry `json:"activities"`
|
||
|
ThereMightBeMore bool `json:"thereMightBeMore"`
|
||
|
ErrorCode ErrorCode `json:"errorCode"`
|
||
|
}
|
||
|
|
||
|
// FilterActivityAsync allows only one filter task to run at a time
|
||
|
// and it cancels the current one if a new one is started
|
||
|
// All calls will trigger an EventActivityFilteringDone event with the result of the filtering
|
||
|
func (s *Service) FilterActivityAsync(ctx context.Context, addresses []common.Address, chainIDs []w_common.ChainID, filter Filter, offset int, limit int) error {
|
||
|
s.mu.Lock()
|
||
|
defer s.mu.Unlock()
|
||
|
|
||
|
// If a previous task is running, cancel it and wait to finish
|
||
|
if s.cancelFn != nil {
|
||
|
s.cancelFn()
|
||
|
s.wg.Wait()
|
||
|
}
|
||
|
|
||
|
if ctx.Err() != nil {
|
||
|
return fmt.Errorf("context error: %w", ctx.Err())
|
||
|
}
|
||
|
|
||
|
s.context, s.cancelFn = context.WithCancel(context.Background())
|
||
|
|
||
|
s.wg.Add(1)
|
||
|
go func() {
|
||
|
defer s.wg.Done()
|
||
|
defer func() {
|
||
|
s.cancelFn = nil
|
||
|
}()
|
||
|
|
||
|
activities, err := getActivityEntries(s.context, s.db, addresses, chainIDs, filter, offset, limit)
|
||
|
|
||
|
res := FilterResponse{
|
||
|
ErrorCode: ErrorCodeFilterFailed,
|
||
|
}
|
||
|
|
||
|
if errors.Is(err, context.Canceled) {
|
||
|
res.ErrorCode = ErrorCodeFilterCanceled
|
||
|
} else if err == nil {
|
||
|
res.Activities = activities
|
||
|
res.ThereMightBeMore = len(activities) == limit
|
||
|
res.ErrorCode = ErrorCodeSuccess
|
||
|
}
|
||
|
|
||
|
s.sendResponseEvent(res)
|
||
|
}()
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (s *Service) Stop() {
|
||
|
s.mu.Lock()
|
||
|
defer s.mu.Unlock()
|
||
|
|
||
|
// If a previous task is running, cancel it and wait to finish
|
||
|
if s.cancelFn != nil {
|
||
|
s.cancelFn()
|
||
|
s.wg.Wait()
|
||
|
s.cancelFn = nil
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (s *Service) sendResponseEvent(response FilterResponse) {
|
||
|
payload, err := json.Marshal(response)
|
||
|
if err != nil {
|
||
|
log.Error("Error marshaling response: %v", err)
|
||
|
}
|
||
|
|
||
|
s.eventFeed.Send(walletevent.Event{
|
||
|
Type: EventActivityFilteringDone,
|
||
|
Message: string(payload),
|
||
|
})
|
||
|
}
|