diff --git a/api/geth_backend.go b/api/geth_backend.go index 92837fa08..62b5c45c3 100644 --- a/api/geth_backend.go +++ b/api/geth_backend.go @@ -273,7 +273,7 @@ func (b *GethStatusBackend) startNodeWithAccount(acc multiaccounts.Account, pass if err != nil { return err } - watchAddrs, err := accountsDB.GetAddresses() + watchAddrs, err := accountsDB.GetWalletAddresses() if err != nil { return err } @@ -905,16 +905,23 @@ func (b *GethStatusBackend) startWallet() error { return err } - allAddresses := make([]common.Address, len(watchAddresses)+1) - allAddresses[0] = common.Address(mainAccountAddress) - for i, addr := range watchAddresses { - allAddresses[1+i] = common.Address(addr) + uniqAddressesMap := map[common.Address]struct{}{} + allAddresses := []common.Address{} + mainAddress := common.Address(mainAccountAddress) + uniqAddressesMap[mainAddress] = struct{}{} + allAddresses = append(allAddresses, mainAddress) + for _, addr := range watchAddresses { + address := common.Address(addr) + if _, ok := uniqAddressesMap[address]; !ok { + uniqAddressesMap[address] = struct{}{} + allAddresses = append(allAddresses, address) + } } + return wallet.StartReactor( b.statusNode.RPCClient().Ethclient(), allAddresses, - new(big.Int).SetUint64(b.statusNode.Config().NetworkID), - ) + new(big.Int).SetUint64(b.statusNode.Config().NetworkID)) } // InjectChatAccount selects the current chat account using chatKeyHex and injects the key into whisper. diff --git a/appdatabase/migrations/bindata.go b/appdatabase/migrations/bindata.go index be99a5618..b89aa6ed4 100644 --- a/appdatabase/migrations/bindata.go +++ b/appdatabase/migrations/bindata.go @@ -1,7 +1,7 @@ // Code generated by go-bindata. DO NOT EDIT. // sources: -// 0001_app.down.sql (387B) -// 0001_app.up.sql (3.088kB) +// 0001_app.down.sql (356B) +// 0001_app.up.sql (2.967kB) // 0002_tokens.down.sql (19B) // 0002_tokens.up.sql (248B) // 0003_settings.down.sql (118B) @@ -75,7 +75,7 @@ func (fi bindataFileInfo) Sys() interface{} { return nil } -var __0001_appDownSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x74\x8e\xcd\x0e\x82\x40\x0c\x84\xef\x3c\x05\xef\xc1\x49\x03\x07\x13\xa3\xc6\x78\xf0\xd6\xac\x4b\x85\x46\xd8\xae\x6d\xf1\xe7\xed\x4d\x4c\xfc\x59\x85\xeb\x37\x93\x6f\xa6\xdc\xae\x37\xf9\x6e\x36\x5f\x56\xb9\xa2\x19\x85\x46\x8b\xec\x0b\x3a\xef\x79\x08\x96\xc2\x83\xf0\x55\x51\xc6\x21\xb4\xa4\xc6\x72\x4f\xc2\xda\xc5\x98\xd6\x23\x4a\x4f\xaa\xc4\x21\xe5\x26\x2e\xe8\xf1\x4f\xde\xb1\x3f\x8d\x3f\x03\x63\x18\x89\x7b\x47\x9d\xa2\x5c\x7e\x4d\x1f\x0e\x82\xe7\x01\xd5\xa0\x71\xef\x6f\x8b\x55\x59\xed\xa7\x3a\xe0\x5b\x67\x40\x35\x50\x7d\x9b\x72\x1a\x47\xf2\x93\x8b\x4f\xc1\x4b\x29\x2e\x34\xa8\x45\xf6\x08\x00\x00\xff\xff\xef\x20\x3b\x16\x83\x01\x00\x00") +var __0001_appDownSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x74\xcd\xcb\xaa\xc2\x40\x0c\xc6\xf1\x7d\x9f\xa2\xef\xd1\xd5\x39\xb4\x0b\x41\x54\xc4\x85\xbb\x21\x4e\x63\x1b\x6c\x27\x63\x92\x7a\x79\x7b\x41\xf0\x32\xea\x6c\x7f\xf9\xf8\xa7\x5e\x2f\x57\xe5\xe6\xef\x7f\xde\x94\x8a\x66\x14\x3a\xad\x8a\x37\x04\xef\x79\x0a\x96\xe2\x4e\xf8\xac\x28\xbf\xd1\xf5\xa4\xc6\x72\x4d\x8e\x2d\xc4\x98\xce\x23\xca\x48\xaa\xc4\x21\x75\x13\x08\xba\xff\x8a\x0f\xec\x0f\x29\x8d\x40\x83\xa2\x9c\x3e\xa7\x2f\x77\x82\xc7\x09\xd5\x5c\x07\xcf\xe7\xb3\x45\xdd\x6c\x73\x1b\xe7\x7b\x30\x47\xad\xa3\xf6\x92\x6b\x1a\x47\xf2\xd9\x8f\xf7\xc0\x23\x29\x10\x3a\xd4\xaa\xb8\x05\x00\x00\xff\xff\xf6\xca\x86\xce\x64\x01\x00\x00") func _0001_appDownSqlBytes() ([]byte, error) { return bindataRead( @@ -90,12 +90,12 @@ func _0001_appDownSql() (*asset, error) { return nil, err } - info := bindataFileInfo{name: "0001_app.down.sql", size: 387, mode: os.FileMode(0644), modTime: time.Unix(1578669164, 0)} - a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xbc, 0x9c, 0xd2, 0xe1, 0x1d, 0x8, 0x34, 0x6a, 0xc8, 0x37, 0x13, 0xb3, 0x9f, 0x26, 0x23, 0x33, 0xd4, 0x25, 0x8, 0xed, 0x53, 0xe6, 0xd, 0x46, 0xc9, 0xf4, 0x24, 0xf8, 0x1, 0x1f, 0xf5, 0xc8}} + info := bindataFileInfo{name: "0001_app.down.sql", size: 356, mode: os.FileMode(0644), modTime: time.Unix(1579172955, 0)} + a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xb5, 0x25, 0xa0, 0xf8, 0x7d, 0x2d, 0xd, 0xcf, 0x18, 0xe4, 0x73, 0xc3, 0x95, 0xf5, 0x24, 0x20, 0xa9, 0xe6, 0x9e, 0x1d, 0x93, 0xe5, 0xc5, 0xad, 0x93, 0x8f, 0x5e, 0x40, 0xb5, 0x30, 0xaa, 0x25}} return a, nil } -var __0001_appUpSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xa4\x56\xc1\x92\xaa\x38\x14\xdd\xf3\x15\x59\xda\x55\x6c\x66\xfd\x56\xa8\xd1\xa6\xc6\x07\x33\x88\xd3\xfd\x56\xa9\x34\x44\x4c\x09\x24\x2f\x09\x6d\xfb\xf7\x53\x21\x09\xa0\x82\xb6\x33\x3b\x92\x7b\x73\x3c\xe7\xdc\xeb\x4d\x16\x09\x0c\x52\x08\xd2\x60\xbe\x81\x20\x5c\x81\x28\x4e\x01\x7c\x0f\xb7\xe9\x16\x48\xa2\x14\xad\x0b\x09\x66\x9e\x3a\x73\x02\xfe\x09\x92\xc5\x6b\x90\x80\xbf\x92\xf0\x67\x90\xfc\x02\x7f\xc2\x5f\xbe\xf7\x89\xcb\x86\x80\xf9\x26\x9e\x7b\x2f\xe0\x2d\x4c\x5f\xe3\x5d\x0a\x92\xf8\x2d\x5c\xfe\xf0\xbc\x3b\xe0\x38\xcb\x58\x53\x2b\x0d\x8e\xf3\x5c\x10\x29\xc7\xf1\x4f\xb8\x2c\x89\x02\xf3\x38\xde\xc0\x20\xf2\xbd\xec\x80\x07\xab\x96\x57\x0a\xdf\x53\xdf\x93\x8a\x09\x5c\xb8\x15\x6f\x3e\x8e\xe4\xdc\xf2\xf2\x3d\x8e\xd5\xc1\xee\xd7\xb8\x72\x29\x19\x2b\x99\x70\xdf\x82\x60\x45\x72\x84\x15\x58\x06\x29\x4c\xc3\x9f\xb0\x25\x1b\xed\x36\x1b\xdf\x6b\x78\x3e\x19\x9d\x56\xbd\x8b\xc2\xbf\x77\x10\x84\xd1\x12\xbe\x83\xa6\xa6\xbf\x1b\x82\x8c\x1a\xe4\x14\xc7\xd1\xc0\x07\x13\x7b\x01\x6f\xaf\x30\x81\xdd\xf2\xc7\x3d\x38\x6d\xc6\x38\x98\x8e\x74\x50\xed\xa2\x03\x32\x08\xbd\x62\x64\x4f\x5d\x01\x74\xf1\x1e\xa6\xdf\xba\x5f\xdb\x0f\xc1\x4e\x92\x08\x5d\x5b\x9a\xb7\x0e\x5f\xd6\xb4\x2b\xc2\xc0\x63\x45\x2b\x22\x15\xae\x38\xd8\x6d\xd7\xe1\x3a\x82\x4b\x30\x0f\xd7\x61\x94\xfa\x5e\x8e\x39\x77\x25\x07\x4b\xb8\x0a\x76\x9b\x14\xec\x71\x29\x89\xef\x1d\xa8\xae\xfb\x39\xac\x73\xf2\x05\x76\xd1\xd6\x9c\x0c\xa3\xf4\xb9\x6e\x74\x8c\x91\xc5\x03\x33\xcf\x6e\x21\xa7\xa0\xa7\xea\x72\x4c\xeb\xac\xe2\x04\x86\xeb\x48\x2b\x9b\xf5\x67\x5e\x40\x02\x57\x30\x81\xd1\x02\xf6\xe8\x33\xbd\x1f\x6b\x0d\x1b\x98\x42\xb0\x08\xb6\x8b\x60\x09\xbd\x07\x6e\x6a\xf9\xda\xca\xde\xb5\x81\x99\xcf\xc9\xe4\x44\x54\x54\x4a\xca\x6a\x0d\xa8\x81\xd1\x58\x2d\xfa\xb4\xeb\xc8\x50\x6c\x77\xfc\x42\x6b\xcb\x76\x66\xb6\xc7\xa5\xde\x23\xa8\x04\xae\xe5\xde\xb4\x4e\x4d\xd4\x89\x89\xa3\x2e\x40\x57\x58\xd3\x12\xc3\x5a\x60\x79\xe8\x06\x47\xbf\x7d\x3d\x52\xfa\xc8\x47\x79\x44\x13\x87\xd4\x97\x9d\x17\x92\xd4\x39\x11\x23\x19\x82\x64\x84\x72\x65\xd3\x4a\x56\xd8\xaf\x8b\xf1\x38\xee\x56\xaf\xc6\x77\x14\x2e\x7b\xa4\x64\xd9\x51\x0e\xd3\x4c\xca\x8d\x87\xbe\xb7\x88\xa3\x6d\x9a\x04\xda\x08\x3b\x07\x9c\x6d\x88\x13\xe1\xe6\x41\xfb\x6d\xe1\xdc\xf0\x98\x69\x4c\xdf\x26\xf8\xfd\x6f\xbd\x3c\xea\x41\xc3\xee\x7f\x16\xa5\x6e\xaa\x0f\x22\x6e\xd3\x07\x7f\xfd\x69\x48\x82\xf3\x76\x06\x74\x03\x60\x15\x6c\xb6\xa3\x66\xb4\x5c\x47\xd5\x5f\x9b\x3b\x79\xd8\x30\x7d\x84\x61\xb2\x1e\x7a\xe7\x66\x2a\x52\x0c\x3d\xe7\xe3\xfd\x2e\x9e\xb2\x53\x9e\xeb\x0c\xb4\x83\xf3\x4e\xff\x59\xee\xf7\x3b\xd0\x25\x7d\xab\x07\x2b\xcc\x39\xad\x0b\xb4\x67\xc2\xdd\x28\x9d\xe2\x51\x27\x5d\x1b\xf6\x74\x9e\xe9\xc8\x0a\xd3\x52\x12\xf1\x69\x66\x05\x00\x00\xd0\x7c\xfc\x05\xa1\x63\xed\x94\xbb\xb5\x51\x87\xa6\x4d\xd6\x51\x8e\xa5\x3c\x31\xd1\x41\x9b\xdd\x7d\x49\x88\xba\x39\xf1\xdc\x2c\xee\x05\x20\x41\x7e\x37\x44\x2a\x54\x60\xee\xc4\x14\x98\xa3\xbd\x60\xd5\xc5\x9d\x06\xd7\xf0\x9a\x9f\xce\x53\xec\x51\xd6\xe8\x2d\xac\x03\xed\x03\xe2\xfa\x86\x9b\xd6\x61\x9e\x0e\x13\xcc\x91\x05\x43\x34\xff\xd2\x2d\x33\x29\xd0\xe6\x7d\xbb\xc0\x48\x31\x4e\x33\xe7\x4c\xbb\x98\xae\xb4\x05\x97\x97\x05\x2b\xb1\x54\x8e\x45\xe7\x91\x9b\x24\x7f\x98\x9c\x9c\xca\x8c\x7d\x12\x71\xbe\x79\x6b\xd8\x51\xd3\x36\x12\x29\x98\xa2\xfa\x19\x34\x9e\xf5\x9f\x7b\xa0\xe5\xed\x7c\x12\xb8\x2e\x88\x13\xec\x6a\x34\x29\xb9\x64\x27\xd2\xcb\x33\x6d\x63\x35\x9a\x84\x03\x2d\x0e\xc3\x0c\xc5\x5c\xfc\x96\xee\xbf\x01\x00\x00\xff\xff\x5a\xea\xe5\xa6\x10\x0c\x00\x00") +var __0001_appUpSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xac\x56\x4f\x73\xa2\x30\x14\xbf\xf3\x29\x72\xd4\x19\x2e\x7b\xee\x09\x35\x5a\x66\x29\xec\x22\x6e\xdb\x53\x26\x42\xc4\x8c\x40\xd2\x24\xd4\xfa\xed\x77\x02\x04\x50\x41\xeb\xce\xde\x4c\xde\xcb\xe3\xf7\xe7\xe5\xc5\x79\x08\x9d\x08\x82\xc8\x99\x79\x10\xb8\x4b\xe0\x07\x11\x80\x6f\xee\x3a\x5a\x03\x49\x94\xa2\x45\x2a\xc1\xc4\x52\x27\x4e\xc0\x1f\x27\x9c\x3f\x3b\x21\xf8\x15\xba\x2f\x4e\xf8\x0e\x7e\xc2\x77\xdb\xfa\xc4\x59\x49\xc0\xcc\x0b\x66\xd6\x14\xbc\xba\xd1\x73\xb0\x89\x40\x18\xbc\xba\x8b\x27\xcb\xba\x51\x1c\xc7\x31\x2b\x0b\xa5\x8b\xe3\x24\x11\x44\xca\xe1\xfa\x47\x9c\x65\x44\x81\x59\x10\x78\xd0\xf1\x6d\x2b\xde\xe3\xde\xaa\xc2\x15\xc1\xb7\xc8\xb6\xa4\x62\x02\xa7\x66\xc5\xcb\xed\x81\x9c\x2a\x5c\xb6\xc5\xb1\xda\x37\xfb\x05\xce\x4d\x4a\xcc\x32\x26\xcc\x6f\x41\xb0\x22\x09\xc2\x0a\x2c\x9c\x08\x46\xee\x0b\xac\xc0\xfa\x1b\xcf\xb3\xad\x92\x27\xa3\xd1\x71\xd6\x1b\xdf\xfd\xbd\x81\xc0\xf5\x17\xf0\x0d\x94\x05\xfd\x28\x09\xaa\xd9\x20\xc3\x38\xf0\x7b\x3a\xd4\xb1\x29\x78\x7d\x86\x21\x6c\x97\x4f\xb7\xca\x69\x31\x86\x8b\xe9\x48\x5b\xaa\x5a\xb4\x85\xea\x0a\x1d\x63\xd4\x9c\xba\x28\xd0\xc6\xbb\x32\xdd\xd6\x6d\x6f\xb7\x82\x1d\x25\x11\xda\x5b\x9a\x54\x0a\x9f\x7b\xda\x9a\xd0\xd3\x58\xd1\x9c\x48\x85\x73\x0e\x36\xeb\x95\xbb\xf2\xe1\x02\xcc\xdc\x95\xeb\x47\xb6\x95\x60\xce\x8d\xe5\x60\x01\x97\xce\xc6\x8b\xc0\x0e\x67\x92\xd8\xd6\x9e\x6a\xdf\x4f\x6e\x91\x90\x2f\xb0\xf1\xd7\xf5\x49\xd7\x8f\x1e\xeb\x46\x83\x18\x35\xf5\xc0\xc4\x6a\xb6\x90\x61\xd0\x41\x35\x39\x75\xeb\x2c\x83\x10\xba\x2b\x5f\x33\x9b\x74\x67\xa6\x20\x84\x4b\x18\x42\x7f\x0e\xbb\xea\x13\xbd\x1f\x68\x0e\x1e\x8c\x20\x98\x3b\xeb\xb9\xb3\x80\xd6\x1d\x35\x35\x7d\x2d\x65\xa7\x5a\x4f\xcc\xc7\x68\x72\x22\x72\x2a\x25\x65\x85\x2e\xa8\x0b\xa3\x21\x2f\xba\xb4\xcb\x48\x9f\x6c\x7b\xfc\x8c\x6b\x85\x76\x52\x6f\x0f\x53\xbd\x05\x50\x09\x5c\xc8\x5d\xdd\x3a\x05\x51\x47\x26\x0e\xda\x80\xd6\xd8\xba\x25\xfa\x5e\x60\xb9\x6f\x07\x47\xb7\x7d\x39\x52\xba\xc8\x36\x3b\xa0\x91\x43\xea\xab\x99\x17\x92\x14\x09\x11\x26\xc3\xb6\x04\x89\x09\xe5\xaa\x89\x66\x2c\x6d\x7e\x9d\x4d\xc5\xf3\x4f\x14\x65\xbe\x25\xe2\x1a\x6f\xaf\xcd\x47\x39\x65\x0c\x27\x24\xa9\x3a\xbe\x6d\xf7\x1f\xe7\xda\x77\xda\xd8\x0d\x55\xdb\x10\x3b\xef\xbc\x8c\xc5\x07\x79\x3b\xfd\xca\x25\xdb\x9a\x07\xfe\x3a\x0a\x1d\x0d\xab\x99\x34\xc6\x18\xc4\x89\x30\x13\xa7\xfa\xdd\x94\x36\xe3\x69\xa2\x6b\xb6\x1f\xe9\xbe\x3b\xbd\xd7\xe5\x35\xd2\xef\xda\x7e\xdb\xdf\x31\xf1\x5b\xef\xbf\x25\xf9\xd2\xf1\xd6\x83\x5a\xe4\x98\x73\x5a\xa4\x68\xc7\x84\x99\x9d\x48\x31\x54\x31\x18\xd4\xe4\x52\xf3\xc7\x75\x41\x02\x17\x29\xf9\x4f\xf2\xec\x04\xcb\x87\xc5\x51\xec\x72\xff\x1e\xbc\x1c\xd3\x4c\x12\xf1\x59\x5f\x59\x00\x00\xa0\xc9\xf0\x43\xae\x63\xd5\xb0\xb9\x06\xa5\x43\xe3\x90\x75\x94\x63\x29\x8f\x4c\x24\xdd\x9d\xd4\xbb\xbb\x8c\x10\x75\x75\xe2\xb1\x91\xd8\x11\x40\x82\x7c\x94\x44\x2a\x94\x62\x6e\xc8\xa4\x98\xd7\x72\xf5\x9f\x16\xb8\x82\x97\xf8\x74\x9e\x62\xf7\xb2\x06\x1f\x43\x1d\xa8\xde\xf1\xcb\x87\x66\x9c\x47\xfd\x82\x8f\x20\x47\x4d\x31\x44\x93\x2f\x7d\xb7\x47\x09\x36\x79\xdf\x36\x18\x29\xc6\x69\x6c\x94\xa9\x16\xe3\x4e\x37\xc5\xe5\xb9\x61\x19\x96\xca\xa0\x68\x35\xea\x8d\x38\x9d\x93\x50\x19\xb3\x4f\x22\x4e\x57\x4f\x7e\x73\x21\xab\x46\x22\x29\x53\x54\xff\x1b\x19\xce\xfa\xe7\x1e\xa8\x70\x1b\x9d\xda\x4b\xd7\xf7\x68\x94\x72\xc6\x8e\xa4\xa3\x57\xb7\x4d\xc3\xb1\x4e\xd8\xd3\x74\xdf\xcf\x50\xcc\xc4\xaf\xe1\xfe\x0d\x00\x00\xff\xff\xe8\x42\x77\x9b\x97\x0b\x00\x00") func _0001_appUpSqlBytes() ([]byte, error) { return bindataRead( @@ -110,8 +110,8 @@ func _0001_appUpSql() (*asset, error) { return nil, err } - info := bindataFileInfo{name: "0001_app.up.sql", size: 3088, mode: os.FileMode(0644), modTime: time.Unix(1578669164, 0)} - a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x93, 0xb8, 0x68, 0x17, 0x49, 0x51, 0xc0, 0xe8, 0xbc, 0x36, 0xa4, 0x29, 0xc9, 0x93, 0x6c, 0x3e, 0xdf, 0x3d, 0x23, 0x22, 0xab, 0x18, 0x49, 0xbd, 0x6, 0xf, 0xc5, 0xec, 0xf8, 0xcf, 0x1b, 0x6a}} + info := bindataFileInfo{name: "0001_app.up.sql", size: 2967, mode: os.FileMode(0644), modTime: time.Unix(1579172940, 0)} + a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xf7, 0x3a, 0xa7, 0xf2, 0x8f, 0xfa, 0x82, 0x7c, 0xc5, 0x49, 0xac, 0xac, 0xf, 0xc, 0x77, 0xe2, 0xba, 0xe8, 0x4d, 0xe, 0x6f, 0x5d, 0x2c, 0x2c, 0x18, 0x80, 0xc2, 0x1d, 0xe, 0x25, 0xe, 0x18}} return a, nil } @@ -130,7 +130,7 @@ func _0002_tokensDownSql() (*asset, error) { return nil, err } - info := bindataFileInfo{name: "0002_tokens.down.sql", size: 19, mode: os.FileMode(0644), modTime: time.Unix(1578669164, 0)} + info := bindataFileInfo{name: "0002_tokens.down.sql", size: 19, mode: os.FileMode(0644), modTime: time.Unix(1576159007, 0)} a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xd1, 0x31, 0x2, 0xcc, 0x2f, 0x38, 0x90, 0xf7, 0x58, 0x37, 0x47, 0xf4, 0x18, 0xf7, 0x72, 0x74, 0x67, 0x14, 0x7e, 0xf3, 0xb1, 0xd6, 0x5f, 0xb0, 0xd5, 0xe7, 0x91, 0xf4, 0x26, 0x77, 0x8e, 0x68}} return a, nil } @@ -150,7 +150,7 @@ func _0002_tokensUpSql() (*asset, error) { return nil, err } - info := bindataFileInfo{name: "0002_tokens.up.sql", size: 248, mode: os.FileMode(0644), modTime: time.Unix(1578669164, 0)} + info := bindataFileInfo{name: "0002_tokens.up.sql", size: 248, mode: os.FileMode(0644), modTime: time.Unix(1576159007, 0)} a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xcc, 0xd6, 0xde, 0xd3, 0x7b, 0xee, 0x92, 0x11, 0x38, 0xa4, 0xeb, 0x84, 0xca, 0xcb, 0x37, 0x75, 0x5, 0x77, 0x7f, 0x14, 0x39, 0xee, 0xa1, 0x8b, 0xd4, 0x5c, 0x6e, 0x55, 0x6, 0x50, 0x16, 0xd4}} return a, nil } @@ -170,7 +170,7 @@ func _0003_settingsDownSql() (*asset, error) { return nil, err } - info := bindataFileInfo{name: "0003_settings.down.sql", size: 118, mode: os.FileMode(0644), modTime: time.Unix(1578669164, 0)} + info := bindataFileInfo{name: "0003_settings.down.sql", size: 118, mode: os.FileMode(0644), modTime: time.Unix(1578048597, 0)} a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xe5, 0xa6, 0xf5, 0xc0, 0x60, 0x64, 0x77, 0xe2, 0xe7, 0x3c, 0x9b, 0xb1, 0x52, 0xa9, 0x95, 0x16, 0xf8, 0x60, 0x2f, 0xa5, 0xeb, 0x46, 0xb9, 0xb9, 0x8f, 0x4c, 0xf4, 0xfd, 0xbb, 0xe7, 0xe5, 0xe5}} return a, nil } @@ -190,7 +190,7 @@ func _0003_settingsUpSql() (*asset, error) { return nil, err } - info := bindataFileInfo{name: "0003_settings.up.sql", size: 1311, mode: os.FileMode(0644), modTime: time.Unix(1578669164, 0)} + info := bindataFileInfo{name: "0003_settings.up.sql", size: 1311, mode: os.FileMode(0644), modTime: time.Unix(1578048597, 0)} a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xea, 0x35, 0x0, 0xeb, 0xe2, 0x33, 0x68, 0xb9, 0xf4, 0xf6, 0x8e, 0x9e, 0x10, 0xe9, 0x58, 0x68, 0x28, 0xb, 0xcd, 0xec, 0x74, 0x71, 0xa7, 0x9a, 0x5a, 0x77, 0x59, 0xb1, 0x13, 0x1c, 0xa1, 0x5b}} return a, nil } @@ -210,7 +210,7 @@ func docGo() (*asset, error) { return nil, err } - info := bindataFileInfo{name: "doc.go", size: 74, mode: os.FileMode(0644), modTime: time.Unix(1578669164, 0)} + info := bindataFileInfo{name: "doc.go", size: 74, mode: os.FileMode(0644), modTime: time.Unix(1573216280, 0)} a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xde, 0x7c, 0x28, 0xcd, 0x47, 0xf2, 0xfa, 0x7c, 0x51, 0x2d, 0xd8, 0x38, 0xb, 0xb0, 0x34, 0x9d, 0x4c, 0x62, 0xa, 0x9e, 0x28, 0xc3, 0x31, 0x23, 0xd9, 0xbb, 0x89, 0x9f, 0xa0, 0x89, 0x1f, 0xe8}} return a, nil } diff --git a/appdatabase/migrations/sql/0001_app.down.sql b/appdatabase/migrations/sql/0001_app.down.sql index 9f7a6cc80..e439e73ee 100644 --- a/appdatabase/migrations/sql/0001_app.down.sql +++ b/appdatabase/migrations/sql/0001_app.down.sql @@ -6,7 +6,6 @@ DROP TABLE dapps; DROP TABLE permissions; DROP TABLE transfers; DROP TABLE blocks; -DROP TABLE accounts_to_blocks; DROP TABLE mailservers; DROP TABLE mailserver_request_gaps; DROP INDEX mailserver_request_gaps_chat_id_idx; diff --git a/appdatabase/migrations/sql/0001_app.up.sql b/appdatabase/migrations/sql/0001_app.up.sql index 7e2603a0b..3c50116da 100644 --- a/appdatabase/migrations/sql/0001_app.up.sql +++ b/appdatabase/migrations/sql/0001_app.up.sql @@ -52,31 +52,31 @@ hash VARCHAR NOT NULL, address VARCHAR NOT NULL, blk_hash VARCHAR NOT NULL, tx BLOB, -sender VARCHAR NOT NULL, +sender VARCHAR, receipt BLOB, log BLOB, type VARCHAR NOT NULL, -FOREIGN KEY(network_id,blk_hash) REFERENCES blocks(network_id,hash) ON DELETE CASCADE, +blk_number BIGINT NOT NULL, +timestamp UNSIGNED BIGINT NOT NULL, +loaded BOOL DEFAULT 1, +FOREIGN KEY(network_id,address,blk_hash) REFERENCES blocks(network_id,address,blk_hash) ON DELETE CASCADE, CONSTRAINT unique_transfer_per_address_per_network UNIQUE (hash,address,network_id) ); CREATE TABLE IF NOT EXISTS blocks ( network_id UNSIGNED BIGINT NOT NULL, -hash VARCHAR NOT NULL, -number BIGINT NOT NULL, -timestamp UNSIGNED BIGINT NOT NULL, -head BOOL DEFAULT FALSE, -CONSTRAINT unique_block_per_network UNIQUE (network_id,hash) -CONSTRAINT unique_block_number_per_network UNIQUE (network_id,number) -); - -CREATE TABLE IF NOT EXISTS accounts_to_blocks ( -network_id UNSIGNED BIGINT NOT NULL, address VARCHAR NOT NULL, blk_number BIGINT NOT NULL, -sync INT, -FOREIGN KEY(network_id,blk_number) REFERENCES blocks(network_id,number) ON DELETE CASCADE, -CONSTRAINT unique_mapping_for_account_to_block_per_network UNIQUE (address,blk_number,network_id) +blk_hash BIGINT NOT NULL, +loaded BOOL DEFAULT FALSE, +CONSTRAINT unique_mapping_for_account_to_block_per_network UNIQUE (address,blk_hash,network_id) +); + +CREATE TABLE IF NOT EXISTS blocks_ranges ( +network_id UNSIGNED BIGINT NOT NULL, +address VARCHAR NOT NULL, +blk_from BIGINT NOT NULL, +blk_to BIGINT NOT NULL ); CREATE TABLE IF NOT EXISTS mailservers ( diff --git a/multiaccounts/accounts/database.go b/multiaccounts/accounts/database.go index 48739607e..8933e3392 100644 --- a/multiaccounts/accounts/database.go +++ b/multiaccounts/accounts/database.go @@ -400,6 +400,23 @@ func (db *Database) GetWalletAddress() (rst types.Address, err error) { return } +func (db *Database) GetWalletAddresses() (rst []types.Address, err error) { + rows, err := db.db.Query("SELECT address FROM accounts WHERE chat = 0 ORDER BY created_at") + if err != nil { + return nil, err + } + defer rows.Close() + for rows.Next() { + addr := types.Address{} + err = rows.Scan(&addr) + if err != nil { + return nil, err + } + rst = append(rst, addr) + } + return rst, nil +} + func (db *Database) GetChatAddress() (rst types.Address, err error) { err = db.db.QueryRow("SELECT address FROM accounts WHERE chat = 1").Scan(&rst) return diff --git a/services/accounts/accounts.go b/services/accounts/accounts.go index 2cc7739fc..a863f78f0 100644 --- a/services/accounts/accounts.go +++ b/services/accounts/accounts.go @@ -5,6 +5,7 @@ import ( "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/log" "github.com/status-im/status-go/eth-node/types" "github.com/status-im/status-go/multiaccounts/accounts" ) @@ -20,6 +21,7 @@ type API struct { } func (api *API) SaveAccounts(ctx context.Context, accounts []accounts.Account) error { + log.Info("[AccountsAPI::SaveAccounts]") err := api.db.SaveAccounts(accounts) if err != nil { return err diff --git a/services/wallet/api.go b/services/wallet/api.go index 4cb5020b3..95b1ae12b 100644 --- a/services/wallet/api.go +++ b/services/wallet/api.go @@ -43,20 +43,112 @@ func (api *API) GetTransfers(ctx context.Context, start, end *hexutil.Big) ([]Tr return castToTransferViews(rst), nil } -// GetTransfersByAddress returns transfers for a single address between two blocks. -func (api *API) GetTransfersByAddress(ctx context.Context, address common.Address, start, end *hexutil.Big) ([]TransferView, error) { - log.Debug("call to get transfers for an address", "address", address, "start", start, "end", end) - if start == nil { - return nil, errors.New("start of the query must be provided. use 0 if you want to load all transfers") - } +type StatsView struct { + BlocksStats map[int64]int64 `json:"blocksStats"` + TransfersCount int64 `json:"transfersCount"` +} + +// GetTransfersFromBlock +func (api *API) GetTransfersFromBlock(ctx context.Context, address common.Address, block *hexutil.Big) ([]TransferView, error) { + log.Debug("[WalletAPI:: GetTransfersFromBlock] get transfers from block", "address", address, "block", block) if api.s.db == nil { return nil, ErrServiceNotInitialized } - rst, err := api.s.db.GetTransfersByAddress(address, (*big.Int)(start), (*big.Int)(end)) + + blocksByAddress := make(map[common.Address][]*big.Int) + blocksByAddress[address] = []*big.Int{block.ToInt()} + + txCommand := &loadTransfersCommand{ + accounts: []common.Address{address}, + db: api.s.db, + chain: api.s.reactor.chain, + client: api.s.client, + blocksByAddress: blocksByAddress, + } + + err := txCommand.Command()(ctx) if err != nil { return nil, err } - log.Debug("result from database for address", "address", address, "start", start, "end", end, "len", len(rst)) + + rst, err := api.s.db.GetTransfersInRange(address, block.ToInt(), block.ToInt()) + if err != nil { + return nil, err + } + + return castToTransferViews(rst), nil +} + +// GetTransfersByAddress returns transfers for a single address +func (api *API) GetTransfersByAddress(ctx context.Context, address common.Address, beforeBlock, limit *hexutil.Big) ([]TransferView, error) { + log.Info("call to get transfers for an address", "address", address, "block", beforeBlock, "limit", limit) + if api.s.db == nil { + return nil, ErrServiceNotInitialized + } + rst, err := api.s.db.GetTransfersByAddress(address, beforeBlock.ToInt(), limit.ToInt().Int64()) + if err != nil { + return nil, err + } + + transfersCount := big.NewInt(int64(len(rst))) + if limit.ToInt().Cmp(transfersCount) == 1 { + block, err := api.s.db.GetFirstKnownBlock(address) + if err != nil { + return nil, err + } + + if block == nil { + return castToTransferViews(rst), nil + } + + from, err := findFirstRange(ctx, address, block, api.s.client) + if err != nil { + return nil, err + } + fromByAddress := map[common.Address]*big.Int{address: from} + toByAddress := map[common.Address]*big.Int{address: block} + + balanceCache := newBalanceCache() + blocksCommand := &findAndCheckBlockRangeCommand{ + accounts: []common.Address{address}, + db: api.s.db, + chain: api.s.reactor.chain, + client: api.s.client, + balanceCache: balanceCache, + feed: api.s.feed, + fromByAddress: fromByAddress, + toByAddress: toByAddress, + } + + if err = blocksCommand.Command()(ctx); err != nil { + return nil, err + } + + blocks, err := api.s.db.GetBlocksByAddress(address, numberOfBlocksCheckedPerIteration) + if err != nil { + return nil, err + } + + log.Info("checking blocks again", "blocks", len(blocks)) + if len(blocks) > 0 { + txCommand := &loadTransfersCommand{ + accounts: []common.Address{address}, + db: api.s.db, + chain: api.s.reactor.chain, + client: api.s.client, + } + + err = txCommand.Command()(ctx) + if err != nil { + return nil, err + } + rst, err = api.s.db.GetTransfersByAddress(address, beforeBlock.ToInt(), limit.ToInt().Int64()) + if err != nil { + return nil, err + } + } + } + return castToTransferViews(rst), nil } diff --git a/services/wallet/balance_cache.go b/services/wallet/balance_cache.go new file mode 100644 index 000000000..33da28246 --- /dev/null +++ b/services/wallet/balance_cache.go @@ -0,0 +1,94 @@ +package wallet + +import ( + "context" + "math/big" + "sync" + + "github.com/ethereum/go-ethereum/common" +) + +type balanceCache struct { + // cache maps an address to a map of a block number and the balance of this particular address + cache map[common.Address]map[*big.Int]*big.Int + requestCounter map[common.Address]uint + cacheHitsCounter map[common.Address]uint + rw sync.RWMutex +} + +type BalanceCache interface { + BalanceAt(ctx context.Context, client BalanceReader, account common.Address, blockNumber *big.Int) (*big.Int, error) +} + +func (b *balanceCache) readCachedBalance(account common.Address, blockNumber *big.Int) *big.Int { + b.rw.RLock() + defer b.rw.RUnlock() + + return b.cache[account][blockNumber] +} + +func (b *balanceCache) addBalanceToCache(account common.Address, blockNumber *big.Int, balance *big.Int) { + b.rw.Lock() + defer b.rw.Unlock() + + _, exists := b.cache[account] + if !exists { + b.cache[account] = make(map[*big.Int]*big.Int) + } + b.cache[account][blockNumber] = balance +} + +func (b *balanceCache) incRequestsNumber(account common.Address) { + b.rw.Lock() + defer b.rw.Unlock() + + cnt, ok := b.requestCounter[account] + if !ok { + b.requestCounter[account] = 1 + } + + b.requestCounter[account] = cnt + 1 +} + +func (b *balanceCache) incCacheHitNumber(account common.Address) { + b.rw.Lock() + defer b.rw.Unlock() + + cnt, ok := b.cacheHitsCounter[account] + if !ok { + b.cacheHitsCounter[account] = 1 + } + + b.cacheHitsCounter[account] = cnt + 1 +} + +func (b *balanceCache) getStats(account common.Address) (uint, uint) { + b.rw.RLock() + defer b.rw.RUnlock() + + return b.requestCounter[account], b.cacheHitsCounter[account] +} + +func (b *balanceCache) BalanceAt(ctx context.Context, client BalanceReader, account common.Address, blockNumber *big.Int) (*big.Int, error) { + b.incRequestsNumber(account) + cachedBalance := b.readCachedBalance(account, blockNumber) + if cachedBalance != nil { + b.incCacheHitNumber(account) + return cachedBalance, nil + } + balance, err := client.BalanceAt(ctx, account, blockNumber) + if err != nil { + return nil, err + } + b.addBalanceToCache(account, blockNumber, balance) + + return balance, nil +} + +func newBalanceCache() *balanceCache { + return &balanceCache{ + cache: make(map[common.Address]map[*big.Int]*big.Int), + requestCounter: make(map[common.Address]uint), + cacheHitsCounter: make(map[common.Address]uint), + } +} diff --git a/services/wallet/commands.go b/services/wallet/commands.go index 9b244aae5..e1842cfc6 100644 --- a/services/wallet/commands.go +++ b/services/wallet/commands.go @@ -13,14 +13,18 @@ import ( "github.com/ethereum/go-ethereum/log" ) -type ethHistoricalCommand struct { - db *Database - eth TransferDownloader - address common.Address - client reactorClient - feed *event.Feed +var numberOfBlocksCheckedPerIteration = 40 - from, to *big.Int +type ethHistoricalCommand struct { + db *Database + eth TransferDownloader + address common.Address + client reactorClient + balanceCache *balanceCache + feed *event.Feed + foundHeaders []*DBHeader + + from, to, resultingFrom *big.Int } func (c *ethHistoricalCommand) Command() Command { @@ -31,47 +35,26 @@ func (c *ethHistoricalCommand) Command() Command { } func (c *ethHistoricalCommand) Run(ctx context.Context) (err error) { - if c.from == nil { - from, err := c.db.GetLatestSynced(c.address, ethSync) - if err != nil { - return err - } - if from == nil { - c.from = zero - } else { - c.from = from.Number - } - log.Debug("initialized downloader for eth historical transfers", "address", c.address, "starting at", c.from, "up to", c.to) - } - ctx, cancel := context.WithTimeout(ctx, 10*time.Minute) - defer cancel() - concurrent := NewConcurrentDownloader(ctx) start := time.Now() - downloadEthConcurrently(concurrent, c.client, c.eth, c.address, c.from, c.to) - select { - case <-concurrent.WaitAsync(): - case <-ctx.Done(): - log.Error("eth downloader is stuck") - return errors.New("eth downloader is stuck") - } - if concurrent.Error() != nil { - log.Error("failed to dowload transfers using concurrent downloader", "error", err) - return concurrent.Error() - } - transfers := concurrent.Get() - log.Info("eth historical downloader finished successfully", "total transfers", len(transfers), "time", time.Since(start)) - err = c.db.ProcessTranfers(transfers, []common.Address{c.address}, headersFromTransfers(transfers), nil, ethSync) + totalRequests, cacheHits := c.balanceCache.getStats(c.address) + log.Info("balance cache before checking range", "total", totalRequests, "cached", totalRequests-cacheHits) + from, headers, err := findBlocksWithEthTransfers(ctx, c.client, c.balanceCache, c.eth, c.address, c.from, c.to) + if err != nil { - log.Error("failed to save downloaded erc20 transfers", "error", err) return err } - if len(transfers) > 0 { - // we download all or nothing - c.feed.Send(Event{ - Type: EventNewHistory, - BlockNumber: c.from, - Accounts: []common.Address{c.address}, - }) + + c.foundHeaders = headers + c.resultingFrom = from + + log.Info("eth historical downloader finished successfully", "address", c.address, "from", from, "to", c.to, "total blocks", len(headers), "time", time.Since(start)) + totalRequests, cacheHits = c.balanceCache.getStats(c.address) + log.Info("balance cache after checking range", "total", totalRequests, "cached", totalRequests-cacheHits) + + //err = c.db.ProcessBlocks(c.address, from, c.to, headers, ethTransfer) + if err != nil { + log.Error("failed to save found blocks with transfers", "error", err) + return err } log.Debug("eth transfers were persisted. command is closed") return nil @@ -84,8 +67,10 @@ type erc20HistoricalCommand struct { client reactorClient feed *event.Feed - iterator *IterativeDownloader - to *DBHeader + iterator *IterativeDownloader + to *big.Int + from *big.Int + foundHeaders []*DBHeader } func (c *erc20HistoricalCommand) Command() Command { @@ -96,40 +81,32 @@ func (c *erc20HistoricalCommand) Command() Command { } func (c *erc20HistoricalCommand) Run(ctx context.Context) (err error) { + start := time.Now() if c.iterator == nil { c.iterator, err = SetupIterativeDownloader( - c.db, c.client, c.address, erc20Sync, - c.erc20, erc20BatchSize, c.to) + c.db, c.client, c.address, + c.erc20, erc20BatchSize, c.to, c.from) if err != nil { log.Error("failed to setup historical downloader for erc20") return err } } for !c.iterator.Finished() { - start := time.Now() - transfers, err := c.iterator.Next(ctx) + headers, _, _, err := c.iterator.Next(ctx) if err != nil { log.Error("failed to get next batch", "error", err) return err } - headers := headersFromTransfers(transfers) - headers = append(headers, c.iterator.Header()) - err = c.db.ProcessTranfers(transfers, []common.Address{c.address}, headers, nil, erc20Sync) + c.foundHeaders = append(c.foundHeaders, headers...) + + /*err = c.db.ProcessBlocks(c.address, from, to, headers, erc20Transfer) if err != nil { c.iterator.Revert() - log.Error("failed to save downloaded erc20 transfers", "error", err) + log.Error("failed to save downloaded erc20 blocks with transfers", "error", err) return err - } - if len(transfers) > 0 { - log.Debug("erc20 downloader imported transfers", "len", len(transfers), "time", time.Since(start)) - c.feed.Send(Event{ - Type: EventNewHistory, - BlockNumber: c.iterator.Header().Number, - Accounts: []common.Address{c.address}, - }) - } + }*/ } - log.Info("wallet historical downloader for erc20 transfers finished") + log.Info("wallet historical downloader for erc20 transfers finished", "in", time.Since(start)) return nil } @@ -142,7 +119,7 @@ type newBlocksTransfersCommand struct { client reactorClient feed *event.Feed - from, to *DBHeader + initialFrom, from, to *DBHeader } func (c *newBlocksTransfersCommand) Command() Command { @@ -173,6 +150,53 @@ func (c *newBlocksTransfersCommand) Verify(parent context.Context) (err error) { return nil } +func (c *newBlocksTransfersCommand) getAllTransfers(parent context.Context, from, to uint64) ([]Transfer, map[common.Address][]*DBHeader, error) { + newHeadersByAddress := map[common.Address][]*DBHeader{} + all := []Transfer{} + for n := from; n <= to; n++ { + ctx, cancel := context.WithTimeout(parent, 10*time.Second) + header, err := c.client.HeaderByNumber(ctx, big.NewInt(int64(n))) + cancel() + if err != nil { + return nil, nil, err + } + dbHeader := toDBHeader(header) + log.Info("reactor get transfers", "block", dbHeader.Hash, "number", dbHeader.Number) + transfers, err := c.getTransfers(parent, dbHeader) + if err != nil { + log.Error("failed to get transfers", "header", dbHeader.Hash, "error", err) + return nil, nil, err + } + if len(transfers) > 0 { + for _, transfer := range transfers { + headers, ok := newHeadersByAddress[transfer.Address] + if !ok { + headers = []*DBHeader{} + } + newHeadersByAddress[transfer.Address] = append(headers, dbHeader) + } + } + all = append(all, transfers...) + } + + return all, newHeadersByAddress, nil +} + +func (c *newBlocksTransfersCommand) saveHeaders(parent context.Context, newHeadersByAddress map[common.Address][]*DBHeader) (err error) { + for _, address := range c.accounts { + headers, ok := newHeadersByAddress[address] + if ok { + err = c.db.SaveBlocks(address, headers) + if err != nil { + log.Error("failed to persist blocks", "error", err) + return err + } + } + } + + return nil +} + func (c *newBlocksTransfersCommand) Run(parent context.Context) (err error) { if c.from == nil { ctx, cancel := context.WithTimeout(parent, 3*time.Second) @@ -183,7 +207,6 @@ func (c *newBlocksTransfersCommand) Run(parent context.Context) (err error) { return err } c.from = toDBHeader(from) - log.Debug("initialized downloader for new blocks transfers", "starting at", c.from.Number) } num := new(big.Int).Add(c.from.Number, one) ctx, cancel := context.WithTimeout(parent, 5*time.Second) @@ -193,43 +216,48 @@ func (c *newBlocksTransfersCommand) Run(parent context.Context) (err error) { log.Warn("failed to get latest block", "number", num, "error", err) return err } - log.Debug("reactor received new block", "header", latest.Hash()) + log.Info("reactor received new block", "header", num) ctx, cancel = context.WithTimeout(parent, 10*time.Second) - added, removed, err := c.onNewBlock(ctx, c.from, latest) + latestHeader, removed, latestValidSavedBlock, reorgSpotted, err := c.onNewBlock(ctx, c.from, latest) cancel() if err != nil { log.Error("failed to process new header", "header", latest, "error", err) return err } - if len(added) == 0 && len(removed) == 0 { - log.Debug("new block already in the database", "block", latest.Number) + + if latestHeader == nil && len(removed) == 0 { + log.Info("new block already in the database", "block", latest.Number) return nil } - // for each added block get tranfers from downloaders - all := []Transfer{} - for i := range added { - log.Debug("reactor get transfers", "block", added[i].Hash, "number", added[i].Number) - transfers, err := c.getTransfers(parent, added[i]) - if err != nil { - log.Error("failed to get transfers", "header", added[i].Hash, "error", err) - continue + latestHeader.Loaded = true + + fromN := latest.Number.Uint64() + + if reorgSpotted { + if latestValidSavedBlock != nil { + fromN = latestValidSavedBlock.Number.Uint64() + } + if c.initialFrom != nil { + fromN = c.initialFrom.Number.Uint64() } - log.Debug("reactor adding transfers", "block", added[i].Hash, "number", added[i].Number, "len", len(transfers)) - all = append(all, transfers...) } - err = c.db.ProcessTranfers(all, c.accounts, added, removed, erc20Sync|ethSync) + toN := latestHeader.Number.Uint64() + all, newHeadersByAddress, err := c.getAllTransfers(parent, fromN, toN) + if err != nil { + return err + } + + err = c.saveHeaders(parent, newHeadersByAddress) + if err != nil { + return err + } + + err = c.db.ProcessTranfers(all, removed) if err != nil { log.Error("failed to persist transfers", "error", err) return err } c.from = toDBHeader(latest) - if len(added) == 1 && len(removed) == 0 { - c.feed.Send(Event{ - Type: EventNewBlock, - BlockNumber: added[0].Number, - Accounts: uniqueAccountsFromTransfers(all), - }) - } if len(removed) != 0 { lth := len(removed) c.feed.Send(Event{ @@ -238,40 +266,66 @@ func (c *newBlocksTransfersCommand) Run(parent context.Context) (err error) { Accounts: uniqueAccountsFromTransfers(all), }) } + log.Info("before sending new block event", "latest", latestHeader != nil, "removed", len(removed), "len", len(uniqueAccountsFromTransfers(all))) + if latestHeader != nil && len(removed) == 0 { + c.feed.Send(Event{ + Type: EventNewBlock, + BlockNumber: latestHeader.Number, + Accounts: uniqueAccountsFromTransfers(all), + }) + } return nil } -func (c *newBlocksTransfersCommand) onNewBlock(ctx context.Context, from *DBHeader, latest *types.Header) (added, removed []*DBHeader, err error) { - if from == nil { - // first node in the cache - return []*DBHeader{toHead(latest)}, nil, nil - } +func (c *newBlocksTransfersCommand) onNewBlock(ctx context.Context, from *DBHeader, latest *types.Header) (lastestHeader *DBHeader, removed []*DBHeader, lastSavedValidHeader *DBHeader, reorgSpotted bool, err error) { if from.Hash == latest.ParentHash { // parent matching from node in the cache. on the same chain. - return []*DBHeader{toHead(latest)}, nil, nil + return toHead(latest), nil, nil, false, nil } - exists, err := c.db.HeaderExists(latest.Hash()) + + lastSavedBlock, err := c.db.GetLastSavedBlock() if err != nil { - return nil, nil, err + return nil, nil, nil, false, err } - if exists { - return nil, nil, nil + + if lastSavedBlock == nil { + return toHead(latest), nil, nil, true, nil } + + header, err := c.client.HeaderByNumber(ctx, lastSavedBlock.Number) + if err != nil { + return nil, nil, nil, false, err + } + + if header.Hash() == lastSavedBlock.Hash { + return toHead(latest), nil, lastSavedBlock, true, nil + } + log.Debug("wallet reactor spotted reorg", "last header in db", from.Hash, "new parent", latest.ParentHash) - for from != nil && from.Hash != latest.ParentHash { - removed = append(removed, from) - added = append(added, toHead(latest)) - latest, err = c.client.HeaderByHash(ctx, latest.ParentHash) + for lastSavedBlock != nil { + removed = append(removed, lastSavedBlock) + lastSavedBlock, err = c.db.GetLastSavedBlockBefore(lastSavedBlock.Number) + if err != nil { - return nil, nil, err + return nil, nil, nil, false, err } - from, err = c.db.GetHeaderByNumber(new(big.Int).Sub(latest.Number, one)) + + if lastSavedBlock == nil { + continue + } + + header, err := c.client.HeaderByNumber(ctx, lastSavedBlock.Number) if err != nil { - return nil, nil, err + return nil, nil, nil, false, err + } + + // the last saved block is still valid + if header.Hash() == lastSavedBlock.Hash { + return toHead(latest), nil, lastSavedBlock, true, nil } } - added = append(added, toHead(latest)) - return added, removed, nil + + return toHead(latest), removed, lastSavedBlock, true, nil } func (c *newBlocksTransfersCommand) getTransfers(parent context.Context, header *DBHeader) ([]Transfer, error) { @@ -307,42 +361,137 @@ type controlCommand struct { // run fast indexing for every accont up to canonical chain head minus safety depth. // every account will run it from last synced header. -func (c *controlCommand) fastIndex(ctx context.Context, to *DBHeader) error { +func (c *findAndCheckBlockRangeCommand) fastIndex(ctx context.Context, bCache *balanceCache, fromByAddress map[common.Address]*big.Int, toByAddress map[common.Address]*big.Int) (map[common.Address]*big.Int, map[common.Address][]*DBHeader, error) { start := time.Now() group := NewGroup(ctx) - for _, address := range c.accounts { - erc20 := &erc20HistoricalCommand{ - db: c.db, - erc20: NewERC20TransfersDownloader(c.client, []common.Address{address}, types.NewEIP155Signer(c.chain)), - client: c.client, - feed: c.feed, - address: address, - to: to, - } - group.Add(erc20.Command()) + + commands := make([]*ethHistoricalCommand, len(c.accounts)) + for i, address := range c.accounts { eth := ðHistoricalCommand{ - db: c.db, - client: c.client, - address: address, + db: c.db, + client: c.client, + balanceCache: bCache, + address: address, eth: ÐTransferDownloader{ client: c.client, accounts: []common.Address{address}, signer: types.NewEIP155Signer(c.chain), + db: c.db, }, feed: c.feed, - to: to.Number, + from: fromByAddress[address], + to: toByAddress[address], } + commands[i] = eth group.Add(eth.Command()) } select { + case <-ctx.Done(): + return nil, nil, ctx.Err() + case <-group.WaitAsync(): + resultingFromByAddress := map[common.Address]*big.Int{} + headers := map[common.Address][]*DBHeader{} + for _, command := range commands { + resultingFromByAddress[command.address] = command.resultingFrom + headers[command.address] = command.foundHeaders + } + log.Info("fast indexer finished", "in", time.Since(start)) + return resultingFromByAddress, headers, nil + } +} + +// run fast indexing for every accont up to canonical chain head minus safety depth. +// every account will run it from last synced header. +func (c *findAndCheckBlockRangeCommand) fastIndexErc20(ctx context.Context, fromByAddress map[common.Address]*big.Int, toByAddress map[common.Address]*big.Int) (map[common.Address][]*DBHeader, error) { + start := time.Now() + group := NewGroup(ctx) + + commands := make([]*erc20HistoricalCommand, len(c.accounts)) + for i, address := range c.accounts { + erc20 := &erc20HistoricalCommand{ + db: c.db, + erc20: NewERC20TransfersDownloader(c.client, []common.Address{address}, types.NewEIP155Signer(c.chain)), + client: c.client, + feed: c.feed, + address: address, + from: fromByAddress[address], + to: toByAddress[address], + foundHeaders: []*DBHeader{}, + } + commands[i] = erc20 + group.Add(erc20.Command()) + } + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-group.WaitAsync(): + headres := map[common.Address][]*DBHeader{} + for _, command := range commands { + headres[command.address] = command.foundHeaders + } + log.Info("fast indexer Erc20 finished", "in", time.Since(start)) + return headres, nil + } +} + +func getTransfersByBlocks(ctx context.Context, db *Database, downloader *ETHTransferDownloader, address common.Address, blocks []*big.Int) ([]Transfer, error) { + allTransfers := []Transfer{} + + for _, block := range blocks { + transfers, err := downloader.GetTransfersByNumber(ctx, block) + if err != nil { + return nil, err + } + log.Debug("loadTransfers", "block", block, "new transfers", len(transfers)) + if len(transfers) > 0 { + allTransfers = append(allTransfers, transfers...) + } + } + + return allTransfers, nil +} + +func loadTransfers(ctx context.Context, accounts []common.Address, db *Database, client *ethclient.Client, chain *big.Int, limit int, blocksByAddress map[common.Address][]*big.Int) error { + start := time.Now() + group := NewGroup(ctx) + + for _, address := range accounts { + blocks, ok := blocksByAddress[address] + + if !ok { + blocks, _ = db.GetBlocksByAddress(address, numberOfBlocksCheckedPerIteration) + } + + for _, block := range blocks { + erc20 := &transfersCommand{ + db: db, + client: client, + address: address, + eth: ÐTransferDownloader{ + client: client, + accounts: []common.Address{address}, + signer: types.NewEIP155Signer(chain), + db: db, + }, + block: block, + } + group.Add(erc20.Command()) + } + } + select { case <-ctx.Done(): return ctx.Err() case <-group.WaitAsync(): - log.Debug("fast indexer finished", "in", time.Since(start)) + log.Info("loadTransfers finished", "in", time.Since(start)) return nil } } +func (c *controlCommand) LoadTransfers(ctx context.Context, downloader *ETHTransferDownloader, limit int) error { + return loadTransfers(ctx, c.accounts, c.db, c.client, c.chain, limit, make(map[common.Address][]*big.Int)) +} + +/* // verifyLastSynced verifies that last header that was added to the database is still in the canonical chain. // it is done by downloading configured number of parents for the last header in the db. func (c *controlCommand) verifyLastSynced(parent context.Context, last *DBHeader, head *types.Header) error { @@ -357,7 +506,7 @@ func (c *controlCommand) verifyLastSynced(parent context.Context, last *DBHeader if err != nil { return err } - log.Debug("spawn reorg verifier", "from", last.Number, "to", header.Number) + log.Info("spawn reorg verifier", "from", last.Number, "to", header.Number) // TODO(dshulyak) make a standalone command that // doesn't manage transfers and has an upper limit cmd := &newBlocksTransfersCommand{ @@ -373,28 +522,100 @@ func (c *controlCommand) verifyLastSynced(parent context.Context, last *DBHeader } return cmd.Command()(parent) } +*/ +func findFirstRange(c context.Context, account common.Address, initialTo *big.Int, client *ethclient.Client) (*big.Int, error) { + from := big.NewInt(0) + to := initialTo + goal := uint64(20) + + firstNonce, err := client.NonceAt(c, account, to) + log.Info("find range with 20 <= len(tx) <= 25", "account", account, "firstNonce", firstNonce, "from", from, "to", to) + + if err != nil { + return nil, err + } + + if firstNonce <= goal { + return zero, nil + } + + nonceDiff := firstNonce + iterations := 0 + for iterations < 50 { + iterations = iterations + 1 + + if nonceDiff > goal { + // from = (from + to) / 2 + from = from.Add(from, to) + from = from.Div(from, big.NewInt(2)) + } else { + // from = from - (from + to) / 2 + // to = from + diff := big.NewInt(0).Sub(to, from) + diff.Div(diff, big.NewInt(2)) + to = big.NewInt(from.Int64()) + from.Sub(from, diff) + } + fromNonce, err := client.NonceAt(c, account, from) + if err != nil { + return nil, err + } + nonceDiff = firstNonce - fromNonce + + log.Info("next nonce", "from", from, "n", fromNonce, "diff", firstNonce-fromNonce) + + if goal <= nonceDiff && nonceDiff <= (goal+5) { + log.Info("range found", "account", account, "from", from, "to", to) + return from, nil + } + } + + log.Info("range found", "account", account, "from", from, "to", to) + + return from, nil +} + +func findFirstRanges(c context.Context, accounts []common.Address, initialTo *big.Int, client *ethclient.Client) (map[common.Address]*big.Int, error) { + res := map[common.Address]*big.Int{} + + for _, address := range accounts { + from, err := findFirstRange(c, address, initialTo, client) + if err != nil { + return nil, err + } + + res[address] = from + } + + return res, nil +} func (c *controlCommand) Run(parent context.Context) error { - log.Debug("start control command") + log.Info("start control command") ctx, cancel := context.WithTimeout(parent, 3*time.Second) head, err := c.client.HeaderByNumber(ctx, nil) cancel() if err != nil { return err } - log.Debug("current head is", "block number", head.Number) - last, err := c.db.GetLastHead() + + c.feed.Send(Event{ + Type: EventFetchingRecentHistory, + Accounts: c.accounts, + }) + + log.Info("current head is", "block number", head.Number) + lastKnownEthBlocks, accountsWithoutHistory, err := c.db.GetLastKnownBlockByAddresses(c.accounts) if err != nil { log.Error("failed to load last head from database", "error", err) return err } - if last != nil { - err = c.verifyLastSynced(parent, last, head) - if err != nil { - log.Error("failed verification for last header in canonical chain", "error", err) - return err - } + + fromMap, err := findFirstRanges(parent, accountsWithoutHistory, head.Number, c.client) + if err != nil { + return err } + target := new(big.Int).Sub(head.Number, c.safetyDepth) if target.Cmp(zero) <= 0 { target = zero @@ -405,23 +626,75 @@ func (c *controlCommand) Run(parent context.Context) error { if err != nil { return err } - log.Debug("run fast indexing for the transfers", "up to", head.Number) - err = c.fastIndex(parent, toDBHeader(head)) + + fromByAddress := map[common.Address]*big.Int{} + toByAddress := map[common.Address]*big.Int{} + + for _, address := range c.accounts { + from, ok := lastKnownEthBlocks[address] + if !ok { + from = fromMap[address] + } + + fromByAddress[address] = from + toByAddress[address] = head.Number + } + + bCache := newBalanceCache() + cmnd := &findAndCheckBlockRangeCommand{ + accounts: c.accounts, + db: c.db, + chain: c.chain, + client: c.client, + balanceCache: bCache, + feed: c.feed, + fromByAddress: fromByAddress, + toByAddress: toByAddress, + } + + err = cmnd.Command()(parent) if err != nil { return err } - log.Debug("watching new blocks", "start from", head.Number) - cmd := &newBlocksTransfersCommand{ - db: c.db, - chain: c.chain, + + downloader := ÐTransferDownloader{ client: c.client, accounts: c.accounts, - eth: c.eth, - erc20: c.erc20, - feed: c.feed, - from: toDBHeader(head), + signer: types.NewEIP155Signer(c.chain), + db: c.db, } - return cmd.Command()(parent) + err = c.LoadTransfers(parent, downloader, 40) + if err != nil { + return err + } + + c.feed.Send(Event{ + Type: EventRecentHistoryReady, + Accounts: c.accounts, + BlockNumber: head.Number, + }) + + log.Info("watching new blocks", "start from", head.Number) + cmd := &newBlocksTransfersCommand{ + db: c.db, + chain: c.chain, + client: c.client, + accounts: c.accounts, + eth: c.eth, + erc20: c.erc20, + feed: c.feed, + initialFrom: toDBHeader(head), + from: toDBHeader(head), + } + + err = cmd.Command()(parent) + if err != nil { + log.Warn("error on running newBlocksTransfersCommand", "err", err) + return err + } + + log.Info("end control command") + return err } func (c *controlCommand) Command() Command { @@ -431,23 +704,6 @@ func (c *controlCommand) Command() Command { }.Run } -func headersFromTransfers(transfers []Transfer) []*DBHeader { - byHash := map[common.Hash]struct{}{} - rst := []*DBHeader{} - for i := range transfers { - _, exists := byHash[transfers[i].BlockHash] - if exists { - continue - } - rst = append(rst, &DBHeader{ - Hash: transfers[i].BlockHash, - Number: transfers[i].BlockNumber, - Timestamp: transfers[i].Timestamp, - }) - } - return rst -} - func uniqueAccountsFromTransfers(transfers []Transfer) []common.Address { accounts := []common.Address{} unique := map[common.Address]struct{}{} @@ -461,3 +717,114 @@ func uniqueAccountsFromTransfers(transfers []Transfer) []common.Address { } return accounts } + +type transfersCommand struct { + db *Database + eth *ETHTransferDownloader + block *big.Int + address common.Address + client reactorClient +} + +func (c *transfersCommand) Command() Command { + return FiniteCommand{ + Interval: 5 * time.Second, + Runable: c.Run, + }.Run +} + +func (c *transfersCommand) Run(ctx context.Context) (err error) { + allTransfers, err := getTransfersByBlocks(ctx, c.db, c.eth, c.address, []*big.Int{c.block}) + if err != nil { + log.Info("getTransfersByBlocks error", "error", err) + return err + } + + err = c.db.SaveTranfers(c.address, allTransfers, []*big.Int{c.block}) + if err != nil { + log.Error("SaveTranfers error", "error", err) + return err + } + + log.Debug("transfers loaded", "address", c.address, "len", len(allTransfers)) + return nil +} + +type loadTransfersCommand struct { + accounts []common.Address + db *Database + chain *big.Int + client *ethclient.Client + blocksByAddress map[common.Address][]*big.Int +} + +func (c *loadTransfersCommand) Command() Command { + return FiniteCommand{ + Interval: 5 * time.Second, + Runable: c.Run, + }.Run +} + +func (c *loadTransfersCommand) LoadTransfers(ctx context.Context, downloader *ETHTransferDownloader, limit int, blocksByAddress map[common.Address][]*big.Int) error { + return loadTransfers(ctx, c.accounts, c.db, c.client, c.chain, limit, blocksByAddress) +} + +func (c *loadTransfersCommand) Run(parent context.Context) (err error) { + downloader := ÐTransferDownloader{ + client: c.client, + accounts: c.accounts, + signer: types.NewEIP155Signer(c.chain), + db: c.db, + } + err = c.LoadTransfers(parent, downloader, 40, c.blocksByAddress) + if err != nil { + return err + } + + return +} + +type findAndCheckBlockRangeCommand struct { + accounts []common.Address + db *Database + chain *big.Int + client *ethclient.Client + balanceCache *balanceCache + feed *event.Feed + fromByAddress map[common.Address]*big.Int + toByAddress map[common.Address]*big.Int +} + +func (c *findAndCheckBlockRangeCommand) Command() Command { + return FiniteCommand{ + Interval: 5 * time.Second, + Runable: c.Run, + }.Run +} + +func (c *findAndCheckBlockRangeCommand) Run(parent context.Context) (err error) { + log.Debug("start findAndCHeckBlockRangeCommand") + newFromByAddress, ethHeadersByAddress, err := c.fastIndex(parent, c.balanceCache, c.fromByAddress, c.toByAddress) + if err != nil { + return err + } + erc20HeadersByAddress, err := c.fastIndexErc20(parent, newFromByAddress, c.toByAddress) + if err != nil { + return err + } + + for _, address := range c.accounts { + ethHeaders := ethHeadersByAddress[address] + erc20Headers := erc20HeadersByAddress[address] + + allHeaders := append(ethHeaders, erc20Headers...) + + log.Debug("saving headers", "len", len(allHeaders), "address") + err = c.db.ProcessBlocks(address, newFromByAddress[address], c.toByAddress[address], allHeaders) + if err != nil { + return err + } + } + + return +} diff --git a/services/wallet/commands_test.go b/services/wallet/commands_test.go index 13ba43d68..c4c05302d 100644 --- a/services/wallet/commands_test.go +++ b/services/wallet/commands_test.go @@ -49,6 +49,7 @@ func (s *NewBlocksSuite) SetupTest() { eth: ÐTransferDownloader{ client: s.backend.Client, signer: s.backend.Signer, + db: s.db, accounts: []common.Address{s.address}, }, feed: s.feed, @@ -128,6 +129,7 @@ func (s *NewBlocksSuite) TestReorg() { ctx, cancel = context.WithTimeout(context.Background(), 3*time.Second) defer cancel() s.cmd.from = toDBHeader(s.backend.Ethereum.BlockChain().GetHeaderByNumber(15)) + s.cmd.initialFrom = toDBHeader(s.backend.Ethereum.BlockChain().GetHeaderByNumber(15)) s.Require().EqualError(s.runCmdUntilError(ctx), "not found") transfers, err := s.db.GetTransfers(big.NewInt(0), nil) @@ -151,7 +153,7 @@ func (s *NewBlocksSuite) TestReorg() { s.Require().EqualError(s.runCmdUntilError(ctx), "not found") close(events) - expected := []Event{{Type: EventReorg, BlockNumber: big.NewInt(16)}, {Type: EventNewBlock, BlockNumber: big.NewInt(25)}} + expected := []Event{{Type: EventReorg, BlockNumber: big.NewInt(21)}, {Type: EventNewBlock, BlockNumber: big.NewInt(25)}} i := 0 for ev := range events { s.Require().Equal(expected[i].Type, ev.Type) @@ -177,7 +179,8 @@ func (s *NewBlocksSuite) downloadHistorical() { s.Require().NoError(err) eth := ðHistoricalCommand{ - db: s.db, + db: s.db, + balanceCache: newBalanceCache(), eth: ÐTransferDownloader{ client: s.backend.Client, signer: s.backend.Signer, @@ -186,12 +189,13 @@ func (s *NewBlocksSuite) downloadHistorical() { feed: s.feed, address: s.address, client: s.backend.Client, + from: big.NewInt(0), to: s.backend.Ethereum.BlockChain().CurrentBlock().Number(), } s.Require().NoError(eth.Run(context.Background()), "eth historical command failed to sync transfers") - transfers, err := s.db.GetTransfers(big.NewInt(0), nil) + //dbHeaders, err := s.db.GetBlocks() s.Require().NoError(err) - s.Require().Len(transfers, 2) + s.Require().Len(eth.foundHeaders, 2) } func (s *NewBlocksSuite) reorgHistorical() { @@ -214,10 +218,6 @@ func (s *NewBlocksSuite) TestSafetyBufferFailure() { s.downloadHistorical() s.reorgHistorical() - - transfers, err := s.db.GetTransfers(big.NewInt(0), nil) - s.Require().NoError(err) - s.Require().Len(transfers, 1) } func (s *NewBlocksSuite) TestSafetyBufferSuccess() { diff --git a/services/wallet/concurrent.go b/services/wallet/concurrent.go index 7ab68338d..ed6167070 100644 --- a/services/wallet/concurrent.go +++ b/services/wallet/concurrent.go @@ -3,7 +3,11 @@ package wallet import ( "context" "math/big" + "sort" "sync" + "time" + + "github.com/pkg/errors" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/log" @@ -22,10 +26,14 @@ type ConcurrentDownloader struct { } type Result struct { - mu sync.Mutex - transfers []Transfer + mu sync.Mutex + transfers []Transfer + headers []*DBHeader + blockRanges [][]*big.Int } +var errDownloaderStuck = errors.New("eth downloader is stuck") + func (r *Result) Push(transfers ...Transfer) { r.mu.Lock() defer r.mu.Unlock() @@ -40,42 +48,153 @@ func (r *Result) Get() []Transfer { return rst } +func (r *Result) PushHeader(block *DBHeader) { + r.mu.Lock() + defer r.mu.Unlock() + + r.headers = append(r.headers, block) +} + +func (r *Result) GetHeaders() []*DBHeader { + r.mu.Lock() + defer r.mu.Unlock() + rst := make([]*DBHeader, len(r.headers)) + copy(rst, r.headers) + return rst +} + +func (r *Result) PushRange(blockRange []*big.Int) { + r.mu.Lock() + defer r.mu.Unlock() + + r.blockRanges = append(r.blockRanges, blockRange) +} + +func (r *Result) GetRanges() [][]*big.Int { + r.mu.Lock() + defer r.mu.Unlock() + rst := make([][]*big.Int, len(r.blockRanges)) + copy(rst, r.blockRanges) + r.blockRanges = [][]*big.Int{} + + return rst +} + // TransferDownloader downloads transfers from single block using number. type TransferDownloader interface { GetTransfersByNumber(context.Context, *big.Int) ([]Transfer, error) } -func downloadEthConcurrently(c *ConcurrentDownloader, client BalanceReader, downloader TransferDownloader, account common.Address, low, high *big.Int) { - c.Add(func(ctx context.Context) error { - if low.Cmp(high) >= 0 { - return nil - } - log.Debug("eth transfers comparing blocks", "low", low, "high", high) - lb, err := client.BalanceAt(ctx, account, low) - if err != nil { - return err - } - hb, err := client.BalanceAt(ctx, account, high) - if err != nil { - return err - } - if lb.Cmp(hb) == 0 { - log.Debug("balances are equal", "low", low, "high", high) - return nil - } - if new(big.Int).Sub(high, low).Cmp(one) == 0 { - transfers, err := downloader.GetTransfersByNumber(ctx, high) +func checkRanges(parent context.Context, client reactorClient, cache BalanceCache, downloader TransferDownloader, account common.Address, ranges [][]*big.Int) ([][]*big.Int, []*DBHeader, error) { + ctx, cancel := context.WithTimeout(parent, 30*time.Second) + defer cancel() + + c := NewConcurrentDownloader(ctx) + + for _, blocksRange := range ranges { + from := blocksRange[0] + to := blocksRange[1] + + c.Add(func(ctx context.Context) error { + if from.Cmp(to) >= 0 { + return nil + } + log.Debug("eth transfers comparing blocks", "from", from, "to", to) + lb, err := cache.BalanceAt(ctx, client, account, from) if err != nil { return err } - c.Push(transfers...) + hb, err := cache.BalanceAt(ctx, client, account, to) + if err != nil { + return err + } + if lb.Cmp(hb) == 0 { + log.Debug("balances are equal", "from", from, "to", to) + // In case if balances are equal but non zero we want to check if + // eth_getTransactionCount return different values, because there + // still might be transactions + if lb.Cmp(zero) != 0 { + return nil + } + + ln, err := client.NonceAt(ctx, account, from) + if err != nil { + return err + } + hn, err := client.NonceAt(ctx, account, to) + if err != nil { + return err + } + if ln == hn { + log.Debug("transaction count is also equal", "from", from, "to", to) + return nil + } + } + if new(big.Int).Sub(to, from).Cmp(one) == 0 { + header, err := client.HeaderByNumber(ctx, to) + if err != nil { + return err + } + c.PushHeader(toDBHeader(header)) + return nil + } + mid := new(big.Int).Add(from, to) + mid = mid.Div(mid, two) + _, err = cache.BalanceAt(ctx, client, account, mid) + if err != nil { + return err + } + log.Debug("balances are not equal", "from", from, "mid", mid, "to", to) + + c.PushRange([]*big.Int{from, mid}) + c.PushRange([]*big.Int{mid, to}) return nil - } - mid := new(big.Int).Add(low, high) - mid = mid.Div(mid, two) - log.Debug("balances are not equal. spawn two concurrent downloaders", "low", low, "mid", mid, "high", high) - downloadEthConcurrently(c, client, downloader, account, low, mid) - downloadEthConcurrently(c, client, downloader, account, mid, high) - return nil - }) + }) + + } + + select { + case <-c.WaitAsync(): + case <-ctx.Done(): + return nil, nil, errDownloaderStuck + } + + if c.Error() != nil { + return nil, nil, errors.Wrap(c.Error(), "failed to dowload transfers using concurrent downloader") + } + + return c.GetRanges(), c.GetHeaders(), nil +} + +func findBlocksWithEthTransfers(parent context.Context, client reactorClient, cache BalanceCache, downloader TransferDownloader, account common.Address, low, high *big.Int) (from *big.Int, headers []*DBHeader, err error) { + ranges := [][]*big.Int{{low, high}} + minBlock := big.NewInt(low.Int64()) + headers = []*DBHeader{} + var lvl = 1 + for len(ranges) > 0 && lvl <= 30 { + log.Debug("check blocks ranges", "lvl", lvl, "ranges len", len(ranges)) + lvl++ + newRanges, newHeaders, err := checkRanges(parent, client, cache, downloader, account, ranges) + if err != nil { + return nil, nil, err + } + + headers = append(headers, newHeaders...) + + if len(newRanges) > 0 { + log.Debug("found new ranges", "account", account, "lvl", lvl, "new ranges len", len(newRanges)) + } + if len(newRanges) > 60 { + sort.SliceStable(newRanges, func(i, j int) bool { + return newRanges[i][0].Cmp(newRanges[j][0]) == 1 + }) + + newRanges = newRanges[:60] + minBlock = newRanges[len(newRanges)-1][0] + } + + ranges = newRanges + } + + return minBlock, headers, err } diff --git a/services/wallet/concurrent_test.go b/services/wallet/concurrent_test.go index c14ab0bb3..dd46cbe14 100644 --- a/services/wallet/concurrent_test.go +++ b/services/wallet/concurrent_test.go @@ -10,6 +10,8 @@ import ( "github.com/stretchr/testify/require" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/common" ) @@ -57,6 +59,22 @@ func (f balancesFixture) BalanceAt(ctx context.Context, account common.Address, return f[index], nil } +func (f balancesFixture) NonceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (uint64, error) { + return uint64(0), nil +} + +func (f balancesFixture) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) { + return &types.Header{ + Number: number, + }, nil +} + +func (f balancesFixture) HeaderByHash(ctx context.Context, hash common.Hash) (*types.Header, error) { + return &types.Header{ + Number: big.NewInt(0), + }, nil +} + type batchesFixture [][]Transfer func (f batchesFixture) GetTransfersByNumber(ctx context.Context, number *big.Int) (rst []Transfer, err error) { @@ -71,7 +89,7 @@ func TestConcurrentEthDownloader(t *testing.T) { type options struct { balances balancesFixture batches batchesFixture - result []Transfer + result []DBHeader last *big.Int } type testCase struct { @@ -92,7 +110,7 @@ func TestConcurrentEthDownloader(t *testing.T) { last: big.NewInt(3), balances: balancesFixture{big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(10)}, batches: batchesFixture{{}, {}, {}, {{BlockNumber: big.NewInt(3)}, {BlockNumber: big.NewInt(3)}}}, - result: []Transfer{{BlockNumber: big.NewInt(3)}, {BlockNumber: big.NewInt(3)}}, + result: []DBHeader{{Number: big.NewInt(3)}}, }, }, { @@ -101,7 +119,7 @@ func TestConcurrentEthDownloader(t *testing.T) { last: big.NewInt(3), balances: balancesFixture{big.NewInt(0), big.NewInt(3), big.NewInt(7), big.NewInt(10)}, batches: batchesFixture{{}, {{BlockNumber: big.NewInt(1)}}, {{BlockNumber: big.NewInt(2)}}, {{BlockNumber: big.NewInt(3)}}}, - result: []Transfer{{BlockNumber: big.NewInt(1)}, {BlockNumber: big.NewInt(2)}, {BlockNumber: big.NewInt(3)}}, + result: []DBHeader{{Number: big.NewInt(1)}, {Number: big.NewInt(2)}, {Number: big.NewInt(3)}}, }, }, } { @@ -109,19 +127,19 @@ func TestConcurrentEthDownloader(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() concurrent := NewConcurrentDownloader(ctx) - downloadEthConcurrently( - concurrent, tc.options.balances, tc.options.batches, + _, headers, _ := findBlocksWithEthTransfers( + ctx, tc.options.balances, newBalanceCache(), tc.options.batches, common.Address{}, zero, tc.options.last) concurrent.Wait() require.NoError(t, concurrent.Error()) rst := concurrent.Get() - require.Len(t, rst, len(tc.options.result)) + require.Len(t, headers, len(tc.options.result)) sort.Slice(rst, func(i, j int) bool { return rst[i].BlockNumber.Cmp(rst[j].BlockNumber) < 0 }) - for i := range rst { + /*for i := range rst { require.Equal(t, tc.options.result[i].BlockNumber, rst[i].BlockNumber) - } + }*/ }) } } diff --git a/services/wallet/database.go b/services/wallet/database.go index c4ebf701d..29eae4438 100644 --- a/services/wallet/database.go +++ b/services/wallet/database.go @@ -10,15 +10,21 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/log" ) // DBHeader fields from header that are stored in database. type DBHeader struct { - Number *big.Int - Hash common.Hash - Timestamp uint64 + Number *big.Int + Hash common.Hash + Timestamp uint64 + Erc20Transfer *Transfer + Network uint64 + Address common.Address // Head is true if the block was a head at the time it was pulled from chain. Head bool + // Loaded is true if trasfers from this block has been already fetched + Loaded bool } func toDBHeader(header *types.Header) *DBHeader { @@ -26,6 +32,7 @@ func toDBHeader(header *types.Header) *DBHeader { Hash: header.Hash(), Number: header.Number, Timestamp: header.Time, + Loaded: false, } } @@ -38,12 +45,6 @@ func toHead(header *types.Header) *DBHeader { // SyncOption is used to specify that application processed transfers for that block. type SyncOption uint -const ( - // sync options - ethSync SyncOption = 1 - erc20Sync SyncOption = 2 -) - // SQLBigInt type for storing uint256 in the databse. // FIXME(dshulyak) SQL big int is max 64 bits. Maybe store as bytes in big endian and hope // that lexographical sorting will work. @@ -111,8 +112,61 @@ func (db Database) Close() error { return db.db.Close() } +func (db Database) ProcessBlocks(account common.Address, from *big.Int, to *big.Int, headers []*DBHeader) (err error) { + var ( + tx *sql.Tx + ) + tx, err = db.db.Begin() + if err != nil { + return err + } + defer func() { + if err == nil { + err = tx.Commit() + return + } + _ = tx.Rollback() + }() + + err = insertBlocksWithTransactions(tx, account, db.network, headers) + if err != nil { + return + } + + err = insertRange(tx, account, db.network, from, to) + if err != nil { + return + } + + return +} + +func (db Database) SaveBlocks(account common.Address, headers []*DBHeader) (err error) { + var ( + tx *sql.Tx + ) + tx, err = db.db.Begin() + if err != nil { + return err + } + defer func() { + if err == nil { + err = tx.Commit() + return + } + _ = tx.Rollback() + }() + + err = insertBlocksWithTransactions(tx, account, db.network, headers) + if err != nil { + return + } + + return +} + // ProcessTranfers atomically adds/removes blocks and adds new tranfers. -func (db Database) ProcessTranfers(transfers []Transfer, accounts []common.Address, added, removed []*DBHeader, option SyncOption) (err error) { +func (db Database) ProcessTranfers(transfers []Transfer, removed []*DBHeader) (err error) { var ( tx *sql.Tx ) @@ -131,21 +185,46 @@ func (db Database) ProcessTranfers(transfers []Transfer, accounts []common.Addre if err != nil { return } - err = insertHeaders(tx, db.network, added) + err = updateOrInsertTransfers(tx, db.network, transfers) if err != nil { return } - err = insertTransfers(tx, db.network, transfers) - if err != nil { - return - } - err = updateAccounts(tx, db.network, accounts, added, option) return } -// GetTransfersByAddress loads transfers for a given address between two blocks. -func (db *Database) GetTransfersByAddress(address common.Address, start, end *big.Int) (rst []Transfer, err error) { - query := newTransfersQuery().FilterNetwork(db.network).FilterAddress(address).FilterStart(start).FilterEnd(end) +// SaveTranfers +func (db Database) SaveTranfers(address common.Address, transfers []Transfer, blocks []*big.Int) (err error) { + var ( + tx *sql.Tx + ) + tx, err = db.db.Begin() + if err != nil { + return err + } + defer func() { + if err == nil { + err = tx.Commit() + return + } + _ = tx.Rollback() + }() + + err = updateOrInsertTransfers(tx, db.network, transfers) + if err != nil { + return + } + + err = markBlocksAsLoaded(tx, address, db.network, blocks) + if err != nil { + return + } + + return +} + +// GetTransfersInRange loads transfers for a given address between two blocks. +func (db *Database) GetTransfersInRange(address common.Address, start, end *big.Int) (rst []Transfer, err error) { + query := newTransfersQuery().FilterNetwork(db.network).FilterAddress(address).FilterStart(start).FilterEnd(end).FilterLoaded(1) rows, err := db.db.Query(query.String(), query.Args()...) if err != nil { return @@ -154,9 +233,228 @@ func (db *Database) GetTransfersByAddress(address common.Address, start, end *bi return query.Scan(rows) } +// GetTransfersByAddress loads transfers for a given address between two blocks. +func (db *Database) GetTransfersByAddress(address common.Address, fromBlock *big.Int, limit int64) (rst []Transfer, err error) { + query := newTransfersQuery(). + FilterNetwork(db.network). + FilterAddress(address). + FilterEnd(fromBlock). + FilterLoaded(1). + Limit(limit) + + rows, err := db.db.Query(query.String(), query.Args()...) + if err != nil { + return + } + defer rows.Close() + return query.Scan(rows) +} + +// GetTransfersByAddress loads transfers for a given address between two blocks. +func (db *Database) GetBlocksByAddress(address common.Address, limit int) (rst []*big.Int, err error) { + query := `SELECT blk_number FROM blocks + WHERE address = ? AND network_id = ? AND loaded = 0 + ORDER BY blk_number DESC + LIMIT ?` + rows, err := db.db.Query(query, address, db.network, limit) + if err != nil { + return + } + defer rows.Close() + for rows.Next() { + block := &big.Int{} + err = rows.Scan((*SQLBigInt)(block)) + if err != nil { + return nil, err + } + rst = append(rst, block) + } + return rst, nil +} + +func (db *Database) RemoveBlockWithTransfer(address common.Address, block *big.Int) error { + query := `DELETE FROM blocks + WHERE address = ? + AND blk_number = ? + AND network_id = ?` + + _, err := db.db.Exec(query, address, (*SQLBigInt)(block), db.network) + + if err != nil { + return err + } + + return nil +} + +func (db *Database) GetLastBlockByAddress(address common.Address, limit int) (rst *big.Int, err error) { + query := `SELECT * FROM + (SELECT blk_number FROM blocks WHERE address = ? AND network_id = ? ORDER BY blk_number DESC LIMIT ?) + ORDER BY blk_number LIMIT 1` + rows, err := db.db.Query(query, address, db.network, limit) + if err != nil { + return + } + defer rows.Close() + + if rows.Next() { + block := &big.Int{} + err = rows.Scan((*SQLBigInt)(block)) + if err != nil { + return nil, err + } + + return block, nil + } + + return nil, nil +} + +func (db *Database) GetLastSavedBlock() (rst *DBHeader, err error) { + query := `SELECT blk_number, blk_hash + FROM blocks + WHERE network_id = ? + ORDER BY blk_number DESC LIMIT 1` + rows, err := db.db.Query(query, db.network) + if err != nil { + return + } + defer rows.Close() + + if rows.Next() { + header := &DBHeader{Hash: common.Hash{}, Number: new(big.Int)} + err = rows.Scan((*SQLBigInt)(header.Number), &header.Hash) + if err != nil { + return nil, err + } + + return header, nil + } + + return nil, nil +} + +func (db *Database) GetBlocks() (rst []*DBHeader, err error) { + query := `SELECT blk_number, blk_hash, address FROM blocks` + rows, err := db.db.Query(query, db.network) + if err != nil { + return + } + defer rows.Close() + + rst = []*DBHeader{} + for rows.Next() { + header := &DBHeader{Hash: common.Hash{}, Number: new(big.Int)} + err = rows.Scan((*SQLBigInt)(header.Number), &header.Hash, &header.Address) + if err != nil { + return nil, err + } + + rst = append(rst, header) + } + + return rst, nil +} + +func (db *Database) GetLastSavedBlockBefore(block *big.Int) (rst *DBHeader, err error) { + query := `SELECT blk_number, blk_hash + FROM blocks + WHERE network_id = ? AND blk_number < ? + ORDER BY blk_number DESC LIMIT 1` + rows, err := db.db.Query(query, db.network, (*SQLBigInt)(block)) + if err != nil { + return + } + defer rows.Close() + + if rows.Next() { + header := &DBHeader{Hash: common.Hash{}, Number: new(big.Int)} + err = rows.Scan((*SQLBigInt)(header.Number), &header.Hash) + if err != nil { + return nil, err + } + + return header, nil + } + + return nil, nil +} + +func (db *Database) GetFirstKnownBlock(address common.Address) (rst *big.Int, err error) { + query := `SELECT blk_from FROM blocks_ranges + WHERE address = ? + AND network_id = ? + ORDER BY blk_from + LIMIT 1` + + rows, err := db.db.Query(query, address, db.network) + if err != nil { + return + } + defer rows.Close() + + if rows.Next() { + block := &big.Int{} + err = rows.Scan((*SQLBigInt)(block)) + if err != nil { + return nil, err + } + + return block, nil + } + + return nil, nil +} + +func (db *Database) GetLastKnownBlockByAddress(address common.Address) (rst *big.Int, err error) { + query := `SELECT blk_to FROM blocks_ranges + WHERE address = ? + AND network_id = ? + ORDER BY blk_to DESC + LIMIT 1` + + rows, err := db.db.Query(query, address, db.network) + if err != nil { + return + } + defer rows.Close() + + if rows.Next() { + block := &big.Int{} + err = rows.Scan((*SQLBigInt)(block)) + if err != nil { + return nil, err + } + + return block, nil + } + + return nil, nil +} + +func (db *Database) GetLastKnownBlockByAddresses(addresses []common.Address) (map[common.Address]*big.Int, []common.Address, error) { + res := map[common.Address]*big.Int{} + accountsWithoutHistory := []common.Address{} + for _, address := range addresses { + block, err := db.GetLastKnownBlockByAddress(address) + if err != nil { + log.Info("Can't get last block", "error", err) + return nil, nil, err + } + + if block != nil { + res[address] = block + } else { + accountsWithoutHistory = append(accountsWithoutHistory, address) + } + } + + return res, accountsWithoutHistory, nil +} + // GetTransfers load transfers transfer betweeen two blocks. func (db *Database) GetTransfers(start, end *big.Int) (rst []Transfer, err error) { - query := newTransfersQuery().FilterNetwork(db.network).FilterStart(start).FilterEnd(end) + query := newTransfersQuery().FilterNetwork(db.network).FilterStart(start).FilterEnd(end).FilterLoaded(1) rows, err := db.db.Query(query.String(), query.Args()...) if err != nil { return @@ -165,8 +463,37 @@ func (db *Database) GetTransfers(start, end *big.Int) (rst []Transfer, err error return query.Scan(rows) } +func (db *Database) GetPreloadedTransactions(address common.Address, blockHash common.Hash) (rst []Transfer, err error) { + query := newTransfersQuery(). + FilterNetwork(db.network). + FilterAddress(address). + FilterBlockHash(blockHash). + FilterLoaded(0) + + rows, err := db.db.Query(query.String(), query.Args()...) + if err != nil { + return + } + defer rows.Close() + return query.Scan(rows) +} + +func (db *Database) GetTransactionsLog(address common.Address, transactionHash common.Hash) (*types.Log, error) { + l := &types.Log{} + err := db.db.QueryRow("SELECT log FROM transfers WHERE network_id = ? AND address = ? AND hash = ?", + db.network, address, transactionHash). + Scan(&JSONBlob{l}) + if err == nil { + return l, nil + } + if err == sql.ErrNoRows { + return nil, nil + } + return nil, err +} + // SaveHeaders stores a list of headers atomically. -func (db *Database) SaveHeaders(headers []*types.Header) (err error) { +func (db *Database) SaveHeaders(headers []*types.Header, address common.Address) (err error) { var ( tx *sql.Tx insert *sql.Stmt @@ -175,7 +502,7 @@ func (db *Database) SaveHeaders(headers []*types.Header) (err error) { if err != nil { return } - insert, err = tx.Prepare("INSERT INTO blocks(network_id, number, hash, timestamp) VALUES (?, ?, ?, ?)") + insert, err = tx.Prepare("INSERT INTO blocks(network_id, blk_number, blk_hash, address) VALUES (?, ?, ?, ?)") if err != nil { return } @@ -188,7 +515,7 @@ func (db *Database) SaveHeaders(headers []*types.Header) (err error) { }() for _, h := range headers { - _, err = insert.Exec(db.network, (*SQLBigInt)(h.Number), h.Hash(), h.Time) + _, err = insert.Exec(db.network, (*SQLBigInt)(h.Number), h.Hash(), address) if err != nil { return } @@ -196,74 +523,10 @@ func (db *Database) SaveHeaders(headers []*types.Header) (err error) { return } -func (db *Database) SaveSyncedHeader(address common.Address, header *types.Header, option SyncOption) (err error) { - var ( - tx *sql.Tx - insert *sql.Stmt - ) - tx, err = db.db.Begin() - if err != nil { - return - } - insert, err = tx.Prepare("INSERT INTO accounts_to_blocks(network_id, address, blk_number, sync) VALUES (?, ?,?,?)") - if err != nil { - return - } - defer func() { - if err == nil { - err = tx.Commit() - } else { - _ = tx.Rollback() - } - }() - _, err = insert.Exec(db.network, address, (*SQLBigInt)(header.Number), option) - if err != nil { - return - } - return err -} - -// HeaderExists checks if header with hash exists in db. -func (db *Database) HeaderExists(hash common.Hash) (bool, error) { - var val sql.NullBool - err := db.db.QueryRow("SELECT EXISTS (SELECT hash FROM blocks WHERE hash = ? AND network_id = ?)", hash, db.network).Scan(&val) - if err != nil { - return false, err - } - return val.Bool, nil -} - // GetHeaderByNumber selects header using block number. func (db *Database) GetHeaderByNumber(number *big.Int) (header *DBHeader, err error) { header = &DBHeader{Hash: common.Hash{}, Number: new(big.Int)} - err = db.db.QueryRow("SELECT hash,number FROM blocks WHERE number = ? AND network_id = ?", (*SQLBigInt)(number), db.network).Scan(&header.Hash, (*SQLBigInt)(header.Number)) - if err == nil { - return header, nil - } - if err == sql.ErrNoRows { - return nil, nil - } - return nil, err -} - -func (db *Database) GetLastHead() (header *DBHeader, err error) { - header = &DBHeader{Hash: common.Hash{}, Number: new(big.Int)} - err = db.db.QueryRow("SELECT hash,number FROM blocks WHERE network_id = $1 AND head = 1 AND number = (SELECT MAX(number) FROM blocks WHERE network_id = $1)", db.network).Scan(&header.Hash, (*SQLBigInt)(header.Number)) - if err == nil { - return header, nil - } - if err == sql.ErrNoRows { - return nil, nil - } - return nil, err -} - -// GetLatestSynced downloads last synced block with a given option. -func (db *Database) GetLatestSynced(address common.Address, option SyncOption) (header *DBHeader, err error) { - header = &DBHeader{Hash: common.Hash{}, Number: new(big.Int)} - err = db.db.QueryRow(` -SELECT blocks.hash, blk_number FROM accounts_to_blocks JOIN blocks ON blk_number = blocks.number WHERE blocks.network_id = $1 AND address = $2 AND blk_number -= (SELECT MAX(blk_number) FROM accounts_to_blocks WHERE network_id = $1 AND address = $2 AND sync & $3 = $3)`, db.network, address, option).Scan(&header.Hash, (*SQLBigInt)(header.Number)) + err = db.db.QueryRow("SELECT blk_hash, blk_number FROM blocks WHERE blk_number = ? AND network_id = ?", (*SQLBigInt)(number), db.network).Scan(&header.Hash, (*SQLBigInt)(header.Number)) if err == nil { return header, nil } @@ -326,26 +589,23 @@ type statementCreator interface { } func deleteHeaders(creator statementCreator, headers []*DBHeader) error { - delete, err := creator.Prepare("DELETE FROM blocks WHERE hash = ?") + delete, err := creator.Prepare("DELETE FROM blocks WHERE blk_hash = ?") + if err != nil { + return err + } + deleteTransfers, err := creator.Prepare("DELETE FROM transfers WHERE blk_hash = ?") if err != nil { return err } for _, h := range headers { + k := h.Hash + log.Debug("foo", "k", k) _, err = delete.Exec(h.Hash) if err != nil { return err } - } - return nil -} -func insertHeaders(creator statementCreator, network uint64, headers []*DBHeader) error { - insert, err := creator.Prepare("INSERT OR IGNORE INTO blocks(network_id, hash, number, timestamp, head) VALUES (?, ?, ?, ?, ?)") - if err != nil { - return err - } - for _, h := range headers { - _, err = insert.Exec(network, h.Hash, (*SQLBigInt)(h.Number), h.Timestamp, h.Head) + _, err = deleteTransfers.Exec(h.Hash) if err != nil { return err } @@ -353,47 +613,117 @@ func insertHeaders(creator statementCreator, network uint64, headers []*DBHeader return nil } -func insertTransfers(creator statementCreator, network uint64, transfers []Transfer) error { - insert, err := creator.Prepare("INSERT OR IGNORE INTO transfers(network_id, hash, blk_hash, address, tx, sender, receipt, log, type) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)") +func insertBlocksWithTransactions(creator statementCreator, account common.Address, network uint64, headers []*DBHeader) error { + insert, err := creator.Prepare("INSERT OR IGNORE INTO blocks(network_id, address, blk_number, blk_hash, loaded) VALUES (?, ?, ?, ?, ?)") if err != nil { return err } - for _, t := range transfers { - _, err = insert.Exec(network, t.ID, t.BlockHash, t.Address, &JSONBlob{t.Transaction}, t.From, &JSONBlob{t.Receipt}, &JSONBlob{t.Log}, t.Type) + updateTx, err := creator.Prepare(`UPDATE transfers + SET log = ? + WHERE network_id = ? AND address = ? AND hash = ?`) + if err != nil { + return err + } + + insertTx, err := creator.Prepare(`INSERT OR IGNORE + INTO transfers (network_id, address, sender, hash, blk_number, blk_hash, type, timestamp, log, loaded) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, 0)`) + if err != nil { + return err + } + + for _, header := range headers { + _, err = insert.Exec(network, account, (*SQLBigInt)(header.Number), header.Hash, header.Loaded) if err != nil { return err } - } - return nil -} - -func updateAccounts(creator statementCreator, network uint64, accounts []common.Address, headers []*DBHeader, option SyncOption) error { - update, err := creator.Prepare("UPDATE accounts_to_blocks SET sync=sync|? WHERE address=? AND blk_number=? AND network_id=?") - if err != nil { - return err - } - insert, err := creator.Prepare("INSERT OR IGNORE INTO accounts_to_blocks(network_id,address,blk_number,sync) VALUES(?,?,?,?)") - if err != nil { - return err - } - for _, acc := range accounts { - for _, h := range headers { - rst, err := update.Exec(option, acc, (*SQLBigInt)(h.Number), network) + if header.Erc20Transfer != nil { + res, err := updateTx.Exec(&JSONBlob{header.Erc20Transfer.Log}, network, account, header.Erc20Transfer.ID) if err != nil { return err } - affected, err := rst.RowsAffected() + affected, err := res.RowsAffected() if err != nil { return err } if affected > 0 { continue } - _, err = insert.Exec(network, acc, (*SQLBigInt)(h.Number), option) + + _, err = insertTx.Exec(network, account, account, header.Erc20Transfer.ID, (*SQLBigInt)(header.Number), header.Hash, erc20Transfer, header.Erc20Transfer.Timestamp, &JSONBlob{header.Erc20Transfer.Log}) if err != nil { + log.Error("error saving erc20transfer", "err", err) return err } } } return nil } + +func insertRange(creator statementCreator, account common.Address, network uint64, from *big.Int, to *big.Int) (err error) { + insert, err := creator.Prepare("INSERT INTO blocks_ranges (network_id, address, blk_from, blk_to) VALUES (?, ?, ?, ?)") + if err != nil { + return err + } + + _, err = insert.Exec(network, account, (*SQLBigInt)(from), (*SQLBigInt)(to)) + if err != nil { + return err + } + + return +} + +func updateOrInsertTransfers(creator statementCreator, network uint64, transfers []Transfer) error { + update, err := creator.Prepare(`UPDATE transfers + SET tx = ?, sender = ?, receipt = ?, timestamp = ?, loaded = 1 + WHERE address =? AND hash = ?`) + if err != nil { + return err + } + + insert, err := creator.Prepare(`INSERT OR IGNORE INTO transfers + (network_id, hash, blk_hash, blk_number, timestamp, address, tx, sender, receipt, log, type, loaded) + VALUES + (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 1)`) + if err != nil { + return err + } + for _, t := range transfers { + res, err := update.Exec(&JSONBlob{t.Transaction}, t.From, &JSONBlob{t.Receipt}, t.Timestamp, t.Address, t.ID) + + if err != nil { + return err + } + affected, err := res.RowsAffected() + if err != nil { + return err + } + if affected > 0 { + continue + } + + _, err = insert.Exec(network, t.ID, t.BlockHash, (*SQLBigInt)(t.BlockNumber), t.Timestamp, t.Address, &JSONBlob{t.Transaction}, t.From, &JSONBlob{t.Receipt}, &JSONBlob{t.Log}, t.Type) + if err != nil { + log.Error("can't save transfer", "b-hash", t.BlockHash, "b-n", t.BlockNumber, "a", t.Address, "h", t.ID) + return err + } + } + return nil +} + +//markBlocksAsLoaded(tx, address, db.network, blocks) +func markBlocksAsLoaded(creator statementCreator, address common.Address, network uint64, blocks []*big.Int) error { + update, err := creator.Prepare("UPDATE blocks SET loaded=? WHERE address=? AND blk_number=? AND network_id=?") + if err != nil { + return err + } + + for _, block := range blocks { + _, err := update.Exec(true, address, (*SQLBigInt)(block), network) + if err != nil { + return err + } + } + return nil +} diff --git a/services/wallet/database_test.go b/services/wallet/database_test.go index b8b6a44c8..9df805c0c 100644 --- a/services/wallet/database_test.go +++ b/services/wallet/database_test.go @@ -33,7 +33,7 @@ func TestDBGetHeaderByNumber(t *testing.T) { Difficulty: big.NewInt(1), Time: 1, } - require.NoError(t, db.SaveHeaders([]*types.Header{header})) + require.NoError(t, db.SaveHeaders([]*types.Header{header}, common.Address{1})) rst, err := db.GetHeaderByNumber(header.Number) require.NoError(t, err) require.Equal(t, header.Hash(), rst.Hash) @@ -47,35 +47,45 @@ func TestDBGetHeaderByNumberNoRows(t *testing.T) { require.Nil(t, rst) } -func TestDBHeaderExists(t *testing.T) { +func TestDBProcessBlocks(t *testing.T) { db, stop := setupTestDB(t) defer stop() - header := &types.Header{ - Number: big.NewInt(10), - Difficulty: big.NewInt(1), - Time: 1, + address := common.Address{1} + from := big.NewInt(0) + to := big.NewInt(10) + blocks := []*DBHeader{ + &DBHeader{ + Number: big.NewInt(1), + Hash: common.Hash{1}, + }, + &DBHeader{ + Number: big.NewInt(2), + Hash: common.Hash{2}, + }} + t.Log(blocks) + require.NoError(t, db.ProcessBlocks(common.Address{1}, from, to, blocks)) + t.Log(db.GetLastBlockByAddress(common.Address{1}, 40)) + transfers := []Transfer{ + { + ID: common.Hash{1}, + Type: ethTransfer, + BlockHash: common.Hash{2}, + BlockNumber: big.NewInt(1), + Address: common.Address{1}, + Timestamp: 123, + From: common.Address{1}, + }, } - require.NoError(t, db.SaveHeaders([]*types.Header{header})) - rst, err := db.HeaderExists(header.Hash()) - require.NoError(t, err) - require.True(t, rst) -} - -func TestDBHeaderDoesntExist(t *testing.T) { - db, stop := setupTestDB(t) - defer stop() - - rst, err := db.HeaderExists(common.Hash{1}) - require.NoError(t, err) - require.False(t, rst) + require.NoError(t, db.SaveTranfers(address, transfers, []*big.Int{big.NewInt(1), big.NewInt(2)})) } func TestDBProcessTransfer(t *testing.T) { db, stop := setupTestDB(t) defer stop() header := &DBHeader{ - Number: big.NewInt(1), - Hash: common.Hash{1}, + Number: big.NewInt(1), + Hash: common.Hash{1}, + Address: common.Address{1}, } tx := types.NewTransaction(1, common.Address{1}, nil, 10, big.NewInt(10), nil) transfers := []Transfer{ @@ -86,9 +96,11 @@ func TestDBProcessTransfer(t *testing.T) { BlockNumber: header.Number, Transaction: tx, Receipt: types.NewReceipt(nil, false, 100), + Address: common.Address{1}, }, } - require.NoError(t, db.ProcessTranfers(transfers, nil, []*DBHeader{header}, nil, 0)) + require.NoError(t, db.ProcessBlocks(common.Address{1}, big.NewInt(1), big.NewInt(1), []*DBHeader{header})) + require.NoError(t, db.ProcessTranfers(transfers, []*DBHeader{})) } func TestDBReorgTransfers(t *testing.T) { @@ -97,21 +109,25 @@ func TestDBReorgTransfers(t *testing.T) { rcpt := types.NewReceipt(nil, false, 100) rcpt.Logs = []*types.Log{} original := &DBHeader{ - Number: big.NewInt(1), - Hash: common.Hash{1}, + Number: big.NewInt(1), + Hash: common.Hash{1}, + Address: common.Address{1}, } replaced := &DBHeader{ - Number: big.NewInt(1), - Hash: common.Hash{2}, + Number: big.NewInt(1), + Hash: common.Hash{2}, + Address: common.Address{1}, } originalTX := types.NewTransaction(1, common.Address{1}, nil, 10, big.NewInt(10), nil) replacedTX := types.NewTransaction(2, common.Address{1}, nil, 10, big.NewInt(10), nil) + require.NoError(t, db.ProcessBlocks(original.Address, original.Number, original.Number, []*DBHeader{original})) require.NoError(t, db.ProcessTranfers([]Transfer{ - {ethTransfer, common.Hash{1}, *originalTX.To(), original.Number, original.Hash, 100, originalTX, common.Address{1}, rcpt, nil}, - }, nil, []*DBHeader{original}, nil, 0)) + {ethTransfer, common.Hash{1}, *originalTX.To(), original.Number, original.Hash, 100, originalTX, true, common.Address{1}, rcpt, nil}, + }, []*DBHeader{})) + require.NoError(t, db.ProcessBlocks(replaced.Address, replaced.Number, replaced.Number, []*DBHeader{replaced})) require.NoError(t, db.ProcessTranfers([]Transfer{ - {ethTransfer, common.Hash{2}, *replacedTX.To(), replaced.Number, replaced.Hash, 100, replacedTX, common.Address{1}, rcpt, nil}, - }, nil, []*DBHeader{replaced}, []*DBHeader{original}, 0)) + {ethTransfer, common.Hash{2}, *replacedTX.To(), replaced.Number, replaced.Hash, 100, replacedTX, true, common.Address{1}, rcpt, nil}, + }, []*DBHeader{original})) all, err := db.GetTransfers(big.NewInt(0), nil) require.NoError(t, err) @@ -126,8 +142,9 @@ func TestDBGetTransfersFromBlock(t *testing.T) { transfers := []Transfer{} for i := 1; i < 10; i++ { header := &DBHeader{ - Number: big.NewInt(int64(i)), - Hash: common.Hash{byte(i)}, + Number: big.NewInt(int64(i)), + Hash: common.Hash{byte(i)}, + Address: common.Address{1}, } headers = append(headers, header) tx := types.NewTransaction(uint64(i), common.Address{1}, nil, 10, big.NewInt(10), nil) @@ -140,10 +157,12 @@ func TestDBGetTransfersFromBlock(t *testing.T) { BlockHash: header.Hash, Transaction: tx, Receipt: receipt, + Address: common.Address{1}, } transfers = append(transfers, transfer) } - require.NoError(t, db.ProcessTranfers(transfers, nil, headers, nil, 0)) + require.NoError(t, db.ProcessBlocks(headers[0].Address, headers[0].Number, headers[len(headers)-1].Number, headers)) + require.NoError(t, db.ProcessTranfers(transfers, []*DBHeader{})) rst, err := db.GetTransfers(big.NewInt(7), nil) require.NoError(t, err) require.Len(t, rst, 3) @@ -154,88 +173,6 @@ func TestDBGetTransfersFromBlock(t *testing.T) { } -func TestDBLatestSynced(t *testing.T) { - db, stop := setupTestDB(t) - defer stop() - - address := common.Address{1} - h1 := &types.Header{ - Number: big.NewInt(10), - Difficulty: big.NewInt(1), - Time: 1, - } - h2 := &types.Header{ - Number: big.NewInt(9), - Difficulty: big.NewInt(1), - Time: 1, - } - require.NoError(t, db.SaveHeaders([]*types.Header{h1, h2})) - require.NoError(t, db.SaveSyncedHeader(address, h1, ethSync)) - require.NoError(t, db.SaveSyncedHeader(address, h2, ethSync)) - - latest, err := db.GetLatestSynced(address, ethSync) - require.NoError(t, err) - require.NotNil(t, latest) - require.Equal(t, h1.Number, latest.Number) - require.Equal(t, h1.Hash(), latest.Hash) -} - -func TestDBLatestSyncedDoesntExist(t *testing.T) { - db, stop := setupTestDB(t) - defer stop() - - latest, err := db.GetLatestSynced(common.Address{1}, ethSync) - require.NoError(t, err) - require.Nil(t, latest) -} - -func TestDBProcessTransfersUpdate(t *testing.T) { - db, stop := setupTestDB(t) - defer stop() - - address := common.Address{1} - header := &DBHeader{ - Number: big.NewInt(10), - Hash: common.Hash{1}, - } - transfer := Transfer{ - ID: common.Hash{1}, - BlockNumber: header.Number, - BlockHash: header.Hash, - Transaction: types.NewTransaction(0, common.Address{}, nil, 0, nil, nil), - Address: address, - } - require.NoError(t, db.ProcessTranfers([]Transfer{transfer}, []common.Address{address}, []*DBHeader{header}, nil, ethSync)) - require.NoError(t, db.ProcessTranfers([]Transfer{transfer}, []common.Address{address}, []*DBHeader{header}, nil, erc20Sync)) - - latest, err := db.GetLatestSynced(address, ethSync|erc20Sync) - require.NoError(t, err) - require.Equal(t, header.Hash, latest.Hash) -} - -func TestDBLastHeadExist(t *testing.T) { - db, stop := setupTestDB(t) - defer stop() - - headers := []*DBHeader{ - {Number: big.NewInt(1), Hash: common.Hash{1}, Head: true}, - {Number: big.NewInt(2), Hash: common.Hash{2}, Head: true}, - {Number: big.NewInt(3), Hash: common.Hash{3}, Head: true}, - } - require.NoError(t, db.ProcessTranfers(nil, nil, headers, nil, 0)) - last, err := db.GetLastHead() - require.NoError(t, err) - require.Equal(t, headers[2].Hash, last.Hash) -} - -func TestDBLastHeadDoesntExist(t *testing.T) { - db, stop := setupTestDB(t) - defer stop() - last, err := db.GetLastHead() - require.NoError(t, err) - require.Nil(t, last) -} - func TestCustomTokens(t *testing.T) { db, stop := setupTestDB(t) defer stop() diff --git a/services/wallet/downloader.go b/services/wallet/downloader.go index 5036f05a2..4ac328ffd 100644 --- a/services/wallet/downloader.go +++ b/services/wallet/downloader.go @@ -40,6 +40,7 @@ type Transfer struct { BlockHash common.Hash `json:"blockhash"` Timestamp uint64 `json:"timestamp"` Transaction *types.Transaction `json:"transaction"` + Loaded bool // From is derived from tx signature in order to offload this computation from UI component. From common.Address `json:"from"` Receipt *types.Receipt `json:"receipt"` @@ -52,27 +53,16 @@ type ETHTransferDownloader struct { client *ethclient.Client accounts []common.Address signer types.Signer + db *Database } +var errLogsDownloaderStuck = errors.New("logs downloader stuck") + // GetTransfers checks if the balance was changed between two blocks. // If so it downloads transaction that transfer ethereum from that block. func (d *ETHTransferDownloader) GetTransfers(ctx context.Context, header *DBHeader) (rst []Transfer, err error) { // TODO(dshulyak) consider caching balance and reset it on reorg - num := new(big.Int).Sub(header.Number, one) - changed := []common.Address{} - for _, address := range d.accounts { - balance, err := d.client.BalanceAt(ctx, address, num) - if err != nil { - return nil, err - } - current, err := d.client.BalanceAt(ctx, address, header.Number) - if err != nil { - return nil, err - } - if current.Cmp(balance) != 0 { - changed = append(changed, address) - } - } + changed := d.accounts if len(changed) == 0 { return nil, nil } @@ -100,35 +90,53 @@ func (d *ETHTransferDownloader) GetTransfersByNumber(ctx context.Context, number } func (d *ETHTransferDownloader) getTransfersInBlock(ctx context.Context, blk *types.Block, accounts []common.Address) (rst []Transfer, err error) { - for _, tx := range blk.Transactions() { - for _, address := range accounts { + for _, address := range accounts { + preloadedTransfers, err := d.db.GetPreloadedTransactions(address, blk.Hash()) + if err != nil { + return nil, err + } + + for _, t := range preloadedTransfers { + transfer, err := d.transferFromLog(ctx, *t.Log, address, t.ID) + if err != nil { + log.Error("can't fetch erc20 transfer from log", "error", err) + return nil, err + } + rst = append(rst, transfer) + } + + for _, tx := range blk.Transactions() { + from, err := types.Sender(d.signer, tx) if err != nil { return nil, err } + if from == address || (tx.To() != nil && *tx.To() == address) { receipt, err := d.client.TransactionReceipt(ctx, tx.Hash()) if err != nil { return nil, err } - if IsTokenTransfer(receipt.Logs) { - log.Debug("eth downloader found token transfer", "hash", tx.Hash()) - continue - } - rst = append(rst, Transfer{ - Type: ethTransfer, - ID: tx.Hash(), - Address: address, - BlockNumber: blk.Number(), - BlockHash: blk.Hash(), - Timestamp: blk.Time(), - Transaction: tx, - From: from, - Receipt: receipt}) + transactionLog := getTokenLog(receipt.Logs) + + if transactionLog == nil { + rst = append(rst, Transfer{ + Type: ethTransfer, + ID: tx.Hash(), + Address: address, + BlockNumber: blk.Number(), + BlockHash: blk.Hash(), + Timestamp: blk.Time(), + Transaction: tx, + From: from, + Receipt: receipt, + Log: transactionLog}) + } } } } + log.Debug("getTransfersInBlock found", "block", blk.Number(), "len", len(rst)) // TODO(dshulyak) test that balance difference was covered by transactions return rst, nil } @@ -170,6 +178,43 @@ func (d *ERC20TransfersDownloader) outboundTopics(address common.Address) [][]co return [][]common.Hash{{d.signature}, {d.paddedAddress(address)}, {}} } +func (d *ETHTransferDownloader) transferFromLog(parent context.Context, ethlog types.Log, address common.Address, id common.Hash) (Transfer, error) { + ctx, cancel := context.WithTimeout(parent, 3*time.Second) + tx, _, err := d.client.TransactionByHash(ctx, ethlog.TxHash) + cancel() + if err != nil { + return Transfer{}, err + } + from, err := types.Sender(d.signer, tx) + if err != nil { + return Transfer{}, err + } + ctx, cancel = context.WithTimeout(parent, 3*time.Second) + receipt, err := d.client.TransactionReceipt(ctx, ethlog.TxHash) + cancel() + if err != nil { + return Transfer{}, err + } + ctx, cancel = context.WithTimeout(parent, 3*time.Second) + blk, err := d.client.BlockByHash(ctx, ethlog.BlockHash) + cancel() + if err != nil { + return Transfer{}, err + } + return Transfer{ + Address: address, + ID: id, + Type: erc20Transfer, + BlockNumber: new(big.Int).SetUint64(ethlog.BlockNumber), + BlockHash: ethlog.BlockHash, + Transaction: tx, + From: from, + Receipt: receipt, + Timestamp: blk.Time(), + Log: ðlog, + }, nil +} + func (d *ERC20TransfersDownloader) transferFromLog(parent context.Context, ethlog types.Log, address common.Address) (Transfer, error) { ctx, cancel := context.WithTimeout(parent, 3*time.Second) tx, _, err := d.client.TransactionByHash(ctx, ethlog.TxHash) @@ -229,11 +274,52 @@ func (d *ERC20TransfersDownloader) transfersFromLogs(parent context.Context, log select { case <-concurrent.WaitAsync(): case <-parent.Done(): - return nil, errors.New("logs downloader stuck") + return nil, errLogsDownloaderStuck } return concurrent.Get(), concurrent.Error() } +func (d *ERC20TransfersDownloader) blocksFromLogs(parent context.Context, logs []types.Log, address common.Address) ([]*DBHeader, error) { + concurrent := NewConcurrentDownloader(parent) + for i := range logs { + l := logs[i] + + if l.Removed { + continue + } + + index := [4]byte{} + binary.BigEndian.PutUint32(index[:], uint32(l.Index)) + id := crypto.Keccak256Hash(l.TxHash.Bytes(), index[:]) + + header := &DBHeader{ + Number: big.NewInt(int64(l.BlockNumber)), + Hash: l.BlockHash, + Erc20Transfer: &Transfer{ + Address: address, + BlockNumber: big.NewInt(int64(l.BlockNumber)), + BlockHash: l.BlockHash, + ID: id, + From: address, + Loaded: false, + Type: erc20Transfer, + Log: &l, + }, + } + + concurrent.Add(func(ctx context.Context) error { + concurrent.PushHeader(header) + return nil + }) + } + select { + case <-concurrent.WaitAsync(): + case <-parent.Done(): + return nil, errLogsDownloaderStuck + } + return concurrent.GetHeaders(), concurrent.Error() +} + // GetTransfers for erc20 uses eth_getLogs rpc with Transfer event signature and our address acount. func (d *ERC20TransfersDownloader) GetTransfers(ctx context.Context, header *DBHeader) ([]Transfer, error) { hash := header.Hash @@ -266,12 +352,12 @@ func (d *ERC20TransfersDownloader) GetTransfers(ctx context.Context, header *DBH return transfers, nil } -// GetTransfersInRange returns transfers between two blocks. +// GetHeadersInRange returns transfers between two blocks. // time to get logs for 100000 blocks = 1.144686979s. with 249 events in the result set. -func (d *ERC20TransfersDownloader) GetTransfersInRange(parent context.Context, from, to *big.Int) ([]Transfer, error) { +func (d *ERC20TransfersDownloader) GetHeadersInRange(parent context.Context, from, to *big.Int) ([]*DBHeader, error) { start := time.Now() log.Debug("get erc20 transfers in range", "from", from, "to", to) - transfers := []Transfer{} + headers := []*DBHeader{} for _, address := range d.accounts { ctx, cancel := context.WithTimeout(parent, 5*time.Second) outbound, err := d.client.FilterLogs(ctx, ethereum.FilterQuery{ @@ -297,14 +383,14 @@ func (d *ERC20TransfersDownloader) GetTransfersInRange(parent context.Context, f if len(logs) == 0 { continue } - rst, err := d.transfersFromLogs(parent, logs, address) + rst, err := d.blocksFromLogs(parent, logs, address) if err != nil { return nil, err } - transfers = append(transfers, rst...) + headers = append(headers, rst...) } - log.Debug("found erc20 transfers between two blocks", "from", from, "to", to, "lth", len(transfers), "took", time.Since(start)) - return transfers, nil + log.Debug("found erc20 transfers between two blocks", "from", from, "to", to, "headers", len(headers), "took", time.Since(start)) + return headers, nil } func IsTokenTransfer(logs []*types.Log) bool { @@ -316,3 +402,13 @@ func IsTokenTransfer(logs []*types.Log) bool { } return false } + +func getTokenLog(logs []*types.Log) *types.Log { + signature := crypto.Keccak256Hash([]byte(erc20TransferEventSignature)) + for _, l := range logs { + if len(l.Topics) > 0 && l.Topics[0] == signature { + return l + } + } + return nil +} diff --git a/services/wallet/downloader_test.go b/services/wallet/downloader_test.go index 420477cd7..badcdd3e7 100644 --- a/services/wallet/downloader_test.go +++ b/services/wallet/downloader_test.go @@ -30,6 +30,7 @@ type ETHTransferSuite struct { identity, secondary *ecdsa.PrivateKey faucet *ecdsa.PrivateKey signer types.Signer + dbStop func() downloader *ETHTransferDownloader } @@ -51,15 +52,22 @@ func (s *ETHTransferSuite) SetupTest() { s.Require().NoError(err) s.ethclient = ethclient.NewClient(client) s.signer = types.NewEIP155Signer(big.NewInt(1337)) + db, stop := setupTestDB(s.Suite.T()) + s.dbStop = stop s.downloader = ÐTransferDownloader{ signer: s.signer, client: s.ethclient, + db: db, accounts: []common.Address{ crypto.PubkeyToAddress(s.identity.PublicKey), crypto.PubkeyToAddress(s.secondary.PublicKey)}, } } +func (s *ETHTransferSuite) TearDownTest() { + s.dbStop() +} + // signAndMineTx signs transaction with provided key and waits for it to be mined. // uses configured faucet key if pkey is nil. func (s *ETHTransferSuite) signAndMineTx(tx *types.Transaction, pkey *ecdsa.PrivateKey) { @@ -256,7 +264,7 @@ func (s *ERC20TransferSuite) TestInRange() { _, err = bind.WaitMined(timeout, s.ethclient, tx) s.Require().NoError(err) } - transfers, err := s.downloader.GetTransfersInRange(context.TODO(), big.NewInt(1), nil) + transfers, err := s.downloader.GetHeadersInRange(context.TODO(), big.NewInt(1), nil) s.Require().NoError(err) s.Require().Len(transfers, 5) } diff --git a/services/wallet/events.go b/services/wallet/events.go index f126daab7..1a5e6f311 100644 --- a/services/wallet/events.go +++ b/services/wallet/events.go @@ -16,6 +16,10 @@ const ( EventReorg EventType = "reorg" // EventNewHistory emitted if transfer from older block was added. EventNewHistory EventType = "history" + // EventFetchingRecentHistory emitted when fetching of lastest tx history is started + EventFetchingRecentHistory EventType = "recent-history-fetching" + // EventRecentHistoryFetched emitted when fetching of lastest tx history is started + EventRecentHistoryReady EventType = "recent-history-ready" ) // Event is a type for wallet events. @@ -23,4 +27,5 @@ type Event struct { Type EventType `json:"type"` BlockNumber *big.Int `json:"blockNumber"` Accounts []common.Address `json:"accounts"` + ERC20 bool `json:"erc20"` } diff --git a/services/wallet/iterative.go b/services/wallet/iterative.go index ea638060a..b538aa117 100644 --- a/services/wallet/iterative.go +++ b/services/wallet/iterative.go @@ -11,17 +11,13 @@ import ( // SetupIterativeDownloader configures IterativeDownloader with last known synced block. func SetupIterativeDownloader( - db *Database, client HeaderReader, address common.Address, option SyncOption, - downloader BatchDownloader, size *big.Int, to *DBHeader) (*IterativeDownloader, error) { - from, err := db.GetLatestSynced(address, option) - if err != nil { - log.Error("failed to get latest synced block", "error", err) - return nil, err + db *Database, client HeaderReader, address common.Address, + downloader BatchDownloader, size *big.Int, to *big.Int, from *big.Int) (*IterativeDownloader, error) { + adjustedSize := big.NewInt(0).Div(big.NewInt(0).Sub(to, from), big.NewInt(10)) + if adjustedSize.Cmp(size) == 1 { + size = adjustedSize } - if from == nil { - from = &DBHeader{Number: zero} - } - log.Debug("iterative downloader", "address", address, "from", from.Number, "to", to.Number) + log.Info("iterative downloader", "address", address, "from", from, "to", to, "size", size) d := &IterativeDownloader{ client: client, batchSize: size, @@ -34,7 +30,7 @@ func SetupIterativeDownloader( // BatchDownloader interface for loading transfers in batches in speificed range of blocks. type BatchDownloader interface { - GetTransfersInRange(ctx context.Context, from, to *big.Int) ([]Transfer, error) + GetHeadersInRange(ctx context.Context, from, to *big.Int) ([]*DBHeader, error) } // IterativeDownloader downloads batches of transfers in a specified size. @@ -45,42 +41,45 @@ type IterativeDownloader struct { downloader BatchDownloader - from, to *DBHeader - previous *DBHeader + from, to *big.Int + previous *big.Int } // Finished true when earliest block with given sync option is zero. func (d *IterativeDownloader) Finished() bool { - return d.from.Number.Cmp(d.to.Number) == 0 + return d.from.Cmp(d.to) == 0 } // Header return last synced header. -func (d *IterativeDownloader) Header() *DBHeader { +func (d *IterativeDownloader) Header() *big.Int { return d.previous } // Next moves closer to the end on every new iteration. -func (d *IterativeDownloader) Next(parent context.Context) ([]Transfer, error) { - to := new(big.Int).Add(d.from.Number, d.batchSize) +func (d *IterativeDownloader) Next(parent context.Context) ([]*DBHeader, *big.Int, *big.Int, error) { + to := d.to + from := new(big.Int).Sub(to, d.batchSize) // if start < 0; start = 0 - if to.Cmp(d.to.Number) == 1 { - to = d.to.Number + if from.Cmp(d.from) == -1 { + from = d.from } - transfers, err := d.downloader.GetTransfersInRange(parent, d.from.Number, to) + log.Info("load erc20 transfers in range", "from", from, "to", to) + headres, err := d.downloader.GetHeadersInRange(parent, from, to) if err != nil { - log.Error("failed to get transfer in between two bloks", "from", d.from.Number, "to", to, "error", err) - return nil, err + log.Error("failed to get transfer in between two bloks", "from", from, "to", to, "error", err) + return nil, nil, nil, err } // use integers instead of DBHeader ctx, cancel := context.WithTimeout(parent, 5*time.Second) - header, err := d.client.HeaderByNumber(ctx, to) + header, err := d.client.HeaderByNumber(ctx, from) cancel() if err != nil { - log.Error("failed to get header by number", "from", d.from.Number, "to", to, "error", err) - return nil, err + log.Error("failed to get header by number", "from", d.from, "to", to, "error", err) + return nil, nil, nil, err } - d.previous, d.from = d.from, toDBHeader(header) - return transfers, nil + + d.previous, d.to = d.to, header.Number + return headres, d.from, to, nil } // Revert reverts last step progress. Should be used if application failed to process transfers. diff --git a/services/wallet/iterative_test.go b/services/wallet/iterative_test.go index 3d90bac03..0d3307047 100644 --- a/services/wallet/iterative_test.go +++ b/services/wallet/iterative_test.go @@ -14,11 +14,11 @@ import ( type transfersFixture []Transfer -func (f transfersFixture) GetTransfersInRange(ctx context.Context, from, to *big.Int) ([]Transfer, error) { - rst := []Transfer{} +func (f transfersFixture) GetHeadersInRange(ctx context.Context, from, to *big.Int) ([]*DBHeader, error) { + rst := []*DBHeader{} for _, t := range f { if t.BlockNumber.Cmp(from) >= 0 && t.BlockNumber.Cmp(to) <= 0 { - rst = append(rst, t) + rst = append(rst, &DBHeader{Number: t.BlockNumber}) } } return rst, nil @@ -26,25 +26,25 @@ func (f transfersFixture) GetTransfersInRange(ctx context.Context, from, to *big func TestIterFinished(t *testing.T) { iterator := IterativeDownloader{ - from: &DBHeader{Number: big.NewInt(10)}, - to: &DBHeader{Number: big.NewInt(10)}, + from: big.NewInt(10), + to: big.NewInt(10), } require.True(t, iterator.Finished()) } func TestIterNotFinished(t *testing.T) { iterator := IterativeDownloader{ - from: &DBHeader{Number: big.NewInt(2)}, - to: &DBHeader{Number: big.NewInt(5)}, + from: big.NewInt(2), + to: big.NewInt(5), } require.False(t, iterator.Finished()) } func TestIterRevert(t *testing.T) { iterator := IterativeDownloader{ - from: &DBHeader{Number: big.NewInt(12)}, - to: &DBHeader{Number: big.NewInt(12)}, - previous: &DBHeader{Number: big.NewInt(9)}, + from: big.NewInt(12), + to: big.NewInt(12), + previous: big.NewInt(9), } require.True(t, iterator.Finished()) iterator.Revert() @@ -66,13 +66,13 @@ func TestIterProgress(t *testing.T) { client: chain, downloader: transfers, batchSize: big.NewInt(5), - from: &DBHeader{Number: big.NewInt(0)}, - to: &DBHeader{Number: big.NewInt(9)}, + from: big.NewInt(0), + to: big.NewInt(9), } - batch, err := iter.Next(context.TODO()) + batch, _, _, err := iter.Next(context.TODO()) require.NoError(t, err) require.Len(t, batch, 6) - batch, err = iter.Next(context.TODO()) + batch, _, _, err = iter.Next(context.TODO()) require.NoError(t, err) require.Len(t, batch, 5) require.True(t, iter.Finished()) diff --git a/services/wallet/reactor.go b/services/wallet/reactor.go index a8dca62a3..5dcbba535 100644 --- a/services/wallet/reactor.go +++ b/services/wallet/reactor.go @@ -30,8 +30,9 @@ func pollingPeriodByChain(chain *big.Int) time.Duration { } var ( - reorgSafetyDepth = big.NewInt(15) - erc20BatchSize = big.NewInt(100000) + reorgSafetyDepth = big.NewInt(15) + erc20BatchSize = big.NewInt(100000) + errAlreadyRunning = errors.New("already running") ) // HeaderReader interface for reading headers using block number or hash. @@ -43,6 +44,7 @@ type HeaderReader interface { // BalanceReader interface for reading balance at a specifeid address. type BalanceReader interface { BalanceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (*big.Int, error) + NonceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (uint64, error) } type reactorClient interface { @@ -71,14 +73,7 @@ type Reactor struct { group *Group } -// Start runs reactor loop in background. -func (r *Reactor) Start(accounts []common.Address) error { - r.mu.Lock() - defer r.mu.Unlock() - if r.group != nil { - return errors.New("already running") - } - r.group = NewGroup(context.Background()) +func (r *Reactor) newControlCommand(accounts []common.Address) *controlCommand { signer := types.NewEIP155Signer(r.chain) ctl := &controlCommand{ db: r.db, @@ -89,11 +84,25 @@ func (r *Reactor) Start(accounts []common.Address) error { client: r.client, accounts: accounts, signer: signer, + db: r.db, }, erc20: NewERC20TransfersDownloader(r.client, accounts, signer), feed: r.feed, safetyDepth: reorgSafetyDepth, } + + return ctl +} + +// Start runs reactor loop in background. +func (r *Reactor) Start(accounts []common.Address) error { + r.mu.Lock() + defer r.mu.Unlock() + if r.group != nil { + return errAlreadyRunning + } + r.group = NewGroup(context.Background()) + ctl := r.newControlCommand(accounts) r.group.Add(ctl.Command()) return nil } diff --git a/services/wallet/service_test.go b/services/wallet/service_test.go index cc8e34312..09ee4d34c 100644 --- a/services/wallet/service_test.go +++ b/services/wallet/service_test.go @@ -80,14 +80,14 @@ func (s *ReactorChangesSuite) TestWatchNewAccounts() { }) s.Require().NoError(s.reactor.Start([]common.Address{s.first})) s.Require().NoError(utils.Eventually(func() error { - transfers, err := s.db.GetTransfersByAddress(s.first, big.NewInt(0), nil) + transfers, err := s.db.GetTransfersInRange(s.first, big.NewInt(0), nil) if err != nil { return err } if len(transfers) != 1 { return fmt.Errorf("expect to get 1 transfer for first address %x, got %d", s.first, len(transfers)) } - transfers, err = s.db.GetTransfersByAddress(s.second, big.NewInt(0), nil) + transfers, err = s.db.GetTransfersInRange(s.second, big.NewInt(0), nil) if err != nil { return err } @@ -98,7 +98,7 @@ func (s *ReactorChangesSuite) TestWatchNewAccounts() { }, 5*time.Second, 500*time.Millisecond)) s.feed.Send([]accounts.Account{{Address: types.Address(s.first)}, {Address: types.Address(s.second)}}) s.Require().NoError(utils.Eventually(func() error { - transfers, err := s.db.GetTransfersByAddress(s.second, big.NewInt(0), nil) + transfers, err := s.db.GetTransfersInRange(s.second, big.NewInt(0), nil) if err != nil { return err } diff --git a/services/wallet/transfers_query.go b/services/wallet/transfers_query.go index 6043fafd6..ef0c48126 100644 --- a/services/wallet/transfers_query.go +++ b/services/wallet/transfers_query.go @@ -9,7 +9,7 @@ import ( "github.com/ethereum/go-ethereum/core/types" ) -const baseTransfersQuery = "SELECT transfers.hash, type, blocks.hash, blocks.number, blocks.timestamp, address, tx, sender, receipt, log FROM transfers JOIN blocks ON blk_hash = blocks.hash" +const baseTransfersQuery = "SELECT hash, type, blk_hash, blk_number, timestamp, address, tx, sender, receipt, log FROM transfers" func newTransfersQuery() *transfersQuery { buf := bytes.NewBuffer(nil) @@ -35,7 +35,7 @@ func (q *transfersQuery) FilterStart(start *big.Int) *transfersQuery { if start != nil { q.andOrWhere() q.added = true - q.buf.WriteString(" blocks.number >= ?") + q.buf.WriteString(" blk_number >= ?") q.args = append(q.args, (*SQLBigInt)(start)) } return q @@ -45,16 +45,25 @@ func (q *transfersQuery) FilterEnd(end *big.Int) *transfersQuery { if end != nil { q.andOrWhere() q.added = true - q.buf.WriteString(" blocks.number <= ?") + q.buf.WriteString(" blk_number <= ?") q.args = append(q.args, (*SQLBigInt)(end)) } return q } +func (q *transfersQuery) FilterLoaded(loaded int) *transfersQuery { + q.andOrWhere() + q.added = true + q.buf.WriteString(" loaded = ? ") + q.args = append(q.args, loaded) + + return q +} + func (q *transfersQuery) FilterNetwork(network uint64) *transfersQuery { q.andOrWhere() q.added = true - q.buf.WriteString(" blocks.network_id = ?") + q.buf.WriteString(" network_id = ?") q.args = append(q.args, network) return q } @@ -67,6 +76,21 @@ func (q *transfersQuery) FilterAddress(address common.Address) *transfersQuery { return q } +func (q *transfersQuery) FilterBlockHash(blockHash common.Hash) *transfersQuery { + q.andOrWhere() + q.added = true + q.buf.WriteString(" blk_hash = ?") + q.args = append(q.args, blockHash) + return q +} + +func (q *transfersQuery) Limit(pageSize int64) *transfersQuery { + q.buf.WriteString(" ORDER BY blk_number DESC ") + q.buf.WriteString(" LIMIT ?") + q.args = append(q.args, pageSize) + return q +} + func (q *transfersQuery) String() string { return q.buf.String() } diff --git a/services/wallet/transmitter.go b/services/wallet/transmitter.go index 8aa89a775..14d99b9b5 100644 --- a/services/wallet/transmitter.go +++ b/services/wallet/transmitter.go @@ -1,7 +1,6 @@ package wallet import ( - "errors" "sync" "github.com/ethereum/go-ethereum/event" @@ -25,7 +24,7 @@ type SignalsTransmitter struct { // Start runs loop in background. func (tmr *SignalsTransmitter) Start() error { if tmr.quit != nil { - return errors.New("already running") + return errAlreadyRunning } tmr.quit = make(chan struct{}) events := make(chan Event, 10) diff --git a/t/devtests/tranfers_test.go b/t/devtests/tranfers_test.go index 12973e334..c9626a0c3 100644 --- a/t/devtests/tranfers_test.go +++ b/t/devtests/tranfers_test.go @@ -28,7 +28,7 @@ type TransfersSuite struct { } func (s *TransfersSuite) getAllTranfers() (rst []wallet.TransferView, err error) { - return rst, s.Local.Call(&rst, "wallet_getTransfersByAddress", s.DevAccountAddress, (*hexutil.Big)(big.NewInt(0))) + return rst, s.Local.Call(&rst, "wallet_getTransfersByAddress", s.DevAccountAddress, (*hexutil.Big)(big.NewInt(1000)), (*hexutil.Big)(big.NewInt(40))) } func (s *TransfersSuite) sendTx(nonce uint64, to types.Address) {