From 670954b71b1a41e9020b4863b95ad3d1435418b6 Mon Sep 17 00:00:00 2001 From: Ivan Belyakov Date: Sun, 10 Dec 2023 15:31:30 +0100 Subject: [PATCH] feat(wallet): separated finite and infinite commands in transfers for proper handling of errors and commands restart. Now: - Infinite commands started only once and never restarted, stoped on context.Done. - Finite commands are joined into AtomicGroup to stop the rest in the group in case one command fails. Otherwise other commands in the group will continue running and the failed command is not retried to restart. Fixed goroutine leakage in case of failure of some commands --- services/wallet/transfer/commands.go | 14 +- .../wallet/transfer/commands_sequential.go | 172 ++++++++++++------ .../transfer/commands_sequential_test.go | 4 +- 3 files changed, 119 insertions(+), 71 deletions(-) diff --git a/services/wallet/transfer/commands.go b/services/wallet/transfer/commands.go index f00c528b5..4d742edbe 100644 --- a/services/wallet/transfer/commands.go +++ b/services/wallet/transfer/commands.go @@ -502,14 +502,9 @@ func (c *loadTransfersCommand) Command() async.Command { }.Run } -func (c *loadTransfersCommand) LoadTransfers(ctx context.Context, limit int, blocksByAddress map[common.Address][]*big.Int) error { - return loadTransfers(ctx, c.blockDAO, c.db, c.chainClient, limit, blocksByAddress, - c.transactionManager, c.pendingTxManager, c.tokenManager, c.feed) -} - func (c *loadTransfersCommand) Run(parent context.Context) (err error) { - err = c.LoadTransfers(parent, c.blocksLimit, c.blocksByAddress) - return + return loadTransfers(parent, c.blockDAO, c.db, c.chainClient, c.blocksLimit, c.blocksByAddress, + c.transactionManager, c.pendingTxManager, c.tokenManager, c.feed) } func loadTransfers(ctx context.Context, blockDAO *BlockDAO, db *Database, @@ -544,13 +539,14 @@ func loadTransfers(ctx context.Context, blockDAO *BlockDAO, db *Database, group.Add(transfers.Command()) } + // loadTransfers command will be restarted in case of error, but if context is cancelled, we should stop select { case <-ctx.Done(): - return ctx.Err() + log.Debug("loadTransfers cancelled", "chain", chainClient.NetworkID(), "error", ctx.Err()) case <-group.WaitAsync(): log.Debug("loadTransfers finished for account", "in", time.Since(start), "chain", chainClient.NetworkID()) - return nil } + return nil } func isBinanceChain(chainID uint64) bool { diff --git a/services/wallet/transfer/commands_sequential.go b/services/wallet/transfer/commands_sequential.go index f9f98f988..040d4fdd3 100644 --- a/services/wallet/transfer/commands_sequential.go +++ b/services/wallet/transfer/commands_sequential.go @@ -19,6 +19,16 @@ import ( "github.com/status-im/status-go/transactions" ) +var trLoopCnt int = 0 +var fetchNewBlocksCnt int = 0 + +func verifyOnce(cnt *int, msg string) { + if *cnt > 0 { + panic("verifyOnce, function: " + msg) + } + *cnt++ +} + type findNewBlocksCommand struct { *findBlocksCommand } @@ -36,6 +46,7 @@ func (c *findNewBlocksCommand) Command() async.Command { func (c *findNewBlocksCommand) Run(parent context.Context) (err error) { headNum, err := getHeadBlockNumber(parent, c.chainClient) if err != nil { + log.Error("findNewBlocksCommand getHeadBlockNumber", "error", err, "chain", c.chainClient.NetworkID()) return err } @@ -668,6 +679,8 @@ func loadTransfersLoop(ctx context.Context, blockDAO *BlockDAO, db *Database, log.Debug("loadTransfersLoop start", "chain", chainClient.NetworkID()) + verifyOnce(&trLoopCnt, "loadTransfersLoop") + for { select { case <-ctx.Done(): @@ -730,17 +743,20 @@ type loadBlocksAndTransfersCommand struct { // Not to be set by the caller transfersLoaded map[common.Address]bool // For event RecentHistoryReady to be sent only once per account during app lifetime + started bool } -func (c *loadBlocksAndTransfersCommand) Run(parent context.Context) error { +// func (c *loadBlocksAndTransfersCommand) Run(ctx context.Context) error { +func (c *loadBlocksAndTransfersCommand) Run(ctx context.Context) error { log.Debug("start load all transfers command", "chain", c.chainClient.NetworkID(), "accounts", c.accounts) - ctx := parent + // Finite processes (to be restarted on error, but stopped on success): + // fetching transfers for loaded blocks + // fetching history blocks - // This wait group is used to wait for all the async commands to finish - // but fetchNewBlocksCommand, which is infinite, never finishes, can only be stopped - // by canceling the context which does not happen here, as we don't call group.Stop(). - group := async.NewGroup(ctx) + // Infinite processes (to be restarted on error): + // fetching new blocks + // fetching transfers for new blocks fromNum := big.NewInt(0) headNum, err := getHeadBlockNumber(ctx, c.chainClient) @@ -748,27 +764,31 @@ func (c *loadBlocksAndTransfersCommand) Run(parent context.Context) error { return err } + group := async.NewAtomicGroup(ctx) + defer func() { + group.Stop() + group.Wait() + }() + // It will start loadTransfersCommand which will run until success when all transfers from DB are loaded - err = c.fetchTransfersForLoadedBlocks(group) - for err != nil { + err = c.startFetchingTransfersForLoadedBlocks(group) + if err != nil { + log.Error("loadBlocksAndTransfersCommand fetchTransfersForLoadedBlocks", "error", err) return err } - // Start transfers loop to load transfers for new blocks - c.startTransfersLoop(ctx) - - // This will start findBlocksCommand which will run until success when all blocks are loaded - // Iterate over all accounts and load blocks for each account - for _, account := range c.accounts { - err = c.fetchHistoryBlocks(parent, group, account, fromNum, headNum, c.blocksLoadedCh) - for err != nil { - group.Stop() - group.Wait() - return err - } + if !c.started { + c.started = true + c.startTransfersLoop(ctx) + c.startFetchingNewBlocks(ctx, c.accounts, headNum, c.blocksLoadedCh) } - c.startFetchingNewBlocks(group, c.accounts, headNum, c.blocksLoadedCh) + // It will start findBlocksCommands which will run until success when all blocks are loaded + err = c.fetchHistoryBlocks(group, c.accounts, fromNum, headNum, c.blocksLoadedCh) + if err != nil { + log.Error("loadBlocksAndTransfersCommand fetchHistoryBlocks", "error", err) + return err + } select { case <-ctx.Done(): @@ -777,6 +797,7 @@ func (c *loadBlocksAndTransfersCommand) Run(parent context.Context) error { log.Debug("end loadBlocksAndTransfers command", "chain", c.chainClient.NetworkID(), "accounts", c.accounts) return nil } + return nil } func (c *loadBlocksAndTransfersCommand) Command() async.Command { @@ -786,12 +807,23 @@ func (c *loadBlocksAndTransfersCommand) Command() async.Command { }.Run } +// Start transfers loop to load transfers for new blocks func (c *loadBlocksAndTransfersCommand) startTransfersLoop(ctx context.Context) { go loadTransfersLoop(ctx, c.blockDAO, c.db, c.chainClient, c.transactionManager, c.pendingTxManager, c.tokenManager, c.feed, c.blocksLoadedCh) } -func (c *loadBlocksAndTransfersCommand) fetchHistoryBlocks(ctx context.Context, group *async.Group, account common.Address, fromNum, toNum *big.Int, blocksLoadedCh chan []*DBHeader) error { +func (c *loadBlocksAndTransfersCommand) fetchHistoryBlocks(group *async.AtomicGroup, accounts []common.Address, fromNum, toNum *big.Int, blocksLoadedCh chan []*DBHeader) (err error) { + for _, account := range accounts { + err = c.fetchHistoryBlocksForAccount(group, account, fromNum, toNum, c.blocksLoadedCh) + if err != nil { + return err + } + } + return nil +} + +func (c *loadBlocksAndTransfersCommand) fetchHistoryBlocksForAccount(group *async.AtomicGroup, account common.Address, fromNum, toNum *big.Int, blocksLoadedCh chan []*DBHeader) error { log.Debug("fetchHistoryBlocks start", "chainID", c.chainClient.NetworkID(), "account", account, "omit", c.omitHistory) @@ -804,7 +836,7 @@ func (c *loadBlocksAndTransfersCommand) fetchHistoryBlocks(ctx context.Context, blockRange, err := loadBlockRangeInfo(c.chainClient.NetworkID(), account, c.blockRangeDAO) if err != nil { - log.Error("findBlocksCommand loadBlockRangeInfo", "error", err) + log.Error("fetchHistoryBlocks loadBlockRangeInfo", "error", err) // c.error = err return err // Will keep spinning forever nomatter what } @@ -871,40 +903,46 @@ func (c *loadBlocksAndTransfersCommand) fetchHistoryBlocks(ctx context.Context, return nil } -func (c *loadBlocksAndTransfersCommand) startFetchingNewBlocks(group *async.Group, addresses []common.Address, fromNum *big.Int, blocksLoadedCh chan<- []*DBHeader) { +func (c *loadBlocksAndTransfersCommand) startFetchingNewBlocks(ctx context.Context, addresses []common.Address, fromNum *big.Int, blocksLoadedCh chan<- []*DBHeader) { + log.Debug("startFetchingNewBlocks start", "chainID", c.chainClient.NetworkID(), "accounts", addresses) - log.Debug("startFetchingNewBlocks", "chainID", c.chainClient.NetworkID(), "accounts", addresses, "db", c.accountsDB) + verifyOnce(&fetchNewBlocksCnt, "startFetchingNewBlocks") - newBlocksCmd := &findNewBlocksCommand{ - findBlocksCommand: &findBlocksCommand{ - accounts: addresses, - db: c.db, - accountsDB: c.accountsDB, - blockRangeDAO: c.blockRangeDAO, - chainClient: c.chainClient, - balanceCacher: c.balanceCacher, - feed: c.feed, - noLimit: false, - fromBlockNumber: fromNum, - transactionManager: c.transactionManager, - tokenManager: c.tokenManager, - blocksLoadedCh: blocksLoadedCh, - defaultNodeBlockChunkSize: DefaultNodeBlockChunkSize, - }, - } - group.Add(newBlocksCmd.Command()) + go func() { + newBlocksCmd := &findNewBlocksCommand{ + findBlocksCommand: &findBlocksCommand{ + accounts: addresses, + db: c.db, + accountsDB: c.accountsDB, + blockRangeDAO: c.blockRangeDAO, + chainClient: c.chainClient, + balanceCacher: c.balanceCacher, + feed: c.feed, + noLimit: false, + fromBlockNumber: fromNum, + transactionManager: c.transactionManager, + tokenManager: c.tokenManager, + blocksLoadedCh: blocksLoadedCh, + defaultNodeBlockChunkSize: DefaultNodeBlockChunkSize, + }, + } + group := async.NewGroup(ctx) + group.Add(newBlocksCmd.Command()) + + // No need to wait for the group since it is infinite + <-ctx.Done() + }() + + log.Debug("startFetchingNewBlocks end", "chainID", c.chainClient.NetworkID(), "accounts", addresses, "error", ctx.Err()) } -func (c *loadBlocksAndTransfersCommand) fetchTransfersForLoadedBlocks(group *async.Group) error { - - log.Debug("fetchTransfers start", "chainID", c.chainClient.NetworkID(), "accounts", c.accounts) - +func (c *loadBlocksAndTransfersCommand) getBlocksToLoad() (map[common.Address][]*big.Int, error) { blocksMap := make(map[common.Address][]*big.Int) for _, account := range c.accounts { blocks, err := c.blockDAO.GetBlocksToLoadByAddress(c.chainClient.NetworkID(), account, numberOfBlocksCheckedPerIteration) if err != nil { log.Error("loadBlocksAndTransfersCommand GetBlocksToLoadByAddress", "error", err) - return err + return nil, err } if len(blocks) == 0 { @@ -917,22 +955,36 @@ func (c *loadBlocksAndTransfersCommand) fetchTransfersForLoadedBlocks(group *asy if len(blocksMap) == 0 { log.Debug("fetchTransfers no blocks to load", "chainID", c.chainClient.NetworkID()) - return nil } - txCommand := &loadTransfersCommand{ - accounts: c.accounts, - db: c.db, - blockDAO: c.blockDAO, - chainClient: c.chainClient, - transactionManager: c.transactionManager, - pendingTxManager: c.pendingTxManager, - tokenManager: c.tokenManager, - blocksByAddress: blocksMap, - feed: c.feed, + return blocksMap, nil +} + +func (c *loadBlocksAndTransfersCommand) startFetchingTransfersForLoadedBlocks(group *async.AtomicGroup) error { + + log.Debug("fetchTransfers start", "chainID", c.chainClient.NetworkID(), "accounts", c.accounts) + + blocksMap, err := c.getBlocksToLoad() + if err != nil { + return err } - group.Add(txCommand.Command()) + go func() { + txCommand := &loadTransfersCommand{ + accounts: c.accounts, + db: c.db, + blockDAO: c.blockDAO, + chainClient: c.chainClient, + transactionManager: c.transactionManager, + pendingTxManager: c.pendingTxManager, + tokenManager: c.tokenManager, + blocksByAddress: blocksMap, + feed: c.feed, + } + + group.Add(txCommand.Command()) + log.Debug("fetchTransfers end", "chainID", c.chainClient.NetworkID(), "accounts", c.accounts) + }() return nil } diff --git a/services/wallet/transfer/commands_sequential_test.go b/services/wallet/transfer/commands_sequential_test.go index 1eff042f7..98fabb0e1 100644 --- a/services/wallet/transfer/commands_sequential_test.go +++ b/services/wallet/transfer/commands_sequential_test.go @@ -1100,12 +1100,12 @@ func TestFetchTransfersForLoadedBlocks(t *testing.T) { tc.traceAPICalls = true ctx := context.Background() - group := async.NewGroup(ctx) + group := async.NewAtomicGroup(ctx) fromNum := big.NewInt(0) toNum, err := getHeadBlockNumber(ctx, cmd.chainClient) require.NoError(t, err) - err = cmd.fetchHistoryBlocks(ctx, group, address, fromNum, toNum, blockChannel) + err = cmd.fetchHistoryBlocksForAccount(group, address, fromNum, toNum, blockChannel) require.NoError(t, err) select {