Merge pull request #31 from farazdagi/feature/block-on-send-transaction
Block on sentTransaction() until transaction is complete (or timed out)
This commit is contained in:
commit
6e4c802957
|
@ -16,7 +16,7 @@ import (
|
|||
"github.com/ethereum/go-ethereum/cmd/utils"
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/crypto"
|
||||
"github.com/ethereum/go-ethereum/les"
|
||||
"github.com/ethereum/go-ethereum/les/status"
|
||||
"github.com/ethereum/go-ethereum/p2p/discover"
|
||||
errextra "github.com/pkg/errors"
|
||||
"github.com/status-im/status-go/src/extkeys"
|
||||
|
@ -200,11 +200,11 @@ func doAddPeer(url string) (bool, error) {
|
|||
return true, nil
|
||||
}
|
||||
|
||||
func onSendTransactionRequest(queuedTx les.QueuedTx) {
|
||||
func onSendTransactionRequest(queuedTx status.QueuedTx) {
|
||||
event := GethEvent{
|
||||
Type: "sendTransactionQueued",
|
||||
Event: SendTransactionEvent{
|
||||
Hash: queuedTx.Hash.Hex(),
|
||||
Id: string(queuedTx.Id),
|
||||
Args: queuedTx.Args,
|
||||
},
|
||||
}
|
||||
|
@ -213,12 +213,12 @@ func onSendTransactionRequest(queuedTx les.QueuedTx) {
|
|||
C.GethServiceSignalEvent(C.CString(string(body)))
|
||||
}
|
||||
|
||||
func completeTransaction(hash, password string) (common.Hash, error) {
|
||||
func completeTransaction(id, password string) (common.Hash, error) {
|
||||
if currentNode != nil {
|
||||
if lightEthereum != nil {
|
||||
backend := lightEthereum.StatusBackend
|
||||
|
||||
return backend.CompleteQueuedTransaction(les.QueuedTxHash(hash), password)
|
||||
return backend.CompleteQueuedTransaction(status.QueuedTxId(id), password)
|
||||
}
|
||||
|
||||
return common.Hash{}, errors.New("can not retrieve LES service")
|
||||
|
|
|
@ -13,10 +13,12 @@ import (
|
|||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/crypto"
|
||||
"github.com/ethereum/go-ethereum/les"
|
||||
"github.com/ethereum/go-ethereum/les/status"
|
||||
"github.com/ethereum/go-ethereum/logger"
|
||||
"github.com/ethereum/go-ethereum/logger/glog"
|
||||
"github.com/ethereum/go-ethereum/rpc"
|
||||
"github.com/ethereum/go-ethereum/whisper"
|
||||
"reflect"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -331,13 +333,11 @@ func TestQueuedTransactions(t *testing.T) {
|
|||
}
|
||||
|
||||
// create an account
|
||||
address, pubKey, mnemonic, err := createAccount(newAccountPassword)
|
||||
address, _, _, err := createAccount(newAccountPassword)
|
||||
if err != nil {
|
||||
fmt.Println(err.Error())
|
||||
t.Error("Test failed: could not create account")
|
||||
t.Errorf("could not create account: %v", err)
|
||||
return
|
||||
}
|
||||
glog.V(logger.Info).Infof("Account created: {address: %s, key: %s, mnemonic:%s}", address, pubKey, mnemonic)
|
||||
|
||||
// test transaction queueing
|
||||
var lightEthereum *les.LightEthereum
|
||||
|
@ -346,37 +346,40 @@ func TestQueuedTransactions(t *testing.T) {
|
|||
}
|
||||
backend := lightEthereum.StatusBackend
|
||||
|
||||
// replace transaction notification hanlder
|
||||
sentinel := 0
|
||||
backend.SetTransactionQueueHandler(func(queuedTx les.QueuedTx) {
|
||||
glog.V(logger.Info).Infof("Queued transaction hash: %v\n", queuedTx.Hash.Hex())
|
||||
var txHash common.Hash
|
||||
if txHash, err = completeTransaction(queuedTx.Hash.Hex(), testAddressPassword); err != nil {
|
||||
t.Errorf("Test failed: cannot complete queued transation[%s]: %v", queuedTx.Hash.Hex(), err)
|
||||
// replace transaction notification handler
|
||||
var txHash = common.Hash{}
|
||||
backend.SetTransactionQueueHandler(func(queuedTx status.QueuedTx) {
|
||||
glog.V(logger.Info).Infof("Transaction queued (will be completed in 5 secs): {id: %v, hash: %v}\n", queuedTx.Id, queuedTx.Hash.Hex())
|
||||
time.Sleep(5 * time.Second)
|
||||
if txHash, err = completeTransaction(string(queuedTx.Id), testAddressPassword); err != nil {
|
||||
t.Errorf("cannot complete queued transation[%v]: %v", queuedTx.Id, err)
|
||||
return
|
||||
}
|
||||
|
||||
glog.V(logger.Info).Infof("Transaction complete: https://testnet.etherscan.io/tx/%s", txHash.Hex())
|
||||
sentinel = 1
|
||||
})
|
||||
|
||||
// try completing non-existing transaction
|
||||
if _, err := completeTransaction("0x1234512345123451234512345123456123451234512345123451234512345123", testAddressPassword); err == nil {
|
||||
if _, err := completeTransaction("some-bad-transaction-id", testAddressPassword); err == nil {
|
||||
t.Errorf("Test failed: error expected and not recieved")
|
||||
return
|
||||
}
|
||||
|
||||
// send normal transaction
|
||||
from, err := utils.MakeAddress(accountManager, testAddress)
|
||||
if err != nil {
|
||||
t.Errorf("Test failed: Could not retrieve account from address: %v", err)
|
||||
t.Errorf("could not retrieve account from address: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
to, err := utils.MakeAddress(accountManager, address)
|
||||
if err != nil {
|
||||
t.Errorf("Test failed: Could not retrieve account from address: %v", err)
|
||||
t.Errorf("could not retrieve account from address: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
err = backend.SendTransaction(nil, les.SendTxArgs{
|
||||
// this call blocks, up until Complete Transaction is called
|
||||
txHashCheck, err := backend.SendTransaction(nil, status.SendTxArgs{
|
||||
From: from.Address,
|
||||
To: &to.Address,
|
||||
Value: rpc.NewHexNumber(big.NewInt(1000000000000)),
|
||||
|
@ -385,11 +388,52 @@ func TestQueuedTransactions(t *testing.T) {
|
|||
t.Errorf("Test failed: cannot send transaction: %v", err)
|
||||
}
|
||||
|
||||
time.Sleep(15 * time.Second)
|
||||
if sentinel != 1 {
|
||||
t.Error("Test failed: transaction was never queued or completed")
|
||||
if !reflect.DeepEqual(txHash, txHashCheck) {
|
||||
t.Errorf("Transaction hash returned from SendTransaction is invalid")
|
||||
return
|
||||
}
|
||||
|
||||
time.Sleep(10 * time.Second)
|
||||
|
||||
if reflect.DeepEqual(txHashCheck, common.Hash{}) {
|
||||
t.Error("Test failed: transaction was never queued or completed")
|
||||
return
|
||||
}
|
||||
|
||||
// now test eviction queue
|
||||
txQueue := backend.TransactionQueue()
|
||||
var i = 0
|
||||
backend.SetTransactionQueueHandler(func(queuedTx status.QueuedTx) {
|
||||
//glog.V(logger.Info).Infof("%d. Transaction queued (queue size: %d): {id: %v}\n", i, txQueue.Count(), queuedTx.Id)
|
||||
i++
|
||||
})
|
||||
|
||||
if txQueue.Count() != 0 {
|
||||
t.Errorf("transaction count should be zero: %d", txQueue.Count())
|
||||
return
|
||||
}
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
go backend.SendTransaction(nil, status.SendTxArgs{})
|
||||
}
|
||||
time.Sleep(3 * time.Second)
|
||||
|
||||
t.Logf("Number of transactions queued: %d. Queue size (shouldn't be more than %d): %d", i, status.DefaultTxQueueCap, txQueue.Count())
|
||||
|
||||
if txQueue.Count() != 10 {
|
||||
t.Errorf("transaction count should be 10: got %d", txQueue.Count())
|
||||
return
|
||||
}
|
||||
|
||||
for i := 0; i < status.DefaultTxQueueCap+5; i++ { // stress test by hitting with lots of goroutines
|
||||
go backend.SendTransaction(nil, status.SendTxArgs{})
|
||||
}
|
||||
time.Sleep(5 * time.Second)
|
||||
|
||||
if txQueue.Count() != status.DefaultTxQueueCap && txQueue.Count() != (status.DefaultTxQueueCap-1) {
|
||||
t.Errorf("transaction count should be %d (or %d): got %d", status.DefaultTxQueueCap, status.DefaultTxQueueCap-1, txQueue.Count())
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func prepareTestNode() error {
|
||||
|
|
|
@ -119,8 +119,8 @@ func UnlockAccount(address, password *C.char, seconds int) *C.char {
|
|||
}
|
||||
|
||||
//export CompleteTransaction
|
||||
func CompleteTransaction(hash, password *C.char) *C.char {
|
||||
txHash, err := completeTransaction(C.GoString(hash), C.GoString(password))
|
||||
func CompleteTransaction(id, password *C.char) *C.char {
|
||||
txHash, err := completeTransaction(C.GoString(id), C.GoString(password))
|
||||
|
||||
errString := emptyError
|
||||
if err != nil {
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"github.com/ethereum/go-ethereum/les"
|
||||
"github.com/ethereum/go-ethereum/les/status"
|
||||
)
|
||||
|
||||
type AccountInfo struct {
|
||||
|
@ -35,8 +35,8 @@ type WhisperMessageEvent struct {
|
|||
}
|
||||
|
||||
type SendTransactionEvent struct {
|
||||
Hash string `json:"hash"`
|
||||
Args les.SendTxArgs `json:"args"`
|
||||
Id string `json:"hash"`
|
||||
Args status.SendTxArgs `json:"args"`
|
||||
}
|
||||
|
||||
type CompleteTransactionResult struct {
|
||||
|
|
|
@ -36,17 +36,18 @@ import (
|
|||
"github.com/ethereum/go-ethereum/core/vm"
|
||||
"github.com/ethereum/go-ethereum/crypto"
|
||||
"github.com/ethereum/go-ethereum/ethdb"
|
||||
"github.com/ethereum/go-ethereum/les/status"
|
||||
"github.com/ethereum/go-ethereum/logger"
|
||||
"github.com/ethereum/go-ethereum/logger/glog"
|
||||
"github.com/ethereum/go-ethereum/p2p"
|
||||
"github.com/ethereum/go-ethereum/rlp"
|
||||
"github.com/ethereum/go-ethereum/rpc"
|
||||
"github.com/pborman/uuid"
|
||||
"github.com/syndtr/goleveldb/leveldb"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
const defaultGas = uint64(90000)
|
||||
const defaultTxQueueCap = int(5)
|
||||
|
||||
// PublicEthereumAPI provides an API to access Ethereum related information.
|
||||
// It offers only methods that operate on public data that is freely available to anyone.
|
||||
|
@ -866,18 +867,18 @@ type PublicTransactionPoolAPI struct {
|
|||
b Backend
|
||||
muPendingTxSubs sync.Mutex
|
||||
pendingTxSubs map[string]rpc.Subscription
|
||||
txQueue chan QueuedTx
|
||||
txQueue chan *status.QueuedTx
|
||||
}
|
||||
|
||||
var txSingletonQueue chan QueuedTx
|
||||
var txSingletonQueue chan *status.QueuedTx
|
||||
|
||||
// NewPublicTransactionPoolAPI creates a new RPC service with methods specific for the transaction pool.
|
||||
func NewPublicTransactionPoolAPI(b Backend) *PublicTransactionPoolAPI {
|
||||
var once sync.Once
|
||||
once.Do(func() {
|
||||
if txSingletonQueue == nil {
|
||||
glog.V(logger.Debug).Infof("Transaction queue (for status-go) inited")
|
||||
txSingletonQueue = make(chan QueuedTx, defaultTxQueueCap)
|
||||
glog.V(logger.Info).Infof("Transaction queue inited (Public Transaction Pool API)")
|
||||
txSingletonQueue = make(chan *status.QueuedTx, status.DefaultTxSendQueueCap)
|
||||
}
|
||||
})
|
||||
|
||||
|
@ -1141,33 +1142,37 @@ func submitTransaction(ctx context.Context, b Backend, tx *types.Transaction, si
|
|||
return signedTx.Hash(), nil
|
||||
}
|
||||
|
||||
// Queued Transaction is a container that holds context and arguments enough to complete the queued transaction.
|
||||
type QueuedTx struct {
|
||||
Hash common.Hash
|
||||
Context context.Context
|
||||
Args SendTxArgs
|
||||
}
|
||||
|
||||
func (s *PublicTransactionPoolAPI) GetTransactionQueue() (chan QueuedTx, error) {
|
||||
func (s *PublicTransactionPoolAPI) GetTransactionQueue() (chan *status.QueuedTx, error) {
|
||||
return s.txQueue, nil
|
||||
}
|
||||
|
||||
// SendTransaction queues transactions, to be fulfilled by CompleteQueuedTransaction()
|
||||
func (s *PublicTransactionPoolAPI) SendTransaction(ctx context.Context, args SendTxArgs) (common.Hash, error) {
|
||||
queuedTx := QueuedTx{
|
||||
queuedTx := &status.QueuedTx{
|
||||
Id: status.QueuedTxId(uuid.New()),
|
||||
Hash: common.Hash{},
|
||||
Context: ctx,
|
||||
Args: args,
|
||||
Args: status.SendTxArgs(args),
|
||||
Done: make(chan struct{}, 1),
|
||||
}
|
||||
|
||||
// populate transaction hash
|
||||
key, err := crypto.GenerateKey()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
queuedTx.Hash = common.BytesToHash(crypto.FromECDSA(key))
|
||||
|
||||
// send transaction to pending pool
|
||||
s.txQueue <- queuedTx
|
||||
|
||||
// now wait up until transaction is complete (via call to CompleteQueuedTransaction) or timeout occurs
|
||||
timeout := make(chan struct{}, 1)
|
||||
go func() {
|
||||
time.Sleep(status.DefaultTxSendCompletionTimeout * time.Second)
|
||||
timeout <- struct{}{}
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-queuedTx.Done:
|
||||
return queuedTx.Hash, queuedTx.Err
|
||||
case <-timeout:
|
||||
return common.Hash{}, errors.New("transaction sending timed out")
|
||||
}
|
||||
|
||||
return queuedTx.Hash, nil
|
||||
}
|
||||
|
||||
|
|
79
src/vendor/github.com/ethereum/go-ethereum/internal/ethapi/status_backend.go
generated
vendored
Normal file
79
src/vendor/github.com/ethereum/go-ethereum/internal/ethapi/status_backend.go
generated
vendored
Normal file
|
@ -0,0 +1,79 @@
|
|||
package ethapi
|
||||
|
||||
import (
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/les/status"
|
||||
"github.com/ethereum/go-ethereum/logger"
|
||||
"github.com/ethereum/go-ethereum/logger/glog"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
// StatusBackend implements les.StatusBackend with direct calls to Ethereum
|
||||
// internals to support calls from status-go bindings (to internal packages e.g. ethapi)
|
||||
type StatusBackend struct {
|
||||
eapi *PublicEthereumAPI // Wrapper around the Ethereum object to access metadata
|
||||
bcapi *PublicBlockChainAPI // Wrapper around the blockchain to access chain data
|
||||
txapi *PublicTransactionPoolAPI // Wrapper around the transaction pool to access transaction data
|
||||
|
||||
txQueue *status.TxQueue
|
||||
}
|
||||
|
||||
// NewStatusBackend creates a new backend using an existing Ethereum object.
|
||||
func NewStatusBackend(apiBackend Backend) *StatusBackend {
|
||||
glog.V(logger.Info).Infof("Status backend service started")
|
||||
backend := &StatusBackend{
|
||||
eapi: NewPublicEthereumAPI(apiBackend, nil, nil),
|
||||
bcapi: NewPublicBlockChainAPI(apiBackend),
|
||||
txapi: NewPublicTransactionPoolAPI(apiBackend),
|
||||
txQueue: status.NewTransactionQueue(),
|
||||
}
|
||||
|
||||
go backend.transactionQueueForwardingLoop()
|
||||
|
||||
return backend
|
||||
}
|
||||
|
||||
func (b *StatusBackend) SetTransactionQueueHandler(fn status.EnqueuedTxHandler) {
|
||||
b.txQueue.SetEnqueueHandler(fn)
|
||||
}
|
||||
|
||||
func (b *StatusBackend) TransactionQueue() *status.TxQueue {
|
||||
return b.txQueue
|
||||
}
|
||||
|
||||
// SendTransaction wraps call to PublicTransactionPoolAPI.SendTransaction
|
||||
func (b *StatusBackend) SendTransaction(ctx context.Context, args status.SendTxArgs) (common.Hash, error) {
|
||||
if ctx == nil {
|
||||
ctx = context.Background()
|
||||
}
|
||||
|
||||
return b.txapi.SendTransaction(ctx, SendTxArgs(args))
|
||||
}
|
||||
|
||||
// CompleteQueuedTransaction wraps call to PublicTransactionPoolAPI.CompleteQueuedTransaction
|
||||
func (b *StatusBackend) CompleteQueuedTransaction(id status.QueuedTxId, passphrase string) (common.Hash, error) {
|
||||
queuedTx, err := b.txQueue.Get(id)
|
||||
if err != nil {
|
||||
return common.Hash{}, err
|
||||
}
|
||||
|
||||
hash, err := b.txapi.CompleteQueuedTransaction(context.Background(), SendTxArgs(queuedTx.Args), passphrase)
|
||||
queuedTx.Hash = hash
|
||||
queuedTx.Err = err
|
||||
queuedTx.Done <- struct{}{} // sendTransaction() waits on this, notify so that it can return
|
||||
|
||||
return hash, err
|
||||
}
|
||||
|
||||
func (b *StatusBackend) transactionQueueForwardingLoop() {
|
||||
txQueue, err := b.txapi.GetTransactionQueue()
|
||||
if err != nil {
|
||||
glog.V(logger.Error).Infof("cannot read from transaction queue")
|
||||
return
|
||||
}
|
||||
|
||||
// forward internal ethapi transactions to status backend
|
||||
for queuedTx := range txQueue {
|
||||
b.txQueue.Enqueue(queuedTx)
|
||||
}
|
||||
}
|
|
@ -70,7 +70,7 @@ type LightEthereum struct {
|
|||
netVersionId int
|
||||
netRPCService *ethapi.PublicNetAPI
|
||||
|
||||
StatusBackend *StatusBackend
|
||||
StatusBackend *ethapi.StatusBackend
|
||||
}
|
||||
|
||||
func New(ctx *node.ServiceContext, config *eth.Config) (*LightEthereum, error) {
|
||||
|
@ -129,7 +129,7 @@ func New(ctx *node.ServiceContext, config *eth.Config) (*LightEthereum, error) {
|
|||
eth.ApiBackend.gpo = gasprice.NewLightPriceOracle(eth.ApiBackend)
|
||||
|
||||
// inject status-im backend
|
||||
eth.StatusBackend = NewStatusBackend(eth.ApiBackend)
|
||||
eth.StatusBackend = ethapi.NewStatusBackend(eth.ApiBackend)
|
||||
|
||||
return eth, nil
|
||||
}
|
||||
|
|
|
@ -0,0 +1,113 @@
|
|||
package status
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/rpc"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
const (
|
||||
DefaultTxQueueCap = int(35) // how many items can be queued
|
||||
DefaultTxSendQueueCap = int(70) // how many items can be passed to sendTransaction() w/o blocking
|
||||
DefaultTxSendCompletionTimeout = 300 // how many seconds to wait before returning result in sentTransaction()
|
||||
)
|
||||
|
||||
var (
|
||||
ErrQueuedTxIdNotFound = errors.New("transaction hash not found")
|
||||
)
|
||||
|
||||
// TxQueue is capped container that holds pending transactions
|
||||
type TxQueue struct {
|
||||
transactions map[QueuedTxId]*QueuedTx
|
||||
evictableIds chan QueuedTxId
|
||||
enqueueTicker chan struct{}
|
||||
|
||||
// when items are enqueued notify handlers
|
||||
txEnqueueHandler EnqueuedTxHandler
|
||||
}
|
||||
|
||||
// QueuedTx holds enough information to complete the queued transaction.
|
||||
type QueuedTx struct {
|
||||
Id QueuedTxId
|
||||
Hash common.Hash
|
||||
Context context.Context
|
||||
Args SendTxArgs
|
||||
Done chan struct{}
|
||||
Err error
|
||||
}
|
||||
|
||||
type QueuedTxId string
|
||||
|
||||
// QueuedTxHandler is a function that receives queued/pending transactions, when they get queued
|
||||
type EnqueuedTxHandler func(QueuedTx)
|
||||
|
||||
// SendTxArgs represents the arguments to submbit a new transaction into the transaction pool.
|
||||
type SendTxArgs struct {
|
||||
From common.Address `json:"from"`
|
||||
To *common.Address `json:"to"`
|
||||
Gas *rpc.HexNumber `json:"gas"`
|
||||
GasPrice *rpc.HexNumber `json:"gasPrice"`
|
||||
Value *rpc.HexNumber `json:"value"`
|
||||
Data string `json:"data"`
|
||||
Nonce *rpc.HexNumber `json:"nonce"`
|
||||
}
|
||||
|
||||
func NewTransactionQueue() *TxQueue {
|
||||
txQueue := &TxQueue{
|
||||
transactions: make(map[QueuedTxId]*QueuedTx),
|
||||
evictableIds: make(chan QueuedTxId, DefaultTxQueueCap), // will be used to evict in FIFO
|
||||
enqueueTicker: make(chan struct{}),
|
||||
}
|
||||
|
||||
go txQueue.evictionLoop()
|
||||
|
||||
return txQueue
|
||||
}
|
||||
|
||||
func (q *TxQueue) evictionLoop() {
|
||||
for range q.enqueueTicker {
|
||||
if len(q.transactions) >= (DefaultTxQueueCap - 1) { // eviction is required to accommodate another/last item
|
||||
delete(q.transactions, <-q.evictableIds)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (q *TxQueue) Enqueue(tx *QueuedTx) error {
|
||||
if q.txEnqueueHandler == nil { //discard, until handler is provided
|
||||
return nil
|
||||
}
|
||||
|
||||
q.enqueueTicker <- struct{}{} // notify eviction loop that we are trying to insert new item
|
||||
q.evictableIds <- tx.Id // this will block when we hit DefaultTxQueueCap
|
||||
|
||||
q.transactions[tx.Id] = tx
|
||||
|
||||
// notify handler
|
||||
q.txEnqueueHandler(*tx)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (q *TxQueue) Get(id QueuedTxId) (*QueuedTx, error) {
|
||||
if tx, ok := q.transactions[id]; ok {
|
||||
delete(q.transactions, id)
|
||||
return tx, nil
|
||||
}
|
||||
|
||||
return nil, ErrQueuedTxIdNotFound
|
||||
}
|
||||
|
||||
func (q *TxQueue) Count() int {
|
||||
return len(q.transactions)
|
||||
}
|
||||
|
||||
func (q *TxQueue) Has(id QueuedTxId) bool {
|
||||
_, ok := q.transactions[id]
|
||||
|
||||
return ok
|
||||
}
|
||||
|
||||
func (q *TxQueue) SetEnqueueHandler(fn EnqueuedTxHandler) {
|
||||
q.txEnqueueHandler = fn
|
||||
}
|
|
@ -1,168 +0,0 @@
|
|||
package les
|
||||
|
||||
import (
|
||||
"golang.org/x/net/context"
|
||||
"sync"
|
||||
|
||||
"errors"
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/internal/ethapi"
|
||||
"github.com/ethereum/go-ethereum/logger"
|
||||
"github.com/ethereum/go-ethereum/logger/glog"
|
||||
"github.com/ethereum/go-ethereum/rpc"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultTxQueueCap = int(5) // how many items can be passed to sendTransaction() w/o blocking
|
||||
defaultEvictingTxQueueCap = int(20) // how many items can be queued
|
||||
defaultEvictingTxQueueEvictionStep = int(5) // how many item to evict in a single run
|
||||
)
|
||||
|
||||
var (
|
||||
ErrQueuedTxHashNotFound = errors.New("Transaction hash not found")
|
||||
)
|
||||
|
||||
// StatusBackend implements les.StatusBackend with direct calls to Ethereum
|
||||
// internals to support calls from status-go bindings (to internal packages e.g. ethapi)
|
||||
type StatusBackend struct {
|
||||
eapi *ethapi.PublicEthereumAPI // Wrapper around the Ethereum object to access metadata
|
||||
bcapi *ethapi.PublicBlockChainAPI // Wrapper around the blockchain to access chain data
|
||||
txapi *ethapi.PublicTransactionPoolAPI // Wrapper around the transaction pool to access transaction data
|
||||
|
||||
txQueue chan QueuedTx
|
||||
txQueueHandler QueuedTxHandler
|
||||
muTxQueueHanlder sync.Mutex
|
||||
|
||||
txEvictingQueue evictingTxQueue
|
||||
}
|
||||
|
||||
type QueuedTxHash string
|
||||
|
||||
type evictingTxQueue struct {
|
||||
transactions map[QueuedTxHash]*QueuedTx
|
||||
evictionQueue chan QueuedTxHash
|
||||
cap int
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
type QueuedTxHandler func(QueuedTx)
|
||||
|
||||
type QueuedTx struct {
|
||||
Hash common.Hash
|
||||
Context context.Context
|
||||
Args SendTxArgs
|
||||
}
|
||||
|
||||
// SendTxArgs represents the arguments to sumbit a new transaction into the transaction pool.
|
||||
type SendTxArgs struct {
|
||||
From common.Address `json:"from"`
|
||||
To *common.Address `json:"to"`
|
||||
Gas *rpc.HexNumber `json:"gas"`
|
||||
GasPrice *rpc.HexNumber `json:"gasPrice"`
|
||||
Value *rpc.HexNumber `json:"value"`
|
||||
Data string `json:"data"`
|
||||
Nonce *rpc.HexNumber `json:"nonce"`
|
||||
}
|
||||
|
||||
// NewStatusBackend creates a new backend using an existing Ethereum object.
|
||||
func NewStatusBackend(apiBackend ethapi.Backend) *StatusBackend {
|
||||
glog.V(logger.Debug).Infof("Status service started")
|
||||
backend := &StatusBackend{
|
||||
eapi: ethapi.NewPublicEthereumAPI(apiBackend, nil, nil),
|
||||
bcapi: ethapi.NewPublicBlockChainAPI(apiBackend),
|
||||
txapi: ethapi.NewPublicTransactionPoolAPI(apiBackend),
|
||||
txQueue: make(chan QueuedTx, defaultTxQueueCap),
|
||||
txEvictingQueue: evictingTxQueue{
|
||||
transactions: make(map[QueuedTxHash]*QueuedTx),
|
||||
evictionQueue: make(chan QueuedTxHash, defaultEvictingTxQueueCap), // will be used to evict in FIFO
|
||||
cap: defaultEvictingTxQueueCap,
|
||||
},
|
||||
}
|
||||
|
||||
go backend.transactionQueueForwardingLoop()
|
||||
|
||||
return backend
|
||||
}
|
||||
|
||||
func (b *StatusBackend) SetTransactionQueueHandler(fn QueuedTxHandler) {
|
||||
b.muTxQueueHanlder.Lock()
|
||||
defer b.muTxQueueHanlder.Unlock()
|
||||
|
||||
b.txQueueHandler = fn
|
||||
}
|
||||
|
||||
// SendTransaction wraps call to PublicTransactionPoolAPI.SendTransaction
|
||||
func (b *StatusBackend) SendTransaction(ctx context.Context, args SendTxArgs) error {
|
||||
if ctx == nil {
|
||||
ctx = context.Background()
|
||||
}
|
||||
|
||||
_, err := b.txapi.SendTransaction(ctx, ethapi.SendTxArgs(args))
|
||||
return err
|
||||
}
|
||||
|
||||
// CompleteQueuedTransaction wraps call to PublicTransactionPoolAPI.CompleteQueuedTransaction
|
||||
func (b *StatusBackend) CompleteQueuedTransaction(hash QueuedTxHash, passphrase string) (common.Hash, error) {
|
||||
queuedTx, err := b.txEvictingQueue.getQueuedTransaction(hash)
|
||||
if err != nil {
|
||||
return common.Hash{}, err
|
||||
}
|
||||
|
||||
return b.txapi.CompleteQueuedTransaction(context.Background(), ethapi.SendTxArgs(queuedTx.Args), passphrase)
|
||||
}
|
||||
|
||||
// GetTransactionQueue wraps call to PublicTransactionPoolAPI.GetTransactionQueue
|
||||
func (b *StatusBackend) GetTransactionQueue() (chan QueuedTx, error) {
|
||||
return b.txQueue, nil
|
||||
}
|
||||
|
||||
func (b *StatusBackend) transactionQueueForwardingLoop() {
|
||||
txQueue, err := b.txapi.GetTransactionQueue()
|
||||
if err != nil {
|
||||
glog.V(logger.Error).Infof("cannot read from transaction queue")
|
||||
return
|
||||
}
|
||||
|
||||
// forward internal ethapi transactions
|
||||
for queuedTx := range txQueue {
|
||||
if b.txQueueHandler == nil { //discard, until handler is provided
|
||||
continue
|
||||
}
|
||||
tx := QueuedTx{
|
||||
Hash: queuedTx.Hash,
|
||||
Context: queuedTx.Context,
|
||||
Args: SendTxArgs(queuedTx.Args),
|
||||
}
|
||||
b.txEvictingQueue.enqueueQueuedTransaction(tx)
|
||||
b.txQueueHandler(tx)
|
||||
}
|
||||
}
|
||||
|
||||
func (q *evictingTxQueue) enqueueQueuedTransaction(tx QueuedTx) error {
|
||||
q.mu.Lock()
|
||||
defer q.mu.Unlock()
|
||||
|
||||
if q.cap <= len(q.transactions) { // eviction is required
|
||||
for i := 0; i < defaultEvictingTxQueueEvictionStep; i++ {
|
||||
hash := <-q.evictionQueue
|
||||
delete(q.transactions, hash)
|
||||
}
|
||||
}
|
||||
|
||||
q.transactions[QueuedTxHash(tx.Hash.Hex())] = &tx
|
||||
q.evictionQueue <- QueuedTxHash(tx.Hash.Hex())
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (q *evictingTxQueue) getQueuedTransaction(hash QueuedTxHash) (*QueuedTx, error) {
|
||||
q.mu.Lock()
|
||||
defer q.mu.Unlock()
|
||||
|
||||
if tx, ok := q.transactions[hash]; ok {
|
||||
delete(q.transactions, hash)
|
||||
return tx, nil
|
||||
}
|
||||
|
||||
return nil, ErrQueuedTxHashNotFound
|
||||
}
|
Loading…
Reference in New Issue