feat: add wallet ticker

This commit is contained in:
Anthony Laibe 2022-12-01 10:19:32 +01:00 committed by Anthony Laibe
parent 0d1837f858
commit c735e2a6bb
5 changed files with 68 additions and 46 deletions

View File

@ -19,9 +19,8 @@ import (
)
func NewAPI(s *Service) *API {
reader := NewReader(s)
router := NewRouter(s)
return &API{s, reader, router}
return &API{s, s.reader, router}
}
// API is class with methods available over RPC.
@ -31,8 +30,8 @@ type API struct {
router *Router
}
func (api *API) StartWallet(ctx context.Context, chainIDs []uint64) error {
return api.reader.Start(ctx, chainIDs)
func (api *API) StartWallet(ctx context.Context) error {
return api.reader.Start()
}
func (api *API) GetWalletToken(ctx context.Context) (map[common.Address][]Token, error) {

View File

@ -8,7 +8,9 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/event"
"github.com/status-im/status-go/multiaccounts/accounts"
"github.com/status-im/status-go/rpc"
"github.com/status-im/status-go/services/wallet/async"
"github.com/status-im/status-go/services/wallet/chain"
"github.com/status-im/status-go/services/wallet/token"
@ -18,12 +20,16 @@ import (
// WalletTickReload emitted every 15mn to reload the wallet balance and history
const EventWalletTickReload walletevent.EventType = "wallet-tick-reload"
func NewReader(s *Service) *Reader {
return &Reader{s}
func NewReader(rpcClient *rpc.Client, tokenManager *token.Manager, accountsDB *accounts.Database, walletFeed *event.Feed) *Reader {
return &Reader{rpcClient, tokenManager, accountsDB, walletFeed, nil}
}
type Reader struct {
s *Service
rpcClient *rpc.Client
tokenManager *token.Manager
accountsDB *accounts.Database
walletFeed *event.Feed
cancel context.CancelFunc
}
type ChainBalance struct {
@ -94,23 +100,35 @@ func getTokenAddresses(tokens []*token.Token) []common.Address {
return res
}
func (r *Reader) Start(ctx context.Context, chainIDs []uint64) error {
run := func(context.Context) error {
r.s.feed.Send(walletevent.Event{
func (r *Reader) Start() error {
ctx, cancel := context.WithCancel(context.Background())
r.cancel = cancel
go func() {
ticker := time.NewTicker(10 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
r.walletFeed.Send(walletevent.Event{
Type: EventWalletTickReload,
})
return nil
}
command := async.FiniteCommand{
Interval: 10 * time.Second,
Runable: run,
}
go command.Run(ctx)
}()
return nil
}
func (r *Reader) Stop() {
if r.cancel != nil {
r.cancel()
}
}
func (r *Reader) GetWalletToken(ctx context.Context) (map[common.Address][]Token, error) {
networks, err := r.s.rpcClient.NetworkManager.Get(false)
networks, err := r.rpcClient.NetworkManager.Get(false)
if err != nil {
return nil, err
}
@ -120,23 +138,23 @@ func (r *Reader) GetWalletToken(ctx context.Context) (map[common.Address][]Token
chainIDs = append(chainIDs, network.ChainID)
}
currency, err := r.s.accountsDB.GetCurrency()
currency, err := r.accountsDB.GetCurrency()
if err != nil {
return nil, err
}
allTokens, err := r.s.tokenManager.GetAllTokens()
allTokens, err := r.tokenManager.GetAllTokens()
if err != nil {
return nil, err
}
for _, network := range networks {
allTokens = append(allTokens, r.s.tokenManager.ToToken(network))
allTokens = append(allTokens, r.tokenManager.ToToken(network))
}
tokenSymbols := getTokenSymbols(allTokens)
tokenAddresses := getTokenAddresses(allTokens)
accounts, err := r.s.accountsDB.GetAccounts()
accounts, err := r.accountsDB.GetAccounts()
if err != nil {
return nil, err
}
@ -174,12 +192,12 @@ func (r *Reader) GetWalletToken(ctx context.Context) (map[common.Address][]Token
})
group.Add(func(parent context.Context) error {
clients, err := chain.NewClients(r.s.rpcClient, chainIDs)
clients, err := chain.NewClients(r.rpcClient, chainIDs)
if err != nil {
return err
}
balances, err = r.s.tokenManager.GetBalancesByChain(ctx, clients, getAddresses(accounts), tokenAddresses)
balances, err = r.tokenManager.GetBalancesByChain(ctx, clients, getAddresses(accounts), tokenAddresses)
if err != nil {
return err
}

View File

@ -14,8 +14,10 @@ import (
"github.com/status-im/status-go/rpc"
"github.com/status-im/status-go/services/ens"
"github.com/status-im/status-go/services/stickers"
"github.com/status-im/status-go/services/wallet/async"
"github.com/status-im/status-go/services/wallet/token"
"github.com/status-im/status-go/services/wallet/transfer"
"github.com/status-im/status-go/services/wallet/walletevent"
"github.com/status-im/status-go/transactions"
)
@ -35,11 +37,15 @@ func NewService(
cryptoOnRampManager := NewCryptoOnRampManager(&CryptoOnRampOptions{
dataSourceType: DataSourceStatic,
})
walletFeed := &event.Feed{}
signals := &walletevent.SignalsTransmitter{
Publisher: walletFeed,
}
tokenManager := token.NewTokenManager(db, rpcClient, rpcClient.NetworkManager)
savedAddressesManager := &SavedAddressesManager{db: db}
transactionManager := &TransactionManager{db: db, transactor: transactor, gethManager: gethManager, config: config, accountsDB: accountsDB}
transferController := transfer.NewTransferController(db, rpcClient, accountFeed)
transferController := transfer.NewTransferController(db, rpcClient, accountFeed, walletFeed)
reader := NewReader(rpcClient, tokenManager, accountsDB, walletFeed)
return &Service{
db: db,
accountsDB: accountsDB,
@ -56,6 +62,8 @@ func NewService(
ens: ens,
stickers: stickers,
feed: accountFeed,
signals: signals,
reader: reader,
}
}
@ -77,11 +85,15 @@ type Service struct {
ens *ens.Service
stickers *stickers.Service
feed *event.Feed
group *async.Group
signals *walletevent.SignalsTransmitter
reader *Reader
}
// Start signals transmitter.
func (s *Service) Start() error {
err := s.transferController.Start()
s.transferController.Start()
err := s.signals.Start()
s.started = true
return err
}
@ -91,10 +103,12 @@ func (s *Service) GetFeed() *event.Feed {
return s.transferController.TransferFeed
}
// Stop reactor, signals transmitter and close db.
// Stop reactor and close db.
func (s *Service) Stop() error {
log.Info("wallet will be stopped")
s.signals.Stop()
s.transferController.Stop()
s.reader.Stop()
s.started = false
log.Info("wallet stopped")
return nil

View File

@ -21,7 +21,6 @@ import (
type Controller struct {
db *Database
rpcClient *rpc.Client
signals *SignalsTransmitter
block *Block
reactor *Reactor
accountFeed *event.Feed
@ -30,29 +29,22 @@ type Controller struct {
balanceCache *balanceCache
}
func NewTransferController(db *sql.DB, rpcClient *rpc.Client, accountFeed *event.Feed) *Controller {
transferFeed := &event.Feed{}
signals := &SignalsTransmitter{
publisher: transferFeed,
}
func NewTransferController(db *sql.DB, rpcClient *rpc.Client, accountFeed *event.Feed, transferFeed *event.Feed) *Controller {
block := &Block{db}
return &Controller{
db: NewDB(db),
block: block,
rpcClient: rpcClient,
signals: signals,
accountFeed: accountFeed,
TransferFeed: transferFeed,
}
}
func (c *Controller) Start() error {
func (c *Controller) Start() {
c.group = async.NewGroup(context.Background())
return c.signals.Start()
}
func (c *Controller) Stop() {
c.signals.Stop()
if c.reactor != nil {
c.reactor.stop()
}

View File

@ -1,4 +1,4 @@
package transfer
package walletevent
import (
"sync"
@ -6,17 +6,16 @@ import (
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/status-im/status-go/services/wallet/walletevent"
"github.com/status-im/status-go/signal"
)
type publisher interface {
type Publisher interface {
Subscribe(interface{}) event.Subscription
}
// SignalsTransmitter transmits received events as wallet signals.
type SignalsTransmitter struct {
publisher
Publisher
wg sync.WaitGroup
quit chan struct{}
@ -29,8 +28,8 @@ func (tmr *SignalsTransmitter) Start() error {
return nil
}
tmr.quit = make(chan struct{})
events := make(chan walletevent.Event, 10)
sub := tmr.publisher.Subscribe(events)
events := make(chan Event, 10)
sub := tmr.Publisher.Subscribe(events)
tmr.wg.Add(1)
go func() {