diff --git a/VERSION b/VERSION index 72a8a6313..9ed317fb4 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.41.0 +0.41.1 diff --git a/services/wallet/commands.go b/services/wallet/commands.go index e1842cfc6..3344a43a6 100644 --- a/services/wallet/commands.go +++ b/services/wallet/commands.go @@ -14,6 +14,7 @@ import ( ) var numberOfBlocksCheckedPerIteration = 40 +var blocksDelayThreshhold = 40 * time.Second type ethHistoricalCommand struct { db *Database @@ -23,6 +24,7 @@ type ethHistoricalCommand struct { balanceCache *balanceCache feed *event.Feed foundHeaders []*DBHeader + noLimit bool from, to, resultingFrom *big.Int } @@ -38,7 +40,7 @@ func (c *ethHistoricalCommand) Run(ctx context.Context) (err error) { start := time.Now() totalRequests, cacheHits := c.balanceCache.getStats(c.address) log.Info("balance cache before checking range", "total", totalRequests, "cached", totalRequests-cacheHits) - from, headers, err := findBlocksWithEthTransfers(ctx, c.client, c.balanceCache, c.eth, c.address, c.from, c.to) + from, headers, err := findBlocksWithEthTransfers(ctx, c.client, c.balanceCache, c.eth, c.address, c.from, c.to, c.noLimit) if err != nil { return err @@ -111,13 +113,14 @@ func (c *erc20HistoricalCommand) Run(ctx context.Context) (err error) { } type newBlocksTransfersCommand struct { - db *Database - accounts []common.Address - chain *big.Int - erc20 *ERC20TransfersDownloader - eth *ETHTransferDownloader - client reactorClient - feed *event.Feed + db *Database + accounts []common.Address + chain *big.Int + erc20 *ERC20TransfersDownloader + eth *ETHTransferDownloader + client reactorClient + feed *event.Feed + lastFetchedBlockTime time.Time initialFrom, from, to *DBHeader } @@ -150,36 +153,101 @@ func (c *newBlocksTransfersCommand) Verify(parent context.Context) (err error) { return nil } -func (c *newBlocksTransfersCommand) getAllTransfers(parent context.Context, from, to uint64) ([]Transfer, map[common.Address][]*DBHeader, error) { - newHeadersByAddress := map[common.Address][]*DBHeader{} - all := []Transfer{} - for n := from; n <= to; n++ { - ctx, cancel := context.WithTimeout(parent, 10*time.Second) - header, err := c.client.HeaderByNumber(ctx, big.NewInt(int64(n))) - cancel() - if err != nil { - return nil, nil, err +func (c *newBlocksTransfersCommand) getAllTransfers(parent context.Context, from, to uint64) (map[common.Address][]Transfer, error) { + transfersByAddress := map[common.Address][]Transfer{} + if to-from > reorgSafetyDepth(c.chain).Uint64() { + fromByAddress := map[common.Address]*big.Int{} + toByAddress := map[common.Address]*big.Int{} + for _, account := range c.accounts { + fromByAddress[account] = new(big.Int).SetUint64(from) + toByAddress[account] = new(big.Int).SetUint64(to) } - dbHeader := toDBHeader(header) - log.Info("reactor get transfers", "block", dbHeader.Hash, "number", dbHeader.Number) - transfers, err := c.getTransfers(parent, dbHeader) - if err != nil { - log.Error("failed to get transfers", "header", dbHeader.Hash, "error", err) - return nil, nil, err + + balanceCache := newBalanceCache() + blocksCommand := &findAndCheckBlockRangeCommand{ + accounts: c.accounts, + db: c.db, + chain: c.chain, + client: c.eth.client, + balanceCache: balanceCache, + feed: c.feed, + fromByAddress: fromByAddress, + toByAddress: toByAddress, + noLimit: true, } - if len(transfers) > 0 { - for _, transfer := range transfers { - headers, ok := newHeadersByAddress[transfer.Address] - if !ok { - headers = []*DBHeader{} - } - newHeadersByAddress[transfer.Address] = append(headers, dbHeader) + + if err := blocksCommand.Command()(parent); err != nil { + return nil, err + } + + for address, headers := range blocksCommand.foundHeaders { + blocks := make([]*big.Int, len(headers)) + for i, header := range headers { + blocks[i] = header.Number } + txCommand := &loadTransfersCommand{ + accounts: []common.Address{address}, + db: c.db, + chain: c.chain, + client: c.erc20.client, + blocksByAddress: map[common.Address][]*big.Int{address: blocks}, + } + + err := txCommand.Command()(parent) + if err != nil { + return nil, err + } + + transfersByAddress[address] = txCommand.foundTransfersByAddress[address] + } + } else { + all := []Transfer{} + newHeadersByAddress := map[common.Address][]*DBHeader{} + for n := from; n <= to; n++ { + ctx, cancel := context.WithTimeout(parent, 10*time.Second) + header, err := c.client.HeaderByNumber(ctx, big.NewInt(int64(n))) + cancel() + if err != nil { + return nil, err + } + dbHeader := toDBHeader(header) + log.Info("reactor get transfers", "block", dbHeader.Hash, "number", dbHeader.Number) + transfers, err := c.getTransfers(parent, dbHeader) + if err != nil { + log.Error("failed to get transfers", "header", dbHeader.Hash, "error", err) + return nil, err + } + if len(transfers) > 0 { + for _, transfer := range transfers { + headers, ok := newHeadersByAddress[transfer.Address] + if !ok { + headers = []*DBHeader{} + } + + transfers, ok := transfersByAddress[transfer.Address] + if !ok { + transfers = []Transfer{} + } + transfersByAddress[transfer.Address] = append(transfers, transfer) + newHeadersByAddress[transfer.Address] = append(headers, dbHeader) + } + } + all = append(all, transfers...) + } + + err := c.saveHeaders(parent, newHeadersByAddress) + if err != nil { + return nil, err + } + + err = c.db.ProcessTranfers(all, nil) + if err != nil { + log.Error("failed to persist transfers", "error", err) + return nil, err } - all = append(all, transfers...) } - return all, newHeadersByAddress, nil + return transfersByAddress, nil } func (c *newBlocksTransfersCommand) saveHeaders(parent context.Context, newHeadersByAddress map[common.Address][]*DBHeader) (err error) { @@ -197,6 +265,32 @@ func (c *newBlocksTransfersCommand) saveHeaders(parent context.Context, newHeade return nil } +func (c *newBlocksTransfersCommand) checkDelay(parent context.Context, nextHeader *types.Header) (*types.Header, error) { + if time.Since(c.lastFetchedBlockTime) > blocksDelayThreshhold { + log.Info("There was a delay before loading next block", "time since previous successful fetching", time.Since(c.lastFetchedBlockTime)) + ctx, cancel := context.WithTimeout(parent, 5*time.Second) + latestHeader, err := c.client.HeaderByNumber(ctx, nil) + cancel() + if err != nil { + log.Warn("failed to get latest block", "number", nextHeader.Number, "error", err) + return nil, err + } + diff := new(big.Int).Sub(latestHeader.Number, nextHeader.Number) + if diff.Cmp(reorgSafetyDepth(c.chain)) >= 0 { + num := new(big.Int).Sub(latestHeader.Number, reorgSafetyDepth(c.chain)) + ctx, cancel := context.WithTimeout(parent, 5*time.Second) + nextHeader, err = c.client.HeaderByNumber(ctx, num) + cancel() + if err != nil { + log.Warn("failed to get next block", "number", num, "error", err) + return nil, err + } + } + } + + return nextHeader, nil +} + func (c *newBlocksTransfersCommand) Run(parent context.Context) (err error) { if c.from == nil { ctx, cancel := context.WithTimeout(parent, 3*time.Second) @@ -210,28 +304,35 @@ func (c *newBlocksTransfersCommand) Run(parent context.Context) (err error) { } num := new(big.Int).Add(c.from.Number, one) ctx, cancel := context.WithTimeout(parent, 5*time.Second) - latest, err := c.client.HeaderByNumber(ctx, num) + nextHeader, err := c.client.HeaderByNumber(ctx, num) cancel() if err != nil { - log.Warn("failed to get latest block", "number", num, "error", err) + log.Warn("failed to get next block", "number", num, "error", err) return err } log.Info("reactor received new block", "header", num) - ctx, cancel = context.WithTimeout(parent, 10*time.Second) - latestHeader, removed, latestValidSavedBlock, reorgSpotted, err := c.onNewBlock(ctx, c.from, latest) - cancel() + + nextHeader, err = c.checkDelay(parent, nextHeader) if err != nil { - log.Error("failed to process new header", "header", latest, "error", err) return err } - if latestHeader == nil && len(removed) == 0 { - log.Info("new block already in the database", "block", latest.Number) - return nil + ctx, cancel = context.WithTimeout(parent, 10*time.Second) + latestHeader, removed, latestValidSavedBlock, reorgSpotted, err := c.onNewBlock(ctx, c.from, nextHeader) + cancel() + if err != nil { + log.Error("failed to process new header", "header", nextHeader, "error", err) + return err } + + err = c.db.ProcessTranfers(nil, removed) + if err != nil { + return err + } + latestHeader.Loaded = true - fromN := latest.Number.Uint64() + fromN := nextHeader.Number.Uint64() if reorgSpotted { if latestValidSavedBlock != nil { @@ -242,38 +343,30 @@ func (c *newBlocksTransfersCommand) Run(parent context.Context) (err error) { } } toN := latestHeader.Number.Uint64() - all, newHeadersByAddress, err := c.getAllTransfers(parent, fromN, toN) + all, err := c.getAllTransfers(parent, fromN, toN) if err != nil { return err } - err = c.saveHeaders(parent, newHeadersByAddress) - if err != nil { - return err - } - - err = c.db.ProcessTranfers(all, removed) - if err != nil { - log.Error("failed to persist transfers", "error", err) - return err - } - c.from = toDBHeader(latest) + c.from = toDBHeader(nextHeader) + c.lastFetchedBlockTime = time.Now() if len(removed) != 0 { lth := len(removed) c.feed.Send(Event{ Type: EventReorg, BlockNumber: removed[lth-1].Number, - Accounts: uniqueAccountsFromTransfers(all), + Accounts: uniqueAccountsFromHeaders(removed), }) } log.Info("before sending new block event", "latest", latestHeader != nil, "removed", len(removed), "len", len(uniqueAccountsFromTransfers(all))) - if latestHeader != nil && len(removed) == 0 { - c.feed.Send(Event{ - Type: EventNewBlock, - BlockNumber: latestHeader.Number, - Accounts: uniqueAccountsFromTransfers(all), - }) - } + + c.feed.Send(Event{ + Type: EventNewBlock, + BlockNumber: latestHeader.Number, + Accounts: uniqueAccountsFromTransfers(all), + NewTransactionsPerAccount: transfersPerAccount(all), + }) + return nil } @@ -378,9 +471,10 @@ func (c *findAndCheckBlockRangeCommand) fastIndex(ctx context.Context, bCache *b signer: types.NewEIP155Signer(c.chain), db: c.db, }, - feed: c.feed, - from: fromByAddress[address], - to: toByAddress[address], + feed: c.feed, + from: fromByAddress[address], + to: toByAddress[address], + noLimit: c.noLimit, } commands[i] = eth group.Add(eth.Command()) @@ -451,10 +545,11 @@ func getTransfersByBlocks(ctx context.Context, db *Database, downloader *ETHTran return allTransfers, nil } -func loadTransfers(ctx context.Context, accounts []common.Address, db *Database, client *ethclient.Client, chain *big.Int, limit int, blocksByAddress map[common.Address][]*big.Int) error { +func loadTransfers(ctx context.Context, accounts []common.Address, db *Database, client *ethclient.Client, chain *big.Int, limit int, blocksByAddress map[common.Address][]*big.Int) (map[common.Address][]Transfer, error) { start := time.Now() group := NewGroup(ctx) + commands := []*transfersCommand{} for _, address := range accounts { blocks, ok := blocksByAddress[address] @@ -475,19 +570,35 @@ func loadTransfers(ctx context.Context, accounts []common.Address, db *Database, }, block: block, } + commands = append(commands, erc20) group.Add(erc20.Command()) } } select { case <-ctx.Done(): - return ctx.Err() + return nil, ctx.Err() case <-group.WaitAsync(): + transfersByAddress := map[common.Address][]Transfer{} + for _, command := range commands { + if len(command.fetchedTransfers) == 0 { + continue + } + + transfers, ok := transfersByAddress[command.address] + if !ok { + transfers = []Transfer{} + } + + for _, transfer := range command.fetchedTransfers { + transfersByAddress[command.address] = append(transfers, transfer) + } + } log.Info("loadTransfers finished", "in", time.Since(start)) - return nil + return transfersByAddress, nil } } -func (c *controlCommand) LoadTransfers(ctx context.Context, downloader *ETHTransferDownloader, limit int) error { +func (c *controlCommand) LoadTransfers(ctx context.Context, downloader *ETHTransferDownloader, limit int) (map[common.Address][]Transfer, error) { return loadTransfers(ctx, c.accounts, c.db, c.client, c.chain, limit, make(map[common.Address][]*big.Int)) } @@ -663,7 +774,7 @@ func (c *controlCommand) Run(parent context.Context) error { signer: types.NewEIP155Signer(c.chain), db: c.db, } - err = c.LoadTransfers(parent, downloader, 40) + _, err = c.LoadTransfers(parent, downloader, 40) if err != nil { return err } @@ -676,15 +787,16 @@ func (c *controlCommand) Run(parent context.Context) error { log.Info("watching new blocks", "start from", head.Number) cmd := &newBlocksTransfersCommand{ - db: c.db, - chain: c.chain, - client: c.client, - accounts: c.accounts, - eth: c.eth, - erc20: c.erc20, - feed: c.feed, - initialFrom: toDBHeader(head), - from: toDBHeader(head), + db: c.db, + chain: c.chain, + client: c.client, + accounts: c.accounts, + eth: c.eth, + erc20: c.erc20, + feed: c.feed, + initialFrom: toDBHeader(head), + from: toDBHeader(head), + lastFetchedBlockTime: time.Now(), } err = cmd.Command()(parent) @@ -704,26 +816,54 @@ func (c *controlCommand) Command() Command { }.Run } -func uniqueAccountsFromTransfers(transfers []Transfer) []common.Address { +func uniqueAccountsFromTransfers(allTransfers map[common.Address][]Transfer) []common.Address { accounts := []common.Address{} unique := map[common.Address]struct{}{} - for i := range transfers { - _, exist := unique[transfers[i].Address] + for address, transfers := range allTransfers { + if len(transfers) == 0 { + continue + } + + _, exist := unique[address] if exist { continue } - unique[transfers[i].Address] = struct{}{} - accounts = append(accounts, transfers[i].Address) + unique[address] = struct{}{} + accounts = append(accounts, address) + } + return accounts +} + +func transfersPerAccount(allTransfers map[common.Address][]Transfer) map[common.Address]int { + res := map[common.Address]int{} + for address, transfers := range allTransfers { + res[address] = len(transfers) + } + + return res +} + +func uniqueAccountsFromHeaders(headers []*DBHeader) []common.Address { + accounts := []common.Address{} + unique := map[common.Address]struct{}{} + for i := range headers { + _, exist := unique[headers[i].Address] + if exist { + continue + } + unique[headers[i].Address] = struct{}{} + accounts = append(accounts, headers[i].Address) } return accounts } type transfersCommand struct { - db *Database - eth *ETHTransferDownloader - block *big.Int - address common.Address - client reactorClient + db *Database + eth *ETHTransferDownloader + block *big.Int + address common.Address + client reactorClient + fetchedTransfers []Transfer } func (c *transfersCommand) Command() Command { @@ -746,16 +886,18 @@ func (c *transfersCommand) Run(ctx context.Context) (err error) { return err } + c.fetchedTransfers = allTransfers log.Debug("transfers loaded", "address", c.address, "len", len(allTransfers)) return nil } type loadTransfersCommand struct { - accounts []common.Address - db *Database - chain *big.Int - client *ethclient.Client - blocksByAddress map[common.Address][]*big.Int + accounts []common.Address + db *Database + chain *big.Int + client *ethclient.Client + blocksByAddress map[common.Address][]*big.Int + foundTransfersByAddress map[common.Address][]Transfer } func (c *loadTransfersCommand) Command() Command { @@ -765,7 +907,7 @@ func (c *loadTransfersCommand) Command() Command { }.Run } -func (c *loadTransfersCommand) LoadTransfers(ctx context.Context, downloader *ETHTransferDownloader, limit int, blocksByAddress map[common.Address][]*big.Int) error { +func (c *loadTransfersCommand) LoadTransfers(ctx context.Context, downloader *ETHTransferDownloader, limit int, blocksByAddress map[common.Address][]*big.Int) (map[common.Address][]Transfer, error) { return loadTransfers(ctx, c.accounts, c.db, c.client, c.chain, limit, blocksByAddress) } @@ -776,10 +918,11 @@ func (c *loadTransfersCommand) Run(parent context.Context) (err error) { signer: types.NewEIP155Signer(c.chain), db: c.db, } - err = c.LoadTransfers(parent, downloader, 40, c.blocksByAddress) + transfersByAddress, err := c.LoadTransfers(parent, downloader, 40, c.blocksByAddress) if err != nil { return err } + c.foundTransfersByAddress = transfersByAddress return } @@ -793,6 +936,8 @@ type findAndCheckBlockRangeCommand struct { feed *event.Feed fromByAddress map[common.Address]*big.Int toByAddress map[common.Address]*big.Int + foundHeaders map[common.Address][]*DBHeader + noLimit bool } func (c *findAndCheckBlockRangeCommand) Command() Command { @@ -808,16 +953,24 @@ func (c *findAndCheckBlockRangeCommand) Run(parent context.Context) (err error) if err != nil { return err } + if c.noLimit { + newFromByAddress = map[common.Address]*big.Int{} + for _, address := range c.accounts { + newFromByAddress[address] = c.fromByAddress[address] + } + } erc20HeadersByAddress, err := c.fastIndexErc20(parent, newFromByAddress, c.toByAddress) if err != nil { return err } + foundHeaders := map[common.Address][]*DBHeader{} for _, address := range c.accounts { ethHeaders := ethHeadersByAddress[address] erc20Headers := erc20HeadersByAddress[address] allHeaders := append(ethHeaders, erc20Headers...) + foundHeaders[address] = allHeaders log.Debug("saving headers", "len", len(allHeaders), "address") err = c.db.ProcessBlocks(address, newFromByAddress[address], c.toByAddress[address], allHeaders) @@ -826,5 +979,7 @@ func (c *findAndCheckBlockRangeCommand) Run(parent context.Context) (err error) } } + c.foundHeaders = foundHeaders + return } diff --git a/services/wallet/commands_test.go b/services/wallet/commands_test.go index c4c05302d..7b38a9492 100644 --- a/services/wallet/commands_test.go +++ b/services/wallet/commands_test.go @@ -54,6 +54,7 @@ func (s *NewBlocksSuite) SetupTest() { }, feed: s.feed, client: s.backend.Client, + chain: big.NewInt(1777), } } @@ -153,7 +154,11 @@ func (s *NewBlocksSuite) TestReorg() { s.Require().EqualError(s.runCmdUntilError(ctx), "not found") close(events) - expected := []Event{{Type: EventReorg, BlockNumber: big.NewInt(21)}, {Type: EventNewBlock, BlockNumber: big.NewInt(25)}} + expected := []Event{ + {Type: EventReorg, BlockNumber: big.NewInt(21)}, + {Type: EventNewBlock, BlockNumber: big.NewInt(24)}, + {Type: EventNewBlock, BlockNumber: big.NewInt(25)}, + } i := 0 for ev := range events { s.Require().Equal(expected[i].Type, ev.Type) @@ -161,7 +166,7 @@ func (s *NewBlocksSuite) TestReorg() { i++ } - transfers, err = s.db.GetTransfers(big.NewInt(0), nil) + transfers, err = s.db.GetTransfers(nil, nil) s.Require().NoError(err) s.Require().Len(transfers, 10) } diff --git a/services/wallet/concurrent.go b/services/wallet/concurrent.go index ed6167070..8a3c150f4 100644 --- a/services/wallet/concurrent.go +++ b/services/wallet/concurrent.go @@ -166,7 +166,7 @@ func checkRanges(parent context.Context, client reactorClient, cache BalanceCach return c.GetRanges(), c.GetHeaders(), nil } -func findBlocksWithEthTransfers(parent context.Context, client reactorClient, cache BalanceCache, downloader TransferDownloader, account common.Address, low, high *big.Int) (from *big.Int, headers []*DBHeader, err error) { +func findBlocksWithEthTransfers(parent context.Context, client reactorClient, cache BalanceCache, downloader TransferDownloader, account common.Address, low, high *big.Int, noLimit bool) (from *big.Int, headers []*DBHeader, err error) { ranges := [][]*big.Int{{low, high}} minBlock := big.NewInt(low.Int64()) headers = []*DBHeader{} @@ -184,7 +184,7 @@ func findBlocksWithEthTransfers(parent context.Context, client reactorClient, ca if len(newRanges) > 0 { log.Debug("found new ranges", "account", account, "lvl", lvl, "new ranges len", len(newRanges)) } - if len(newRanges) > 60 { + if len(newRanges) > 60 && !noLimit { sort.SliceStable(newRanges, func(i, j int) bool { return newRanges[i][0].Cmp(newRanges[j][0]) == 1 }) diff --git a/services/wallet/concurrent_test.go b/services/wallet/concurrent_test.go index dd46cbe14..fa9b6b75e 100644 --- a/services/wallet/concurrent_test.go +++ b/services/wallet/concurrent_test.go @@ -129,7 +129,7 @@ func TestConcurrentEthDownloader(t *testing.T) { concurrent := NewConcurrentDownloader(ctx) _, headers, _ := findBlocksWithEthTransfers( ctx, tc.options.balances, newBalanceCache(), tc.options.batches, - common.Address{}, zero, tc.options.last) + common.Address{}, zero, tc.options.last, false) concurrent.Wait() require.NoError(t, concurrent.Error()) rst := concurrent.Get() diff --git a/services/wallet/events.go b/services/wallet/events.go index 1a5e6f311..9b1d41a53 100644 --- a/services/wallet/events.go +++ b/services/wallet/events.go @@ -24,8 +24,9 @@ const ( // Event is a type for wallet events. type Event struct { - Type EventType `json:"type"` - BlockNumber *big.Int `json:"blockNumber"` - Accounts []common.Address `json:"accounts"` - ERC20 bool `json:"erc20"` + Type EventType `json:"type"` + BlockNumber *big.Int `json:"blockNumber"` + Accounts []common.Address `json:"accounts"` + NewTransactionsPerAccount map[common.Address]int `json:"newTransactions"` + ERC20 bool `json:"erc20"` } diff --git a/services/wallet/reactor.go b/services/wallet/reactor.go index 5dcbba535..3293289d4 100644 --- a/services/wallet/reactor.go +++ b/services/wallet/reactor.go @@ -23,14 +23,24 @@ func pollingPeriodByChain(chain *big.Int) time.Duration { case int64(params.MainNetworkID): return 10 * time.Second case int64(params.RopstenNetworkID): - return 2 * time.Second + return 4 * time.Second default: return 500 * time.Millisecond } } +func reorgSafetyDepth(chain *big.Int) *big.Int { + switch chain.Int64() { + case int64(params.MainNetworkID): + return big.NewInt(5) + case int64(params.RopstenNetworkID): + return big.NewInt(15) + default: + return big.NewInt(15) + } +} + var ( - reorgSafetyDepth = big.NewInt(15) erc20BatchSize = big.NewInt(100000) errAlreadyRunning = errors.New("already running") ) @@ -88,7 +98,7 @@ func (r *Reactor) newControlCommand(accounts []common.Address) *controlCommand { }, erc20: NewERC20TransfersDownloader(r.client, accounts, signer), feed: r.feed, - safetyDepth: reorgSafetyDepth, + safetyDepth: reorgSafetyDepth(r.chain), } return ctl