2021-09-09 16:28:54 +02:00
package transfer
import (
"database/sql"
"database/sql/driver"
"encoding/json"
"errors"
"math/big"
"reflect"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/status-im/status-go/services/wallet/bigint"
)
// DBHeader fields from header that are stored in database.
type DBHeader struct {
Number * big . Int
Hash common . Hash
Timestamp uint64
Erc20Transfers [ ] * Transfer
Network uint64
Address common . Address
// Head is true if the block was a head at the time it was pulled from chain.
Head bool
// Loaded is true if trasfers from this block has been already fetched
Loaded bool
}
func toDBHeader ( header * types . Header ) * DBHeader {
return & DBHeader {
Hash : header . Hash ( ) ,
Number : header . Number ,
Timestamp : header . Time ,
Loaded : false ,
}
}
// SyncOption is used to specify that application processed transfers for that block.
type SyncOption uint
// JSONBlob type for marshaling/unmarshaling inner type to json.
type JSONBlob struct {
data interface { }
}
// Scan implements interface.
func ( blob * JSONBlob ) Scan ( value interface { } ) error {
if value == nil || reflect . ValueOf ( blob . data ) . IsNil ( ) {
return nil
}
bytes , ok := value . ( [ ] byte )
if ! ok {
return errors . New ( "not a byte slice" )
}
if len ( bytes ) == 0 {
return nil
}
err := json . Unmarshal ( bytes , blob . data )
return err
}
// Value implements interface.
func ( blob * JSONBlob ) Value ( ) ( driver . Value , error ) {
if blob . data == nil || reflect . ValueOf ( blob . data ) . IsNil ( ) {
return nil , nil
}
return json . Marshal ( blob . data )
}
func NewDB ( client * sql . DB ) * Database {
return & Database { client : client }
}
// Database sql wrapper for operations with wallet objects.
type Database struct {
client * sql . DB
}
// Close closes database.
func ( db * Database ) Close ( ) error {
return db . client . Close ( )
}
func ( db * Database ) ProcessBlocks ( chainID uint64 , account common . Address , from * big . Int , to * LastKnownBlock , headers [ ] * DBHeader ) ( err error ) {
var (
tx * sql . Tx
)
tx , err = db . client . Begin ( )
if err != nil {
return err
}
defer func ( ) {
if err == nil {
err = tx . Commit ( )
return
}
_ = tx . Rollback ( )
} ( )
err = insertBlocksWithTransactions ( chainID , tx , account , headers )
if err != nil {
return
}
err = upsertRange ( chainID , tx , account , from , to )
if err != nil {
return
}
return
}
func ( db * Database ) SaveBlocks ( chainID uint64 , account common . Address , headers [ ] * DBHeader ) ( err error ) {
var (
tx * sql . Tx
)
tx , err = db . client . Begin ( )
if err != nil {
return err
}
defer func ( ) {
if err == nil {
err = tx . Commit ( )
return
}
_ = tx . Rollback ( )
} ( )
err = insertBlocksWithTransactions ( chainID , tx , account , headers )
if err != nil {
return
}
return
}
// ProcessTranfers atomically adds/removes blocks and adds new tranfers.
func ( db * Database ) ProcessTranfers ( chainID uint64 , transfers [ ] Transfer , removed [ ] * DBHeader ) ( err error ) {
var (
tx * sql . Tx
)
tx , err = db . client . Begin ( )
if err != nil {
return err
}
defer func ( ) {
if err == nil {
err = tx . Commit ( )
return
}
_ = tx . Rollback ( )
} ( )
err = deleteHeaders ( tx , removed )
if err != nil {
return
}
err = updateOrInsertTransfers ( chainID , tx , transfers )
if err != nil {
return
}
return
}
// SaveTranfers
func ( db * Database ) SaveTranfers ( chainID uint64 , address common . Address , transfers [ ] Transfer , blocks [ ] * big . Int ) ( err error ) {
var (
tx * sql . Tx
)
tx , err = db . client . Begin ( )
if err != nil {
return err
}
defer func ( ) {
if err == nil {
err = tx . Commit ( )
return
}
_ = tx . Rollback ( )
} ( )
err = updateOrInsertTransfers ( chainID , tx , transfers )
if err != nil {
return
}
err = markBlocksAsLoaded ( chainID , tx , address , blocks )
if err != nil {
return
}
return
}
// GetTransfersInRange loads transfers for a given address between two blocks.
func ( db * Database ) GetTransfersInRange ( chainID uint64 , address common . Address , start , end * big . Int ) ( rst [ ] Transfer , err error ) {
query := newTransfersQuery ( ) . FilterNetwork ( chainID ) . FilterAddress ( address ) . FilterStart ( start ) . FilterEnd ( end ) . FilterLoaded ( 1 )
rows , err := db . client . Query ( query . String ( ) , query . Args ( ) ... )
if err != nil {
return
}
defer rows . Close ( )
return query . Scan ( rows )
}
// GetTransfersByAddress loads transfers for a given address between two blocks.
func ( db * Database ) GetTransfersByAddress ( chainID uint64 , address common . Address , toBlock * big . Int , limit int64 ) ( rst [ ] Transfer , err error ) {
query := newTransfersQuery ( ) .
FilterNetwork ( chainID ) .
FilterAddress ( address ) .
FilterEnd ( toBlock ) .
FilterLoaded ( 1 ) .
Limit ( limit )
rows , err := db . client . Query ( query . String ( ) , query . Args ( ) ... )
if err != nil {
return
}
defer rows . Close ( )
return query . Scan ( rows )
}
// GetTransfersByAddressAndBlock loads transfers for a given address and block.
func ( db * Database ) GetTransfersByAddressAndBlock ( chainID uint64 , address common . Address , block * big . Int , limit int64 ) ( rst [ ] Transfer , err error ) {
query := newTransfersQuery ( ) .
FilterNetwork ( chainID ) .
FilterAddress ( address ) .
FilterBlockNumber ( block ) .
FilterLoaded ( 1 ) .
Limit ( limit )
rows , err := db . client . Query ( query . String ( ) , query . Args ( ) ... )
if err != nil {
return
}
defer rows . Close ( )
return query . Scan ( rows )
}
// GetTransfers load transfers transfer betweeen two blocks.
func ( db * Database ) GetTransfers ( chainID uint64 , start , end * big . Int ) ( rst [ ] Transfer , err error ) {
query := newTransfersQuery ( ) . FilterNetwork ( chainID ) . FilterStart ( start ) . FilterEnd ( end ) . FilterLoaded ( 1 )
rows , err := db . client . Query ( query . String ( ) , query . Args ( ) ... )
if err != nil {
return
}
defer rows . Close ( )
return query . Scan ( rows )
}
func ( db * Database ) GetPreloadedTransactions ( chainID uint64 , address common . Address , blockHash common . Hash ) ( rst [ ] Transfer , err error ) {
query := newTransfersQuery ( ) .
FilterNetwork ( chainID ) .
FilterAddress ( address ) .
FilterBlockHash ( blockHash ) .
FilterLoaded ( 0 )
rows , err := db . client . Query ( query . String ( ) , query . Args ( ) ... )
if err != nil {
return
}
defer rows . Close ( )
return query . Scan ( rows )
}
func ( db * Database ) GetTransactionsLog ( chainID uint64 , address common . Address , transactionHash common . Hash ) ( * types . Log , error ) {
l := & types . Log { }
err := db . client . QueryRow ( "SELECT log FROM transfers WHERE network_id = ? AND address = ? AND hash = ?" ,
chainID , address , transactionHash ) .
Scan ( & JSONBlob { l } )
if err == nil {
return l , nil
}
if err == sql . ErrNoRows {
return nil , nil
}
return nil , err
}
// SaveHeaders stores a list of headers atomically.
func ( db * Database ) SaveHeaders ( chainID uint64 , headers [ ] * types . Header , address common . Address ) ( err error ) {
var (
tx * sql . Tx
insert * sql . Stmt
)
tx , err = db . client . Begin ( )
if err != nil {
return
}
insert , err = tx . Prepare ( "INSERT INTO blocks(network_id, blk_number, blk_hash, address) VALUES (?, ?, ?, ?)" )
if err != nil {
return
}
defer func ( ) {
if err == nil {
err = tx . Commit ( )
} else {
_ = tx . Rollback ( )
}
} ( )
for _ , h := range headers {
_ , err = insert . Exec ( chainID , ( * bigint . SQLBigInt ) ( h . Number ) , h . Hash ( ) , address )
if err != nil {
return
}
}
return
}
// GetHeaderByNumber selects header using block number.
func ( db * Database ) GetHeaderByNumber ( chainID uint64 , number * big . Int ) ( header * DBHeader , err error ) {
header = & DBHeader { Hash : common . Hash { } , Number : new ( big . Int ) }
err = db . client . QueryRow ( "SELECT blk_hash, blk_number FROM blocks WHERE blk_number = ? AND network_id = ?" , ( * bigint . SQLBigInt ) ( number ) , chainID ) . Scan ( & header . Hash , ( * bigint . SQLBigInt ) ( header . Number ) )
if err == nil {
return header , nil
}
if err == sql . ErrNoRows {
return nil , nil
}
return nil , err
}
// statementCreator allows to pass transaction or database to use in consumer.
type statementCreator interface {
Prepare ( query string ) ( * sql . Stmt , error )
}
func deleteHeaders ( creator statementCreator , headers [ ] * DBHeader ) error {
delete , err := creator . Prepare ( "DELETE FROM blocks WHERE blk_hash = ?" )
if err != nil {
return err
}
deleteTransfers , err := creator . Prepare ( "DELETE FROM transfers WHERE blk_hash = ?" )
if err != nil {
return err
}
for _ , h := range headers {
_ , err = delete . Exec ( h . Hash )
if err != nil {
return err
}
_ , err = deleteTransfers . Exec ( h . Hash )
if err != nil {
return err
}
}
return nil
}
2021-11-24 14:59:45 +02:00
func ( db * Database ) InsertBlock ( chainID uint64 , account common . Address , blockNumber * big . Int , blockHash common . Hash ) error {
var (
tx * sql . Tx
)
tx , err := db . client . Begin ( )
if err != nil {
return err
}
defer func ( ) {
if err == nil {
err = tx . Commit ( )
return
}
_ = tx . Rollback ( )
} ( )
insert , err := tx . Prepare ( "INSERT OR IGNORE INTO blocks(network_id, address, blk_number, blk_hash, loaded) VALUES (?, ?, ?, ?, ?)" )
if err != nil {
return err
}
_ , err = insert . Exec ( chainID , account , ( * bigint . SQLBigInt ) ( blockNumber ) , blockHash , true )
return err
}
2021-09-09 16:28:54 +02:00
func insertBlocksWithTransactions ( chainID uint64 , creator statementCreator , account common . Address , headers [ ] * DBHeader ) error {
insert , err := creator . Prepare ( "INSERT OR IGNORE INTO blocks(network_id, address, blk_number, blk_hash, loaded) VALUES (?, ?, ?, ?, ?)" )
if err != nil {
return err
}
updateTx , err := creator . Prepare ( ` UPDATE transfers
SET log = ?
WHERE network_id = ? AND address = ? AND hash = ? ` )
if err != nil {
return err
}
2021-11-24 14:59:45 +02:00
insertTx , err := creator . Prepare ( ` INSERT OR IGNORE
2021-09-09 16:28:54 +02:00
INTO transfers ( network_id , address , sender , hash , blk_number , blk_hash , type , timestamp , log , loaded )
VALUES ( ? , ? , ? , ? , ? , ? , ? , ? , ? , 0 ) ` )
if err != nil {
return err
}
for _ , header := range headers {
_ , err = insert . Exec ( chainID , account , ( * bigint . SQLBigInt ) ( header . Number ) , header . Hash , header . Loaded )
if err != nil {
return err
}
if len ( header . Erc20Transfers ) > 0 {
for _ , transfer := range header . Erc20Transfers {
res , err := updateTx . Exec ( & JSONBlob { transfer . Log } , chainID , account , transfer . ID )
if err != nil {
return err
}
affected , err := res . RowsAffected ( )
if err != nil {
return err
}
if affected > 0 {
continue
}
_ , err = insertTx . Exec ( chainID , account , account , transfer . ID , ( * bigint . SQLBigInt ) ( header . Number ) , header . Hash , erc20Transfer , transfer . Timestamp , & JSONBlob { transfer . Log } )
if err != nil {
log . Error ( "error saving erc20transfer" , "err" , err )
return err
}
}
}
}
return nil
}
func updateOrInsertTransfers ( chainID uint64 , creator statementCreator , transfers [ ] Transfer ) error {
update , err := creator . Prepare ( ` UPDATE transfers
SET tx = ? , sender = ? , receipt = ? , timestamp = ? , loaded = 1
WHERE address = ? AND hash = ? ` )
if err != nil {
return err
}
insert , err := creator . Prepare ( ` INSERT OR IGNORE INTO transfers
( network_id , hash , blk_hash , blk_number , timestamp , address , tx , sender , receipt , log , type , loaded )
VALUES
( ? , ? , ? , ? , ? , ? , ? , ? , ? , ? , ? , 1 ) ` )
if err != nil {
return err
}
for _ , t := range transfers {
res , err := update . Exec ( & JSONBlob { t . Transaction } , t . From , & JSONBlob { t . Receipt } , t . Timestamp , t . Address , t . ID )
if err != nil {
return err
}
affected , err := res . RowsAffected ( )
if err != nil {
return err
}
if affected > 0 {
continue
}
_ , err = insert . Exec ( chainID , t . ID , t . BlockHash , ( * bigint . SQLBigInt ) ( t . BlockNumber ) , t . Timestamp , t . Address , & JSONBlob { t . Transaction } , t . From , & JSONBlob { t . Receipt } , & JSONBlob { t . Log } , t . Type )
if err != nil {
log . Error ( "can't save transfer" , "b-hash" , t . BlockHash , "b-n" , t . BlockNumber , "a" , t . Address , "h" , t . ID )
return err
}
}
return nil
}
//markBlocksAsLoaded(tx, address, chainID, blocks)
func markBlocksAsLoaded ( chainID uint64 , creator statementCreator , address common . Address , blocks [ ] * big . Int ) error {
update , err := creator . Prepare ( "UPDATE blocks SET loaded=? WHERE address=? AND blk_number=? AND network_id=?" )
if err != nil {
return err
}
for _ , block := range blocks {
_ , err := update . Exec ( true , address , ( * bigint . SQLBigInt ) ( block ) , chainID )
if err != nil {
return err
}
}
return nil
}