feat(wallet): separated finite and infinite commands in transfers for

proper handling of errors and commands restart.
Now:
- Infinite commands started only once and never restarted, stoped on
context.Done.
- Finite commands are joined into AtomicGroup to stop the rest in the
  group in case one command fails. Otherwise other commands in the group
  will continue running and the failed command is not retried to
  restart. Fixed goroutine leakage in case of failure of some commands
This commit is contained in:
Ivan Belyakov 2023-12-10 15:31:30 +01:00 committed by IvanBelyakoff
parent a6df2be92e
commit 670954b71b
3 changed files with 119 additions and 71 deletions

View File

@ -502,14 +502,9 @@ func (c *loadTransfersCommand) Command() async.Command {
}.Run
}
func (c *loadTransfersCommand) LoadTransfers(ctx context.Context, limit int, blocksByAddress map[common.Address][]*big.Int) error {
return loadTransfers(ctx, c.blockDAO, c.db, c.chainClient, limit, blocksByAddress,
c.transactionManager, c.pendingTxManager, c.tokenManager, c.feed)
}
func (c *loadTransfersCommand) Run(parent context.Context) (err error) {
err = c.LoadTransfers(parent, c.blocksLimit, c.blocksByAddress)
return
return loadTransfers(parent, c.blockDAO, c.db, c.chainClient, c.blocksLimit, c.blocksByAddress,
c.transactionManager, c.pendingTxManager, c.tokenManager, c.feed)
}
func loadTransfers(ctx context.Context, blockDAO *BlockDAO, db *Database,
@ -544,13 +539,14 @@ func loadTransfers(ctx context.Context, blockDAO *BlockDAO, db *Database,
group.Add(transfers.Command())
}
// loadTransfers command will be restarted in case of error, but if context is cancelled, we should stop
select {
case <-ctx.Done():
return ctx.Err()
log.Debug("loadTransfers cancelled", "chain", chainClient.NetworkID(), "error", ctx.Err())
case <-group.WaitAsync():
log.Debug("loadTransfers finished for account", "in", time.Since(start), "chain", chainClient.NetworkID())
return nil
}
return nil
}
func isBinanceChain(chainID uint64) bool {

View File

@ -19,6 +19,16 @@ import (
"github.com/status-im/status-go/transactions"
)
var trLoopCnt int = 0
var fetchNewBlocksCnt int = 0
func verifyOnce(cnt *int, msg string) {
if *cnt > 0 {
panic("verifyOnce, function: " + msg)
}
*cnt++
}
type findNewBlocksCommand struct {
*findBlocksCommand
}
@ -36,6 +46,7 @@ func (c *findNewBlocksCommand) Command() async.Command {
func (c *findNewBlocksCommand) Run(parent context.Context) (err error) {
headNum, err := getHeadBlockNumber(parent, c.chainClient)
if err != nil {
log.Error("findNewBlocksCommand getHeadBlockNumber", "error", err, "chain", c.chainClient.NetworkID())
return err
}
@ -668,6 +679,8 @@ func loadTransfersLoop(ctx context.Context, blockDAO *BlockDAO, db *Database,
log.Debug("loadTransfersLoop start", "chain", chainClient.NetworkID())
verifyOnce(&trLoopCnt, "loadTransfersLoop")
for {
select {
case <-ctx.Done():
@ -730,17 +743,20 @@ type loadBlocksAndTransfersCommand struct {
// Not to be set by the caller
transfersLoaded map[common.Address]bool // For event RecentHistoryReady to be sent only once per account during app lifetime
started bool
}
func (c *loadBlocksAndTransfersCommand) Run(parent context.Context) error {
// func (c *loadBlocksAndTransfersCommand) Run(ctx context.Context) error {
func (c *loadBlocksAndTransfersCommand) Run(ctx context.Context) error {
log.Debug("start load all transfers command", "chain", c.chainClient.NetworkID(), "accounts", c.accounts)
ctx := parent
// Finite processes (to be restarted on error, but stopped on success):
// fetching transfers for loaded blocks
// fetching history blocks
// This wait group is used to wait for all the async commands to finish
// but fetchNewBlocksCommand, which is infinite, never finishes, can only be stopped
// by canceling the context which does not happen here, as we don't call group.Stop().
group := async.NewGroup(ctx)
// Infinite processes (to be restarted on error):
// fetching new blocks
// fetching transfers for new blocks
fromNum := big.NewInt(0)
headNum, err := getHeadBlockNumber(ctx, c.chainClient)
@ -748,27 +764,31 @@ func (c *loadBlocksAndTransfersCommand) Run(parent context.Context) error {
return err
}
group := async.NewAtomicGroup(ctx)
defer func() {
group.Stop()
group.Wait()
}()
// It will start loadTransfersCommand which will run until success when all transfers from DB are loaded
err = c.fetchTransfersForLoadedBlocks(group)
for err != nil {
err = c.startFetchingTransfersForLoadedBlocks(group)
if err != nil {
log.Error("loadBlocksAndTransfersCommand fetchTransfersForLoadedBlocks", "error", err)
return err
}
// Start transfers loop to load transfers for new blocks
c.startTransfersLoop(ctx)
// This will start findBlocksCommand which will run until success when all blocks are loaded
// Iterate over all accounts and load blocks for each account
for _, account := range c.accounts {
err = c.fetchHistoryBlocks(parent, group, account, fromNum, headNum, c.blocksLoadedCh)
for err != nil {
group.Stop()
group.Wait()
return err
}
if !c.started {
c.started = true
c.startTransfersLoop(ctx)
c.startFetchingNewBlocks(ctx, c.accounts, headNum, c.blocksLoadedCh)
}
c.startFetchingNewBlocks(group, c.accounts, headNum, c.blocksLoadedCh)
// It will start findBlocksCommands which will run until success when all blocks are loaded
err = c.fetchHistoryBlocks(group, c.accounts, fromNum, headNum, c.blocksLoadedCh)
if err != nil {
log.Error("loadBlocksAndTransfersCommand fetchHistoryBlocks", "error", err)
return err
}
select {
case <-ctx.Done():
@ -777,6 +797,7 @@ func (c *loadBlocksAndTransfersCommand) Run(parent context.Context) error {
log.Debug("end loadBlocksAndTransfers command", "chain", c.chainClient.NetworkID(), "accounts", c.accounts)
return nil
}
return nil
}
func (c *loadBlocksAndTransfersCommand) Command() async.Command {
@ -786,12 +807,23 @@ func (c *loadBlocksAndTransfersCommand) Command() async.Command {
}.Run
}
// Start transfers loop to load transfers for new blocks
func (c *loadBlocksAndTransfersCommand) startTransfersLoop(ctx context.Context) {
go loadTransfersLoop(ctx, c.blockDAO, c.db, c.chainClient, c.transactionManager,
c.pendingTxManager, c.tokenManager, c.feed, c.blocksLoadedCh)
}
func (c *loadBlocksAndTransfersCommand) fetchHistoryBlocks(ctx context.Context, group *async.Group, account common.Address, fromNum, toNum *big.Int, blocksLoadedCh chan []*DBHeader) error {
func (c *loadBlocksAndTransfersCommand) fetchHistoryBlocks(group *async.AtomicGroup, accounts []common.Address, fromNum, toNum *big.Int, blocksLoadedCh chan []*DBHeader) (err error) {
for _, account := range accounts {
err = c.fetchHistoryBlocksForAccount(group, account, fromNum, toNum, c.blocksLoadedCh)
if err != nil {
return err
}
}
return nil
}
func (c *loadBlocksAndTransfersCommand) fetchHistoryBlocksForAccount(group *async.AtomicGroup, account common.Address, fromNum, toNum *big.Int, blocksLoadedCh chan []*DBHeader) error {
log.Debug("fetchHistoryBlocks start", "chainID", c.chainClient.NetworkID(), "account", account, "omit", c.omitHistory)
@ -804,7 +836,7 @@ func (c *loadBlocksAndTransfersCommand) fetchHistoryBlocks(ctx context.Context,
blockRange, err := loadBlockRangeInfo(c.chainClient.NetworkID(), account, c.blockRangeDAO)
if err != nil {
log.Error("findBlocksCommand loadBlockRangeInfo", "error", err)
log.Error("fetchHistoryBlocks loadBlockRangeInfo", "error", err)
// c.error = err
return err // Will keep spinning forever nomatter what
}
@ -871,40 +903,46 @@ func (c *loadBlocksAndTransfersCommand) fetchHistoryBlocks(ctx context.Context,
return nil
}
func (c *loadBlocksAndTransfersCommand) startFetchingNewBlocks(group *async.Group, addresses []common.Address, fromNum *big.Int, blocksLoadedCh chan<- []*DBHeader) {
func (c *loadBlocksAndTransfersCommand) startFetchingNewBlocks(ctx context.Context, addresses []common.Address, fromNum *big.Int, blocksLoadedCh chan<- []*DBHeader) {
log.Debug("startFetchingNewBlocks start", "chainID", c.chainClient.NetworkID(), "accounts", addresses)
log.Debug("startFetchingNewBlocks", "chainID", c.chainClient.NetworkID(), "accounts", addresses, "db", c.accountsDB)
verifyOnce(&fetchNewBlocksCnt, "startFetchingNewBlocks")
newBlocksCmd := &findNewBlocksCommand{
findBlocksCommand: &findBlocksCommand{
accounts: addresses,
db: c.db,
accountsDB: c.accountsDB,
blockRangeDAO: c.blockRangeDAO,
chainClient: c.chainClient,
balanceCacher: c.balanceCacher,
feed: c.feed,
noLimit: false,
fromBlockNumber: fromNum,
transactionManager: c.transactionManager,
tokenManager: c.tokenManager,
blocksLoadedCh: blocksLoadedCh,
defaultNodeBlockChunkSize: DefaultNodeBlockChunkSize,
},
}
group.Add(newBlocksCmd.Command())
go func() {
newBlocksCmd := &findNewBlocksCommand{
findBlocksCommand: &findBlocksCommand{
accounts: addresses,
db: c.db,
accountsDB: c.accountsDB,
blockRangeDAO: c.blockRangeDAO,
chainClient: c.chainClient,
balanceCacher: c.balanceCacher,
feed: c.feed,
noLimit: false,
fromBlockNumber: fromNum,
transactionManager: c.transactionManager,
tokenManager: c.tokenManager,
blocksLoadedCh: blocksLoadedCh,
defaultNodeBlockChunkSize: DefaultNodeBlockChunkSize,
},
}
group := async.NewGroup(ctx)
group.Add(newBlocksCmd.Command())
// No need to wait for the group since it is infinite
<-ctx.Done()
}()
log.Debug("startFetchingNewBlocks end", "chainID", c.chainClient.NetworkID(), "accounts", addresses, "error", ctx.Err())
}
func (c *loadBlocksAndTransfersCommand) fetchTransfersForLoadedBlocks(group *async.Group) error {
log.Debug("fetchTransfers start", "chainID", c.chainClient.NetworkID(), "accounts", c.accounts)
func (c *loadBlocksAndTransfersCommand) getBlocksToLoad() (map[common.Address][]*big.Int, error) {
blocksMap := make(map[common.Address][]*big.Int)
for _, account := range c.accounts {
blocks, err := c.blockDAO.GetBlocksToLoadByAddress(c.chainClient.NetworkID(), account, numberOfBlocksCheckedPerIteration)
if err != nil {
log.Error("loadBlocksAndTransfersCommand GetBlocksToLoadByAddress", "error", err)
return err
return nil, err
}
if len(blocks) == 0 {
@ -917,22 +955,36 @@ func (c *loadBlocksAndTransfersCommand) fetchTransfersForLoadedBlocks(group *asy
if len(blocksMap) == 0 {
log.Debug("fetchTransfers no blocks to load", "chainID", c.chainClient.NetworkID())
return nil
}
txCommand := &loadTransfersCommand{
accounts: c.accounts,
db: c.db,
blockDAO: c.blockDAO,
chainClient: c.chainClient,
transactionManager: c.transactionManager,
pendingTxManager: c.pendingTxManager,
tokenManager: c.tokenManager,
blocksByAddress: blocksMap,
feed: c.feed,
return blocksMap, nil
}
func (c *loadBlocksAndTransfersCommand) startFetchingTransfersForLoadedBlocks(group *async.AtomicGroup) error {
log.Debug("fetchTransfers start", "chainID", c.chainClient.NetworkID(), "accounts", c.accounts)
blocksMap, err := c.getBlocksToLoad()
if err != nil {
return err
}
group.Add(txCommand.Command())
go func() {
txCommand := &loadTransfersCommand{
accounts: c.accounts,
db: c.db,
blockDAO: c.blockDAO,
chainClient: c.chainClient,
transactionManager: c.transactionManager,
pendingTxManager: c.pendingTxManager,
tokenManager: c.tokenManager,
blocksByAddress: blocksMap,
feed: c.feed,
}
group.Add(txCommand.Command())
log.Debug("fetchTransfers end", "chainID", c.chainClient.NetworkID(), "accounts", c.accounts)
}()
return nil
}

View File

@ -1100,12 +1100,12 @@ func TestFetchTransfersForLoadedBlocks(t *testing.T) {
tc.traceAPICalls = true
ctx := context.Background()
group := async.NewGroup(ctx)
group := async.NewAtomicGroup(ctx)
fromNum := big.NewInt(0)
toNum, err := getHeadBlockNumber(ctx, cmd.chainClient)
require.NoError(t, err)
err = cmd.fetchHistoryBlocks(ctx, group, address, fromNum, toNum, blockChannel)
err = cmd.fetchHistoryBlocksForAccount(group, address, fromNum, toNum, blockChannel)
require.NoError(t, err)
select {