feat: add ticker in backend rather than in clients

This commit is contained in:
Anthony Laibe 2022-11-29 14:43:18 +01:00 committed by Anthony Laibe
parent eff02a79a9
commit 0d1837f858
9 changed files with 65 additions and 47 deletions

View File

@ -11,6 +11,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/status-im/status-go/services/wallet/transfer"
"github.com/status-im/status-go/services/wallet/walletevent"
"github.com/status-im/status-go/signal"
"github.com/status-im/status-go/t/utils"
@ -115,13 +116,13 @@ func TestTransactionNotification(t *testing.T) {
require.NoError(t, walletDb.ProcessBlocks(1777, header.Address, big.NewInt(1), lastBlock, []*transfer.DBHeader{header}))
require.NoError(t, walletDb.ProcessTranfers(1777, transfers, []*transfer.DBHeader{}))
feed.Send(transfer.Event{
feed.Send(walletevent.Event{
Type: transfer.EventRecentHistoryReady,
BlockNumber: big.NewInt(0),
Accounts: []common.Address{header.Address},
})
feed.Send(transfer.Event{
feed.Send(walletevent.Event{
Type: transfer.EventNewTransfers,
BlockNumber: header.Number,
Accounts: []common.Address{header.Address},

View File

@ -12,6 +12,7 @@ import (
"github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/multiaccounts/accounts"
"github.com/status-im/status-go/services/wallet/transfer"
"github.com/status-im/status-go/services/wallet/walletevent"
)
type transactionState string
@ -158,7 +159,7 @@ func (s *Service) StartWalletWatcher() {
}
s.walletTransmitter.quit = make(chan struct{})
events := make(chan transfer.Event, 10)
events := make(chan walletevent.Event, 10)
sub := s.walletTransmitter.publisher.Subscribe(events)
s.walletTransmitter.wg.Add(1)

View File

@ -4,6 +4,7 @@ import (
"context"
"math"
"math/big"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
@ -11,8 +12,12 @@ import (
"github.com/status-im/status-go/services/wallet/async"
"github.com/status-im/status-go/services/wallet/chain"
"github.com/status-im/status-go/services/wallet/token"
"github.com/status-im/status-go/services/wallet/walletevent"
)
// WalletTickReload emitted every 15mn to reload the wallet balance and history
const EventWalletTickReload walletevent.EventType = "wallet-tick-reload"
func NewReader(s *Service) *Reader {
return &Reader{s}
}
@ -90,12 +95,18 @@ func getTokenAddresses(tokens []*token.Token) []common.Address {
}
func (r *Reader) Start(ctx context.Context, chainIDs []uint64) error {
accounts, err := r.s.accountsDB.GetAccounts()
if err != nil {
return err
run := func(context.Context) error {
r.s.feed.Send(walletevent.Event{
Type: EventWalletTickReload,
})
return nil
}
return r.s.transferController.CheckRecentHistory(chainIDs, getAddresses(accounts))
command := async.FiniteCommand{
Interval: 10 * time.Second,
Runable: run,
}
go command.Run(ctx)
return nil
}
func (r *Reader) GetWalletToken(ctx context.Context) (map[common.Address][]Token, error) {

View File

@ -55,6 +55,7 @@ func NewService(
transactor: transactor,
ens: ens,
stickers: stickers,
feed: accountFeed,
}
}
@ -75,6 +76,7 @@ type Service struct {
transactor *transactions.Transactor
ens *ens.Service
stickers *stickers.Service
feed *event.Feed
}
// Start signals transmitter.

View File

@ -12,6 +12,20 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/status-im/status-go/services/wallet/async"
"github.com/status-im/status-go/services/wallet/chain"
"github.com/status-im/status-go/services/wallet/walletevent"
)
const (
// EventNewTransfers emitted when new block was added to the same canonical chan.
EventNewTransfers walletevent.EventType = "new-transfers"
// EventFetchingRecentHistory emitted when fetching of lastest tx history is started
EventFetchingRecentHistory walletevent.EventType = "recent-history-fetching"
// EventRecentHistoryReady emitted when fetching of lastest tx history is started
EventRecentHistoryReady walletevent.EventType = "recent-history-ready"
// EventFetchingHistoryError emitted when fetching of tx history failed
EventFetchingHistoryError walletevent.EventType = "fetching-history-error"
// EventNonArchivalNodeDetected emitted when a connection to a non archival node is detected
EventNonArchivalNodeDetected walletevent.EventType = "non-archival-node-detected"
)
var (
@ -172,7 +186,7 @@ func (c *controlCommand) Run(parent context.Context) error {
return err
}
c.feed.Send(Event{
c.feed.Send(walletevent.Event{
Type: EventFetchingRecentHistory,
Accounts: c.accounts,
})
@ -256,9 +270,9 @@ func (c *controlCommand) Run(parent context.Context) error {
return err
}
events := map[common.Address]Event{}
events := map[common.Address]walletevent.Event{}
for _, address := range c.accounts {
event := Event{
event := walletevent.Event{
Type: EventNewTransfers,
Accounts: []common.Address{address},
}
@ -276,7 +290,7 @@ func (c *controlCommand) Run(parent context.Context) error {
c.feed.Send(event)
}
c.feed.Send(Event{
c.feed.Send(walletevent.Event{
Type: EventRecentHistoryReady,
Accounts: c.accounts,
BlockNumber: target,
@ -297,12 +311,12 @@ func (c *controlCommand) NewError(err error) bool {
if nonArchivalNodeError(err) {
log.Info("Non archival node detected")
c.nonArchivalRPCNode = true
c.feed.Send(Event{
c.feed.Send(walletevent.Event{
Type: EventNonArchivalNodeDetected,
})
}
if c.errorsCount >= 3 {
c.feed.Send(Event{
c.feed.Send(walletevent.Event{
Type: EventFetchingHistoryError,
Message: err.Error(),
})

View File

@ -15,6 +15,7 @@ import (
"github.com/status-im/status-go/rpc"
"github.com/status-im/status-go/services/wallet/async"
"github.com/status-im/status-go/services/wallet/chain"
"github.com/status-im/status-go/services/wallet/walletevent"
)
type Controller struct {
@ -230,7 +231,7 @@ func (c *Controller) GetTransfersByAddress(ctx context.Context, chainID uint64,
from, err := findFirstRange(ctx, address, block, chainClient)
if err != nil {
if nonArchivalNodeError(err) {
c.TransferFeed.Send(Event{
c.TransferFeed.Send(walletevent.Event{
Type: EventNonArchivalNodeDetected,
})
from = big.NewInt(0).Sub(block, big.NewInt(100))

View File

@ -1,31 +0,0 @@
package transfer
import (
"math/big"
"github.com/ethereum/go-ethereum/common"
)
// EventType type for event types.
type EventType string
const (
// EventNewTransfers emitted when new block was added to the same canonical chan.
EventNewTransfers EventType = "new-transfers"
// EventFetchingRecentHistory emitted when fetching of lastest tx history is started
EventFetchingRecentHistory EventType = "recent-history-fetching"
// EventRecentHistoryReady emitted when fetching of lastest tx history is started
EventRecentHistoryReady EventType = "recent-history-ready"
// EventFetchingHistoryError emitted when fetching of tx history failed
EventFetchingHistoryError EventType = "fetching-history-error"
// EventNonArchivalNodeDetected emitted when a connection to a non archival node is detected
EventNonArchivalNodeDetected EventType = "non-archival-node-detected"
)
// Event is a type for transfer events.
type Event struct {
Type EventType `json:"type"`
BlockNumber *big.Int `json:"blockNumber"`
Accounts []common.Address `json:"accounts"`
Message string `json:"message"`
}

View File

@ -6,6 +6,7 @@ import (
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/status-im/status-go/services/wallet/walletevent"
"github.com/status-im/status-go/signal"
)
@ -28,7 +29,7 @@ func (tmr *SignalsTransmitter) Start() error {
return nil
}
tmr.quit = make(chan struct{})
events := make(chan Event, 10)
events := make(chan walletevent.Event, 10)
sub := tmr.publisher.Subscribe(events)
tmr.wg.Add(1)

View File

@ -0,0 +1,18 @@
package walletevent
import (
"math/big"
"github.com/ethereum/go-ethereum/common"
)
// EventType type for event types.
type EventType string
// Event is a type for transfer events.
type Event struct {
Type EventType `json:"type"`
BlockNumber *big.Int `json:"blockNumber"`
Accounts []common.Address `json:"accounts"`
Message string `json:"message"`
}