From d39ca7fea451198a04e5caaeac6163a277c7ced2 Mon Sep 17 00:00:00 2001 From: Roman Volosovskyi Date: Tue, 8 Dec 2020 16:39:27 +0200 Subject: [PATCH] [wallet] Fix multiple notifications on a single erc20 transfer --- VERSION | 2 +- services/local-notifications/core.go | 30 ++++++++++++------ services/wallet/commands.go | 26 +++++++++++++--- services/wallet/database.go | 46 +++++++++++++++------------- services/wallet/downloader.go | 6 ++-- 5 files changed, 70 insertions(+), 40 deletions(-) diff --git a/VERSION b/VERSION index 91d98abbb..a1b8a7d1a 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.64.5 +0.64.6 diff --git a/services/local-notifications/core.go b/services/local-notifications/core.go index cc7807fe9..56b993dfe 100644 --- a/services/local-notifications/core.go +++ b/services/local-notifications/core.go @@ -166,7 +166,7 @@ func (s *Service) transactionsHandler(payload TransactionEvent) { limit := 20 if payload.BlockNumber != nil { for _, address := range payload.Accounts { - if payload.BlockNumber.Cmp(payload.MaxKnownBlocks[address]) == 1 { + if payload.BlockNumber.Cmp(payload.MaxKnownBlocks[address]) >= 0 { log.Info("Handled transfer for address", "info", address) transfers, err := s.walletDB.GetTransfersByAddressAndBlock(address, payload.BlockNumber, int64(limit)) if err != nil { @@ -235,14 +235,26 @@ func (s *Service) StartWalletWatcher() { return case event := <-events: if event.Type == wallet.EventNewBlock && len(maxKnownBlocks) > 0 { - s.transmitter.publisher.Send(TransactionEvent{ - Type: string(event.Type), - BlockNumber: event.BlockNumber, - Accounts: event.Accounts, - NewTransactionsPerAccount: event.NewTransactionsPerAccount, - ERC20: event.ERC20, - MaxKnownBlocks: maxKnownBlocks, - }) + newBlocks := false + for _, address := range event.Accounts { + if _, ok := maxKnownBlocks[address]; !ok { + newBlocks = true + maxKnownBlocks[address] = event.BlockNumber + } else if event.BlockNumber.Cmp(maxKnownBlocks[address]) == 1 { + maxKnownBlocks[address] = event.BlockNumber + newBlocks = true + } + } + if newBlocks { + s.transmitter.publisher.Send(TransactionEvent{ + Type: string(event.Type), + BlockNumber: event.BlockNumber, + Accounts: event.Accounts, + NewTransactionsPerAccount: event.NewTransactionsPerAccount, + ERC20: event.ERC20, + MaxKnownBlocks: maxKnownBlocks, + }) + } } else if event.Type == wallet.EventMaxKnownBlock { for _, address := range event.Accounts { if _, ok := maxKnownBlocks[address]; !ok { diff --git a/services/wallet/commands.go b/services/wallet/commands.go index 59efd3ac4..346cbd5e8 100644 --- a/services/wallet/commands.go +++ b/services/wallet/commands.go @@ -990,9 +990,27 @@ func (c *findAndCheckBlockRangeCommand) Run(parent context.Context) (err error) for _, address := range c.accounts { ethHeaders := ethHeadersByAddress[address] erc20Headers := erc20HeadersByAddress[address] - allHeaders := append(ethHeaders, erc20Headers...) - foundHeaders[address] = allHeaders + + uniqHeadersByHash := map[common.Hash]*DBHeader{} + for _, header := range allHeaders { + uniqHeader, ok := uniqHeadersByHash[header.Hash] + if ok { + if len(header.Erc20Transfers) > 0 { + uniqHeader.Erc20Transfers = append(uniqHeader.Erc20Transfers, header.Erc20Transfers...) + } + uniqHeadersByHash[header.Hash] = uniqHeader + } else { + uniqHeadersByHash[header.Hash] = header + } + } + + uniqHeaders := []*DBHeader{} + for _, header := range uniqHeadersByHash { + uniqHeaders = append(uniqHeaders, header) + } + + foundHeaders[address] = uniqHeaders for _, header := range allHeaders { if header.Number.Cmp(maxBlockNumber) == 1 { @@ -1000,8 +1018,8 @@ func (c *findAndCheckBlockRangeCommand) Run(parent context.Context) (err error) } } - log.Debug("saving headers", "len", len(allHeaders), "address") - err = c.db.ProcessBlocks(address, newFromByAddress[address], c.toByAddress[address], allHeaders) + log.Debug("saving headers", "len", len(uniqHeaders), "address") + err = c.db.ProcessBlocks(address, newFromByAddress[address], c.toByAddress[address], uniqHeaders) if err != nil { return err } diff --git a/services/wallet/database.go b/services/wallet/database.go index 7ba13bcb9..af9ec2f3c 100644 --- a/services/wallet/database.go +++ b/services/wallet/database.go @@ -15,12 +15,12 @@ import ( // DBHeader fields from header that are stored in database. type DBHeader struct { - Number *big.Int - Hash common.Hash - Timestamp uint64 - Erc20Transfer *Transfer - Network uint64 - Address common.Address + Number *big.Int + Hash common.Hash + Timestamp uint64 + Erc20Transfers []*Transfer + Network uint64 + Address common.Address // Head is true if the block was a head at the time it was pulled from chain. Head bool // Loaded is true if trasfers from this block has been already fetched @@ -765,23 +765,25 @@ func insertBlocksWithTransactions(creator statementCreator, account common.Addre if err != nil { return err } - if header.Erc20Transfer != nil { - res, err := updateTx.Exec(&JSONBlob{header.Erc20Transfer.Log}, network, account, header.Erc20Transfer.ID) - if err != nil { - return err - } - affected, err := res.RowsAffected() - if err != nil { - return err - } - if affected > 0 { - continue - } + if len(header.Erc20Transfers) > 0 { + for _, transfer := range header.Erc20Transfers { + res, err := updateTx.Exec(&JSONBlob{transfer.Log}, network, account, transfer.ID) + if err != nil { + return err + } + affected, err := res.RowsAffected() + if err != nil { + return err + } + if affected > 0 { + continue + } - _, err = insertTx.Exec(network, account, account, header.Erc20Transfer.ID, (*SQLBigInt)(header.Number), header.Hash, erc20Transfer, header.Erc20Transfer.Timestamp, &JSONBlob{header.Erc20Transfer.Log}) - if err != nil { - log.Error("error saving erc20transfer", "err", err) - return err + _, err = insertTx.Exec(network, account, account, transfer.ID, (*SQLBigInt)(header.Number), header.Hash, erc20Transfer, transfer.Timestamp, &JSONBlob{transfer.Log}) + if err != nil { + log.Error("error saving erc20transfer", "err", err) + return err + } } } } diff --git a/services/wallet/downloader.go b/services/wallet/downloader.go index cb67306ab..10e616ac4 100644 --- a/services/wallet/downloader.go +++ b/services/wallet/downloader.go @@ -296,7 +296,7 @@ func (d *ERC20TransfersDownloader) blocksFromLogs(parent context.Context, logs [ header := &DBHeader{ Number: big.NewInt(int64(l.BlockNumber)), Hash: l.BlockHash, - Erc20Transfer: &Transfer{ + Erc20Transfers: []*Transfer{{ Address: address, BlockNumber: big.NewInt(int64(l.BlockNumber)), BlockHash: l.BlockHash, @@ -304,9 +304,7 @@ func (d *ERC20TransfersDownloader) blocksFromLogs(parent context.Context, logs [ From: address, Loaded: false, Type: erc20Transfer, - Log: &l, - }, - } + Log: &l}}} concurrent.Add(func(ctx context.Context) error { concurrent.PushHeader(header)