status-go/services/wallet/database.go
Dmitry Shulyak 40b6b3da13
Simplify transfer object (#1521)
* Store tx and receipt in db and cast it to TransferView on read

* Store Log instead of log index

* Use contract from log and bring back address field

* Add tx status and id fields
2019-07-15 14:16:07 +03:00

363 lines
9.1 KiB
Go

package wallet
import (
"database/sql"
"database/sql/driver"
"encoding/json"
"errors"
"math/big"
"reflect"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/status-im/status-go/services/wallet/migrations"
"github.com/status-im/status-go/sqlite"
)
// DBHeader fields from header that are stored in database.
type DBHeader struct {
Number *big.Int
Hash common.Hash
Timestamp uint64
// Head is true if the block was a head at the time it was pulled from chain.
Head bool
}
func toDBHeader(header *types.Header) *DBHeader {
return &DBHeader{
Hash: header.Hash(),
Number: header.Number,
Timestamp: header.Time,
}
}
func toHead(header *types.Header) *DBHeader {
dbheader := toDBHeader(header)
dbheader.Head = true
return dbheader
}
// SyncOption is used to specify that application processed transfers for that block.
type SyncOption uint
const (
// sync options
ethSync SyncOption = 1
erc20Sync SyncOption = 2
)
// InitializeDB creates db file at a given path and applies migrations.
func InitializeDB(path, password string) (*Database, error) {
db, err := sqlite.OpenDB(path, password)
if err != nil {
return nil, err
}
err = migrations.Migrate(db)
if err != nil {
return nil, err
}
return &Database{db: db}, nil
}
// SQLBigInt type for storing uint256 in the databse.
// FIXME(dshulyak) SQL big int is max 64 bits. Maybe store as bytes in big endian and hope
// that lexographical sorting will work.
type SQLBigInt big.Int
// Scan implements interface.
func (i *SQLBigInt) Scan(value interface{}) error {
val, ok := value.(int64)
if !ok {
return errors.New("not an integer")
}
(*big.Int)(i).SetInt64(val)
return nil
}
// Value implements interface.
func (i *SQLBigInt) Value() (driver.Value, error) {
if !(*big.Int)(i).IsInt64() {
return nil, errors.New("not an int64")
}
return (*big.Int)(i).Int64(), nil
}
// JSONBlob type for marshaling/unmarshaling inner type to json.
type JSONBlob struct {
data interface{}
}
// Scan implements interface.
func (blob *JSONBlob) Scan(value interface{}) error {
if value == nil || reflect.ValueOf(blob.data).IsNil() {
return nil
}
bytes, ok := value.([]byte)
if !ok {
return errors.New("not a byte slice")
}
if len(bytes) == 0 {
return nil
}
err := json.Unmarshal(bytes, blob.data)
return err
}
// Value implements interface.
func (blob *JSONBlob) Value() (driver.Value, error) {
if blob.data == nil || reflect.ValueOf(blob.data).IsNil() {
return nil, nil
}
return json.Marshal(blob.data)
}
// Database sql wrapper for operations with wallet objects.
type Database struct {
db *sql.DB
}
// Close closes database.
func (db Database) Close() error {
return db.db.Close()
}
// ProcessTranfers atomically adds/removes blocks and adds new tranfers.
func (db Database) ProcessTranfers(transfers []Transfer, accounts []common.Address, added, removed []*DBHeader, option SyncOption) (err error) {
var (
tx *sql.Tx
)
tx, err = db.db.Begin()
if err != nil {
return err
}
defer func() {
if err == nil {
err = tx.Commit()
return
}
_ = tx.Rollback()
}()
err = deleteHeaders(tx, removed)
if err != nil {
return
}
err = insertHeaders(tx, added)
if err != nil {
return
}
err = insertTransfers(tx, transfers)
if err != nil {
return
}
err = updateAccounts(tx, accounts, added, option)
return
}
// GetTransfersByAddress loads transfers for a given address between two blocks.
func (db *Database) GetTransfersByAddress(address common.Address, start, end *big.Int) (rst []Transfer, err error) {
query := newTransfersQuery().FilterAddress(address).FilterStart(start).FilterEnd(end)
rows, err := db.db.Query(query.String(), query.Args()...)
if err != nil {
return
}
defer rows.Close()
return query.Scan(rows)
}
// GetTransfers load transfers transfer betweeen two blocks.
func (db *Database) GetTransfers(start, end *big.Int) (rst []Transfer, err error) {
query := newTransfersQuery().FilterStart(start).FilterEnd(end)
rows, err := db.db.Query(query.String(), query.Args()...)
if err != nil {
return
}
defer rows.Close()
return query.Scan(rows)
}
// SaveHeaders stores a list of headers atomically.
func (db *Database) SaveHeaders(headers []*types.Header) (err error) {
var (
tx *sql.Tx
insert *sql.Stmt
)
tx, err = db.db.Begin()
if err != nil {
return
}
insert, err = tx.Prepare("INSERT INTO blocks(number, hash, timestamp) VALUES (?, ?, ?)")
if err != nil {
return
}
defer func() {
if err == nil {
err = tx.Commit()
} else {
_ = tx.Rollback()
}
}()
for _, h := range headers {
_, err = insert.Exec((*SQLBigInt)(h.Number), h.Hash(), h.Time)
if err != nil {
return
}
}
return
}
func (db *Database) SaveSyncedHeader(address common.Address, header *types.Header, option SyncOption) (err error) {
var (
tx *sql.Tx
insert *sql.Stmt
)
tx, err = db.db.Begin()
if err != nil {
return
}
insert, err = tx.Prepare("INSERT INTO accounts_to_blocks(address, blk_number, sync) VALUES (?,?,?)")
if err != nil {
return
}
defer func() {
if err == nil {
err = tx.Commit()
} else {
_ = tx.Rollback()
}
}()
_, err = insert.Exec(address, (*SQLBigInt)(header.Number), option)
if err != nil {
return
}
return err
}
// HeaderExists checks if header with hash exists in db.
func (db *Database) HeaderExists(hash common.Hash) (bool, error) {
var val sql.NullBool
err := db.db.QueryRow("SELECT EXISTS (SELECT hash FROM blocks WHERE hash = ?)", hash).Scan(&val)
if err != nil {
return false, err
}
return val.Bool, nil
}
// GetHeaderByNumber selects header using block number.
func (db *Database) GetHeaderByNumber(number *big.Int) (header *DBHeader, err error) {
header = &DBHeader{Hash: common.Hash{}, Number: new(big.Int)}
err = db.db.QueryRow("SELECT hash,number FROM blocks WHERE number = ?", (*SQLBigInt)(number)).Scan(&header.Hash, (*SQLBigInt)(header.Number))
if err == nil {
return header, nil
}
if err == sql.ErrNoRows {
return nil, nil
}
return nil, err
}
func (db *Database) GetLastHead() (header *DBHeader, err error) {
header = &DBHeader{Hash: common.Hash{}, Number: new(big.Int)}
err = db.db.QueryRow("SELECT hash,number FROM blocks WHERE head = 1 AND number = (SELECT MAX(number) FROM blocks)").Scan(&header.Hash, (*SQLBigInt)(header.Number))
if err == nil {
return header, nil
}
if err == sql.ErrNoRows {
return nil, nil
}
return nil, err
}
// GetLatestSynced downloads last synced block with a given option.
func (db *Database) GetLatestSynced(address common.Address, option SyncOption) (header *DBHeader, err error) {
header = &DBHeader{Hash: common.Hash{}, Number: new(big.Int)}
err = db.db.QueryRow(`
SELECT blocks.hash, blk_number FROM accounts_to_blocks JOIN blocks ON blk_number = blocks.number WHERE address = $1 AND blk_number
= (SELECT MAX(blk_number) FROM accounts_to_blocks WHERE address = $1 AND sync & $2 = $2)`, address, option).Scan(&header.Hash, (*SQLBigInt)(header.Number))
if err == nil {
return header, nil
}
if err == sql.ErrNoRows {
return nil, nil
}
return nil, err
}
// statementCreator allows to pass transaction or database to use in consumer.
type statementCreator interface {
Prepare(query string) (*sql.Stmt, error)
}
func deleteHeaders(creator statementCreator, headers []*DBHeader) error {
delete, err := creator.Prepare("DELETE FROM blocks WHERE hash = ?")
if err != nil {
return err
}
for _, h := range headers {
_, err = delete.Exec(h.Hash)
if err != nil {
return err
}
}
return nil
}
func insertHeaders(creator statementCreator, headers []*DBHeader) error {
insert, err := creator.Prepare("INSERT OR IGNORE INTO blocks(hash, number, timestamp, head) VALUES (?, ?, ?, ?)")
if err != nil {
return err
}
for _, h := range headers {
_, err = insert.Exec(h.Hash, (*SQLBigInt)(h.Number), h.Timestamp, h.Head)
if err != nil {
return err
}
}
return nil
}
func insertTransfers(creator statementCreator, transfers []Transfer) error {
insert, err := creator.Prepare("INSERT OR IGNORE INTO transfers(hash, blk_hash, address, tx, sender, receipt, log, type) VALUES (?, ?, ?, ?, ?, ?, ?, ?)")
if err != nil {
return err
}
for _, t := range transfers {
_, err = insert.Exec(t.ID, t.BlockHash, t.Address, &JSONBlob{t.Transaction}, t.From, &JSONBlob{t.Receipt}, &JSONBlob{t.Log}, t.Type)
if err != nil {
return err
}
}
return nil
}
func updateAccounts(creator statementCreator, accounts []common.Address, headers []*DBHeader, option SyncOption) error {
update, err := creator.Prepare("UPDATE accounts_to_blocks SET sync=sync|? WHERE address=? AND blk_number=?")
if err != nil {
return err
}
insert, err := creator.Prepare("INSERT OR IGNORE INTO accounts_to_blocks(address,blk_number,sync) VALUES(?,?,?)")
if err != nil {
return err
}
for _, acc := range accounts {
for _, h := range headers {
rst, err := update.Exec(option, acc, (*SQLBigInt)(h.Number))
if err != nil {
return err
}
affected, err := rst.RowsAffected()
if err != nil {
return err
}
if affected > 0 {
continue
}
_, err = insert.Exec(acc, (*SQLBigInt)(h.Number), option)
if err != nil {
return err
}
}
}
return nil
}