From d74d930b7043289cd1959c71b45564b349a629e2 Mon Sep 17 00:00:00 2001 From: Ivan Belyakov Date: Mon, 5 Jun 2023 17:05:50 +0200 Subject: [PATCH] feat(wallet): enable sequential transfers fetching by default. "EventNewTransfers' event is sent on each block processed, otherwise it could take minutes or longer before first 'EventNewTransfers' is sent. --- params/config.go | 2 ++ services/wallet/service.go | 2 +- services/wallet/transfer/commands.go | 28 +++++++++++++++---- .../wallet/transfer/commands_sequential.go | 18 +----------- services/wallet/transfer/controller.go | 16 +++++------ services/wallet/transfer/reactor.go | 12 ++++---- 6 files changed, 41 insertions(+), 37 deletions(-) diff --git a/params/config.go b/params/config.go index 76cd2783f..5859e97b4 100644 --- a/params/config.go +++ b/params/config.go @@ -520,6 +520,8 @@ type WalletConfig struct { AlchemyAPIKeys map[uint64]string `json:"AlchemyAPIKeys"` InfuraAPIKey string `json:"InfuraAPIKey"` InfuraAPIKeySecret string `json:"InfuraAPIKeySecret"` + // LoadAllTransfers should be false to reduce network traffic and harddrive space consumption when loading tranfers + LoadAllTransfers bool `json:"LoadAllTransfers"` } // LocalNotificationsConfig extra configuration for localnotifications.Service. diff --git a/services/wallet/service.go b/services/wallet/service.go index c7c45786c..83ea92ad1 100644 --- a/services/wallet/service.go +++ b/services/wallet/service.go @@ -86,7 +86,7 @@ func NewService( tokenManager := token.NewTokenManager(db, rpcClient, rpcClient.NetworkManager) savedAddressesManager := &SavedAddressesManager{db: db} transactionManager := transfer.NewTransactionManager(db, gethManager, transactor, config, accountsDB) - transferController := transfer.NewTransferController(db, rpcClient, accountFeed, walletFeed, transactionManager, tokenManager, transfer.OnDemandFetchStrategyType) + transferController := transfer.NewTransferController(db, rpcClient, accountFeed, walletFeed, transactionManager, tokenManager, config.WalletConfig.LoadAllTransfers) cryptoCompare := cryptocompare.NewClient() coingecko := coingecko.NewClient() marketManager := market.NewManager(cryptoCompare, coingecko, walletFeed) diff --git a/services/wallet/transfer/commands.go b/services/wallet/transfer/commands.go index 5af5c1596..8b71111a7 100644 --- a/services/wallet/transfer/commands.go +++ b/services/wallet/transfer/commands.go @@ -354,6 +354,7 @@ type transfersCommand struct { blocksLimit int transactionManager *TransactionManager tokenManager *token.Manager + feed *event.Feed // result fetchedTransfers []Transfer @@ -370,6 +371,9 @@ func (c *transfersCommand) Run(ctx context.Context) (err error) { // Take blocks from cache if available and disrespect the limit // If no blocks are available in cache, take blocks from DB respecting the limit // If no limit is set, take all blocks from DB + log.Info("start transfersCommand", "chain", c.chainClient.ChainID, "address", c.address, "blockNums", c.blockNums) + startTs := time.Now() + for { blocks := c.blockNums if blocks == nil { @@ -377,9 +381,7 @@ func (c *transfersCommand) Run(ctx context.Context) (err error) { } for _, blockNum := range blocks { - log.Info("start transfersCommand", "chain", c.chainClient.ChainID, "address", c.address, "block", blockNum) - - startTs := time.Now() + log.Debug("transfersCommand block start", "chain", c.chainClient.ChainID, "address", c.address, "block", blockNum) allTransfers, err := c.eth.GetTransfersByNumber(ctx, blockNum) if err != nil { @@ -411,8 +413,10 @@ func (c *transfersCommand) Run(ctx context.Context) (err error) { c.fetchedTransfers = append(c.fetchedTransfers, allTransfers...) - log.Debug("end transfersCommand", "chain", c.chainClient.ChainID, "address", c.address, - "block", blockNum, "len", len(allTransfers), "in", time.Since(startTs)) + c.notifyOfNewTransfers(allTransfers) + + log.Debug("transfersCommand block end", "chain", c.chainClient.ChainID, "address", c.address, + "block", blockNum, "tranfers.len", len(allTransfers), "fetchedTransfers.len", len(c.fetchedTransfers)) } if c.blockNums != nil || len(blocks) == 0 || @@ -423,6 +427,9 @@ func (c *transfersCommand) Run(ctx context.Context) (err error) { } } + log.Info("end transfersCommand", "chain", c.chainClient.ChainID, "address", c.address, + "blocks.len", len(c.blockNums), "transfers.len", len(c.fetchedTransfers), "in", time.Since(startTs)) + return nil } @@ -503,6 +510,17 @@ func (c *transfersCommand) processMultiTransactions(ctx context.Context, allTran return nil } +func (c *transfersCommand) notifyOfNewTransfers(transfers []Transfer) { + if c.feed != nil { + if len(transfers) > 0 { + c.feed.Send(walletevent.Event{ + Type: EventNewTransfers, + Accounts: []common.Address{c.address}, + }) + } + } +} + type loadTransfersCommand struct { accounts []common.Address db *Database diff --git a/services/wallet/transfer/commands_sequential.go b/services/wallet/transfer/commands_sequential.go index 602224318..33105b0cc 100644 --- a/services/wallet/transfer/commands_sequential.go +++ b/services/wallet/transfer/commands_sequential.go @@ -284,7 +284,6 @@ func (c *loadAllTransfersCommand) Run(parent context.Context) error { start := time.Now() group := async.NewGroup(parent) - commands := []*transfersCommand{} for _, address := range c.accounts { transfers := &transfersCommand{ db: c.db, @@ -301,8 +300,8 @@ func (c *loadAllTransfersCommand) Run(parent context.Context) error { blocksLimit: c.blocksLimit, transactionManager: c.transactionManager, tokenManager: c.tokenManager, + feed: c.feed, } - commands = append(commands, transfers) group.Add(transfers.Command()) } @@ -312,26 +311,11 @@ func (c *loadAllTransfersCommand) Run(parent context.Context) error { return parent.Err() case <-group.WaitAsync(): log.Debug("loadTransfers finished for account", "in", time.Since(start), "chain", c.chainClient.ChainID, "limit", c.blocksLimit) - - c.notifyOfNewTransfers(commands) } return nil } -func (c *loadAllTransfersCommand) notifyOfNewTransfers(commands []*transfersCommand) { - if c.feed != nil { - for _, command := range commands { - if len(command.fetchedTransfers) > 0 { - c.feed.Send(walletevent.Event{ - Type: EventNewTransfers, - Accounts: []common.Address{command.address}, - }) - } - } - } -} - func newLoadBlocksAndTransfersCommand(accounts []common.Address, db *Database, blockDAO *BlockDAO, chainClient *chain.ClientWithFallback, feed *event.Feed, transactionManager *TransactionManager, tokenManager *token.Manager) *loadBlocksAndTransfersCommand { diff --git a/services/wallet/transfer/controller.go b/services/wallet/transfer/controller.go index 21a2bbae3..4197d436b 100644 --- a/services/wallet/transfer/controller.go +++ b/services/wallet/transfer/controller.go @@ -27,11 +27,11 @@ type Controller struct { group *async.Group transactionManager *TransactionManager tokenManager *token.Manager - fetchStrategyType FetchStrategyType + loadAllTransfers bool } func NewTransferController(db *sql.DB, rpcClient *rpc.Client, accountFeed *event.Feed, transferFeed *event.Feed, - transactionManager *TransactionManager, tokenManager *token.Manager, fetchStrategyType FetchStrategyType) *Controller { + transactionManager *TransactionManager, tokenManager *token.Manager, loadAllTransfers bool) *Controller { blockDAO := &BlockDAO{db} return &Controller{ @@ -42,7 +42,7 @@ func NewTransferController(db *sql.DB, rpcClient *rpc.Client, accountFeed *event TransferFeed: transferFeed, transactionManager: transactionManager, tokenManager: tokenManager, - fetchStrategyType: fetchStrategyType, + loadAllTransfers: loadAllTransfers, } } @@ -110,20 +110,20 @@ func (c *Controller) CheckRecentHistory(chainIDs []uint64, accounts []common.Add } if c.reactor != nil { - err := c.reactor.restart(chainClients, accounts, c.fetchStrategyType) + err := c.reactor.restart(chainClients, accounts, c.loadAllTransfers) if err != nil { return err } } else { c.reactor = NewReactor(c.db, c.blockDAO, c.TransferFeed, c.transactionManager, c.tokenManager) - err = c.reactor.start(chainClients, accounts, c.fetchStrategyType) + err = c.reactor.start(chainClients, accounts, c.loadAllTransfers) if err != nil { return err } c.group.Add(func(ctx context.Context) error { - return watchAccountsChanges(ctx, c.accountFeed, c.reactor, chainClients, accounts, c.fetchStrategyType) + return watchAccountsChanges(ctx, c.accountFeed, c.reactor, chainClients, accounts, c.loadAllTransfers) }) } return nil @@ -132,7 +132,7 @@ func (c *Controller) CheckRecentHistory(chainIDs []uint64, accounts []common.Add // watchAccountsChanges subscribes to a feed and watches for changes in accounts list. If there are new or removed accounts // reactor will be restarted. func watchAccountsChanges(ctx context.Context, accountFeed *event.Feed, reactor *Reactor, - chainClients map[uint64]*chain.ClientWithFallback, initial []common.Address, fetchStrategyType FetchStrategyType) error { + chainClients map[uint64]*chain.ClientWithFallback, initial []common.Address, loadAllTransfers bool) error { ch := make(chan accountsevent.Event, 1) // it may block if the rate of updates will be significantly higher sub := accountFeed.Subscribe(ch) @@ -170,7 +170,7 @@ func watchAccountsChanges(ctx context.Context, accountFeed *event.Feed, reactor listenList := mapToList(listen) log.Debug("list of accounts was changed from a previous version. reactor will be restarted", "new", listenList) - err := reactor.restart(chainClients, listenList, fetchStrategyType) + err := reactor.restart(chainClients, listenList, loadAllTransfers) if err != nil { log.Error("failed to restart reactor with new accounts", "error", err) } diff --git a/services/wallet/transfer/reactor.go b/services/wallet/transfer/reactor.go index 8f362caba..2733ba817 100644 --- a/services/wallet/transfer/reactor.go +++ b/services/wallet/transfer/reactor.go @@ -251,9 +251,9 @@ func NewReactor(db *Database, blockDAO *BlockDAO, feed *event.Feed, tm *Transact // Start runs reactor loop in background. func (r *Reactor) start(chainClients map[uint64]*chain.ClientWithFallback, accounts []common.Address, - fetchStrategyType FetchStrategyType) error { + loadAllTransfers bool) error { - r.strategy = r.createFetchStrategy(chainClients, accounts, fetchStrategyType) + r.strategy = r.createFetchStrategy(chainClients, accounts, loadAllTransfers) return r.strategy.start() } @@ -265,16 +265,16 @@ func (r *Reactor) stop() { } func (r *Reactor) restart(chainClients map[uint64]*chain.ClientWithFallback, accounts []common.Address, - fetchStrategyType FetchStrategyType) error { + loadAllTransfers bool) error { r.stop() - return r.start(chainClients, accounts, fetchStrategyType) + return r.start(chainClients, accounts, loadAllTransfers) } func (r *Reactor) createFetchStrategy(chainClients map[uint64]*chain.ClientWithFallback, - accounts []common.Address, fetchType FetchStrategyType) HistoryFetcher { + accounts []common.Address, loadAllTransfers bool) HistoryFetcher { - if fetchType == SequentialFetchStrategyType { + if loadAllTransfers { return NewSequentialFetchStrategy( r.db, r.blockDAO,