status-im/status-react#9203 Faster tx fetching with less request

*** How it worked before this PR on multiaccount creation:
- On multiacc creation we scanned chain for eth and erc20 transfers. For
  each address of a new empty multiaccount this scan required
  1. two `eth_getBalance` requests to find out that there is no any
     balance change between zero and the last block, for eth transfers
  2. and `chain-size/100000` (currently ~100) `eth_getLogs` requests,
     for erc20 transfers
- For some reason we scanned an address of the chat account as well, and
  also accounts were not deduplicated. So even for an empty multiacc we
  scanned chain twice for each chat and main wallet addresses, in result
  app had to execute about 400 requests.
- As mentioned above, `eth_getBalance` requests were used to check if
  there were any eth transfers, and that caused empty history in case
  if user already used all available eth (so that both zero and latest
  blocks show 0 eth for an address). There might have been transactions
  but we wouldn't fetch/show them.
- There was no upper limit for the number of rpc requests during the
  scan, so it could require indefinite number of requests; the scanning
  algorithm was written so that we persisted the whole history of
  transactions or tried to scan form the beginning again in case of
  failure, giving up only after 10 minutes of failures. In result
  addresses with sufficient number of transactions would never be fully
  scanned and during these 10 minutes app could use gigabytes of
  internet data.
- Failures were caused by `eth_getBlockByNumber`/`eth_getBlockByHash`
  requests. These requests return significantly bigger responses than
  `eth_getBalance`/`eth_transactionsCount` and it is likely that
  execution of thousands of them in parallel caused failures for
  accounts with hundreds of transactions. Even for an account with 12k
  we could successfully determine blocks with transaction in a few
  minutes using `eth_getBalance` requests, but `eth_getBlock...`
  couldn't be processed for this acc.
- There was no caching for for `eth_getBalance` requests, and this
  caused in average 3-4 times more such requests than is needed.

*** How it works now on multiaccount creation:
- On multiacc creation we scan chain for last ~30 eth transactions and
  then check erc20 in the range where these eth transactions were found.
  For an empty address in multiacc this means:
  1. two `eth_getBalance` transactions to determine that there was no
     balance change between zero and the last block; two
     `eth_transactionsCount` requests to determine there are no outgoing
     transactions for this address; total 4 requests for eth transfers
  2. 20 `eth_getLogs` for erc20 transfers. This number can be lowered,
     but that's not a big deal
- Deduplication of addresses is added and also we don't scan chat
  account, so a new multiacc requires ~25 (we also request latest block
  number and probably execute a few other calls) request to determine
  that multiacc is empty (comparing to ~400 before)
- In case if address contains transactions we:
  1. determine the range which contains 20-25 outgoing eth/erc20
     transactions. This usually requires up to 10 `eth_transactionCount`
     requests
  2. then we scan chain for eth transfers using `eth_getBalance` and
     `eth_transactionCount` (for double checking zero balances)
  3. we make sure that we do not scan db for more than 30 blocks with
     transfers. That's important for accounts with mostly incoming
     transactions, because the range found on the first step might
     contain any number of incoming transfers, but only 20-25 outgoing
     transactions
  4. when we found ~30 blocks in a given range, we update initial
     range `from` block using the oldest found block
  5. and now we scan db for erc20transfers using `eth_getLogs`
     `oldest-found-eth-block`-`latest-block`, we make not more than 20 calls
  6. when all blocks which contain incoming/outgoing transfers for a
     given address are found, we save these blocks to db and mark that
     transfers from these blocks are still to be fetched
  7. Then we select latest ~30 (the number can be adjusted) blocks from
     these which were found and fetch transfers, this requires 3-4
     requests per transfer.
  8. we persist scanned range so that we know were to start next time
  9. we dispatch an event which tells client that transactions are found
  10. client fetches latest 20 transfers
- when user presses "fetch more" button we check if app's db contains next
  20 transfers, if not we scan chain again and return transfers after

small fixes
This commit is contained in:
Roman Volosovskyi 2019-12-18 13:01:46 +02:00
parent a4f88d0017
commit a92a95cf83
No known key found for this signature in database
GPG Key ID: 0238A4B5ECEE70DE
24 changed files with 1711 additions and 589 deletions

View File

@ -273,7 +273,7 @@ func (b *GethStatusBackend) startNodeWithAccount(acc multiaccounts.Account, pass
if err != nil {
return err
}
watchAddrs, err := accountsDB.GetAddresses()
watchAddrs, err := accountsDB.GetWalletAddresses()
if err != nil {
return err
}
@ -905,16 +905,23 @@ func (b *GethStatusBackend) startWallet() error {
return err
}
allAddresses := make([]common.Address, len(watchAddresses)+1)
allAddresses[0] = common.Address(mainAccountAddress)
for i, addr := range watchAddresses {
allAddresses[1+i] = common.Address(addr)
uniqAddressesMap := map[common.Address]struct{}{}
allAddresses := []common.Address{}
mainAddress := common.Address(mainAccountAddress)
uniqAddressesMap[mainAddress] = struct{}{}
allAddresses = append(allAddresses, mainAddress)
for _, addr := range watchAddresses {
address := common.Address(addr)
if _, ok := uniqAddressesMap[address]; !ok {
uniqAddressesMap[address] = struct{}{}
allAddresses = append(allAddresses, address)
}
}
return wallet.StartReactor(
b.statusNode.RPCClient().Ethclient(),
allAddresses,
new(big.Int).SetUint64(b.statusNode.Config().NetworkID),
)
new(big.Int).SetUint64(b.statusNode.Config().NetworkID))
}
// InjectChatAccount selects the current chat account using chatKeyHex and injects the key into whisper.

View File

@ -1,7 +1,7 @@
// Code generated by go-bindata. DO NOT EDIT.
// sources:
// 0001_app.down.sql (387B)
// 0001_app.up.sql (3.088kB)
// 0001_app.down.sql (356B)
// 0001_app.up.sql (2.967kB)
// 0002_tokens.down.sql (19B)
// 0002_tokens.up.sql (248B)
// 0003_settings.down.sql (118B)
@ -75,7 +75,7 @@ func (fi bindataFileInfo) Sys() interface{} {
return nil
}
var __0001_appDownSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x74\x8e\xcd\x0e\x82\x40\x0c\x84\xef\x3c\x05\xef\xc1\x49\x03\x07\x13\xa3\xc6\x78\xf0\xd6\xac\x4b\x85\x46\xd8\xae\x6d\xf1\xe7\xed\x4d\x4c\xfc\x59\x85\xeb\x37\x93\x6f\xa6\xdc\xae\x37\xf9\x6e\x36\x5f\x56\xb9\xa2\x19\x85\x46\x8b\xec\x0b\x3a\xef\x79\x08\x96\xc2\x83\xf0\x55\x51\xc6\x21\xb4\xa4\xc6\x72\x4f\xc2\xda\xc5\x98\xd6\x23\x4a\x4f\xaa\xc4\x21\xe5\x26\x2e\xe8\xf1\x4f\xde\xb1\x3f\x8d\x3f\x03\x63\x18\x89\x7b\x47\x9d\xa2\x5c\x7e\x4d\x1f\x0e\x82\xe7\x01\xd5\xa0\x71\xef\x6f\x8b\x55\x59\xed\xa7\x3a\xe0\x5b\x67\x40\x35\x50\x7d\x9b\x72\x1a\x47\xf2\x93\x8b\x4f\xc1\x4b\x29\x2e\x34\xa8\x45\xf6\x08\x00\x00\xff\xff\xef\x20\x3b\x16\x83\x01\x00\x00")
var __0001_appDownSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x74\xcd\xcb\xaa\xc2\x40\x0c\xc6\xf1\x7d\x9f\xa2\xef\xd1\xd5\x39\xb4\x0b\x41\x54\xc4\x85\xbb\x21\x4e\x63\x1b\x6c\x27\x63\x92\x7a\x79\x7b\x41\xf0\x32\xea\x6c\x7f\xf9\xf8\xa7\x5e\x2f\x57\xe5\xe6\xef\x7f\xde\x94\x8a\x66\x14\x3a\xad\x8a\x37\x04\xef\x79\x0a\x96\xe2\x4e\xf8\xac\x28\xbf\xd1\xf5\xa4\xc6\x72\x4d\x8e\x2d\xc4\x98\xce\x23\xca\x48\xaa\xc4\x21\x75\x13\x08\xba\xff\x8a\x0f\xec\x0f\x29\x8d\x40\x83\xa2\x9c\x3e\xa7\x2f\x77\x82\xc7\x09\xd5\x5c\x07\xcf\xe7\xb3\x45\xdd\x6c\x73\x1b\xe7\x7b\x30\x47\xad\xa3\xf6\x92\x6b\x1a\x47\xf2\xd9\x8f\xf7\xc0\x23\x29\x10\x3a\xd4\xaa\xb8\x05\x00\x00\xff\xff\xf6\xca\x86\xce\x64\x01\x00\x00")
func _0001_appDownSqlBytes() ([]byte, error) {
return bindataRead(
@ -90,12 +90,12 @@ func _0001_appDownSql() (*asset, error) {
return nil, err
}
info := bindataFileInfo{name: "0001_app.down.sql", size: 387, mode: os.FileMode(0644), modTime: time.Unix(1578669164, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xbc, 0x9c, 0xd2, 0xe1, 0x1d, 0x8, 0x34, 0x6a, 0xc8, 0x37, 0x13, 0xb3, 0x9f, 0x26, 0x23, 0x33, 0xd4, 0x25, 0x8, 0xed, 0x53, 0xe6, 0xd, 0x46, 0xc9, 0xf4, 0x24, 0xf8, 0x1, 0x1f, 0xf5, 0xc8}}
info := bindataFileInfo{name: "0001_app.down.sql", size: 356, mode: os.FileMode(0644), modTime: time.Unix(1579172955, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xb5, 0x25, 0xa0, 0xf8, 0x7d, 0x2d, 0xd, 0xcf, 0x18, 0xe4, 0x73, 0xc3, 0x95, 0xf5, 0x24, 0x20, 0xa9, 0xe6, 0x9e, 0x1d, 0x93, 0xe5, 0xc5, 0xad, 0x93, 0x8f, 0x5e, 0x40, 0xb5, 0x30, 0xaa, 0x25}}
return a, nil
}
var __0001_appUpSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xa4\x56\xc1\x92\xaa\x38\x14\xdd\xf3\x15\x59\xda\x55\x6c\x66\xfd\x56\xa8\xd1\xa6\xc6\x07\x33\x88\xd3\xfd\x56\xa9\x34\x44\x4c\x09\x24\x2f\x09\x6d\xfb\xf7\x53\x21\x09\xa0\x82\xb6\x33\x3b\x92\x7b\x73\x3c\xe7\xdc\xeb\x4d\x16\x09\x0c\x52\x08\xd2\x60\xbe\x81\x20\x5c\x81\x28\x4e\x01\x7c\x0f\xb7\xe9\x16\x48\xa2\x14\xad\x0b\x09\x66\x9e\x3a\x73\x02\xfe\x09\x92\xc5\x6b\x90\x80\xbf\x92\xf0\x67\x90\xfc\x02\x7f\xc2\x5f\xbe\xf7\x89\xcb\x86\x80\xf9\x26\x9e\x7b\x2f\xe0\x2d\x4c\x5f\xe3\x5d\x0a\x92\xf8\x2d\x5c\xfe\xf0\xbc\x3b\xe0\x38\xcb\x58\x53\x2b\x0d\x8e\xf3\x5c\x10\x29\xc7\xf1\x4f\xb8\x2c\x89\x02\xf3\x38\xde\xc0\x20\xf2\xbd\xec\x80\x07\xab\x96\x57\x0a\xdf\x53\xdf\x93\x8a\x09\x5c\xb8\x15\x6f\x3e\x8e\xe4\xdc\xf2\xf2\x3d\x8e\xd5\xc1\xee\xd7\xb8\x72\x29\x19\x2b\x99\x70\xdf\x82\x60\x45\x72\x84\x15\x58\x06\x29\x4c\xc3\x9f\xb0\x25\x1b\xed\x36\x1b\xdf\x6b\x78\x3e\x19\x9d\x56\xbd\x8b\xc2\xbf\x77\x10\x84\xd1\x12\xbe\x83\xa6\xa6\xbf\x1b\x82\x8c\x1a\xe4\x14\xc7\xd1\xc0\x07\x13\x7b\x01\x6f\xaf\x30\x81\xdd\xf2\xc7\x3d\x38\x6d\xc6\x38\x98\x8e\x74\x50\xed\xa2\x03\x32\x08\xbd\x62\x64\x4f\x5d\x01\x74\xf1\x1e\xa6\xdf\xba\x5f\xdb\x0f\xc1\x4e\x92\x08\x5d\x5b\x9a\xb7\x0e\x5f\xd6\xb4\x2b\xc2\xc0\x63\x45\x2b\x22\x15\xae\x38\xd8\x6d\xd7\xe1\x3a\x82\x4b\x30\x0f\xd7\x61\x94\xfa\x5e\x8e\x39\x77\x25\x07\x4b\xb8\x0a\x76\x9b\x14\xec\x71\x29\x89\xef\x1d\xa8\xae\xfb\x39\xac\x73\xf2\x05\x76\xd1\xd6\x9c\x0c\xa3\xf4\xb9\x6e\x74\x8c\x91\xc5\x03\x33\xcf\x6e\x21\xa7\xa0\xa7\xea\x72\x4c\xeb\xac\xe2\x04\x86\xeb\x48\x2b\x9b\xf5\x67\x5e\x40\x02\x57\x30\x81\xd1\x02\xf6\xe8\x33\xbd\x1f\x6b\x0d\x1b\x98\x42\xb0\x08\xb6\x8b\x60\x09\xbd\x07\x6e\x6a\xf9\xda\xca\xde\xb5\x81\x99\xcf\xc9\xe4\x44\x54\x54\x4a\xca\x6a\x0d\xa8\x81\xd1\x58\x2d\xfa\xb4\xeb\xc8\x50\x6c\x77\xfc\x42\x6b\xcb\x76\x66\xb6\xc7\xa5\xde\x23\xa8\x04\xae\xe5\xde\xb4\x4e\x4d\xd4\x89\x89\xa3\x2e\x40\x57\x58\xd3\x12\xc3\x5a\x60\x79\xe8\x06\x47\xbf\x7d\x3d\x52\xfa\xc8\x47\x79\x44\x13\x87\xd4\x97\x9d\x17\x92\xd4\x39\x11\x23\x19\x82\x64\x84\x72\x65\xd3\x4a\x56\xd8\xaf\x8b\xf1\x38\xee\x56\xaf\xc6\x77\x14\x2e\x7b\xa4\x64\xd9\x51\x0e\xd3\x4c\xca\x8d\x87\xbe\xb7\x88\xa3\x6d\x9a\x04\xda\x08\x3b\x07\x9c\x6d\x88\x13\xe1\xe6\x41\xfb\x6d\xe1\xdc\xf0\x98\x69\x4c\xdf\x26\xf8\xfd\x6f\xbd\x3c\xea\x41\xc3\xee\x7f\x16\xa5\x6e\xaa\x0f\x22\x6e\xd3\x07\x7f\xfd\x69\x48\x82\xf3\x76\x06\x74\x03\x60\x15\x6c\xb6\xa3\x66\xb4\x5c\x47\xd5\x5f\x9b\x3b\x79\xd8\x30\x7d\x84\x61\xb2\x1e\x7a\xe7\x66\x2a\x52\x0c\x3d\xe7\xe3\xfd\x2e\x9e\xb2\x53\x9e\xeb\x0c\xb4\x83\xf3\x4e\xff\x59\xee\xf7\x3b\xd0\x25\x7d\xab\x07\x2b\xcc\x39\xad\x0b\xb4\x67\xc2\xdd\x28\x9d\xe2\x51\x27\x5d\x1b\xf6\x74\x9e\xe9\xc8\x0a\xd3\x52\x12\xf1\x69\x66\x05\x00\x00\xd0\x7c\xfc\x05\xa1\x63\xed\x94\xbb\xb5\x51\x87\xa6\x4d\xd6\x51\x8e\xa5\x3c\x31\xd1\x41\x9b\xdd\x7d\x49\x88\xba\x39\xf1\xdc\x2c\xee\x05\x20\x41\x7e\x37\x44\x2a\x54\x60\xee\xc4\x14\x98\xa3\xbd\x60\xd5\xc5\x9d\x06\xd7\xf0\x9a\x9f\xce\x53\xec\x51\xd6\xe8\x2d\xac\x03\xed\x03\xe2\xfa\x86\x9b\xd6\x61\x9e\x0e\x13\xcc\x91\x05\x43\x34\xff\xd2\x2d\x33\x29\xd0\xe6\x7d\xbb\xc0\x48\x31\x4e\x33\xe7\x4c\xbb\x98\xae\xb4\x05\x97\x97\x05\x2b\xb1\x54\x8e\x45\xe7\x91\x9b\x24\x7f\x98\x9c\x9c\xca\x8c\x7d\x12\x71\xbe\x79\x6b\xd8\x51\xd3\x36\x12\x29\x98\xa2\xfa\x19\x34\x9e\xf5\x9f\x7b\xa0\xe5\xed\x7c\x12\xb8\x2e\x88\x13\xec\x6a\x34\x29\xb9\x64\x27\xd2\xcb\x33\x6d\x63\x35\x9a\x84\x03\x2d\x0e\xc3\x0c\xc5\x5c\xfc\x96\xee\xbf\x01\x00\x00\xff\xff\x5a\xea\xe5\xa6\x10\x0c\x00\x00")
var __0001_appUpSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xac\x56\x4f\x73\xa2\x30\x14\xbf\xf3\x29\x72\xd4\x19\x2e\x7b\xee\x09\x35\x5a\x66\x29\xec\x22\x6e\xdb\x53\x26\x42\xc4\x8c\x40\xd2\x24\xd4\xfa\xed\x77\x02\x04\x50\x41\xeb\xce\xde\x4c\xde\xcb\xe3\xf7\xe7\xe5\xc5\x79\x08\x9d\x08\x82\xc8\x99\x79\x10\xb8\x4b\xe0\x07\x11\x80\x6f\xee\x3a\x5a\x03\x49\x94\xa2\x45\x2a\xc1\xc4\x52\x27\x4e\xc0\x1f\x27\x9c\x3f\x3b\x21\xf8\x15\xba\x2f\x4e\xf8\x0e\x7e\xc2\x77\xdb\xfa\xc4\x59\x49\xc0\xcc\x0b\x66\xd6\x14\xbc\xba\xd1\x73\xb0\x89\x40\x18\xbc\xba\x8b\x27\xcb\xba\x51\x1c\xc7\x31\x2b\x0b\xa5\x8b\xe3\x24\x11\x44\xca\xe1\xfa\x47\x9c\x65\x44\x81\x59\x10\x78\xd0\xf1\x6d\x2b\xde\xe3\xde\xaa\xc2\x15\xc1\xb7\xc8\xb6\xa4\x62\x02\xa7\x66\xc5\xcb\xed\x81\x9c\x2a\x5c\xb6\xc5\xb1\xda\x37\xfb\x05\xce\x4d\x4a\xcc\x32\x26\xcc\x6f\x41\xb0\x22\x09\xc2\x0a\x2c\x9c\x08\x46\xee\x0b\xac\xc0\xfa\x1b\xcf\xb3\xad\x92\x27\xa3\xd1\x71\xd6\x1b\xdf\xfd\xbd\x81\xc0\xf5\x17\xf0\x0d\x94\x05\xfd\x28\x09\xaa\xd9\x20\xc3\x38\xf0\x7b\x3a\xd4\xb1\x29\x78\x7d\x86\x21\x6c\x97\x4f\xb7\xca\x69\x31\x86\x8b\xe9\x48\x5b\xaa\x5a\xb4\x85\xea\x0a\x1d\x63\xd4\x9c\xba\x28\xd0\xc6\xbb\x32\xdd\xd6\x6d\x6f\xb7\x82\x1d\x25\x11\xda\x5b\x9a\x54\x0a\x9f\x7b\xda\x9a\xd0\xd3\x58\xd1\x9c\x48\x85\x73\x0e\x36\xeb\x95\xbb\xf2\xe1\x02\xcc\xdc\x95\xeb\x47\xb6\x95\x60\xce\x8d\xe5\x60\x01\x97\xce\xc6\x8b\xc0\x0e\x67\x92\xd8\xd6\x9e\x6a\xdf\x4f\x6e\x91\x90\x2f\xb0\xf1\xd7\xf5\x49\xd7\x8f\x1e\xeb\x46\x83\x18\x35\xf5\xc0\xc4\x6a\xb6\x90\x61\xd0\x41\x35\x39\x75\xeb\x2c\x83\x10\xba\x2b\x5f\x33\x9b\x74\x67\xa6\x20\x84\x4b\x18\x42\x7f\x0e\xbb\xea\x13\xbd\x1f\x68\x0e\x1e\x8c\x20\x98\x3b\xeb\xb9\xb3\x80\xd6\x1d\x35\x35\x7d\x2d\x65\xa7\x5a\x4f\xcc\xc7\x68\x72\x22\x72\x2a\x25\x65\x85\x2e\xa8\x0b\xa3\x21\x2f\xba\xb4\xcb\x48\x9f\x6c\x7b\xfc\x8c\x6b\x85\x76\x52\x6f\x0f\x53\xbd\x05\x50\x09\x5c\xc8\x5d\xdd\x3a\x05\x51\x47\x26\x0e\xda\x80\xd6\xd8\xba\x25\xfa\x5e\x60\xb9\x6f\x07\x47\xb7\x7d\x39\x52\xba\xc8\x36\x3b\xa0\x91\x43\xea\xab\x99\x17\x92\x14\x09\x11\x26\xc3\xb6\x04\x89\x09\xe5\xaa\x89\x66\x2c\x6d\x7e\x9d\x4d\xc5\xf3\x4f\x14\x65\xbe\x25\xe2\x1a\x6f\xaf\xcd\x47\x39\x65\x0c\x27\x24\xa9\x3a\xbe\x6d\xf7\x1f\xe7\xda\x77\xda\xd8\x0d\x55\xdb\x10\x3b\xef\xbc\x8c\xc5\x07\x79\x3b\xfd\xca\x25\xdb\x9a\x07\xfe\x3a\x0a\x1d\x0d\xab\x99\x34\xc6\x18\xc4\x89\x30\x13\xa7\xfa\xdd\x94\x36\xe3\x69\xa2\x6b\xb6\x1f\xe9\xbe\x3b\xbd\xd7\xe5\x35\xd2\xef\xda\x7e\xdb\xdf\x31\xf1\x5b\xef\xbf\x25\xf9\xd2\xf1\xd6\x83\x5a\xe4\x98\x73\x5a\xa4\x68\xc7\x84\x99\x9d\x48\x31\x54\x31\x18\xd4\xe4\x52\xf3\xc7\x75\x41\x02\x17\x29\xf9\x4f\xf2\xec\x04\xcb\x87\xc5\x51\xec\x72\xff\x1e\xbc\x1c\xd3\x4c\x12\xf1\x59\x5f\x59\x00\x00\xa0\xc9\xf0\x43\xae\x63\xd5\xb0\xb9\x06\xa5\x43\xe3\x90\x75\x94\x63\x29\x8f\x4c\x24\xdd\x9d\xd4\xbb\xbb\x8c\x10\x75\x75\xe2\xb1\x91\xd8\x11\x40\x82\x7c\x94\x44\x2a\x94\x62\x6e\xc8\xa4\x98\xd7\x72\xf5\x9f\x16\xb8\x82\x97\xf8\x74\x9e\x62\xf7\xb2\x06\x1f\x43\x1d\xa8\xde\xf1\xcb\x87\x66\x9c\x47\xfd\x82\x8f\x20\x47\x4d\x31\x44\x93\x2f\x7d\xb7\x47\x09\x36\x79\xdf\x36\x18\x29\xc6\x69\x6c\x94\xa9\x16\xe3\x4e\x37\xc5\xe5\xb9\x61\x19\x96\xca\xa0\x68\x35\xea\x8d\x38\x9d\x93\x50\x19\xb3\x4f\x22\x4e\x57\x4f\x7e\x73\x21\xab\x46\x22\x29\x53\x54\xff\x1b\x19\xce\xfa\xe7\x1e\xa8\x70\x1b\x9d\xda\x4b\xd7\xf7\x68\x94\x72\xc6\x8e\xa4\xa3\x57\xb7\x4d\xc3\xb1\x4e\xd8\xd3\x74\xdf\xcf\x50\xcc\xc4\xaf\xe1\xfe\x0d\x00\x00\xff\xff\xe8\x42\x77\x9b\x97\x0b\x00\x00")
func _0001_appUpSqlBytes() ([]byte, error) {
return bindataRead(
@ -110,8 +110,8 @@ func _0001_appUpSql() (*asset, error) {
return nil, err
}
info := bindataFileInfo{name: "0001_app.up.sql", size: 3088, mode: os.FileMode(0644), modTime: time.Unix(1578669164, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x93, 0xb8, 0x68, 0x17, 0x49, 0x51, 0xc0, 0xe8, 0xbc, 0x36, 0xa4, 0x29, 0xc9, 0x93, 0x6c, 0x3e, 0xdf, 0x3d, 0x23, 0x22, 0xab, 0x18, 0x49, 0xbd, 0x6, 0xf, 0xc5, 0xec, 0xf8, 0xcf, 0x1b, 0x6a}}
info := bindataFileInfo{name: "0001_app.up.sql", size: 2967, mode: os.FileMode(0644), modTime: time.Unix(1579172940, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xf7, 0x3a, 0xa7, 0xf2, 0x8f, 0xfa, 0x82, 0x7c, 0xc5, 0x49, 0xac, 0xac, 0xf, 0xc, 0x77, 0xe2, 0xba, 0xe8, 0x4d, 0xe, 0x6f, 0x5d, 0x2c, 0x2c, 0x18, 0x80, 0xc2, 0x1d, 0xe, 0x25, 0xe, 0x18}}
return a, nil
}
@ -130,7 +130,7 @@ func _0002_tokensDownSql() (*asset, error) {
return nil, err
}
info := bindataFileInfo{name: "0002_tokens.down.sql", size: 19, mode: os.FileMode(0644), modTime: time.Unix(1578669164, 0)}
info := bindataFileInfo{name: "0002_tokens.down.sql", size: 19, mode: os.FileMode(0644), modTime: time.Unix(1576159007, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xd1, 0x31, 0x2, 0xcc, 0x2f, 0x38, 0x90, 0xf7, 0x58, 0x37, 0x47, 0xf4, 0x18, 0xf7, 0x72, 0x74, 0x67, 0x14, 0x7e, 0xf3, 0xb1, 0xd6, 0x5f, 0xb0, 0xd5, 0xe7, 0x91, 0xf4, 0x26, 0x77, 0x8e, 0x68}}
return a, nil
}
@ -150,7 +150,7 @@ func _0002_tokensUpSql() (*asset, error) {
return nil, err
}
info := bindataFileInfo{name: "0002_tokens.up.sql", size: 248, mode: os.FileMode(0644), modTime: time.Unix(1578669164, 0)}
info := bindataFileInfo{name: "0002_tokens.up.sql", size: 248, mode: os.FileMode(0644), modTime: time.Unix(1576159007, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xcc, 0xd6, 0xde, 0xd3, 0x7b, 0xee, 0x92, 0x11, 0x38, 0xa4, 0xeb, 0x84, 0xca, 0xcb, 0x37, 0x75, 0x5, 0x77, 0x7f, 0x14, 0x39, 0xee, 0xa1, 0x8b, 0xd4, 0x5c, 0x6e, 0x55, 0x6, 0x50, 0x16, 0xd4}}
return a, nil
}
@ -170,7 +170,7 @@ func _0003_settingsDownSql() (*asset, error) {
return nil, err
}
info := bindataFileInfo{name: "0003_settings.down.sql", size: 118, mode: os.FileMode(0644), modTime: time.Unix(1578669164, 0)}
info := bindataFileInfo{name: "0003_settings.down.sql", size: 118, mode: os.FileMode(0644), modTime: time.Unix(1578048597, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xe5, 0xa6, 0xf5, 0xc0, 0x60, 0x64, 0x77, 0xe2, 0xe7, 0x3c, 0x9b, 0xb1, 0x52, 0xa9, 0x95, 0x16, 0xf8, 0x60, 0x2f, 0xa5, 0xeb, 0x46, 0xb9, 0xb9, 0x8f, 0x4c, 0xf4, 0xfd, 0xbb, 0xe7, 0xe5, 0xe5}}
return a, nil
}
@ -190,7 +190,7 @@ func _0003_settingsUpSql() (*asset, error) {
return nil, err
}
info := bindataFileInfo{name: "0003_settings.up.sql", size: 1311, mode: os.FileMode(0644), modTime: time.Unix(1578669164, 0)}
info := bindataFileInfo{name: "0003_settings.up.sql", size: 1311, mode: os.FileMode(0644), modTime: time.Unix(1578048597, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xea, 0x35, 0x0, 0xeb, 0xe2, 0x33, 0x68, 0xb9, 0xf4, 0xf6, 0x8e, 0x9e, 0x10, 0xe9, 0x58, 0x68, 0x28, 0xb, 0xcd, 0xec, 0x74, 0x71, 0xa7, 0x9a, 0x5a, 0x77, 0x59, 0xb1, 0x13, 0x1c, 0xa1, 0x5b}}
return a, nil
}
@ -210,7 +210,7 @@ func docGo() (*asset, error) {
return nil, err
}
info := bindataFileInfo{name: "doc.go", size: 74, mode: os.FileMode(0644), modTime: time.Unix(1578669164, 0)}
info := bindataFileInfo{name: "doc.go", size: 74, mode: os.FileMode(0644), modTime: time.Unix(1573216280, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xde, 0x7c, 0x28, 0xcd, 0x47, 0xf2, 0xfa, 0x7c, 0x51, 0x2d, 0xd8, 0x38, 0xb, 0xb0, 0x34, 0x9d, 0x4c, 0x62, 0xa, 0x9e, 0x28, 0xc3, 0x31, 0x23, 0xd9, 0xbb, 0x89, 0x9f, 0xa0, 0x89, 0x1f, 0xe8}}
return a, nil
}

View File

@ -6,7 +6,6 @@ DROP TABLE dapps;
DROP TABLE permissions;
DROP TABLE transfers;
DROP TABLE blocks;
DROP TABLE accounts_to_blocks;
DROP TABLE mailservers;
DROP TABLE mailserver_request_gaps;
DROP INDEX mailserver_request_gaps_chat_id_idx;

View File

@ -52,31 +52,31 @@ hash VARCHAR NOT NULL,
address VARCHAR NOT NULL,
blk_hash VARCHAR NOT NULL,
tx BLOB,
sender VARCHAR NOT NULL,
sender VARCHAR,
receipt BLOB,
log BLOB,
type VARCHAR NOT NULL,
FOREIGN KEY(network_id,blk_hash) REFERENCES blocks(network_id,hash) ON DELETE CASCADE,
blk_number BIGINT NOT NULL,
timestamp UNSIGNED BIGINT NOT NULL,
loaded BOOL DEFAULT 1,
FOREIGN KEY(network_id,address,blk_hash) REFERENCES blocks(network_id,address,blk_hash) ON DELETE CASCADE,
CONSTRAINT unique_transfer_per_address_per_network UNIQUE (hash,address,network_id)
);
CREATE TABLE IF NOT EXISTS blocks (
network_id UNSIGNED BIGINT NOT NULL,
hash VARCHAR NOT NULL,
number BIGINT NOT NULL,
timestamp UNSIGNED BIGINT NOT NULL,
head BOOL DEFAULT FALSE,
CONSTRAINT unique_block_per_network UNIQUE (network_id,hash)
CONSTRAINT unique_block_number_per_network UNIQUE (network_id,number)
);
CREATE TABLE IF NOT EXISTS accounts_to_blocks (
network_id UNSIGNED BIGINT NOT NULL,
address VARCHAR NOT NULL,
blk_number BIGINT NOT NULL,
sync INT,
FOREIGN KEY(network_id,blk_number) REFERENCES blocks(network_id,number) ON DELETE CASCADE,
CONSTRAINT unique_mapping_for_account_to_block_per_network UNIQUE (address,blk_number,network_id)
blk_hash BIGINT NOT NULL,
loaded BOOL DEFAULT FALSE,
CONSTRAINT unique_mapping_for_account_to_block_per_network UNIQUE (address,blk_hash,network_id)
);
CREATE TABLE IF NOT EXISTS blocks_ranges (
network_id UNSIGNED BIGINT NOT NULL,
address VARCHAR NOT NULL,
blk_from BIGINT NOT NULL,
blk_to BIGINT NOT NULL
);
CREATE TABLE IF NOT EXISTS mailservers (

View File

@ -400,6 +400,23 @@ func (db *Database) GetWalletAddress() (rst types.Address, err error) {
return
}
func (db *Database) GetWalletAddresses() (rst []types.Address, err error) {
rows, err := db.db.Query("SELECT address FROM accounts WHERE chat = 0 ORDER BY created_at")
if err != nil {
return nil, err
}
defer rows.Close()
for rows.Next() {
addr := types.Address{}
err = rows.Scan(&addr)
if err != nil {
return nil, err
}
rst = append(rst, addr)
}
return rst, nil
}
func (db *Database) GetChatAddress() (rst types.Address, err error) {
err = db.db.QueryRow("SELECT address FROM accounts WHERE chat = 1").Scan(&rst)
return

View File

@ -5,6 +5,7 @@ import (
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/multiaccounts/accounts"
)
@ -20,6 +21,7 @@ type API struct {
}
func (api *API) SaveAccounts(ctx context.Context, accounts []accounts.Account) error {
log.Info("[AccountsAPI::SaveAccounts]")
err := api.db.SaveAccounts(accounts)
if err != nil {
return err

View File

@ -43,20 +43,112 @@ func (api *API) GetTransfers(ctx context.Context, start, end *hexutil.Big) ([]Tr
return castToTransferViews(rst), nil
}
// GetTransfersByAddress returns transfers for a single address between two blocks.
func (api *API) GetTransfersByAddress(ctx context.Context, address common.Address, start, end *hexutil.Big) ([]TransferView, error) {
log.Debug("call to get transfers for an address", "address", address, "start", start, "end", end)
if start == nil {
return nil, errors.New("start of the query must be provided. use 0 if you want to load all transfers")
}
type StatsView struct {
BlocksStats map[int64]int64 `json:"blocksStats"`
TransfersCount int64 `json:"transfersCount"`
}
// GetTransfersFromBlock
func (api *API) GetTransfersFromBlock(ctx context.Context, address common.Address, block *hexutil.Big) ([]TransferView, error) {
log.Debug("[WalletAPI:: GetTransfersFromBlock] get transfers from block", "address", address, "block", block)
if api.s.db == nil {
return nil, ErrServiceNotInitialized
}
rst, err := api.s.db.GetTransfersByAddress(address, (*big.Int)(start), (*big.Int)(end))
blocksByAddress := make(map[common.Address][]*big.Int)
blocksByAddress[address] = []*big.Int{block.ToInt()}
txCommand := &loadTransfersCommand{
accounts: []common.Address{address},
db: api.s.db,
chain: api.s.reactor.chain,
client: api.s.client,
blocksByAddress: blocksByAddress,
}
err := txCommand.Command()(ctx)
if err != nil {
return nil, err
}
log.Debug("result from database for address", "address", address, "start", start, "end", end, "len", len(rst))
rst, err := api.s.db.GetTransfersInRange(address, block.ToInt(), block.ToInt())
if err != nil {
return nil, err
}
return castToTransferViews(rst), nil
}
// GetTransfersByAddress returns transfers for a single address
func (api *API) GetTransfersByAddress(ctx context.Context, address common.Address, beforeBlock, limit *hexutil.Big) ([]TransferView, error) {
log.Info("call to get transfers for an address", "address", address, "block", beforeBlock, "limit", limit)
if api.s.db == nil {
return nil, ErrServiceNotInitialized
}
rst, err := api.s.db.GetTransfersByAddress(address, beforeBlock.ToInt(), limit.ToInt().Int64())
if err != nil {
return nil, err
}
transfersCount := big.NewInt(int64(len(rst)))
if limit.ToInt().Cmp(transfersCount) == 1 {
block, err := api.s.db.GetFirstKnownBlock(address)
if err != nil {
return nil, err
}
if block == nil {
return castToTransferViews(rst), nil
}
from, err := findFirstRange(ctx, address, block, api.s.client)
if err != nil {
return nil, err
}
fromByAddress := map[common.Address]*big.Int{address: from}
toByAddress := map[common.Address]*big.Int{address: block}
balanceCache := newBalanceCache()
blocksCommand := &findAndCheckBlockRangeCommand{
accounts: []common.Address{address},
db: api.s.db,
chain: api.s.reactor.chain,
client: api.s.client,
balanceCache: balanceCache,
feed: api.s.feed,
fromByAddress: fromByAddress,
toByAddress: toByAddress,
}
if err = blocksCommand.Command()(ctx); err != nil {
return nil, err
}
blocks, err := api.s.db.GetBlocksByAddress(address, numberOfBlocksCheckedPerIteration)
if err != nil {
return nil, err
}
log.Info("checking blocks again", "blocks", len(blocks))
if len(blocks) > 0 {
txCommand := &loadTransfersCommand{
accounts: []common.Address{address},
db: api.s.db,
chain: api.s.reactor.chain,
client: api.s.client,
}
err = txCommand.Command()(ctx)
if err != nil {
return nil, err
}
rst, err = api.s.db.GetTransfersByAddress(address, beforeBlock.ToInt(), limit.ToInt().Int64())
if err != nil {
return nil, err
}
}
}
return castToTransferViews(rst), nil
}

View File

@ -0,0 +1,94 @@
package wallet
import (
"context"
"math/big"
"sync"
"github.com/ethereum/go-ethereum/common"
)
type balanceCache struct {
// cache maps an address to a map of a block number and the balance of this particular address
cache map[common.Address]map[*big.Int]*big.Int
requestCounter map[common.Address]uint
cacheHitsCounter map[common.Address]uint
rw sync.RWMutex
}
type BalanceCache interface {
BalanceAt(ctx context.Context, client BalanceReader, account common.Address, blockNumber *big.Int) (*big.Int, error)
}
func (b *balanceCache) readCachedBalance(account common.Address, blockNumber *big.Int) *big.Int {
b.rw.RLock()
defer b.rw.RUnlock()
return b.cache[account][blockNumber]
}
func (b *balanceCache) addBalanceToCache(account common.Address, blockNumber *big.Int, balance *big.Int) {
b.rw.Lock()
defer b.rw.Unlock()
_, exists := b.cache[account]
if !exists {
b.cache[account] = make(map[*big.Int]*big.Int)
}
b.cache[account][blockNumber] = balance
}
func (b *balanceCache) incRequestsNumber(account common.Address) {
b.rw.Lock()
defer b.rw.Unlock()
cnt, ok := b.requestCounter[account]
if !ok {
b.requestCounter[account] = 1
}
b.requestCounter[account] = cnt + 1
}
func (b *balanceCache) incCacheHitNumber(account common.Address) {
b.rw.Lock()
defer b.rw.Unlock()
cnt, ok := b.cacheHitsCounter[account]
if !ok {
b.cacheHitsCounter[account] = 1
}
b.cacheHitsCounter[account] = cnt + 1
}
func (b *balanceCache) getStats(account common.Address) (uint, uint) {
b.rw.RLock()
defer b.rw.RUnlock()
return b.requestCounter[account], b.cacheHitsCounter[account]
}
func (b *balanceCache) BalanceAt(ctx context.Context, client BalanceReader, account common.Address, blockNumber *big.Int) (*big.Int, error) {
b.incRequestsNumber(account)
cachedBalance := b.readCachedBalance(account, blockNumber)
if cachedBalance != nil {
b.incCacheHitNumber(account)
return cachedBalance, nil
}
balance, err := client.BalanceAt(ctx, account, blockNumber)
if err != nil {
return nil, err
}
b.addBalanceToCache(account, blockNumber, balance)
return balance, nil
}
func newBalanceCache() *balanceCache {
return &balanceCache{
cache: make(map[common.Address]map[*big.Int]*big.Int),
requestCounter: make(map[common.Address]uint),
cacheHitsCounter: make(map[common.Address]uint),
}
}

View File

@ -13,14 +13,18 @@ import (
"github.com/ethereum/go-ethereum/log"
)
type ethHistoricalCommand struct {
db *Database
eth TransferDownloader
address common.Address
client reactorClient
feed *event.Feed
var numberOfBlocksCheckedPerIteration = 40
from, to *big.Int
type ethHistoricalCommand struct {
db *Database
eth TransferDownloader
address common.Address
client reactorClient
balanceCache *balanceCache
feed *event.Feed
foundHeaders []*DBHeader
from, to, resultingFrom *big.Int
}
func (c *ethHistoricalCommand) Command() Command {
@ -31,47 +35,26 @@ func (c *ethHistoricalCommand) Command() Command {
}
func (c *ethHistoricalCommand) Run(ctx context.Context) (err error) {
if c.from == nil {
from, err := c.db.GetLatestSynced(c.address, ethSync)
if err != nil {
return err
}
if from == nil {
c.from = zero
} else {
c.from = from.Number
}
log.Debug("initialized downloader for eth historical transfers", "address", c.address, "starting at", c.from, "up to", c.to)
}
ctx, cancel := context.WithTimeout(ctx, 10*time.Minute)
defer cancel()
concurrent := NewConcurrentDownloader(ctx)
start := time.Now()
downloadEthConcurrently(concurrent, c.client, c.eth, c.address, c.from, c.to)
select {
case <-concurrent.WaitAsync():
case <-ctx.Done():
log.Error("eth downloader is stuck")
return errors.New("eth downloader is stuck")
}
if concurrent.Error() != nil {
log.Error("failed to dowload transfers using concurrent downloader", "error", err)
return concurrent.Error()
}
transfers := concurrent.Get()
log.Info("eth historical downloader finished successfully", "total transfers", len(transfers), "time", time.Since(start))
err = c.db.ProcessTranfers(transfers, []common.Address{c.address}, headersFromTransfers(transfers), nil, ethSync)
totalRequests, cacheHits := c.balanceCache.getStats(c.address)
log.Info("balance cache before checking range", "total", totalRequests, "cached", totalRequests-cacheHits)
from, headers, err := findBlocksWithEthTransfers(ctx, c.client, c.balanceCache, c.eth, c.address, c.from, c.to)
if err != nil {
log.Error("failed to save downloaded erc20 transfers", "error", err)
return err
}
if len(transfers) > 0 {
// we download all or nothing
c.feed.Send(Event{
Type: EventNewHistory,
BlockNumber: c.from,
Accounts: []common.Address{c.address},
})
c.foundHeaders = headers
c.resultingFrom = from
log.Info("eth historical downloader finished successfully", "address", c.address, "from", from, "to", c.to, "total blocks", len(headers), "time", time.Since(start))
totalRequests, cacheHits = c.balanceCache.getStats(c.address)
log.Info("balance cache after checking range", "total", totalRequests, "cached", totalRequests-cacheHits)
//err = c.db.ProcessBlocks(c.address, from, c.to, headers, ethTransfer)
if err != nil {
log.Error("failed to save found blocks with transfers", "error", err)
return err
}
log.Debug("eth transfers were persisted. command is closed")
return nil
@ -84,8 +67,10 @@ type erc20HistoricalCommand struct {
client reactorClient
feed *event.Feed
iterator *IterativeDownloader
to *DBHeader
iterator *IterativeDownloader
to *big.Int
from *big.Int
foundHeaders []*DBHeader
}
func (c *erc20HistoricalCommand) Command() Command {
@ -96,40 +81,32 @@ func (c *erc20HistoricalCommand) Command() Command {
}
func (c *erc20HistoricalCommand) Run(ctx context.Context) (err error) {
start := time.Now()
if c.iterator == nil {
c.iterator, err = SetupIterativeDownloader(
c.db, c.client, c.address, erc20Sync,
c.erc20, erc20BatchSize, c.to)
c.db, c.client, c.address,
c.erc20, erc20BatchSize, c.to, c.from)
if err != nil {
log.Error("failed to setup historical downloader for erc20")
return err
}
}
for !c.iterator.Finished() {
start := time.Now()
transfers, err := c.iterator.Next(ctx)
headers, _, _, err := c.iterator.Next(ctx)
if err != nil {
log.Error("failed to get next batch", "error", err)
return err
}
headers := headersFromTransfers(transfers)
headers = append(headers, c.iterator.Header())
err = c.db.ProcessTranfers(transfers, []common.Address{c.address}, headers, nil, erc20Sync)
c.foundHeaders = append(c.foundHeaders, headers...)
/*err = c.db.ProcessBlocks(c.address, from, to, headers, erc20Transfer)
if err != nil {
c.iterator.Revert()
log.Error("failed to save downloaded erc20 transfers", "error", err)
log.Error("failed to save downloaded erc20 blocks with transfers", "error", err)
return err
}
if len(transfers) > 0 {
log.Debug("erc20 downloader imported transfers", "len", len(transfers), "time", time.Since(start))
c.feed.Send(Event{
Type: EventNewHistory,
BlockNumber: c.iterator.Header().Number,
Accounts: []common.Address{c.address},
})
}
}*/
}
log.Info("wallet historical downloader for erc20 transfers finished")
log.Info("wallet historical downloader for erc20 transfers finished", "in", time.Since(start))
return nil
}
@ -142,7 +119,7 @@ type newBlocksTransfersCommand struct {
client reactorClient
feed *event.Feed
from, to *DBHeader
initialFrom, from, to *DBHeader
}
func (c *newBlocksTransfersCommand) Command() Command {
@ -173,6 +150,53 @@ func (c *newBlocksTransfersCommand) Verify(parent context.Context) (err error) {
return nil
}
func (c *newBlocksTransfersCommand) getAllTransfers(parent context.Context, from, to uint64) ([]Transfer, map[common.Address][]*DBHeader, error) {
newHeadersByAddress := map[common.Address][]*DBHeader{}
all := []Transfer{}
for n := from; n <= to; n++ {
ctx, cancel := context.WithTimeout(parent, 10*time.Second)
header, err := c.client.HeaderByNumber(ctx, big.NewInt(int64(n)))
cancel()
if err != nil {
return nil, nil, err
}
dbHeader := toDBHeader(header)
log.Info("reactor get transfers", "block", dbHeader.Hash, "number", dbHeader.Number)
transfers, err := c.getTransfers(parent, dbHeader)
if err != nil {
log.Error("failed to get transfers", "header", dbHeader.Hash, "error", err)
return nil, nil, err
}
if len(transfers) > 0 {
for _, transfer := range transfers {
headers, ok := newHeadersByAddress[transfer.Address]
if !ok {
headers = []*DBHeader{}
}
newHeadersByAddress[transfer.Address] = append(headers, dbHeader)
}
}
all = append(all, transfers...)
}
return all, newHeadersByAddress, nil
}
func (c *newBlocksTransfersCommand) saveHeaders(parent context.Context, newHeadersByAddress map[common.Address][]*DBHeader) (err error) {
for _, address := range c.accounts {
headers, ok := newHeadersByAddress[address]
if ok {
err = c.db.SaveBlocks(address, headers)
if err != nil {
log.Error("failed to persist blocks", "error", err)
return err
}
}
}
return nil
}
func (c *newBlocksTransfersCommand) Run(parent context.Context) (err error) {
if c.from == nil {
ctx, cancel := context.WithTimeout(parent, 3*time.Second)
@ -183,7 +207,6 @@ func (c *newBlocksTransfersCommand) Run(parent context.Context) (err error) {
return err
}
c.from = toDBHeader(from)
log.Debug("initialized downloader for new blocks transfers", "starting at", c.from.Number)
}
num := new(big.Int).Add(c.from.Number, one)
ctx, cancel := context.WithTimeout(parent, 5*time.Second)
@ -193,43 +216,48 @@ func (c *newBlocksTransfersCommand) Run(parent context.Context) (err error) {
log.Warn("failed to get latest block", "number", num, "error", err)
return err
}
log.Debug("reactor received new block", "header", latest.Hash())
log.Info("reactor received new block", "header", num)
ctx, cancel = context.WithTimeout(parent, 10*time.Second)
added, removed, err := c.onNewBlock(ctx, c.from, latest)
latestHeader, removed, latestValidSavedBlock, reorgSpotted, err := c.onNewBlock(ctx, c.from, latest)
cancel()
if err != nil {
log.Error("failed to process new header", "header", latest, "error", err)
return err
}
if len(added) == 0 && len(removed) == 0 {
log.Debug("new block already in the database", "block", latest.Number)
if latestHeader == nil && len(removed) == 0 {
log.Info("new block already in the database", "block", latest.Number)
return nil
}
// for each added block get tranfers from downloaders
all := []Transfer{}
for i := range added {
log.Debug("reactor get transfers", "block", added[i].Hash, "number", added[i].Number)
transfers, err := c.getTransfers(parent, added[i])
if err != nil {
log.Error("failed to get transfers", "header", added[i].Hash, "error", err)
continue
latestHeader.Loaded = true
fromN := latest.Number.Uint64()
if reorgSpotted {
if latestValidSavedBlock != nil {
fromN = latestValidSavedBlock.Number.Uint64()
}
if c.initialFrom != nil {
fromN = c.initialFrom.Number.Uint64()
}
log.Debug("reactor adding transfers", "block", added[i].Hash, "number", added[i].Number, "len", len(transfers))
all = append(all, transfers...)
}
err = c.db.ProcessTranfers(all, c.accounts, added, removed, erc20Sync|ethSync)
toN := latestHeader.Number.Uint64()
all, newHeadersByAddress, err := c.getAllTransfers(parent, fromN, toN)
if err != nil {
return err
}
err = c.saveHeaders(parent, newHeadersByAddress)
if err != nil {
return err
}
err = c.db.ProcessTranfers(all, removed)
if err != nil {
log.Error("failed to persist transfers", "error", err)
return err
}
c.from = toDBHeader(latest)
if len(added) == 1 && len(removed) == 0 {
c.feed.Send(Event{
Type: EventNewBlock,
BlockNumber: added[0].Number,
Accounts: uniqueAccountsFromTransfers(all),
})
}
if len(removed) != 0 {
lth := len(removed)
c.feed.Send(Event{
@ -238,40 +266,66 @@ func (c *newBlocksTransfersCommand) Run(parent context.Context) (err error) {
Accounts: uniqueAccountsFromTransfers(all),
})
}
log.Info("before sending new block event", "latest", latestHeader != nil, "removed", len(removed), "len", len(uniqueAccountsFromTransfers(all)))
if latestHeader != nil && len(removed) == 0 {
c.feed.Send(Event{
Type: EventNewBlock,
BlockNumber: latestHeader.Number,
Accounts: uniqueAccountsFromTransfers(all),
})
}
return nil
}
func (c *newBlocksTransfersCommand) onNewBlock(ctx context.Context, from *DBHeader, latest *types.Header) (added, removed []*DBHeader, err error) {
if from == nil {
// first node in the cache
return []*DBHeader{toHead(latest)}, nil, nil
}
func (c *newBlocksTransfersCommand) onNewBlock(ctx context.Context, from *DBHeader, latest *types.Header) (lastestHeader *DBHeader, removed []*DBHeader, lastSavedValidHeader *DBHeader, reorgSpotted bool, err error) {
if from.Hash == latest.ParentHash {
// parent matching from node in the cache. on the same chain.
return []*DBHeader{toHead(latest)}, nil, nil
return toHead(latest), nil, nil, false, nil
}
exists, err := c.db.HeaderExists(latest.Hash())
lastSavedBlock, err := c.db.GetLastSavedBlock()
if err != nil {
return nil, nil, err
return nil, nil, nil, false, err
}
if exists {
return nil, nil, nil
if lastSavedBlock == nil {
return toHead(latest), nil, nil, true, nil
}
header, err := c.client.HeaderByNumber(ctx, lastSavedBlock.Number)
if err != nil {
return nil, nil, nil, false, err
}
if header.Hash() == lastSavedBlock.Hash {
return toHead(latest), nil, lastSavedBlock, true, nil
}
log.Debug("wallet reactor spotted reorg", "last header in db", from.Hash, "new parent", latest.ParentHash)
for from != nil && from.Hash != latest.ParentHash {
removed = append(removed, from)
added = append(added, toHead(latest))
latest, err = c.client.HeaderByHash(ctx, latest.ParentHash)
for lastSavedBlock != nil {
removed = append(removed, lastSavedBlock)
lastSavedBlock, err = c.db.GetLastSavedBlockBefore(lastSavedBlock.Number)
if err != nil {
return nil, nil, err
return nil, nil, nil, false, err
}
from, err = c.db.GetHeaderByNumber(new(big.Int).Sub(latest.Number, one))
if lastSavedBlock == nil {
continue
}
header, err := c.client.HeaderByNumber(ctx, lastSavedBlock.Number)
if err != nil {
return nil, nil, err
return nil, nil, nil, false, err
}
// the last saved block is still valid
if header.Hash() == lastSavedBlock.Hash {
return toHead(latest), nil, lastSavedBlock, true, nil
}
}
added = append(added, toHead(latest))
return added, removed, nil
return toHead(latest), removed, lastSavedBlock, true, nil
}
func (c *newBlocksTransfersCommand) getTransfers(parent context.Context, header *DBHeader) ([]Transfer, error) {
@ -307,42 +361,137 @@ type controlCommand struct {
// run fast indexing for every accont up to canonical chain head minus safety depth.
// every account will run it from last synced header.
func (c *controlCommand) fastIndex(ctx context.Context, to *DBHeader) error {
func (c *findAndCheckBlockRangeCommand) fastIndex(ctx context.Context, bCache *balanceCache, fromByAddress map[common.Address]*big.Int, toByAddress map[common.Address]*big.Int) (map[common.Address]*big.Int, map[common.Address][]*DBHeader, error) {
start := time.Now()
group := NewGroup(ctx)
for _, address := range c.accounts {
erc20 := &erc20HistoricalCommand{
db: c.db,
erc20: NewERC20TransfersDownloader(c.client, []common.Address{address}, types.NewEIP155Signer(c.chain)),
client: c.client,
feed: c.feed,
address: address,
to: to,
}
group.Add(erc20.Command())
commands := make([]*ethHistoricalCommand, len(c.accounts))
for i, address := range c.accounts {
eth := &ethHistoricalCommand{
db: c.db,
client: c.client,
address: address,
db: c.db,
client: c.client,
balanceCache: bCache,
address: address,
eth: &ETHTransferDownloader{
client: c.client,
accounts: []common.Address{address},
signer: types.NewEIP155Signer(c.chain),
db: c.db,
},
feed: c.feed,
to: to.Number,
from: fromByAddress[address],
to: toByAddress[address],
}
commands[i] = eth
group.Add(eth.Command())
}
select {
case <-ctx.Done():
return nil, nil, ctx.Err()
case <-group.WaitAsync():
resultingFromByAddress := map[common.Address]*big.Int{}
headers := map[common.Address][]*DBHeader{}
for _, command := range commands {
resultingFromByAddress[command.address] = command.resultingFrom
headers[command.address] = command.foundHeaders
}
log.Info("fast indexer finished", "in", time.Since(start))
return resultingFromByAddress, headers, nil
}
}
// run fast indexing for every accont up to canonical chain head minus safety depth.
// every account will run it from last synced header.
func (c *findAndCheckBlockRangeCommand) fastIndexErc20(ctx context.Context, fromByAddress map[common.Address]*big.Int, toByAddress map[common.Address]*big.Int) (map[common.Address][]*DBHeader, error) {
start := time.Now()
group := NewGroup(ctx)
commands := make([]*erc20HistoricalCommand, len(c.accounts))
for i, address := range c.accounts {
erc20 := &erc20HistoricalCommand{
db: c.db,
erc20: NewERC20TransfersDownloader(c.client, []common.Address{address}, types.NewEIP155Signer(c.chain)),
client: c.client,
feed: c.feed,
address: address,
from: fromByAddress[address],
to: toByAddress[address],
foundHeaders: []*DBHeader{},
}
commands[i] = erc20
group.Add(erc20.Command())
}
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-group.WaitAsync():
headres := map[common.Address][]*DBHeader{}
for _, command := range commands {
headres[command.address] = command.foundHeaders
}
log.Info("fast indexer Erc20 finished", "in", time.Since(start))
return headres, nil
}
}
func getTransfersByBlocks(ctx context.Context, db *Database, downloader *ETHTransferDownloader, address common.Address, blocks []*big.Int) ([]Transfer, error) {
allTransfers := []Transfer{}
for _, block := range blocks {
transfers, err := downloader.GetTransfersByNumber(ctx, block)
if err != nil {
return nil, err
}
log.Debug("loadTransfers", "block", block, "new transfers", len(transfers))
if len(transfers) > 0 {
allTransfers = append(allTransfers, transfers...)
}
}
return allTransfers, nil
}
func loadTransfers(ctx context.Context, accounts []common.Address, db *Database, client *ethclient.Client, chain *big.Int, limit int, blocksByAddress map[common.Address][]*big.Int) error {
start := time.Now()
group := NewGroup(ctx)
for _, address := range accounts {
blocks, ok := blocksByAddress[address]
if !ok {
blocks, _ = db.GetBlocksByAddress(address, numberOfBlocksCheckedPerIteration)
}
for _, block := range blocks {
erc20 := &transfersCommand{
db: db,
client: client,
address: address,
eth: &ETHTransferDownloader{
client: client,
accounts: []common.Address{address},
signer: types.NewEIP155Signer(chain),
db: db,
},
block: block,
}
group.Add(erc20.Command())
}
}
select {
case <-ctx.Done():
return ctx.Err()
case <-group.WaitAsync():
log.Debug("fast indexer finished", "in", time.Since(start))
log.Info("loadTransfers finished", "in", time.Since(start))
return nil
}
}
func (c *controlCommand) LoadTransfers(ctx context.Context, downloader *ETHTransferDownloader, limit int) error {
return loadTransfers(ctx, c.accounts, c.db, c.client, c.chain, limit, make(map[common.Address][]*big.Int))
}
/*
// verifyLastSynced verifies that last header that was added to the database is still in the canonical chain.
// it is done by downloading configured number of parents for the last header in the db.
func (c *controlCommand) verifyLastSynced(parent context.Context, last *DBHeader, head *types.Header) error {
@ -357,7 +506,7 @@ func (c *controlCommand) verifyLastSynced(parent context.Context, last *DBHeader
if err != nil {
return err
}
log.Debug("spawn reorg verifier", "from", last.Number, "to", header.Number)
log.Info("spawn reorg verifier", "from", last.Number, "to", header.Number)
// TODO(dshulyak) make a standalone command that
// doesn't manage transfers and has an upper limit
cmd := &newBlocksTransfersCommand{
@ -373,28 +522,100 @@ func (c *controlCommand) verifyLastSynced(parent context.Context, last *DBHeader
}
return cmd.Command()(parent)
}
*/
func findFirstRange(c context.Context, account common.Address, initialTo *big.Int, client *ethclient.Client) (*big.Int, error) {
from := big.NewInt(0)
to := initialTo
goal := uint64(20)
firstNonce, err := client.NonceAt(c, account, to)
log.Info("find range with 20 <= len(tx) <= 25", "account", account, "firstNonce", firstNonce, "from", from, "to", to)
if err != nil {
return nil, err
}
if firstNonce <= goal {
return zero, nil
}
nonceDiff := firstNonce
iterations := 0
for iterations < 50 {
iterations = iterations + 1
if nonceDiff > goal {
// from = (from + to) / 2
from = from.Add(from, to)
from = from.Div(from, big.NewInt(2))
} else {
// from = from - (from + to) / 2
// to = from
diff := big.NewInt(0).Sub(to, from)
diff.Div(diff, big.NewInt(2))
to = big.NewInt(from.Int64())
from.Sub(from, diff)
}
fromNonce, err := client.NonceAt(c, account, from)
if err != nil {
return nil, err
}
nonceDiff = firstNonce - fromNonce
log.Info("next nonce", "from", from, "n", fromNonce, "diff", firstNonce-fromNonce)
if goal <= nonceDiff && nonceDiff <= (goal+5) {
log.Info("range found", "account", account, "from", from, "to", to)
return from, nil
}
}
log.Info("range found", "account", account, "from", from, "to", to)
return from, nil
}
func findFirstRanges(c context.Context, accounts []common.Address, initialTo *big.Int, client *ethclient.Client) (map[common.Address]*big.Int, error) {
res := map[common.Address]*big.Int{}
for _, address := range accounts {
from, err := findFirstRange(c, address, initialTo, client)
if err != nil {
return nil, err
}
res[address] = from
}
return res, nil
}
func (c *controlCommand) Run(parent context.Context) error {
log.Debug("start control command")
log.Info("start control command")
ctx, cancel := context.WithTimeout(parent, 3*time.Second)
head, err := c.client.HeaderByNumber(ctx, nil)
cancel()
if err != nil {
return err
}
log.Debug("current head is", "block number", head.Number)
last, err := c.db.GetLastHead()
c.feed.Send(Event{
Type: EventFetchingRecentHistory,
Accounts: c.accounts,
})
log.Info("current head is", "block number", head.Number)
lastKnownEthBlocks, accountsWithoutHistory, err := c.db.GetLastKnownBlockByAddresses(c.accounts)
if err != nil {
log.Error("failed to load last head from database", "error", err)
return err
}
if last != nil {
err = c.verifyLastSynced(parent, last, head)
if err != nil {
log.Error("failed verification for last header in canonical chain", "error", err)
return err
}
fromMap, err := findFirstRanges(parent, accountsWithoutHistory, head.Number, c.client)
if err != nil {
return err
}
target := new(big.Int).Sub(head.Number, c.safetyDepth)
if target.Cmp(zero) <= 0 {
target = zero
@ -405,23 +626,75 @@ func (c *controlCommand) Run(parent context.Context) error {
if err != nil {
return err
}
log.Debug("run fast indexing for the transfers", "up to", head.Number)
err = c.fastIndex(parent, toDBHeader(head))
fromByAddress := map[common.Address]*big.Int{}
toByAddress := map[common.Address]*big.Int{}
for _, address := range c.accounts {
from, ok := lastKnownEthBlocks[address]
if !ok {
from = fromMap[address]
}
fromByAddress[address] = from
toByAddress[address] = head.Number
}
bCache := newBalanceCache()
cmnd := &findAndCheckBlockRangeCommand{
accounts: c.accounts,
db: c.db,
chain: c.chain,
client: c.client,
balanceCache: bCache,
feed: c.feed,
fromByAddress: fromByAddress,
toByAddress: toByAddress,
}
err = cmnd.Command()(parent)
if err != nil {
return err
}
log.Debug("watching new blocks", "start from", head.Number)
cmd := &newBlocksTransfersCommand{
db: c.db,
chain: c.chain,
downloader := &ETHTransferDownloader{
client: c.client,
accounts: c.accounts,
eth: c.eth,
erc20: c.erc20,
feed: c.feed,
from: toDBHeader(head),
signer: types.NewEIP155Signer(c.chain),
db: c.db,
}
return cmd.Command()(parent)
err = c.LoadTransfers(parent, downloader, 40)
if err != nil {
return err
}
c.feed.Send(Event{
Type: EventRecentHistoryReady,
Accounts: c.accounts,
BlockNumber: head.Number,
})
log.Info("watching new blocks", "start from", head.Number)
cmd := &newBlocksTransfersCommand{
db: c.db,
chain: c.chain,
client: c.client,
accounts: c.accounts,
eth: c.eth,
erc20: c.erc20,
feed: c.feed,
initialFrom: toDBHeader(head),
from: toDBHeader(head),
}
err = cmd.Command()(parent)
if err != nil {
log.Warn("error on running newBlocksTransfersCommand", "err", err)
return err
}
log.Info("end control command")
return err
}
func (c *controlCommand) Command() Command {
@ -431,23 +704,6 @@ func (c *controlCommand) Command() Command {
}.Run
}
func headersFromTransfers(transfers []Transfer) []*DBHeader {
byHash := map[common.Hash]struct{}{}
rst := []*DBHeader{}
for i := range transfers {
_, exists := byHash[transfers[i].BlockHash]
if exists {
continue
}
rst = append(rst, &DBHeader{
Hash: transfers[i].BlockHash,
Number: transfers[i].BlockNumber,
Timestamp: transfers[i].Timestamp,
})
}
return rst
}
func uniqueAccountsFromTransfers(transfers []Transfer) []common.Address {
accounts := []common.Address{}
unique := map[common.Address]struct{}{}
@ -461,3 +717,114 @@ func uniqueAccountsFromTransfers(transfers []Transfer) []common.Address {
}
return accounts
}
type transfersCommand struct {
db *Database
eth *ETHTransferDownloader
block *big.Int
address common.Address
client reactorClient
}
func (c *transfersCommand) Command() Command {
return FiniteCommand{
Interval: 5 * time.Second,
Runable: c.Run,
}.Run
}
func (c *transfersCommand) Run(ctx context.Context) (err error) {
allTransfers, err := getTransfersByBlocks(ctx, c.db, c.eth, c.address, []*big.Int{c.block})
if err != nil {
log.Info("getTransfersByBlocks error", "error", err)
return err
}
err = c.db.SaveTranfers(c.address, allTransfers, []*big.Int{c.block})
if err != nil {
log.Error("SaveTranfers error", "error", err)
return err
}
log.Debug("transfers loaded", "address", c.address, "len", len(allTransfers))
return nil
}
type loadTransfersCommand struct {
accounts []common.Address
db *Database
chain *big.Int
client *ethclient.Client
blocksByAddress map[common.Address][]*big.Int
}
func (c *loadTransfersCommand) Command() Command {
return FiniteCommand{
Interval: 5 * time.Second,
Runable: c.Run,
}.Run
}
func (c *loadTransfersCommand) LoadTransfers(ctx context.Context, downloader *ETHTransferDownloader, limit int, blocksByAddress map[common.Address][]*big.Int) error {
return loadTransfers(ctx, c.accounts, c.db, c.client, c.chain, limit, blocksByAddress)
}
func (c *loadTransfersCommand) Run(parent context.Context) (err error) {
downloader := &ETHTransferDownloader{
client: c.client,
accounts: c.accounts,
signer: types.NewEIP155Signer(c.chain),
db: c.db,
}
err = c.LoadTransfers(parent, downloader, 40, c.blocksByAddress)
if err != nil {
return err
}
return
}
type findAndCheckBlockRangeCommand struct {
accounts []common.Address
db *Database
chain *big.Int
client *ethclient.Client
balanceCache *balanceCache
feed *event.Feed
fromByAddress map[common.Address]*big.Int
toByAddress map[common.Address]*big.Int
}
func (c *findAndCheckBlockRangeCommand) Command() Command {
return FiniteCommand{
Interval: 5 * time.Second,
Runable: c.Run,
}.Run
}
func (c *findAndCheckBlockRangeCommand) Run(parent context.Context) (err error) {
log.Debug("start findAndCHeckBlockRangeCommand")
newFromByAddress, ethHeadersByAddress, err := c.fastIndex(parent, c.balanceCache, c.fromByAddress, c.toByAddress)
if err != nil {
return err
}
erc20HeadersByAddress, err := c.fastIndexErc20(parent, newFromByAddress, c.toByAddress)
if err != nil {
return err
}
for _, address := range c.accounts {
ethHeaders := ethHeadersByAddress[address]
erc20Headers := erc20HeadersByAddress[address]
allHeaders := append(ethHeaders, erc20Headers...)
log.Debug("saving headers", "len", len(allHeaders), "address")
err = c.db.ProcessBlocks(address, newFromByAddress[address], c.toByAddress[address], allHeaders)
if err != nil {
return err
}
}
return
}

View File

@ -49,6 +49,7 @@ func (s *NewBlocksSuite) SetupTest() {
eth: &ETHTransferDownloader{
client: s.backend.Client,
signer: s.backend.Signer,
db: s.db,
accounts: []common.Address{s.address},
},
feed: s.feed,
@ -128,6 +129,7 @@ func (s *NewBlocksSuite) TestReorg() {
ctx, cancel = context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
s.cmd.from = toDBHeader(s.backend.Ethereum.BlockChain().GetHeaderByNumber(15))
s.cmd.initialFrom = toDBHeader(s.backend.Ethereum.BlockChain().GetHeaderByNumber(15))
s.Require().EqualError(s.runCmdUntilError(ctx), "not found")
transfers, err := s.db.GetTransfers(big.NewInt(0), nil)
@ -151,7 +153,7 @@ func (s *NewBlocksSuite) TestReorg() {
s.Require().EqualError(s.runCmdUntilError(ctx), "not found")
close(events)
expected := []Event{{Type: EventReorg, BlockNumber: big.NewInt(16)}, {Type: EventNewBlock, BlockNumber: big.NewInt(25)}}
expected := []Event{{Type: EventReorg, BlockNumber: big.NewInt(21)}, {Type: EventNewBlock, BlockNumber: big.NewInt(25)}}
i := 0
for ev := range events {
s.Require().Equal(expected[i].Type, ev.Type)
@ -177,7 +179,8 @@ func (s *NewBlocksSuite) downloadHistorical() {
s.Require().NoError(err)
eth := &ethHistoricalCommand{
db: s.db,
db: s.db,
balanceCache: newBalanceCache(),
eth: &ETHTransferDownloader{
client: s.backend.Client,
signer: s.backend.Signer,
@ -186,12 +189,13 @@ func (s *NewBlocksSuite) downloadHistorical() {
feed: s.feed,
address: s.address,
client: s.backend.Client,
from: big.NewInt(0),
to: s.backend.Ethereum.BlockChain().CurrentBlock().Number(),
}
s.Require().NoError(eth.Run(context.Background()), "eth historical command failed to sync transfers")
transfers, err := s.db.GetTransfers(big.NewInt(0), nil)
//dbHeaders, err := s.db.GetBlocks()
s.Require().NoError(err)
s.Require().Len(transfers, 2)
s.Require().Len(eth.foundHeaders, 2)
}
func (s *NewBlocksSuite) reorgHistorical() {
@ -214,10 +218,6 @@ func (s *NewBlocksSuite) TestSafetyBufferFailure() {
s.downloadHistorical()
s.reorgHistorical()
transfers, err := s.db.GetTransfers(big.NewInt(0), nil)
s.Require().NoError(err)
s.Require().Len(transfers, 1)
}
func (s *NewBlocksSuite) TestSafetyBufferSuccess() {

View File

@ -3,7 +3,11 @@ package wallet
import (
"context"
"math/big"
"sort"
"sync"
"time"
"github.com/pkg/errors"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
@ -22,10 +26,14 @@ type ConcurrentDownloader struct {
}
type Result struct {
mu sync.Mutex
transfers []Transfer
mu sync.Mutex
transfers []Transfer
headers []*DBHeader
blockRanges [][]*big.Int
}
var errDownloaderStuck = errors.New("eth downloader is stuck")
func (r *Result) Push(transfers ...Transfer) {
r.mu.Lock()
defer r.mu.Unlock()
@ -40,42 +48,153 @@ func (r *Result) Get() []Transfer {
return rst
}
func (r *Result) PushHeader(block *DBHeader) {
r.mu.Lock()
defer r.mu.Unlock()
r.headers = append(r.headers, block)
}
func (r *Result) GetHeaders() []*DBHeader {
r.mu.Lock()
defer r.mu.Unlock()
rst := make([]*DBHeader, len(r.headers))
copy(rst, r.headers)
return rst
}
func (r *Result) PushRange(blockRange []*big.Int) {
r.mu.Lock()
defer r.mu.Unlock()
r.blockRanges = append(r.blockRanges, blockRange)
}
func (r *Result) GetRanges() [][]*big.Int {
r.mu.Lock()
defer r.mu.Unlock()
rst := make([][]*big.Int, len(r.blockRanges))
copy(rst, r.blockRanges)
r.blockRanges = [][]*big.Int{}
return rst
}
// TransferDownloader downloads transfers from single block using number.
type TransferDownloader interface {
GetTransfersByNumber(context.Context, *big.Int) ([]Transfer, error)
}
func downloadEthConcurrently(c *ConcurrentDownloader, client BalanceReader, downloader TransferDownloader, account common.Address, low, high *big.Int) {
c.Add(func(ctx context.Context) error {
if low.Cmp(high) >= 0 {
return nil
}
log.Debug("eth transfers comparing blocks", "low", low, "high", high)
lb, err := client.BalanceAt(ctx, account, low)
if err != nil {
return err
}
hb, err := client.BalanceAt(ctx, account, high)
if err != nil {
return err
}
if lb.Cmp(hb) == 0 {
log.Debug("balances are equal", "low", low, "high", high)
return nil
}
if new(big.Int).Sub(high, low).Cmp(one) == 0 {
transfers, err := downloader.GetTransfersByNumber(ctx, high)
func checkRanges(parent context.Context, client reactorClient, cache BalanceCache, downloader TransferDownloader, account common.Address, ranges [][]*big.Int) ([][]*big.Int, []*DBHeader, error) {
ctx, cancel := context.WithTimeout(parent, 30*time.Second)
defer cancel()
c := NewConcurrentDownloader(ctx)
for _, blocksRange := range ranges {
from := blocksRange[0]
to := blocksRange[1]
c.Add(func(ctx context.Context) error {
if from.Cmp(to) >= 0 {
return nil
}
log.Debug("eth transfers comparing blocks", "from", from, "to", to)
lb, err := cache.BalanceAt(ctx, client, account, from)
if err != nil {
return err
}
c.Push(transfers...)
hb, err := cache.BalanceAt(ctx, client, account, to)
if err != nil {
return err
}
if lb.Cmp(hb) == 0 {
log.Debug("balances are equal", "from", from, "to", to)
// In case if balances are equal but non zero we want to check if
// eth_getTransactionCount return different values, because there
// still might be transactions
if lb.Cmp(zero) != 0 {
return nil
}
ln, err := client.NonceAt(ctx, account, from)
if err != nil {
return err
}
hn, err := client.NonceAt(ctx, account, to)
if err != nil {
return err
}
if ln == hn {
log.Debug("transaction count is also equal", "from", from, "to", to)
return nil
}
}
if new(big.Int).Sub(to, from).Cmp(one) == 0 {
header, err := client.HeaderByNumber(ctx, to)
if err != nil {
return err
}
c.PushHeader(toDBHeader(header))
return nil
}
mid := new(big.Int).Add(from, to)
mid = mid.Div(mid, two)
_, err = cache.BalanceAt(ctx, client, account, mid)
if err != nil {
return err
}
log.Debug("balances are not equal", "from", from, "mid", mid, "to", to)
c.PushRange([]*big.Int{from, mid})
c.PushRange([]*big.Int{mid, to})
return nil
}
mid := new(big.Int).Add(low, high)
mid = mid.Div(mid, two)
log.Debug("balances are not equal. spawn two concurrent downloaders", "low", low, "mid", mid, "high", high)
downloadEthConcurrently(c, client, downloader, account, low, mid)
downloadEthConcurrently(c, client, downloader, account, mid, high)
return nil
})
})
}
select {
case <-c.WaitAsync():
case <-ctx.Done():
return nil, nil, errDownloaderStuck
}
if c.Error() != nil {
return nil, nil, errors.Wrap(c.Error(), "failed to dowload transfers using concurrent downloader")
}
return c.GetRanges(), c.GetHeaders(), nil
}
func findBlocksWithEthTransfers(parent context.Context, client reactorClient, cache BalanceCache, downloader TransferDownloader, account common.Address, low, high *big.Int) (from *big.Int, headers []*DBHeader, err error) {
ranges := [][]*big.Int{{low, high}}
minBlock := big.NewInt(low.Int64())
headers = []*DBHeader{}
var lvl = 1
for len(ranges) > 0 && lvl <= 30 {
log.Debug("check blocks ranges", "lvl", lvl, "ranges len", len(ranges))
lvl++
newRanges, newHeaders, err := checkRanges(parent, client, cache, downloader, account, ranges)
if err != nil {
return nil, nil, err
}
headers = append(headers, newHeaders...)
if len(newRanges) > 0 {
log.Debug("found new ranges", "account", account, "lvl", lvl, "new ranges len", len(newRanges))
}
if len(newRanges) > 60 {
sort.SliceStable(newRanges, func(i, j int) bool {
return newRanges[i][0].Cmp(newRanges[j][0]) == 1
})
newRanges = newRanges[:60]
minBlock = newRanges[len(newRanges)-1][0]
}
ranges = newRanges
}
return minBlock, headers, err
}

View File

@ -10,6 +10,8 @@ import (
"github.com/stretchr/testify/require"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/common"
)
@ -57,6 +59,22 @@ func (f balancesFixture) BalanceAt(ctx context.Context, account common.Address,
return f[index], nil
}
func (f balancesFixture) NonceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (uint64, error) {
return uint64(0), nil
}
func (f balancesFixture) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) {
return &types.Header{
Number: number,
}, nil
}
func (f balancesFixture) HeaderByHash(ctx context.Context, hash common.Hash) (*types.Header, error) {
return &types.Header{
Number: big.NewInt(0),
}, nil
}
type batchesFixture [][]Transfer
func (f batchesFixture) GetTransfersByNumber(ctx context.Context, number *big.Int) (rst []Transfer, err error) {
@ -71,7 +89,7 @@ func TestConcurrentEthDownloader(t *testing.T) {
type options struct {
balances balancesFixture
batches batchesFixture
result []Transfer
result []DBHeader
last *big.Int
}
type testCase struct {
@ -92,7 +110,7 @@ func TestConcurrentEthDownloader(t *testing.T) {
last: big.NewInt(3),
balances: balancesFixture{big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(10)},
batches: batchesFixture{{}, {}, {}, {{BlockNumber: big.NewInt(3)}, {BlockNumber: big.NewInt(3)}}},
result: []Transfer{{BlockNumber: big.NewInt(3)}, {BlockNumber: big.NewInt(3)}},
result: []DBHeader{{Number: big.NewInt(3)}},
},
},
{
@ -101,7 +119,7 @@ func TestConcurrentEthDownloader(t *testing.T) {
last: big.NewInt(3),
balances: balancesFixture{big.NewInt(0), big.NewInt(3), big.NewInt(7), big.NewInt(10)},
batches: batchesFixture{{}, {{BlockNumber: big.NewInt(1)}}, {{BlockNumber: big.NewInt(2)}}, {{BlockNumber: big.NewInt(3)}}},
result: []Transfer{{BlockNumber: big.NewInt(1)}, {BlockNumber: big.NewInt(2)}, {BlockNumber: big.NewInt(3)}},
result: []DBHeader{{Number: big.NewInt(1)}, {Number: big.NewInt(2)}, {Number: big.NewInt(3)}},
},
},
} {
@ -109,19 +127,19 @@ func TestConcurrentEthDownloader(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
concurrent := NewConcurrentDownloader(ctx)
downloadEthConcurrently(
concurrent, tc.options.balances, tc.options.batches,
_, headers, _ := findBlocksWithEthTransfers(
ctx, tc.options.balances, newBalanceCache(), tc.options.batches,
common.Address{}, zero, tc.options.last)
concurrent.Wait()
require.NoError(t, concurrent.Error())
rst := concurrent.Get()
require.Len(t, rst, len(tc.options.result))
require.Len(t, headers, len(tc.options.result))
sort.Slice(rst, func(i, j int) bool {
return rst[i].BlockNumber.Cmp(rst[j].BlockNumber) < 0
})
for i := range rst {
/*for i := range rst {
require.Equal(t, tc.options.result[i].BlockNumber, rst[i].BlockNumber)
}
}*/
})
}
}

View File

@ -10,15 +10,21 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
)
// DBHeader fields from header that are stored in database.
type DBHeader struct {
Number *big.Int
Hash common.Hash
Timestamp uint64
Number *big.Int
Hash common.Hash
Timestamp uint64
Erc20Transfer *Transfer
Network uint64
Address common.Address
// Head is true if the block was a head at the time it was pulled from chain.
Head bool
// Loaded is true if trasfers from this block has been already fetched
Loaded bool
}
func toDBHeader(header *types.Header) *DBHeader {
@ -26,6 +32,7 @@ func toDBHeader(header *types.Header) *DBHeader {
Hash: header.Hash(),
Number: header.Number,
Timestamp: header.Time,
Loaded: false,
}
}
@ -38,12 +45,6 @@ func toHead(header *types.Header) *DBHeader {
// SyncOption is used to specify that application processed transfers for that block.
type SyncOption uint
const (
// sync options
ethSync SyncOption = 1
erc20Sync SyncOption = 2
)
// SQLBigInt type for storing uint256 in the databse.
// FIXME(dshulyak) SQL big int is max 64 bits. Maybe store as bytes in big endian and hope
// that lexographical sorting will work.
@ -111,8 +112,61 @@ func (db Database) Close() error {
return db.db.Close()
}
func (db Database) ProcessBlocks(account common.Address, from *big.Int, to *big.Int, headers []*DBHeader) (err error) {
var (
tx *sql.Tx
)
tx, err = db.db.Begin()
if err != nil {
return err
}
defer func() {
if err == nil {
err = tx.Commit()
return
}
_ = tx.Rollback()
}()
err = insertBlocksWithTransactions(tx, account, db.network, headers)
if err != nil {
return
}
err = insertRange(tx, account, db.network, from, to)
if err != nil {
return
}
return
}
func (db Database) SaveBlocks(account common.Address, headers []*DBHeader) (err error) {
var (
tx *sql.Tx
)
tx, err = db.db.Begin()
if err != nil {
return err
}
defer func() {
if err == nil {
err = tx.Commit()
return
}
_ = tx.Rollback()
}()
err = insertBlocksWithTransactions(tx, account, db.network, headers)
if err != nil {
return
}
return
}
// ProcessTranfers atomically adds/removes blocks and adds new tranfers.
func (db Database) ProcessTranfers(transfers []Transfer, accounts []common.Address, added, removed []*DBHeader, option SyncOption) (err error) {
func (db Database) ProcessTranfers(transfers []Transfer, removed []*DBHeader) (err error) {
var (
tx *sql.Tx
)
@ -131,21 +185,46 @@ func (db Database) ProcessTranfers(transfers []Transfer, accounts []common.Addre
if err != nil {
return
}
err = insertHeaders(tx, db.network, added)
err = updateOrInsertTransfers(tx, db.network, transfers)
if err != nil {
return
}
err = insertTransfers(tx, db.network, transfers)
if err != nil {
return
}
err = updateAccounts(tx, db.network, accounts, added, option)
return
}
// GetTransfersByAddress loads transfers for a given address between two blocks.
func (db *Database) GetTransfersByAddress(address common.Address, start, end *big.Int) (rst []Transfer, err error) {
query := newTransfersQuery().FilterNetwork(db.network).FilterAddress(address).FilterStart(start).FilterEnd(end)
// SaveTranfers
func (db Database) SaveTranfers(address common.Address, transfers []Transfer, blocks []*big.Int) (err error) {
var (
tx *sql.Tx
)
tx, err = db.db.Begin()
if err != nil {
return err
}
defer func() {
if err == nil {
err = tx.Commit()
return
}
_ = tx.Rollback()
}()
err = updateOrInsertTransfers(tx, db.network, transfers)
if err != nil {
return
}
err = markBlocksAsLoaded(tx, address, db.network, blocks)
if err != nil {
return
}
return
}
// GetTransfersInRange loads transfers for a given address between two blocks.
func (db *Database) GetTransfersInRange(address common.Address, start, end *big.Int) (rst []Transfer, err error) {
query := newTransfersQuery().FilterNetwork(db.network).FilterAddress(address).FilterStart(start).FilterEnd(end).FilterLoaded(1)
rows, err := db.db.Query(query.String(), query.Args()...)
if err != nil {
return
@ -154,9 +233,228 @@ func (db *Database) GetTransfersByAddress(address common.Address, start, end *bi
return query.Scan(rows)
}
// GetTransfersByAddress loads transfers for a given address between two blocks.
func (db *Database) GetTransfersByAddress(address common.Address, fromBlock *big.Int, limit int64) (rst []Transfer, err error) {
query := newTransfersQuery().
FilterNetwork(db.network).
FilterAddress(address).
FilterEnd(fromBlock).
FilterLoaded(1).
Limit(limit)
rows, err := db.db.Query(query.String(), query.Args()...)
if err != nil {
return
}
defer rows.Close()
return query.Scan(rows)
}
// GetTransfersByAddress loads transfers for a given address between two blocks.
func (db *Database) GetBlocksByAddress(address common.Address, limit int) (rst []*big.Int, err error) {
query := `SELECT blk_number FROM blocks
WHERE address = ? AND network_id = ? AND loaded = 0
ORDER BY blk_number DESC
LIMIT ?`
rows, err := db.db.Query(query, address, db.network, limit)
if err != nil {
return
}
defer rows.Close()
for rows.Next() {
block := &big.Int{}
err = rows.Scan((*SQLBigInt)(block))
if err != nil {
return nil, err
}
rst = append(rst, block)
}
return rst, nil
}
func (db *Database) RemoveBlockWithTransfer(address common.Address, block *big.Int) error {
query := `DELETE FROM blocks
WHERE address = ?
AND blk_number = ?
AND network_id = ?`
_, err := db.db.Exec(query, address, (*SQLBigInt)(block), db.network)
if err != nil {
return err
}
return nil
}
func (db *Database) GetLastBlockByAddress(address common.Address, limit int) (rst *big.Int, err error) {
query := `SELECT * FROM
(SELECT blk_number FROM blocks WHERE address = ? AND network_id = ? ORDER BY blk_number DESC LIMIT ?)
ORDER BY blk_number LIMIT 1`
rows, err := db.db.Query(query, address, db.network, limit)
if err != nil {
return
}
defer rows.Close()
if rows.Next() {
block := &big.Int{}
err = rows.Scan((*SQLBigInt)(block))
if err != nil {
return nil, err
}
return block, nil
}
return nil, nil
}
func (db *Database) GetLastSavedBlock() (rst *DBHeader, err error) {
query := `SELECT blk_number, blk_hash
FROM blocks
WHERE network_id = ?
ORDER BY blk_number DESC LIMIT 1`
rows, err := db.db.Query(query, db.network)
if err != nil {
return
}
defer rows.Close()
if rows.Next() {
header := &DBHeader{Hash: common.Hash{}, Number: new(big.Int)}
err = rows.Scan((*SQLBigInt)(header.Number), &header.Hash)
if err != nil {
return nil, err
}
return header, nil
}
return nil, nil
}
func (db *Database) GetBlocks() (rst []*DBHeader, err error) {
query := `SELECT blk_number, blk_hash, address FROM blocks`
rows, err := db.db.Query(query, db.network)
if err != nil {
return
}
defer rows.Close()
rst = []*DBHeader{}
for rows.Next() {
header := &DBHeader{Hash: common.Hash{}, Number: new(big.Int)}
err = rows.Scan((*SQLBigInt)(header.Number), &header.Hash, &header.Address)
if err != nil {
return nil, err
}
rst = append(rst, header)
}
return rst, nil
}
func (db *Database) GetLastSavedBlockBefore(block *big.Int) (rst *DBHeader, err error) {
query := `SELECT blk_number, blk_hash
FROM blocks
WHERE network_id = ? AND blk_number < ?
ORDER BY blk_number DESC LIMIT 1`
rows, err := db.db.Query(query, db.network, (*SQLBigInt)(block))
if err != nil {
return
}
defer rows.Close()
if rows.Next() {
header := &DBHeader{Hash: common.Hash{}, Number: new(big.Int)}
err = rows.Scan((*SQLBigInt)(header.Number), &header.Hash)
if err != nil {
return nil, err
}
return header, nil
}
return nil, nil
}
func (db *Database) GetFirstKnownBlock(address common.Address) (rst *big.Int, err error) {
query := `SELECT blk_from FROM blocks_ranges
WHERE address = ?
AND network_id = ?
ORDER BY blk_from
LIMIT 1`
rows, err := db.db.Query(query, address, db.network)
if err != nil {
return
}
defer rows.Close()
if rows.Next() {
block := &big.Int{}
err = rows.Scan((*SQLBigInt)(block))
if err != nil {
return nil, err
}
return block, nil
}
return nil, nil
}
func (db *Database) GetLastKnownBlockByAddress(address common.Address) (rst *big.Int, err error) {
query := `SELECT blk_to FROM blocks_ranges
WHERE address = ?
AND network_id = ?
ORDER BY blk_to DESC
LIMIT 1`
rows, err := db.db.Query(query, address, db.network)
if err != nil {
return
}
defer rows.Close()
if rows.Next() {
block := &big.Int{}
err = rows.Scan((*SQLBigInt)(block))
if err != nil {
return nil, err
}
return block, nil
}
return nil, nil
}
func (db *Database) GetLastKnownBlockByAddresses(addresses []common.Address) (map[common.Address]*big.Int, []common.Address, error) {
res := map[common.Address]*big.Int{}
accountsWithoutHistory := []common.Address{}
for _, address := range addresses {
block, err := db.GetLastKnownBlockByAddress(address)
if err != nil {
log.Info("Can't get last block", "error", err)
return nil, nil, err
}
if block != nil {
res[address] = block
} else {
accountsWithoutHistory = append(accountsWithoutHistory, address)
}
}
return res, accountsWithoutHistory, nil
}
// GetTransfers load transfers transfer betweeen two blocks.
func (db *Database) GetTransfers(start, end *big.Int) (rst []Transfer, err error) {
query := newTransfersQuery().FilterNetwork(db.network).FilterStart(start).FilterEnd(end)
query := newTransfersQuery().FilterNetwork(db.network).FilterStart(start).FilterEnd(end).FilterLoaded(1)
rows, err := db.db.Query(query.String(), query.Args()...)
if err != nil {
return
@ -165,8 +463,37 @@ func (db *Database) GetTransfers(start, end *big.Int) (rst []Transfer, err error
return query.Scan(rows)
}
func (db *Database) GetPreloadedTransactions(address common.Address, blockHash common.Hash) (rst []Transfer, err error) {
query := newTransfersQuery().
FilterNetwork(db.network).
FilterAddress(address).
FilterBlockHash(blockHash).
FilterLoaded(0)
rows, err := db.db.Query(query.String(), query.Args()...)
if err != nil {
return
}
defer rows.Close()
return query.Scan(rows)
}
func (db *Database) GetTransactionsLog(address common.Address, transactionHash common.Hash) (*types.Log, error) {
l := &types.Log{}
err := db.db.QueryRow("SELECT log FROM transfers WHERE network_id = ? AND address = ? AND hash = ?",
db.network, address, transactionHash).
Scan(&JSONBlob{l})
if err == nil {
return l, nil
}
if err == sql.ErrNoRows {
return nil, nil
}
return nil, err
}
// SaveHeaders stores a list of headers atomically.
func (db *Database) SaveHeaders(headers []*types.Header) (err error) {
func (db *Database) SaveHeaders(headers []*types.Header, address common.Address) (err error) {
var (
tx *sql.Tx
insert *sql.Stmt
@ -175,7 +502,7 @@ func (db *Database) SaveHeaders(headers []*types.Header) (err error) {
if err != nil {
return
}
insert, err = tx.Prepare("INSERT INTO blocks(network_id, number, hash, timestamp) VALUES (?, ?, ?, ?)")
insert, err = tx.Prepare("INSERT INTO blocks(network_id, blk_number, blk_hash, address) VALUES (?, ?, ?, ?)")
if err != nil {
return
}
@ -188,7 +515,7 @@ func (db *Database) SaveHeaders(headers []*types.Header) (err error) {
}()
for _, h := range headers {
_, err = insert.Exec(db.network, (*SQLBigInt)(h.Number), h.Hash(), h.Time)
_, err = insert.Exec(db.network, (*SQLBigInt)(h.Number), h.Hash(), address)
if err != nil {
return
}
@ -196,74 +523,10 @@ func (db *Database) SaveHeaders(headers []*types.Header) (err error) {
return
}
func (db *Database) SaveSyncedHeader(address common.Address, header *types.Header, option SyncOption) (err error) {
var (
tx *sql.Tx
insert *sql.Stmt
)
tx, err = db.db.Begin()
if err != nil {
return
}
insert, err = tx.Prepare("INSERT INTO accounts_to_blocks(network_id, address, blk_number, sync) VALUES (?, ?,?,?)")
if err != nil {
return
}
defer func() {
if err == nil {
err = tx.Commit()
} else {
_ = tx.Rollback()
}
}()
_, err = insert.Exec(db.network, address, (*SQLBigInt)(header.Number), option)
if err != nil {
return
}
return err
}
// HeaderExists checks if header with hash exists in db.
func (db *Database) HeaderExists(hash common.Hash) (bool, error) {
var val sql.NullBool
err := db.db.QueryRow("SELECT EXISTS (SELECT hash FROM blocks WHERE hash = ? AND network_id = ?)", hash, db.network).Scan(&val)
if err != nil {
return false, err
}
return val.Bool, nil
}
// GetHeaderByNumber selects header using block number.
func (db *Database) GetHeaderByNumber(number *big.Int) (header *DBHeader, err error) {
header = &DBHeader{Hash: common.Hash{}, Number: new(big.Int)}
err = db.db.QueryRow("SELECT hash,number FROM blocks WHERE number = ? AND network_id = ?", (*SQLBigInt)(number), db.network).Scan(&header.Hash, (*SQLBigInt)(header.Number))
if err == nil {
return header, nil
}
if err == sql.ErrNoRows {
return nil, nil
}
return nil, err
}
func (db *Database) GetLastHead() (header *DBHeader, err error) {
header = &DBHeader{Hash: common.Hash{}, Number: new(big.Int)}
err = db.db.QueryRow("SELECT hash,number FROM blocks WHERE network_id = $1 AND head = 1 AND number = (SELECT MAX(number) FROM blocks WHERE network_id = $1)", db.network).Scan(&header.Hash, (*SQLBigInt)(header.Number))
if err == nil {
return header, nil
}
if err == sql.ErrNoRows {
return nil, nil
}
return nil, err
}
// GetLatestSynced downloads last synced block with a given option.
func (db *Database) GetLatestSynced(address common.Address, option SyncOption) (header *DBHeader, err error) {
header = &DBHeader{Hash: common.Hash{}, Number: new(big.Int)}
err = db.db.QueryRow(`
SELECT blocks.hash, blk_number FROM accounts_to_blocks JOIN blocks ON blk_number = blocks.number WHERE blocks.network_id = $1 AND address = $2 AND blk_number
= (SELECT MAX(blk_number) FROM accounts_to_blocks WHERE network_id = $1 AND address = $2 AND sync & $3 = $3)`, db.network, address, option).Scan(&header.Hash, (*SQLBigInt)(header.Number))
err = db.db.QueryRow("SELECT blk_hash, blk_number FROM blocks WHERE blk_number = ? AND network_id = ?", (*SQLBigInt)(number), db.network).Scan(&header.Hash, (*SQLBigInt)(header.Number))
if err == nil {
return header, nil
}
@ -326,26 +589,23 @@ type statementCreator interface {
}
func deleteHeaders(creator statementCreator, headers []*DBHeader) error {
delete, err := creator.Prepare("DELETE FROM blocks WHERE hash = ?")
delete, err := creator.Prepare("DELETE FROM blocks WHERE blk_hash = ?")
if err != nil {
return err
}
deleteTransfers, err := creator.Prepare("DELETE FROM transfers WHERE blk_hash = ?")
if err != nil {
return err
}
for _, h := range headers {
k := h.Hash
log.Debug("foo", "k", k)
_, err = delete.Exec(h.Hash)
if err != nil {
return err
}
}
return nil
}
func insertHeaders(creator statementCreator, network uint64, headers []*DBHeader) error {
insert, err := creator.Prepare("INSERT OR IGNORE INTO blocks(network_id, hash, number, timestamp, head) VALUES (?, ?, ?, ?, ?)")
if err != nil {
return err
}
for _, h := range headers {
_, err = insert.Exec(network, h.Hash, (*SQLBigInt)(h.Number), h.Timestamp, h.Head)
_, err = deleteTransfers.Exec(h.Hash)
if err != nil {
return err
}
@ -353,47 +613,117 @@ func insertHeaders(creator statementCreator, network uint64, headers []*DBHeader
return nil
}
func insertTransfers(creator statementCreator, network uint64, transfers []Transfer) error {
insert, err := creator.Prepare("INSERT OR IGNORE INTO transfers(network_id, hash, blk_hash, address, tx, sender, receipt, log, type) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)")
func insertBlocksWithTransactions(creator statementCreator, account common.Address, network uint64, headers []*DBHeader) error {
insert, err := creator.Prepare("INSERT OR IGNORE INTO blocks(network_id, address, blk_number, blk_hash, loaded) VALUES (?, ?, ?, ?, ?)")
if err != nil {
return err
}
for _, t := range transfers {
_, err = insert.Exec(network, t.ID, t.BlockHash, t.Address, &JSONBlob{t.Transaction}, t.From, &JSONBlob{t.Receipt}, &JSONBlob{t.Log}, t.Type)
updateTx, err := creator.Prepare(`UPDATE transfers
SET log = ?
WHERE network_id = ? AND address = ? AND hash = ?`)
if err != nil {
return err
}
insertTx, err := creator.Prepare(`INSERT OR IGNORE
INTO transfers (network_id, address, sender, hash, blk_number, blk_hash, type, timestamp, log, loaded)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, 0)`)
if err != nil {
return err
}
for _, header := range headers {
_, err = insert.Exec(network, account, (*SQLBigInt)(header.Number), header.Hash, header.Loaded)
if err != nil {
return err
}
}
return nil
}
func updateAccounts(creator statementCreator, network uint64, accounts []common.Address, headers []*DBHeader, option SyncOption) error {
update, err := creator.Prepare("UPDATE accounts_to_blocks SET sync=sync|? WHERE address=? AND blk_number=? AND network_id=?")
if err != nil {
return err
}
insert, err := creator.Prepare("INSERT OR IGNORE INTO accounts_to_blocks(network_id,address,blk_number,sync) VALUES(?,?,?,?)")
if err != nil {
return err
}
for _, acc := range accounts {
for _, h := range headers {
rst, err := update.Exec(option, acc, (*SQLBigInt)(h.Number), network)
if header.Erc20Transfer != nil {
res, err := updateTx.Exec(&JSONBlob{header.Erc20Transfer.Log}, network, account, header.Erc20Transfer.ID)
if err != nil {
return err
}
affected, err := rst.RowsAffected()
affected, err := res.RowsAffected()
if err != nil {
return err
}
if affected > 0 {
continue
}
_, err = insert.Exec(network, acc, (*SQLBigInt)(h.Number), option)
_, err = insertTx.Exec(network, account, account, header.Erc20Transfer.ID, (*SQLBigInt)(header.Number), header.Hash, erc20Transfer, header.Erc20Transfer.Timestamp, &JSONBlob{header.Erc20Transfer.Log})
if err != nil {
log.Error("error saving erc20transfer", "err", err)
return err
}
}
}
return nil
}
func insertRange(creator statementCreator, account common.Address, network uint64, from *big.Int, to *big.Int) (err error) {
insert, err := creator.Prepare("INSERT INTO blocks_ranges (network_id, address, blk_from, blk_to) VALUES (?, ?, ?, ?)")
if err != nil {
return err
}
_, err = insert.Exec(network, account, (*SQLBigInt)(from), (*SQLBigInt)(to))
if err != nil {
return err
}
return
}
func updateOrInsertTransfers(creator statementCreator, network uint64, transfers []Transfer) error {
update, err := creator.Prepare(`UPDATE transfers
SET tx = ?, sender = ?, receipt = ?, timestamp = ?, loaded = 1
WHERE address =? AND hash = ?`)
if err != nil {
return err
}
insert, err := creator.Prepare(`INSERT OR IGNORE INTO transfers
(network_id, hash, blk_hash, blk_number, timestamp, address, tx, sender, receipt, log, type, loaded)
VALUES
(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 1)`)
if err != nil {
return err
}
for _, t := range transfers {
res, err := update.Exec(&JSONBlob{t.Transaction}, t.From, &JSONBlob{t.Receipt}, t.Timestamp, t.Address, t.ID)
if err != nil {
return err
}
affected, err := res.RowsAffected()
if err != nil {
return err
}
if affected > 0 {
continue
}
_, err = insert.Exec(network, t.ID, t.BlockHash, (*SQLBigInt)(t.BlockNumber), t.Timestamp, t.Address, &JSONBlob{t.Transaction}, t.From, &JSONBlob{t.Receipt}, &JSONBlob{t.Log}, t.Type)
if err != nil {
log.Error("can't save transfer", "b-hash", t.BlockHash, "b-n", t.BlockNumber, "a", t.Address, "h", t.ID)
return err
}
}
return nil
}
//markBlocksAsLoaded(tx, address, db.network, blocks)
func markBlocksAsLoaded(creator statementCreator, address common.Address, network uint64, blocks []*big.Int) error {
update, err := creator.Prepare("UPDATE blocks SET loaded=? WHERE address=? AND blk_number=? AND network_id=?")
if err != nil {
return err
}
for _, block := range blocks {
_, err := update.Exec(true, address, (*SQLBigInt)(block), network)
if err != nil {
return err
}
}
return nil
}

View File

@ -33,7 +33,7 @@ func TestDBGetHeaderByNumber(t *testing.T) {
Difficulty: big.NewInt(1),
Time: 1,
}
require.NoError(t, db.SaveHeaders([]*types.Header{header}))
require.NoError(t, db.SaveHeaders([]*types.Header{header}, common.Address{1}))
rst, err := db.GetHeaderByNumber(header.Number)
require.NoError(t, err)
require.Equal(t, header.Hash(), rst.Hash)
@ -47,35 +47,45 @@ func TestDBGetHeaderByNumberNoRows(t *testing.T) {
require.Nil(t, rst)
}
func TestDBHeaderExists(t *testing.T) {
func TestDBProcessBlocks(t *testing.T) {
db, stop := setupTestDB(t)
defer stop()
header := &types.Header{
Number: big.NewInt(10),
Difficulty: big.NewInt(1),
Time: 1,
address := common.Address{1}
from := big.NewInt(0)
to := big.NewInt(10)
blocks := []*DBHeader{
&DBHeader{
Number: big.NewInt(1),
Hash: common.Hash{1},
},
&DBHeader{
Number: big.NewInt(2),
Hash: common.Hash{2},
}}
t.Log(blocks)
require.NoError(t, db.ProcessBlocks(common.Address{1}, from, to, blocks))
t.Log(db.GetLastBlockByAddress(common.Address{1}, 40))
transfers := []Transfer{
{
ID: common.Hash{1},
Type: ethTransfer,
BlockHash: common.Hash{2},
BlockNumber: big.NewInt(1),
Address: common.Address{1},
Timestamp: 123,
From: common.Address{1},
},
}
require.NoError(t, db.SaveHeaders([]*types.Header{header}))
rst, err := db.HeaderExists(header.Hash())
require.NoError(t, err)
require.True(t, rst)
}
func TestDBHeaderDoesntExist(t *testing.T) {
db, stop := setupTestDB(t)
defer stop()
rst, err := db.HeaderExists(common.Hash{1})
require.NoError(t, err)
require.False(t, rst)
require.NoError(t, db.SaveTranfers(address, transfers, []*big.Int{big.NewInt(1), big.NewInt(2)}))
}
func TestDBProcessTransfer(t *testing.T) {
db, stop := setupTestDB(t)
defer stop()
header := &DBHeader{
Number: big.NewInt(1),
Hash: common.Hash{1},
Number: big.NewInt(1),
Hash: common.Hash{1},
Address: common.Address{1},
}
tx := types.NewTransaction(1, common.Address{1}, nil, 10, big.NewInt(10), nil)
transfers := []Transfer{
@ -86,9 +96,11 @@ func TestDBProcessTransfer(t *testing.T) {
BlockNumber: header.Number,
Transaction: tx,
Receipt: types.NewReceipt(nil, false, 100),
Address: common.Address{1},
},
}
require.NoError(t, db.ProcessTranfers(transfers, nil, []*DBHeader{header}, nil, 0))
require.NoError(t, db.ProcessBlocks(common.Address{1}, big.NewInt(1), big.NewInt(1), []*DBHeader{header}))
require.NoError(t, db.ProcessTranfers(transfers, []*DBHeader{}))
}
func TestDBReorgTransfers(t *testing.T) {
@ -97,21 +109,25 @@ func TestDBReorgTransfers(t *testing.T) {
rcpt := types.NewReceipt(nil, false, 100)
rcpt.Logs = []*types.Log{}
original := &DBHeader{
Number: big.NewInt(1),
Hash: common.Hash{1},
Number: big.NewInt(1),
Hash: common.Hash{1},
Address: common.Address{1},
}
replaced := &DBHeader{
Number: big.NewInt(1),
Hash: common.Hash{2},
Number: big.NewInt(1),
Hash: common.Hash{2},
Address: common.Address{1},
}
originalTX := types.NewTransaction(1, common.Address{1}, nil, 10, big.NewInt(10), nil)
replacedTX := types.NewTransaction(2, common.Address{1}, nil, 10, big.NewInt(10), nil)
require.NoError(t, db.ProcessBlocks(original.Address, original.Number, original.Number, []*DBHeader{original}))
require.NoError(t, db.ProcessTranfers([]Transfer{
{ethTransfer, common.Hash{1}, *originalTX.To(), original.Number, original.Hash, 100, originalTX, common.Address{1}, rcpt, nil},
}, nil, []*DBHeader{original}, nil, 0))
{ethTransfer, common.Hash{1}, *originalTX.To(), original.Number, original.Hash, 100, originalTX, true, common.Address{1}, rcpt, nil},
}, []*DBHeader{}))
require.NoError(t, db.ProcessBlocks(replaced.Address, replaced.Number, replaced.Number, []*DBHeader{replaced}))
require.NoError(t, db.ProcessTranfers([]Transfer{
{ethTransfer, common.Hash{2}, *replacedTX.To(), replaced.Number, replaced.Hash, 100, replacedTX, common.Address{1}, rcpt, nil},
}, nil, []*DBHeader{replaced}, []*DBHeader{original}, 0))
{ethTransfer, common.Hash{2}, *replacedTX.To(), replaced.Number, replaced.Hash, 100, replacedTX, true, common.Address{1}, rcpt, nil},
}, []*DBHeader{original}))
all, err := db.GetTransfers(big.NewInt(0), nil)
require.NoError(t, err)
@ -126,8 +142,9 @@ func TestDBGetTransfersFromBlock(t *testing.T) {
transfers := []Transfer{}
for i := 1; i < 10; i++ {
header := &DBHeader{
Number: big.NewInt(int64(i)),
Hash: common.Hash{byte(i)},
Number: big.NewInt(int64(i)),
Hash: common.Hash{byte(i)},
Address: common.Address{1},
}
headers = append(headers, header)
tx := types.NewTransaction(uint64(i), common.Address{1}, nil, 10, big.NewInt(10), nil)
@ -140,10 +157,12 @@ func TestDBGetTransfersFromBlock(t *testing.T) {
BlockHash: header.Hash,
Transaction: tx,
Receipt: receipt,
Address: common.Address{1},
}
transfers = append(transfers, transfer)
}
require.NoError(t, db.ProcessTranfers(transfers, nil, headers, nil, 0))
require.NoError(t, db.ProcessBlocks(headers[0].Address, headers[0].Number, headers[len(headers)-1].Number, headers))
require.NoError(t, db.ProcessTranfers(transfers, []*DBHeader{}))
rst, err := db.GetTransfers(big.NewInt(7), nil)
require.NoError(t, err)
require.Len(t, rst, 3)
@ -154,88 +173,6 @@ func TestDBGetTransfersFromBlock(t *testing.T) {
}
func TestDBLatestSynced(t *testing.T) {
db, stop := setupTestDB(t)
defer stop()
address := common.Address{1}
h1 := &types.Header{
Number: big.NewInt(10),
Difficulty: big.NewInt(1),
Time: 1,
}
h2 := &types.Header{
Number: big.NewInt(9),
Difficulty: big.NewInt(1),
Time: 1,
}
require.NoError(t, db.SaveHeaders([]*types.Header{h1, h2}))
require.NoError(t, db.SaveSyncedHeader(address, h1, ethSync))
require.NoError(t, db.SaveSyncedHeader(address, h2, ethSync))
latest, err := db.GetLatestSynced(address, ethSync)
require.NoError(t, err)
require.NotNil(t, latest)
require.Equal(t, h1.Number, latest.Number)
require.Equal(t, h1.Hash(), latest.Hash)
}
func TestDBLatestSyncedDoesntExist(t *testing.T) {
db, stop := setupTestDB(t)
defer stop()
latest, err := db.GetLatestSynced(common.Address{1}, ethSync)
require.NoError(t, err)
require.Nil(t, latest)
}
func TestDBProcessTransfersUpdate(t *testing.T) {
db, stop := setupTestDB(t)
defer stop()
address := common.Address{1}
header := &DBHeader{
Number: big.NewInt(10),
Hash: common.Hash{1},
}
transfer := Transfer{
ID: common.Hash{1},
BlockNumber: header.Number,
BlockHash: header.Hash,
Transaction: types.NewTransaction(0, common.Address{}, nil, 0, nil, nil),
Address: address,
}
require.NoError(t, db.ProcessTranfers([]Transfer{transfer}, []common.Address{address}, []*DBHeader{header}, nil, ethSync))
require.NoError(t, db.ProcessTranfers([]Transfer{transfer}, []common.Address{address}, []*DBHeader{header}, nil, erc20Sync))
latest, err := db.GetLatestSynced(address, ethSync|erc20Sync)
require.NoError(t, err)
require.Equal(t, header.Hash, latest.Hash)
}
func TestDBLastHeadExist(t *testing.T) {
db, stop := setupTestDB(t)
defer stop()
headers := []*DBHeader{
{Number: big.NewInt(1), Hash: common.Hash{1}, Head: true},
{Number: big.NewInt(2), Hash: common.Hash{2}, Head: true},
{Number: big.NewInt(3), Hash: common.Hash{3}, Head: true},
}
require.NoError(t, db.ProcessTranfers(nil, nil, headers, nil, 0))
last, err := db.GetLastHead()
require.NoError(t, err)
require.Equal(t, headers[2].Hash, last.Hash)
}
func TestDBLastHeadDoesntExist(t *testing.T) {
db, stop := setupTestDB(t)
defer stop()
last, err := db.GetLastHead()
require.NoError(t, err)
require.Nil(t, last)
}
func TestCustomTokens(t *testing.T) {
db, stop := setupTestDB(t)
defer stop()

View File

@ -40,6 +40,7 @@ type Transfer struct {
BlockHash common.Hash `json:"blockhash"`
Timestamp uint64 `json:"timestamp"`
Transaction *types.Transaction `json:"transaction"`
Loaded bool
// From is derived from tx signature in order to offload this computation from UI component.
From common.Address `json:"from"`
Receipt *types.Receipt `json:"receipt"`
@ -52,27 +53,16 @@ type ETHTransferDownloader struct {
client *ethclient.Client
accounts []common.Address
signer types.Signer
db *Database
}
var errLogsDownloaderStuck = errors.New("logs downloader stuck")
// GetTransfers checks if the balance was changed between two blocks.
// If so it downloads transaction that transfer ethereum from that block.
func (d *ETHTransferDownloader) GetTransfers(ctx context.Context, header *DBHeader) (rst []Transfer, err error) {
// TODO(dshulyak) consider caching balance and reset it on reorg
num := new(big.Int).Sub(header.Number, one)
changed := []common.Address{}
for _, address := range d.accounts {
balance, err := d.client.BalanceAt(ctx, address, num)
if err != nil {
return nil, err
}
current, err := d.client.BalanceAt(ctx, address, header.Number)
if err != nil {
return nil, err
}
if current.Cmp(balance) != 0 {
changed = append(changed, address)
}
}
changed := d.accounts
if len(changed) == 0 {
return nil, nil
}
@ -100,35 +90,53 @@ func (d *ETHTransferDownloader) GetTransfersByNumber(ctx context.Context, number
}
func (d *ETHTransferDownloader) getTransfersInBlock(ctx context.Context, blk *types.Block, accounts []common.Address) (rst []Transfer, err error) {
for _, tx := range blk.Transactions() {
for _, address := range accounts {
for _, address := range accounts {
preloadedTransfers, err := d.db.GetPreloadedTransactions(address, blk.Hash())
if err != nil {
return nil, err
}
for _, t := range preloadedTransfers {
transfer, err := d.transferFromLog(ctx, *t.Log, address, t.ID)
if err != nil {
log.Error("can't fetch erc20 transfer from log", "error", err)
return nil, err
}
rst = append(rst, transfer)
}
for _, tx := range blk.Transactions() {
from, err := types.Sender(d.signer, tx)
if err != nil {
return nil, err
}
if from == address || (tx.To() != nil && *tx.To() == address) {
receipt, err := d.client.TransactionReceipt(ctx, tx.Hash())
if err != nil {
return nil, err
}
if IsTokenTransfer(receipt.Logs) {
log.Debug("eth downloader found token transfer", "hash", tx.Hash())
continue
}
rst = append(rst, Transfer{
Type: ethTransfer,
ID: tx.Hash(),
Address: address,
BlockNumber: blk.Number(),
BlockHash: blk.Hash(),
Timestamp: blk.Time(),
Transaction: tx,
From: from,
Receipt: receipt})
transactionLog := getTokenLog(receipt.Logs)
if transactionLog == nil {
rst = append(rst, Transfer{
Type: ethTransfer,
ID: tx.Hash(),
Address: address,
BlockNumber: blk.Number(),
BlockHash: blk.Hash(),
Timestamp: blk.Time(),
Transaction: tx,
From: from,
Receipt: receipt,
Log: transactionLog})
}
}
}
}
log.Debug("getTransfersInBlock found", "block", blk.Number(), "len", len(rst))
// TODO(dshulyak) test that balance difference was covered by transactions
return rst, nil
}
@ -170,6 +178,43 @@ func (d *ERC20TransfersDownloader) outboundTopics(address common.Address) [][]co
return [][]common.Hash{{d.signature}, {d.paddedAddress(address)}, {}}
}
func (d *ETHTransferDownloader) transferFromLog(parent context.Context, ethlog types.Log, address common.Address, id common.Hash) (Transfer, error) {
ctx, cancel := context.WithTimeout(parent, 3*time.Second)
tx, _, err := d.client.TransactionByHash(ctx, ethlog.TxHash)
cancel()
if err != nil {
return Transfer{}, err
}
from, err := types.Sender(d.signer, tx)
if err != nil {
return Transfer{}, err
}
ctx, cancel = context.WithTimeout(parent, 3*time.Second)
receipt, err := d.client.TransactionReceipt(ctx, ethlog.TxHash)
cancel()
if err != nil {
return Transfer{}, err
}
ctx, cancel = context.WithTimeout(parent, 3*time.Second)
blk, err := d.client.BlockByHash(ctx, ethlog.BlockHash)
cancel()
if err != nil {
return Transfer{}, err
}
return Transfer{
Address: address,
ID: id,
Type: erc20Transfer,
BlockNumber: new(big.Int).SetUint64(ethlog.BlockNumber),
BlockHash: ethlog.BlockHash,
Transaction: tx,
From: from,
Receipt: receipt,
Timestamp: blk.Time(),
Log: &ethlog,
}, nil
}
func (d *ERC20TransfersDownloader) transferFromLog(parent context.Context, ethlog types.Log, address common.Address) (Transfer, error) {
ctx, cancel := context.WithTimeout(parent, 3*time.Second)
tx, _, err := d.client.TransactionByHash(ctx, ethlog.TxHash)
@ -229,11 +274,52 @@ func (d *ERC20TransfersDownloader) transfersFromLogs(parent context.Context, log
select {
case <-concurrent.WaitAsync():
case <-parent.Done():
return nil, errors.New("logs downloader stuck")
return nil, errLogsDownloaderStuck
}
return concurrent.Get(), concurrent.Error()
}
func (d *ERC20TransfersDownloader) blocksFromLogs(parent context.Context, logs []types.Log, address common.Address) ([]*DBHeader, error) {
concurrent := NewConcurrentDownloader(parent)
for i := range logs {
l := logs[i]
if l.Removed {
continue
}
index := [4]byte{}
binary.BigEndian.PutUint32(index[:], uint32(l.Index))
id := crypto.Keccak256Hash(l.TxHash.Bytes(), index[:])
header := &DBHeader{
Number: big.NewInt(int64(l.BlockNumber)),
Hash: l.BlockHash,
Erc20Transfer: &Transfer{
Address: address,
BlockNumber: big.NewInt(int64(l.BlockNumber)),
BlockHash: l.BlockHash,
ID: id,
From: address,
Loaded: false,
Type: erc20Transfer,
Log: &l,
},
}
concurrent.Add(func(ctx context.Context) error {
concurrent.PushHeader(header)
return nil
})
}
select {
case <-concurrent.WaitAsync():
case <-parent.Done():
return nil, errLogsDownloaderStuck
}
return concurrent.GetHeaders(), concurrent.Error()
}
// GetTransfers for erc20 uses eth_getLogs rpc with Transfer event signature and our address acount.
func (d *ERC20TransfersDownloader) GetTransfers(ctx context.Context, header *DBHeader) ([]Transfer, error) {
hash := header.Hash
@ -266,12 +352,12 @@ func (d *ERC20TransfersDownloader) GetTransfers(ctx context.Context, header *DBH
return transfers, nil
}
// GetTransfersInRange returns transfers between two blocks.
// GetHeadersInRange returns transfers between two blocks.
// time to get logs for 100000 blocks = 1.144686979s. with 249 events in the result set.
func (d *ERC20TransfersDownloader) GetTransfersInRange(parent context.Context, from, to *big.Int) ([]Transfer, error) {
func (d *ERC20TransfersDownloader) GetHeadersInRange(parent context.Context, from, to *big.Int) ([]*DBHeader, error) {
start := time.Now()
log.Debug("get erc20 transfers in range", "from", from, "to", to)
transfers := []Transfer{}
headers := []*DBHeader{}
for _, address := range d.accounts {
ctx, cancel := context.WithTimeout(parent, 5*time.Second)
outbound, err := d.client.FilterLogs(ctx, ethereum.FilterQuery{
@ -297,14 +383,14 @@ func (d *ERC20TransfersDownloader) GetTransfersInRange(parent context.Context, f
if len(logs) == 0 {
continue
}
rst, err := d.transfersFromLogs(parent, logs, address)
rst, err := d.blocksFromLogs(parent, logs, address)
if err != nil {
return nil, err
}
transfers = append(transfers, rst...)
headers = append(headers, rst...)
}
log.Debug("found erc20 transfers between two blocks", "from", from, "to", to, "lth", len(transfers), "took", time.Since(start))
return transfers, nil
log.Debug("found erc20 transfers between two blocks", "from", from, "to", to, "headers", len(headers), "took", time.Since(start))
return headers, nil
}
func IsTokenTransfer(logs []*types.Log) bool {
@ -316,3 +402,13 @@ func IsTokenTransfer(logs []*types.Log) bool {
}
return false
}
func getTokenLog(logs []*types.Log) *types.Log {
signature := crypto.Keccak256Hash([]byte(erc20TransferEventSignature))
for _, l := range logs {
if len(l.Topics) > 0 && l.Topics[0] == signature {
return l
}
}
return nil
}

View File

@ -30,6 +30,7 @@ type ETHTransferSuite struct {
identity, secondary *ecdsa.PrivateKey
faucet *ecdsa.PrivateKey
signer types.Signer
dbStop func()
downloader *ETHTransferDownloader
}
@ -51,15 +52,22 @@ func (s *ETHTransferSuite) SetupTest() {
s.Require().NoError(err)
s.ethclient = ethclient.NewClient(client)
s.signer = types.NewEIP155Signer(big.NewInt(1337))
db, stop := setupTestDB(s.Suite.T())
s.dbStop = stop
s.downloader = &ETHTransferDownloader{
signer: s.signer,
client: s.ethclient,
db: db,
accounts: []common.Address{
crypto.PubkeyToAddress(s.identity.PublicKey),
crypto.PubkeyToAddress(s.secondary.PublicKey)},
}
}
func (s *ETHTransferSuite) TearDownTest() {
s.dbStop()
}
// signAndMineTx signs transaction with provided key and waits for it to be mined.
// uses configured faucet key if pkey is nil.
func (s *ETHTransferSuite) signAndMineTx(tx *types.Transaction, pkey *ecdsa.PrivateKey) {
@ -256,7 +264,7 @@ func (s *ERC20TransferSuite) TestInRange() {
_, err = bind.WaitMined(timeout, s.ethclient, tx)
s.Require().NoError(err)
}
transfers, err := s.downloader.GetTransfersInRange(context.TODO(), big.NewInt(1), nil)
transfers, err := s.downloader.GetHeadersInRange(context.TODO(), big.NewInt(1), nil)
s.Require().NoError(err)
s.Require().Len(transfers, 5)
}

View File

@ -16,6 +16,10 @@ const (
EventReorg EventType = "reorg"
// EventNewHistory emitted if transfer from older block was added.
EventNewHistory EventType = "history"
// EventFetchingRecentHistory emitted when fetching of lastest tx history is started
EventFetchingRecentHistory EventType = "recent-history-fetching"
// EventRecentHistoryFetched emitted when fetching of lastest tx history is started
EventRecentHistoryReady EventType = "recent-history-ready"
)
// Event is a type for wallet events.
@ -23,4 +27,5 @@ type Event struct {
Type EventType `json:"type"`
BlockNumber *big.Int `json:"blockNumber"`
Accounts []common.Address `json:"accounts"`
ERC20 bool `json:"erc20"`
}

View File

@ -11,17 +11,13 @@ import (
// SetupIterativeDownloader configures IterativeDownloader with last known synced block.
func SetupIterativeDownloader(
db *Database, client HeaderReader, address common.Address, option SyncOption,
downloader BatchDownloader, size *big.Int, to *DBHeader) (*IterativeDownloader, error) {
from, err := db.GetLatestSynced(address, option)
if err != nil {
log.Error("failed to get latest synced block", "error", err)
return nil, err
db *Database, client HeaderReader, address common.Address,
downloader BatchDownloader, size *big.Int, to *big.Int, from *big.Int) (*IterativeDownloader, error) {
adjustedSize := big.NewInt(0).Div(big.NewInt(0).Sub(to, from), big.NewInt(10))
if adjustedSize.Cmp(size) == 1 {
size = adjustedSize
}
if from == nil {
from = &DBHeader{Number: zero}
}
log.Debug("iterative downloader", "address", address, "from", from.Number, "to", to.Number)
log.Info("iterative downloader", "address", address, "from", from, "to", to, "size", size)
d := &IterativeDownloader{
client: client,
batchSize: size,
@ -34,7 +30,7 @@ func SetupIterativeDownloader(
// BatchDownloader interface for loading transfers in batches in speificed range of blocks.
type BatchDownloader interface {
GetTransfersInRange(ctx context.Context, from, to *big.Int) ([]Transfer, error)
GetHeadersInRange(ctx context.Context, from, to *big.Int) ([]*DBHeader, error)
}
// IterativeDownloader downloads batches of transfers in a specified size.
@ -45,42 +41,45 @@ type IterativeDownloader struct {
downloader BatchDownloader
from, to *DBHeader
previous *DBHeader
from, to *big.Int
previous *big.Int
}
// Finished true when earliest block with given sync option is zero.
func (d *IterativeDownloader) Finished() bool {
return d.from.Number.Cmp(d.to.Number) == 0
return d.from.Cmp(d.to) == 0
}
// Header return last synced header.
func (d *IterativeDownloader) Header() *DBHeader {
func (d *IterativeDownloader) Header() *big.Int {
return d.previous
}
// Next moves closer to the end on every new iteration.
func (d *IterativeDownloader) Next(parent context.Context) ([]Transfer, error) {
to := new(big.Int).Add(d.from.Number, d.batchSize)
func (d *IterativeDownloader) Next(parent context.Context) ([]*DBHeader, *big.Int, *big.Int, error) {
to := d.to
from := new(big.Int).Sub(to, d.batchSize)
// if start < 0; start = 0
if to.Cmp(d.to.Number) == 1 {
to = d.to.Number
if from.Cmp(d.from) == -1 {
from = d.from
}
transfers, err := d.downloader.GetTransfersInRange(parent, d.from.Number, to)
log.Info("load erc20 transfers in range", "from", from, "to", to)
headres, err := d.downloader.GetHeadersInRange(parent, from, to)
if err != nil {
log.Error("failed to get transfer in between two bloks", "from", d.from.Number, "to", to, "error", err)
return nil, err
log.Error("failed to get transfer in between two bloks", "from", from, "to", to, "error", err)
return nil, nil, nil, err
}
// use integers instead of DBHeader
ctx, cancel := context.WithTimeout(parent, 5*time.Second)
header, err := d.client.HeaderByNumber(ctx, to)
header, err := d.client.HeaderByNumber(ctx, from)
cancel()
if err != nil {
log.Error("failed to get header by number", "from", d.from.Number, "to", to, "error", err)
return nil, err
log.Error("failed to get header by number", "from", d.from, "to", to, "error", err)
return nil, nil, nil, err
}
d.previous, d.from = d.from, toDBHeader(header)
return transfers, nil
d.previous, d.to = d.to, header.Number
return headres, d.from, to, nil
}
// Revert reverts last step progress. Should be used if application failed to process transfers.

View File

@ -14,11 +14,11 @@ import (
type transfersFixture []Transfer
func (f transfersFixture) GetTransfersInRange(ctx context.Context, from, to *big.Int) ([]Transfer, error) {
rst := []Transfer{}
func (f transfersFixture) GetHeadersInRange(ctx context.Context, from, to *big.Int) ([]*DBHeader, error) {
rst := []*DBHeader{}
for _, t := range f {
if t.BlockNumber.Cmp(from) >= 0 && t.BlockNumber.Cmp(to) <= 0 {
rst = append(rst, t)
rst = append(rst, &DBHeader{Number: t.BlockNumber})
}
}
return rst, nil
@ -26,25 +26,25 @@ func (f transfersFixture) GetTransfersInRange(ctx context.Context, from, to *big
func TestIterFinished(t *testing.T) {
iterator := IterativeDownloader{
from: &DBHeader{Number: big.NewInt(10)},
to: &DBHeader{Number: big.NewInt(10)},
from: big.NewInt(10),
to: big.NewInt(10),
}
require.True(t, iterator.Finished())
}
func TestIterNotFinished(t *testing.T) {
iterator := IterativeDownloader{
from: &DBHeader{Number: big.NewInt(2)},
to: &DBHeader{Number: big.NewInt(5)},
from: big.NewInt(2),
to: big.NewInt(5),
}
require.False(t, iterator.Finished())
}
func TestIterRevert(t *testing.T) {
iterator := IterativeDownloader{
from: &DBHeader{Number: big.NewInt(12)},
to: &DBHeader{Number: big.NewInt(12)},
previous: &DBHeader{Number: big.NewInt(9)},
from: big.NewInt(12),
to: big.NewInt(12),
previous: big.NewInt(9),
}
require.True(t, iterator.Finished())
iterator.Revert()
@ -66,13 +66,13 @@ func TestIterProgress(t *testing.T) {
client: chain,
downloader: transfers,
batchSize: big.NewInt(5),
from: &DBHeader{Number: big.NewInt(0)},
to: &DBHeader{Number: big.NewInt(9)},
from: big.NewInt(0),
to: big.NewInt(9),
}
batch, err := iter.Next(context.TODO())
batch, _, _, err := iter.Next(context.TODO())
require.NoError(t, err)
require.Len(t, batch, 6)
batch, err = iter.Next(context.TODO())
batch, _, _, err = iter.Next(context.TODO())
require.NoError(t, err)
require.Len(t, batch, 5)
require.True(t, iter.Finished())

View File

@ -30,8 +30,9 @@ func pollingPeriodByChain(chain *big.Int) time.Duration {
}
var (
reorgSafetyDepth = big.NewInt(15)
erc20BatchSize = big.NewInt(100000)
reorgSafetyDepth = big.NewInt(15)
erc20BatchSize = big.NewInt(100000)
errAlreadyRunning = errors.New("already running")
)
// HeaderReader interface for reading headers using block number or hash.
@ -43,6 +44,7 @@ type HeaderReader interface {
// BalanceReader interface for reading balance at a specifeid address.
type BalanceReader interface {
BalanceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (*big.Int, error)
NonceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (uint64, error)
}
type reactorClient interface {
@ -71,14 +73,7 @@ type Reactor struct {
group *Group
}
// Start runs reactor loop in background.
func (r *Reactor) Start(accounts []common.Address) error {
r.mu.Lock()
defer r.mu.Unlock()
if r.group != nil {
return errors.New("already running")
}
r.group = NewGroup(context.Background())
func (r *Reactor) newControlCommand(accounts []common.Address) *controlCommand {
signer := types.NewEIP155Signer(r.chain)
ctl := &controlCommand{
db: r.db,
@ -89,11 +84,25 @@ func (r *Reactor) Start(accounts []common.Address) error {
client: r.client,
accounts: accounts,
signer: signer,
db: r.db,
},
erc20: NewERC20TransfersDownloader(r.client, accounts, signer),
feed: r.feed,
safetyDepth: reorgSafetyDepth,
}
return ctl
}
// Start runs reactor loop in background.
func (r *Reactor) Start(accounts []common.Address) error {
r.mu.Lock()
defer r.mu.Unlock()
if r.group != nil {
return errAlreadyRunning
}
r.group = NewGroup(context.Background())
ctl := r.newControlCommand(accounts)
r.group.Add(ctl.Command())
return nil
}

View File

@ -80,14 +80,14 @@ func (s *ReactorChangesSuite) TestWatchNewAccounts() {
})
s.Require().NoError(s.reactor.Start([]common.Address{s.first}))
s.Require().NoError(utils.Eventually(func() error {
transfers, err := s.db.GetTransfersByAddress(s.first, big.NewInt(0), nil)
transfers, err := s.db.GetTransfersInRange(s.first, big.NewInt(0), nil)
if err != nil {
return err
}
if len(transfers) != 1 {
return fmt.Errorf("expect to get 1 transfer for first address %x, got %d", s.first, len(transfers))
}
transfers, err = s.db.GetTransfersByAddress(s.second, big.NewInt(0), nil)
transfers, err = s.db.GetTransfersInRange(s.second, big.NewInt(0), nil)
if err != nil {
return err
}
@ -98,7 +98,7 @@ func (s *ReactorChangesSuite) TestWatchNewAccounts() {
}, 5*time.Second, 500*time.Millisecond))
s.feed.Send([]accounts.Account{{Address: types.Address(s.first)}, {Address: types.Address(s.second)}})
s.Require().NoError(utils.Eventually(func() error {
transfers, err := s.db.GetTransfersByAddress(s.second, big.NewInt(0), nil)
transfers, err := s.db.GetTransfersInRange(s.second, big.NewInt(0), nil)
if err != nil {
return err
}

View File

@ -9,7 +9,7 @@ import (
"github.com/ethereum/go-ethereum/core/types"
)
const baseTransfersQuery = "SELECT transfers.hash, type, blocks.hash, blocks.number, blocks.timestamp, address, tx, sender, receipt, log FROM transfers JOIN blocks ON blk_hash = blocks.hash"
const baseTransfersQuery = "SELECT hash, type, blk_hash, blk_number, timestamp, address, tx, sender, receipt, log FROM transfers"
func newTransfersQuery() *transfersQuery {
buf := bytes.NewBuffer(nil)
@ -35,7 +35,7 @@ func (q *transfersQuery) FilterStart(start *big.Int) *transfersQuery {
if start != nil {
q.andOrWhere()
q.added = true
q.buf.WriteString(" blocks.number >= ?")
q.buf.WriteString(" blk_number >= ?")
q.args = append(q.args, (*SQLBigInt)(start))
}
return q
@ -45,16 +45,25 @@ func (q *transfersQuery) FilterEnd(end *big.Int) *transfersQuery {
if end != nil {
q.andOrWhere()
q.added = true
q.buf.WriteString(" blocks.number <= ?")
q.buf.WriteString(" blk_number <= ?")
q.args = append(q.args, (*SQLBigInt)(end))
}
return q
}
func (q *transfersQuery) FilterLoaded(loaded int) *transfersQuery {
q.andOrWhere()
q.added = true
q.buf.WriteString(" loaded = ? ")
q.args = append(q.args, loaded)
return q
}
func (q *transfersQuery) FilterNetwork(network uint64) *transfersQuery {
q.andOrWhere()
q.added = true
q.buf.WriteString(" blocks.network_id = ?")
q.buf.WriteString(" network_id = ?")
q.args = append(q.args, network)
return q
}
@ -67,6 +76,21 @@ func (q *transfersQuery) FilterAddress(address common.Address) *transfersQuery {
return q
}
func (q *transfersQuery) FilterBlockHash(blockHash common.Hash) *transfersQuery {
q.andOrWhere()
q.added = true
q.buf.WriteString(" blk_hash = ?")
q.args = append(q.args, blockHash)
return q
}
func (q *transfersQuery) Limit(pageSize int64) *transfersQuery {
q.buf.WriteString(" ORDER BY blk_number DESC ")
q.buf.WriteString(" LIMIT ?")
q.args = append(q.args, pageSize)
return q
}
func (q *transfersQuery) String() string {
return q.buf.String()
}

View File

@ -1,7 +1,6 @@
package wallet
import (
"errors"
"sync"
"github.com/ethereum/go-ethereum/event"
@ -25,7 +24,7 @@ type SignalsTransmitter struct {
// Start runs loop in background.
func (tmr *SignalsTransmitter) Start() error {
if tmr.quit != nil {
return errors.New("already running")
return errAlreadyRunning
}
tmr.quit = make(chan struct{})
events := make(chan Event, 10)

View File

@ -28,7 +28,7 @@ type TransfersSuite struct {
}
func (s *TransfersSuite) getAllTranfers() (rst []wallet.TransferView, err error) {
return rst, s.Local.Call(&rst, "wallet_getTransfersByAddress", s.DevAccountAddress, (*hexutil.Big)(big.NewInt(0)))
return rst, s.Local.Call(&rst, "wallet_getTransfersByAddress", s.DevAccountAddress, (*hexutil.Big)(big.NewInt(1000)), (*hexutil.Big)(big.NewInt(40)))
}
func (s *TransfersSuite) sendTx(nonce uint64, to types.Address) {