mirror of
https://github.com/status-im/status-go.git
synced 2025-01-18 18:55:47 +00:00
30af25198e
fetch strategy: Before: - block fetching commands for different accounts were in the same wait group, making them dependent on each iteration. - transfers loading command was checking database for new unloaded blocks on timeout and was in the same wait group with block fetching, so it was often blocked until all block fetching commands finish for iteration. Now: - block fetching commands run independently for each account - transfers fetching command is run once on startup for unloaded blocks from DB - fetch history blocks commands are launched once on startup for accounts with no full history loaded - transfers are loaded on each iteration of block range check without waiting for all ranges to be checked
110 lines
3.0 KiB
Go
110 lines
3.0 KiB
Go
package transfer
|
|
|
|
import (
|
|
"context"
|
|
"math/big"
|
|
"sync"
|
|
|
|
"github.com/ethereum/go-ethereum/common"
|
|
"github.com/ethereum/go-ethereum/event"
|
|
"github.com/ethereum/go-ethereum/log"
|
|
"github.com/status-im/status-go/rpc/chain"
|
|
"github.com/status-im/status-go/services/wallet/async"
|
|
"github.com/status-im/status-go/services/wallet/token"
|
|
"github.com/status-im/status-go/services/wallet/walletevent"
|
|
)
|
|
|
|
func NewSequentialFetchStrategy(db *Database, blockDAO *BlockDAO, feed *event.Feed,
|
|
transactionManager *TransactionManager,
|
|
tokenManager *token.Manager,
|
|
chainClients map[uint64]*chain.ClientWithFallback,
|
|
accounts []common.Address) *SequentialFetchStrategy {
|
|
|
|
return &SequentialFetchStrategy{
|
|
db: db,
|
|
blockDAO: blockDAO,
|
|
feed: feed,
|
|
transactionManager: transactionManager,
|
|
tokenManager: tokenManager,
|
|
chainClients: chainClients,
|
|
accounts: accounts,
|
|
}
|
|
}
|
|
|
|
type SequentialFetchStrategy struct {
|
|
db *Database
|
|
blockDAO *BlockDAO
|
|
feed *event.Feed
|
|
mu sync.Mutex
|
|
group *async.Group
|
|
transactionManager *TransactionManager
|
|
tokenManager *token.Manager
|
|
chainClients map[uint64]*chain.ClientWithFallback
|
|
accounts []common.Address
|
|
}
|
|
|
|
func (s *SequentialFetchStrategy) newCommand(chainClient *chain.ClientWithFallback,
|
|
account common.Address) async.Commander {
|
|
|
|
return newLoadBlocksAndTransfersCommand(account, s.db, s.blockDAO, chainClient, s.feed,
|
|
s.transactionManager, s.tokenManager)
|
|
}
|
|
|
|
func (s *SequentialFetchStrategy) start() error {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
if s.group != nil {
|
|
return errAlreadyRunning
|
|
}
|
|
s.group = async.NewGroup(context.Background())
|
|
|
|
if s.feed != nil {
|
|
s.feed.Send(walletevent.Event{
|
|
Type: EventFetchingRecentHistory,
|
|
Accounts: s.accounts,
|
|
})
|
|
}
|
|
|
|
for _, chainClient := range s.chainClients {
|
|
for _, address := range s.accounts {
|
|
ctl := s.newCommand(chainClient, address)
|
|
s.group.Add(ctl.Command())
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Stop stops reactor loop and waits till it exits.
|
|
func (s *SequentialFetchStrategy) stop() {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
if s.group == nil {
|
|
return
|
|
}
|
|
s.group.Stop()
|
|
s.group.Wait()
|
|
s.group = nil
|
|
}
|
|
|
|
func (s *SequentialFetchStrategy) kind() FetchStrategyType {
|
|
return SequentialFetchStrategyType
|
|
}
|
|
|
|
// TODO: remove fetchMore parameter from here and interface, it is used by OnDemandFetchStrategy only
|
|
func (s *SequentialFetchStrategy) getTransfersByAddress(ctx context.Context, chainID uint64, address common.Address, toBlock *big.Int,
|
|
limit int64, fetchMore bool) ([]Transfer, error) {
|
|
|
|
log.Info("[WalletAPI:: GetTransfersByAddress] get transfers for an address", "address", address, "fetchMore", fetchMore,
|
|
"chainID", chainID, "toBlock", toBlock, "limit", limit)
|
|
|
|
rst, err := s.db.GetTransfersByAddress(chainID, address, toBlock, limit)
|
|
if err != nil {
|
|
log.Error("[WalletAPI:: GetTransfersByAddress] can't fetch transfers", "err", err)
|
|
return nil, err
|
|
}
|
|
|
|
return rst, nil
|
|
}
|