130 lines
3.8 KiB
Go
130 lines
3.8 KiB
Go
package transfer
|
|
|
|
import (
|
|
"context"
|
|
"math/big"
|
|
"sync"
|
|
|
|
"go.uber.org/zap"
|
|
|
|
"github.com/ethereum/go-ethereum/common"
|
|
"github.com/ethereum/go-ethereum/event"
|
|
"github.com/status-im/status-go/logutils"
|
|
"github.com/status-im/status-go/multiaccounts/accounts"
|
|
"github.com/status-im/status-go/rpc/chain"
|
|
"github.com/status-im/status-go/services/wallet/async"
|
|
"github.com/status-im/status-go/services/wallet/balance"
|
|
"github.com/status-im/status-go/services/wallet/blockchainstate"
|
|
"github.com/status-im/status-go/services/wallet/token"
|
|
"github.com/status-im/status-go/services/wallet/walletevent"
|
|
"github.com/status-im/status-go/transactions"
|
|
)
|
|
|
|
func NewSequentialFetchStrategy(db *Database, blockDAO *BlockDAO, blockRangesSeqDAO *BlockRangeSequentialDAO, accountsDB *accounts.Database, feed *event.Feed,
|
|
pendingTxManager *transactions.PendingTxTracker,
|
|
tokenManager *token.Manager,
|
|
chainClients map[uint64]chain.ClientInterface,
|
|
accounts []common.Address,
|
|
balanceCacher balance.Cacher,
|
|
omitHistory bool,
|
|
blockChainState *blockchainstate.BlockChainState,
|
|
) *SequentialFetchStrategy {
|
|
|
|
return &SequentialFetchStrategy{
|
|
db: db,
|
|
blockDAO: blockDAO,
|
|
blockRangesSeqDAO: blockRangesSeqDAO,
|
|
accountsDB: accountsDB,
|
|
feed: feed,
|
|
pendingTxManager: pendingTxManager,
|
|
tokenManager: tokenManager,
|
|
chainClients: chainClients,
|
|
accounts: accounts,
|
|
balanceCacher: balanceCacher,
|
|
omitHistory: omitHistory,
|
|
blockChainState: blockChainState,
|
|
}
|
|
}
|
|
|
|
type SequentialFetchStrategy struct {
|
|
db *Database
|
|
blockDAO *BlockDAO
|
|
blockRangesSeqDAO *BlockRangeSequentialDAO
|
|
accountsDB *accounts.Database
|
|
feed *event.Feed
|
|
mu sync.Mutex
|
|
group *async.Group
|
|
pendingTxManager *transactions.PendingTxTracker
|
|
tokenManager *token.Manager
|
|
chainClients map[uint64]chain.ClientInterface
|
|
accounts []common.Address
|
|
balanceCacher balance.Cacher
|
|
omitHistory bool
|
|
blockChainState *blockchainstate.BlockChainState
|
|
}
|
|
|
|
func (s *SequentialFetchStrategy) newCommand(chainClient chain.ClientInterface,
|
|
accounts []common.Address) async.Commander {
|
|
|
|
return newLoadBlocksAndTransfersCommand(accounts, s.db, s.accountsDB, s.blockDAO, s.blockRangesSeqDAO, chainClient, s.feed,
|
|
s.pendingTxManager, s.tokenManager, s.balanceCacher, s.omitHistory, s.blockChainState)
|
|
}
|
|
|
|
func (s *SequentialFetchStrategy) start() error {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
if s.group != nil {
|
|
return errAlreadyRunning
|
|
}
|
|
s.group = async.NewGroup(context.Background())
|
|
|
|
if s.feed != nil {
|
|
s.feed.Send(walletevent.Event{
|
|
Type: EventFetchingRecentHistory,
|
|
Accounts: s.accounts,
|
|
})
|
|
}
|
|
|
|
for _, chainClient := range s.chainClients {
|
|
ctl := s.newCommand(chainClient, s.accounts)
|
|
s.group.Add(ctl.Command())
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Stop stops reactor loop and waits till it exits.
|
|
func (s *SequentialFetchStrategy) stop() {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
if s.group == nil {
|
|
return
|
|
}
|
|
s.group.Stop()
|
|
s.group.Wait()
|
|
s.group = nil
|
|
}
|
|
|
|
func (s *SequentialFetchStrategy) kind() FetchStrategyType {
|
|
return SequentialFetchStrategyType
|
|
}
|
|
|
|
func (s *SequentialFetchStrategy) getTransfersByAddress(ctx context.Context, chainID uint64, address common.Address, toBlock *big.Int,
|
|
limit int64) ([]Transfer, error) {
|
|
|
|
logutils.ZapLogger().Debug("[WalletAPI:: GetTransfersByAddress] get transfers for an address",
|
|
zap.Stringer("address", address),
|
|
zap.Uint64("chainID", chainID),
|
|
zap.Stringer("toBlock", toBlock),
|
|
zap.Int64("limit", limit))
|
|
|
|
rst, err := s.db.GetTransfersByAddress(chainID, address, toBlock, limit)
|
|
if err != nil {
|
|
logutils.ZapLogger().Error("[WalletAPI:: GetTransfersByAddress] can't fetch transfers", zap.Error(err))
|
|
return nil, err
|
|
}
|
|
|
|
return rst, nil
|
|
}
|