feat(wallet): enable sequential transfers fetching by default.

"EventNewTransfers' event is sent on each block processed, otherwise
it could take minutes or longer before first 'EventNewTransfers'
is sent.
This commit is contained in:
Ivan Belyakov 2023-06-05 17:05:50 +02:00 committed by IvanBelyakoff
parent e8c4b7647f
commit d74d930b70
6 changed files with 41 additions and 37 deletions

View File

@ -520,6 +520,8 @@ type WalletConfig struct {
AlchemyAPIKeys map[uint64]string `json:"AlchemyAPIKeys"` AlchemyAPIKeys map[uint64]string `json:"AlchemyAPIKeys"`
InfuraAPIKey string `json:"InfuraAPIKey"` InfuraAPIKey string `json:"InfuraAPIKey"`
InfuraAPIKeySecret string `json:"InfuraAPIKeySecret"` InfuraAPIKeySecret string `json:"InfuraAPIKeySecret"`
// LoadAllTransfers should be false to reduce network traffic and harddrive space consumption when loading tranfers
LoadAllTransfers bool `json:"LoadAllTransfers"`
} }
// LocalNotificationsConfig extra configuration for localnotifications.Service. // LocalNotificationsConfig extra configuration for localnotifications.Service.

View File

@ -86,7 +86,7 @@ func NewService(
tokenManager := token.NewTokenManager(db, rpcClient, rpcClient.NetworkManager) tokenManager := token.NewTokenManager(db, rpcClient, rpcClient.NetworkManager)
savedAddressesManager := &SavedAddressesManager{db: db} savedAddressesManager := &SavedAddressesManager{db: db}
transactionManager := transfer.NewTransactionManager(db, gethManager, transactor, config, accountsDB) transactionManager := transfer.NewTransactionManager(db, gethManager, transactor, config, accountsDB)
transferController := transfer.NewTransferController(db, rpcClient, accountFeed, walletFeed, transactionManager, tokenManager, transfer.OnDemandFetchStrategyType) transferController := transfer.NewTransferController(db, rpcClient, accountFeed, walletFeed, transactionManager, tokenManager, config.WalletConfig.LoadAllTransfers)
cryptoCompare := cryptocompare.NewClient() cryptoCompare := cryptocompare.NewClient()
coingecko := coingecko.NewClient() coingecko := coingecko.NewClient()
marketManager := market.NewManager(cryptoCompare, coingecko, walletFeed) marketManager := market.NewManager(cryptoCompare, coingecko, walletFeed)

View File

@ -354,6 +354,7 @@ type transfersCommand struct {
blocksLimit int blocksLimit int
transactionManager *TransactionManager transactionManager *TransactionManager
tokenManager *token.Manager tokenManager *token.Manager
feed *event.Feed
// result // result
fetchedTransfers []Transfer fetchedTransfers []Transfer
@ -370,6 +371,9 @@ func (c *transfersCommand) Run(ctx context.Context) (err error) {
// Take blocks from cache if available and disrespect the limit // Take blocks from cache if available and disrespect the limit
// If no blocks are available in cache, take blocks from DB respecting the limit // If no blocks are available in cache, take blocks from DB respecting the limit
// If no limit is set, take all blocks from DB // If no limit is set, take all blocks from DB
log.Info("start transfersCommand", "chain", c.chainClient.ChainID, "address", c.address, "blockNums", c.blockNums)
startTs := time.Now()
for { for {
blocks := c.blockNums blocks := c.blockNums
if blocks == nil { if blocks == nil {
@ -377,9 +381,7 @@ func (c *transfersCommand) Run(ctx context.Context) (err error) {
} }
for _, blockNum := range blocks { for _, blockNum := range blocks {
log.Info("start transfersCommand", "chain", c.chainClient.ChainID, "address", c.address, "block", blockNum) log.Debug("transfersCommand block start", "chain", c.chainClient.ChainID, "address", c.address, "block", blockNum)
startTs := time.Now()
allTransfers, err := c.eth.GetTransfersByNumber(ctx, blockNum) allTransfers, err := c.eth.GetTransfersByNumber(ctx, blockNum)
if err != nil { if err != nil {
@ -411,8 +413,10 @@ func (c *transfersCommand) Run(ctx context.Context) (err error) {
c.fetchedTransfers = append(c.fetchedTransfers, allTransfers...) c.fetchedTransfers = append(c.fetchedTransfers, allTransfers...)
log.Debug("end transfersCommand", "chain", c.chainClient.ChainID, "address", c.address, c.notifyOfNewTransfers(allTransfers)
"block", blockNum, "len", len(allTransfers), "in", time.Since(startTs))
log.Debug("transfersCommand block end", "chain", c.chainClient.ChainID, "address", c.address,
"block", blockNum, "tranfers.len", len(allTransfers), "fetchedTransfers.len", len(c.fetchedTransfers))
} }
if c.blockNums != nil || len(blocks) == 0 || if c.blockNums != nil || len(blocks) == 0 ||
@ -423,6 +427,9 @@ func (c *transfersCommand) Run(ctx context.Context) (err error) {
} }
} }
log.Info("end transfersCommand", "chain", c.chainClient.ChainID, "address", c.address,
"blocks.len", len(c.blockNums), "transfers.len", len(c.fetchedTransfers), "in", time.Since(startTs))
return nil return nil
} }
@ -503,6 +510,17 @@ func (c *transfersCommand) processMultiTransactions(ctx context.Context, allTran
return nil return nil
} }
func (c *transfersCommand) notifyOfNewTransfers(transfers []Transfer) {
if c.feed != nil {
if len(transfers) > 0 {
c.feed.Send(walletevent.Event{
Type: EventNewTransfers,
Accounts: []common.Address{c.address},
})
}
}
}
type loadTransfersCommand struct { type loadTransfersCommand struct {
accounts []common.Address accounts []common.Address
db *Database db *Database

View File

@ -284,7 +284,6 @@ func (c *loadAllTransfersCommand) Run(parent context.Context) error {
start := time.Now() start := time.Now()
group := async.NewGroup(parent) group := async.NewGroup(parent)
commands := []*transfersCommand{}
for _, address := range c.accounts { for _, address := range c.accounts {
transfers := &transfersCommand{ transfers := &transfersCommand{
db: c.db, db: c.db,
@ -301,8 +300,8 @@ func (c *loadAllTransfersCommand) Run(parent context.Context) error {
blocksLimit: c.blocksLimit, blocksLimit: c.blocksLimit,
transactionManager: c.transactionManager, transactionManager: c.transactionManager,
tokenManager: c.tokenManager, tokenManager: c.tokenManager,
feed: c.feed,
} }
commands = append(commands, transfers)
group.Add(transfers.Command()) group.Add(transfers.Command())
} }
@ -312,26 +311,11 @@ func (c *loadAllTransfersCommand) Run(parent context.Context) error {
return parent.Err() return parent.Err()
case <-group.WaitAsync(): case <-group.WaitAsync():
log.Debug("loadTransfers finished for account", "in", time.Since(start), "chain", c.chainClient.ChainID, "limit", c.blocksLimit) log.Debug("loadTransfers finished for account", "in", time.Since(start), "chain", c.chainClient.ChainID, "limit", c.blocksLimit)
c.notifyOfNewTransfers(commands)
} }
return nil return nil
} }
func (c *loadAllTransfersCommand) notifyOfNewTransfers(commands []*transfersCommand) {
if c.feed != nil {
for _, command := range commands {
if len(command.fetchedTransfers) > 0 {
c.feed.Send(walletevent.Event{
Type: EventNewTransfers,
Accounts: []common.Address{command.address},
})
}
}
}
}
func newLoadBlocksAndTransfersCommand(accounts []common.Address, db *Database, func newLoadBlocksAndTransfersCommand(accounts []common.Address, db *Database,
blockDAO *BlockDAO, chainClient *chain.ClientWithFallback, feed *event.Feed, blockDAO *BlockDAO, chainClient *chain.ClientWithFallback, feed *event.Feed,
transactionManager *TransactionManager, tokenManager *token.Manager) *loadBlocksAndTransfersCommand { transactionManager *TransactionManager, tokenManager *token.Manager) *loadBlocksAndTransfersCommand {

View File

@ -27,11 +27,11 @@ type Controller struct {
group *async.Group group *async.Group
transactionManager *TransactionManager transactionManager *TransactionManager
tokenManager *token.Manager tokenManager *token.Manager
fetchStrategyType FetchStrategyType loadAllTransfers bool
} }
func NewTransferController(db *sql.DB, rpcClient *rpc.Client, accountFeed *event.Feed, transferFeed *event.Feed, func NewTransferController(db *sql.DB, rpcClient *rpc.Client, accountFeed *event.Feed, transferFeed *event.Feed,
transactionManager *TransactionManager, tokenManager *token.Manager, fetchStrategyType FetchStrategyType) *Controller { transactionManager *TransactionManager, tokenManager *token.Manager, loadAllTransfers bool) *Controller {
blockDAO := &BlockDAO{db} blockDAO := &BlockDAO{db}
return &Controller{ return &Controller{
@ -42,7 +42,7 @@ func NewTransferController(db *sql.DB, rpcClient *rpc.Client, accountFeed *event
TransferFeed: transferFeed, TransferFeed: transferFeed,
transactionManager: transactionManager, transactionManager: transactionManager,
tokenManager: tokenManager, tokenManager: tokenManager,
fetchStrategyType: fetchStrategyType, loadAllTransfers: loadAllTransfers,
} }
} }
@ -110,20 +110,20 @@ func (c *Controller) CheckRecentHistory(chainIDs []uint64, accounts []common.Add
} }
if c.reactor != nil { if c.reactor != nil {
err := c.reactor.restart(chainClients, accounts, c.fetchStrategyType) err := c.reactor.restart(chainClients, accounts, c.loadAllTransfers)
if err != nil { if err != nil {
return err return err
} }
} else { } else {
c.reactor = NewReactor(c.db, c.blockDAO, c.TransferFeed, c.transactionManager, c.tokenManager) c.reactor = NewReactor(c.db, c.blockDAO, c.TransferFeed, c.transactionManager, c.tokenManager)
err = c.reactor.start(chainClients, accounts, c.fetchStrategyType) err = c.reactor.start(chainClients, accounts, c.loadAllTransfers)
if err != nil { if err != nil {
return err return err
} }
c.group.Add(func(ctx context.Context) error { c.group.Add(func(ctx context.Context) error {
return watchAccountsChanges(ctx, c.accountFeed, c.reactor, chainClients, accounts, c.fetchStrategyType) return watchAccountsChanges(ctx, c.accountFeed, c.reactor, chainClients, accounts, c.loadAllTransfers)
}) })
} }
return nil return nil
@ -132,7 +132,7 @@ func (c *Controller) CheckRecentHistory(chainIDs []uint64, accounts []common.Add
// watchAccountsChanges subscribes to a feed and watches for changes in accounts list. If there are new or removed accounts // watchAccountsChanges subscribes to a feed and watches for changes in accounts list. If there are new or removed accounts
// reactor will be restarted. // reactor will be restarted.
func watchAccountsChanges(ctx context.Context, accountFeed *event.Feed, reactor *Reactor, func watchAccountsChanges(ctx context.Context, accountFeed *event.Feed, reactor *Reactor,
chainClients map[uint64]*chain.ClientWithFallback, initial []common.Address, fetchStrategyType FetchStrategyType) error { chainClients map[uint64]*chain.ClientWithFallback, initial []common.Address, loadAllTransfers bool) error {
ch := make(chan accountsevent.Event, 1) // it may block if the rate of updates will be significantly higher ch := make(chan accountsevent.Event, 1) // it may block if the rate of updates will be significantly higher
sub := accountFeed.Subscribe(ch) sub := accountFeed.Subscribe(ch)
@ -170,7 +170,7 @@ func watchAccountsChanges(ctx context.Context, accountFeed *event.Feed, reactor
listenList := mapToList(listen) listenList := mapToList(listen)
log.Debug("list of accounts was changed from a previous version. reactor will be restarted", "new", listenList) log.Debug("list of accounts was changed from a previous version. reactor will be restarted", "new", listenList)
err := reactor.restart(chainClients, listenList, fetchStrategyType) err := reactor.restart(chainClients, listenList, loadAllTransfers)
if err != nil { if err != nil {
log.Error("failed to restart reactor with new accounts", "error", err) log.Error("failed to restart reactor with new accounts", "error", err)
} }

View File

@ -251,9 +251,9 @@ func NewReactor(db *Database, blockDAO *BlockDAO, feed *event.Feed, tm *Transact
// Start runs reactor loop in background. // Start runs reactor loop in background.
func (r *Reactor) start(chainClients map[uint64]*chain.ClientWithFallback, accounts []common.Address, func (r *Reactor) start(chainClients map[uint64]*chain.ClientWithFallback, accounts []common.Address,
fetchStrategyType FetchStrategyType) error { loadAllTransfers bool) error {
r.strategy = r.createFetchStrategy(chainClients, accounts, fetchStrategyType) r.strategy = r.createFetchStrategy(chainClients, accounts, loadAllTransfers)
return r.strategy.start() return r.strategy.start()
} }
@ -265,16 +265,16 @@ func (r *Reactor) stop() {
} }
func (r *Reactor) restart(chainClients map[uint64]*chain.ClientWithFallback, accounts []common.Address, func (r *Reactor) restart(chainClients map[uint64]*chain.ClientWithFallback, accounts []common.Address,
fetchStrategyType FetchStrategyType) error { loadAllTransfers bool) error {
r.stop() r.stop()
return r.start(chainClients, accounts, fetchStrategyType) return r.start(chainClients, accounts, loadAllTransfers)
} }
func (r *Reactor) createFetchStrategy(chainClients map[uint64]*chain.ClientWithFallback, func (r *Reactor) createFetchStrategy(chainClients map[uint64]*chain.ClientWithFallback,
accounts []common.Address, fetchType FetchStrategyType) HistoryFetcher { accounts []common.Address, loadAllTransfers bool) HistoryFetcher {
if fetchType == SequentialFetchStrategyType { if loadAllTransfers {
return NewSequentialFetchStrategy( return NewSequentialFetchStrategy(
r.db, r.db,
r.blockDAO, r.blockDAO,