[wallet] Fix multiple notifications on a single erc20 transfer

This commit is contained in:
Roman Volosovskyi 2020-12-08 16:39:27 +02:00
parent d65946e9c0
commit d39ca7fea4
No known key found for this signature in database
GPG Key ID: 0238A4B5ECEE70DE
5 changed files with 70 additions and 40 deletions

View File

@ -1 +1 @@
0.64.5
0.64.6

View File

@ -166,7 +166,7 @@ func (s *Service) transactionsHandler(payload TransactionEvent) {
limit := 20
if payload.BlockNumber != nil {
for _, address := range payload.Accounts {
if payload.BlockNumber.Cmp(payload.MaxKnownBlocks[address]) == 1 {
if payload.BlockNumber.Cmp(payload.MaxKnownBlocks[address]) >= 0 {
log.Info("Handled transfer for address", "info", address)
transfers, err := s.walletDB.GetTransfersByAddressAndBlock(address, payload.BlockNumber, int64(limit))
if err != nil {
@ -235,14 +235,26 @@ func (s *Service) StartWalletWatcher() {
return
case event := <-events:
if event.Type == wallet.EventNewBlock && len(maxKnownBlocks) > 0 {
s.transmitter.publisher.Send(TransactionEvent{
Type: string(event.Type),
BlockNumber: event.BlockNumber,
Accounts: event.Accounts,
NewTransactionsPerAccount: event.NewTransactionsPerAccount,
ERC20: event.ERC20,
MaxKnownBlocks: maxKnownBlocks,
})
newBlocks := false
for _, address := range event.Accounts {
if _, ok := maxKnownBlocks[address]; !ok {
newBlocks = true
maxKnownBlocks[address] = event.BlockNumber
} else if event.BlockNumber.Cmp(maxKnownBlocks[address]) == 1 {
maxKnownBlocks[address] = event.BlockNumber
newBlocks = true
}
}
if newBlocks {
s.transmitter.publisher.Send(TransactionEvent{
Type: string(event.Type),
BlockNumber: event.BlockNumber,
Accounts: event.Accounts,
NewTransactionsPerAccount: event.NewTransactionsPerAccount,
ERC20: event.ERC20,
MaxKnownBlocks: maxKnownBlocks,
})
}
} else if event.Type == wallet.EventMaxKnownBlock {
for _, address := range event.Accounts {
if _, ok := maxKnownBlocks[address]; !ok {

View File

@ -990,9 +990,27 @@ func (c *findAndCheckBlockRangeCommand) Run(parent context.Context) (err error)
for _, address := range c.accounts {
ethHeaders := ethHeadersByAddress[address]
erc20Headers := erc20HeadersByAddress[address]
allHeaders := append(ethHeaders, erc20Headers...)
foundHeaders[address] = allHeaders
uniqHeadersByHash := map[common.Hash]*DBHeader{}
for _, header := range allHeaders {
uniqHeader, ok := uniqHeadersByHash[header.Hash]
if ok {
if len(header.Erc20Transfers) > 0 {
uniqHeader.Erc20Transfers = append(uniqHeader.Erc20Transfers, header.Erc20Transfers...)
}
uniqHeadersByHash[header.Hash] = uniqHeader
} else {
uniqHeadersByHash[header.Hash] = header
}
}
uniqHeaders := []*DBHeader{}
for _, header := range uniqHeadersByHash {
uniqHeaders = append(uniqHeaders, header)
}
foundHeaders[address] = uniqHeaders
for _, header := range allHeaders {
if header.Number.Cmp(maxBlockNumber) == 1 {
@ -1000,8 +1018,8 @@ func (c *findAndCheckBlockRangeCommand) Run(parent context.Context) (err error)
}
}
log.Debug("saving headers", "len", len(allHeaders), "address")
err = c.db.ProcessBlocks(address, newFromByAddress[address], c.toByAddress[address], allHeaders)
log.Debug("saving headers", "len", len(uniqHeaders), "address")
err = c.db.ProcessBlocks(address, newFromByAddress[address], c.toByAddress[address], uniqHeaders)
if err != nil {
return err
}

View File

@ -15,12 +15,12 @@ import (
// DBHeader fields from header that are stored in database.
type DBHeader struct {
Number *big.Int
Hash common.Hash
Timestamp uint64
Erc20Transfer *Transfer
Network uint64
Address common.Address
Number *big.Int
Hash common.Hash
Timestamp uint64
Erc20Transfers []*Transfer
Network uint64
Address common.Address
// Head is true if the block was a head at the time it was pulled from chain.
Head bool
// Loaded is true if trasfers from this block has been already fetched
@ -765,23 +765,25 @@ func insertBlocksWithTransactions(creator statementCreator, account common.Addre
if err != nil {
return err
}
if header.Erc20Transfer != nil {
res, err := updateTx.Exec(&JSONBlob{header.Erc20Transfer.Log}, network, account, header.Erc20Transfer.ID)
if err != nil {
return err
}
affected, err := res.RowsAffected()
if err != nil {
return err
}
if affected > 0 {
continue
}
if len(header.Erc20Transfers) > 0 {
for _, transfer := range header.Erc20Transfers {
res, err := updateTx.Exec(&JSONBlob{transfer.Log}, network, account, transfer.ID)
if err != nil {
return err
}
affected, err := res.RowsAffected()
if err != nil {
return err
}
if affected > 0 {
continue
}
_, err = insertTx.Exec(network, account, account, header.Erc20Transfer.ID, (*SQLBigInt)(header.Number), header.Hash, erc20Transfer, header.Erc20Transfer.Timestamp, &JSONBlob{header.Erc20Transfer.Log})
if err != nil {
log.Error("error saving erc20transfer", "err", err)
return err
_, err = insertTx.Exec(network, account, account, transfer.ID, (*SQLBigInt)(header.Number), header.Hash, erc20Transfer, transfer.Timestamp, &JSONBlob{transfer.Log})
if err != nil {
log.Error("error saving erc20transfer", "err", err)
return err
}
}
}
}

View File

@ -296,7 +296,7 @@ func (d *ERC20TransfersDownloader) blocksFromLogs(parent context.Context, logs [
header := &DBHeader{
Number: big.NewInt(int64(l.BlockNumber)),
Hash: l.BlockHash,
Erc20Transfer: &Transfer{
Erc20Transfers: []*Transfer{{
Address: address,
BlockNumber: big.NewInt(int64(l.BlockNumber)),
BlockHash: l.BlockHash,
@ -304,9 +304,7 @@ func (d *ERC20TransfersDownloader) blocksFromLogs(parent context.Context, logs [
From: address,
Loaded: false,
Type: erc20Transfer,
Log: &l,
},
}
Log: &l}}}
concurrent.Add(func(ctx context.Context) error {
concurrent.PushHeader(header)