Block on sentTransaction() until transaction is complete (or timed out)

This commit is contained in:
Victor Farazdagi 2016-08-31 21:02:06 +03:00
parent acda0d1be5
commit 124f552e36
9 changed files with 294 additions and 221 deletions

View File

@ -16,7 +16,7 @@ import (
"github.com/ethereum/go-ethereum/cmd/utils" "github.com/ethereum/go-ethereum/cmd/utils"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/les" "github.com/ethereum/go-ethereum/les/status"
"github.com/ethereum/go-ethereum/p2p/discover" "github.com/ethereum/go-ethereum/p2p/discover"
errextra "github.com/pkg/errors" errextra "github.com/pkg/errors"
"github.com/status-im/status-go/src/extkeys" "github.com/status-im/status-go/src/extkeys"
@ -200,11 +200,11 @@ func doAddPeer(url string) (bool, error) {
return true, nil return true, nil
} }
func onSendTransactionRequest(queuedTx les.QueuedTx) { func onSendTransactionRequest(queuedTx status.QueuedTx) {
event := GethEvent{ event := GethEvent{
Type: "sendTransactionQueued", Type: "sendTransactionQueued",
Event: SendTransactionEvent{ Event: SendTransactionEvent{
Hash: queuedTx.Hash.Hex(), Id: string(queuedTx.Id),
Args: queuedTx.Args, Args: queuedTx.Args,
}, },
} }
@ -213,12 +213,12 @@ func onSendTransactionRequest(queuedTx les.QueuedTx) {
C.GethServiceSignalEvent(C.CString(string(body))) C.GethServiceSignalEvent(C.CString(string(body)))
} }
func completeTransaction(hash, password string) (common.Hash, error) { func completeTransaction(id, password string) (common.Hash, error) {
if currentNode != nil { if currentNode != nil {
if lightEthereum != nil { if lightEthereum != nil {
backend := lightEthereum.StatusBackend backend := lightEthereum.StatusBackend
return backend.CompleteQueuedTransaction(les.QueuedTxHash(hash), password) return backend.CompleteQueuedTransaction(status.QueuedTxId(id), password)
} }
return common.Hash{}, errors.New("can not retrieve LES service") return common.Hash{}, errors.New("can not retrieve LES service")

View File

@ -13,10 +13,12 @@ import (
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/les" "github.com/ethereum/go-ethereum/les"
"github.com/ethereum/go-ethereum/les/status"
"github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog" "github.com/ethereum/go-ethereum/logger/glog"
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/whisper" "github.com/ethereum/go-ethereum/whisper"
"reflect"
) )
const ( const (
@ -331,13 +333,11 @@ func TestQueuedTransactions(t *testing.T) {
} }
// create an account // create an account
address, pubKey, mnemonic, err := createAccount(newAccountPassword) address, _, _, err := createAccount(newAccountPassword)
if err != nil { if err != nil {
fmt.Println(err.Error()) t.Errorf("could not create account: %v", err)
t.Error("Test failed: could not create account")
return return
} }
glog.V(logger.Info).Infof("Account created: {address: %s, key: %s, mnemonic:%s}", address, pubKey, mnemonic)
// test transaction queueing // test transaction queueing
var lightEthereum *les.LightEthereum var lightEthereum *les.LightEthereum
@ -346,37 +346,40 @@ func TestQueuedTransactions(t *testing.T) {
} }
backend := lightEthereum.StatusBackend backend := lightEthereum.StatusBackend
// replace transaction notification hanlder // replace transaction notification handler
sentinel := 0 var txHash = common.Hash{}
backend.SetTransactionQueueHandler(func(queuedTx les.QueuedTx) { backend.SetTransactionQueueHandler(func(queuedTx status.QueuedTx) {
glog.V(logger.Info).Infof("Queued transaction hash: %v\n", queuedTx.Hash.Hex()) glog.V(logger.Info).Infof("Transaction queued (will be completed in 5 secs): {id: %v, hash: %v}\n", queuedTx.Id, queuedTx.Hash.Hex())
var txHash common.Hash time.Sleep(5 * time.Second)
if txHash, err = completeTransaction(queuedTx.Hash.Hex(), testAddressPassword); err != nil { if txHash, err = completeTransaction(string(queuedTx.Id), testAddressPassword); err != nil {
t.Errorf("Test failed: cannot complete queued transation[%s]: %v", queuedTx.Hash.Hex(), err) t.Errorf("cannot complete queued transation[%v]: %v", queuedTx.Id, err)
return return
} }
glog.V(logger.Info).Infof("Transaction complete: https://testnet.etherscan.io/tx/%s", txHash.Hex()) glog.V(logger.Info).Infof("Transaction complete: https://testnet.etherscan.io/tx/%s", txHash.Hex())
sentinel = 1
}) })
// try completing non-existing transaction // try completing non-existing transaction
if _, err := completeTransaction("0x1234512345123451234512345123456123451234512345123451234512345123", testAddressPassword); err == nil { if _, err := completeTransaction("some-bad-transaction-id", testAddressPassword); err == nil {
t.Errorf("Test failed: error expected and not recieved") t.Errorf("Test failed: error expected and not recieved")
return
} }
// send normal transaction // send normal transaction
from, err := utils.MakeAddress(accountManager, testAddress) from, err := utils.MakeAddress(accountManager, testAddress)
if err != nil { if err != nil {
t.Errorf("Test failed: Could not retrieve account from address: %v", err) t.Errorf("could not retrieve account from address: %v", err)
return
} }
to, err := utils.MakeAddress(accountManager, address) to, err := utils.MakeAddress(accountManager, address)
if err != nil { if err != nil {
t.Errorf("Test failed: Could not retrieve account from address: %v", err) t.Errorf("could not retrieve account from address: %v", err)
return
} }
err = backend.SendTransaction(nil, les.SendTxArgs{ // this call blocks, up until Complete Transaction is called
txHashCheck, err := backend.SendTransaction(nil, status.SendTxArgs{
From: from.Address, From: from.Address,
To: &to.Address, To: &to.Address,
Value: rpc.NewHexNumber(big.NewInt(1000000000000)), Value: rpc.NewHexNumber(big.NewInt(1000000000000)),
@ -385,11 +388,52 @@ func TestQueuedTransactions(t *testing.T) {
t.Errorf("Test failed: cannot send transaction: %v", err) t.Errorf("Test failed: cannot send transaction: %v", err)
} }
time.Sleep(15 * time.Second) if !reflect.DeepEqual(txHash, txHashCheck) {
if sentinel != 1 { t.Errorf("Transaction hash returned from SendTransaction is invalid")
t.Error("Test failed: transaction was never queued or completed") return
} }
time.Sleep(10 * time.Second)
if reflect.DeepEqual(txHashCheck, common.Hash{}) {
t.Error("Test failed: transaction was never queued or completed")
return
}
// now test eviction queue
txQueue := backend.TransactionQueue()
var i = 0
backend.SetTransactionQueueHandler(func(queuedTx status.QueuedTx) {
//glog.V(logger.Info).Infof("%d. Transaction queued (queue size: %d): {id: %v}\n", i, txQueue.Count(), queuedTx.Id)
i++
})
if txQueue.Count() != 0 {
t.Errorf("transaction count should be zero: %d", txQueue.Count())
return
}
for i := 0; i < 10; i++ {
go backend.SendTransaction(nil, status.SendTxArgs{})
}
time.Sleep(3 * time.Second)
t.Logf("Number of transactions queued: %d. Queue size (shouldn't be more than %d): %d", i, status.DefaultTxQueueCap, txQueue.Count())
if txQueue.Count() != 10 {
t.Errorf("transaction count should be 10: got %d", txQueue.Count())
return
}
for i := 0; i < status.DefaultTxQueueCap+5; i++ { // stress test by hitting with lots of goroutines
go backend.SendTransaction(nil, status.SendTxArgs{})
}
time.Sleep(5 * time.Second)
if txQueue.Count() != status.DefaultTxQueueCap && txQueue.Count() != (status.DefaultTxQueueCap-1) {
t.Errorf("transaction count should be %d (or %d): got %d", status.DefaultTxQueueCap, status.DefaultTxQueueCap-1, txQueue.Count())
return
}
} }
func prepareTestNode() error { func prepareTestNode() error {

View File

@ -119,8 +119,8 @@ func UnlockAccount(address, password *C.char, seconds int) *C.char {
} }
//export CompleteTransaction //export CompleteTransaction
func CompleteTransaction(hash, password *C.char) *C.char { func CompleteTransaction(id, password *C.char) *C.char {
txHash, err := completeTransaction(C.GoString(hash), C.GoString(password)) txHash, err := completeTransaction(C.GoString(id), C.GoString(password))
errString := emptyError errString := emptyError
if err != nil { if err != nil {

View File

@ -1,7 +1,7 @@
package main package main
import ( import (
"github.com/ethereum/go-ethereum/les" "github.com/ethereum/go-ethereum/les/status"
) )
type AccountInfo struct { type AccountInfo struct {
@ -35,8 +35,8 @@ type WhisperMessageEvent struct {
} }
type SendTransactionEvent struct { type SendTransactionEvent struct {
Hash string `json:"hash"` Id string `json:"hash"`
Args les.SendTxArgs `json:"args"` Args status.SendTxArgs `json:"args"`
} }
type CompleteTransactionResult struct { type CompleteTransactionResult struct {

View File

@ -36,17 +36,18 @@ import (
"github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/les/status"
"github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog" "github.com/ethereum/go-ethereum/logger/glog"
"github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
"github.com/pborman/uuid"
"github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb"
"golang.org/x/net/context" "golang.org/x/net/context"
) )
const defaultGas = uint64(90000) const defaultGas = uint64(90000)
const defaultTxQueueCap = int(5)
// PublicEthereumAPI provides an API to access Ethereum related information. // PublicEthereumAPI provides an API to access Ethereum related information.
// It offers only methods that operate on public data that is freely available to anyone. // It offers only methods that operate on public data that is freely available to anyone.
@ -866,18 +867,18 @@ type PublicTransactionPoolAPI struct {
b Backend b Backend
muPendingTxSubs sync.Mutex muPendingTxSubs sync.Mutex
pendingTxSubs map[string]rpc.Subscription pendingTxSubs map[string]rpc.Subscription
txQueue chan QueuedTx txQueue chan *status.QueuedTx
} }
var txSingletonQueue chan QueuedTx var txSingletonQueue chan *status.QueuedTx
// NewPublicTransactionPoolAPI creates a new RPC service with methods specific for the transaction pool. // NewPublicTransactionPoolAPI creates a new RPC service with methods specific for the transaction pool.
func NewPublicTransactionPoolAPI(b Backend) *PublicTransactionPoolAPI { func NewPublicTransactionPoolAPI(b Backend) *PublicTransactionPoolAPI {
var once sync.Once var once sync.Once
once.Do(func() { once.Do(func() {
if txSingletonQueue == nil { if txSingletonQueue == nil {
glog.V(logger.Debug).Infof("Transaction queue (for status-go) inited") glog.V(logger.Info).Infof("Transaction queue inited (Public Transaction Pool API)")
txSingletonQueue = make(chan QueuedTx, defaultTxQueueCap) txSingletonQueue = make(chan *status.QueuedTx, status.DefaultTxSendQueueCap)
} }
}) })
@ -1141,33 +1142,37 @@ func submitTransaction(ctx context.Context, b Backend, tx *types.Transaction, si
return signedTx.Hash(), nil return signedTx.Hash(), nil
} }
// Queued Transaction is a container that holds context and arguments enough to complete the queued transaction. func (s *PublicTransactionPoolAPI) GetTransactionQueue() (chan *status.QueuedTx, error) {
type QueuedTx struct {
Hash common.Hash
Context context.Context
Args SendTxArgs
}
func (s *PublicTransactionPoolAPI) GetTransactionQueue() (chan QueuedTx, error) {
return s.txQueue, nil return s.txQueue, nil
} }
// SendTransaction queues transactions, to be fulfilled by CompleteQueuedTransaction() // SendTransaction queues transactions, to be fulfilled by CompleteQueuedTransaction()
func (s *PublicTransactionPoolAPI) SendTransaction(ctx context.Context, args SendTxArgs) (common.Hash, error) { func (s *PublicTransactionPoolAPI) SendTransaction(ctx context.Context, args SendTxArgs) (common.Hash, error) {
queuedTx := QueuedTx{ queuedTx := &status.QueuedTx{
Id: status.QueuedTxId(uuid.New()),
Hash: common.Hash{}, Hash: common.Hash{},
Context: ctx, Context: ctx,
Args: args, Args: status.SendTxArgs(args),
Done: make(chan struct{}, 1),
} }
// populate transaction hash // send transaction to pending pool
key, err := crypto.GenerateKey()
if err != nil {
panic(err)
}
queuedTx.Hash = common.BytesToHash(crypto.FromECDSA(key))
s.txQueue <- queuedTx s.txQueue <- queuedTx
// now wait up until transaction is complete (via call to CompleteQueuedTransaction) or timeout occurs
timeout := make(chan struct{}, 1)
go func() {
time.Sleep(status.DefaultTxSendCompletionTimeout * time.Second)
timeout <- struct{}{}
}()
select {
case <-queuedTx.Done:
return queuedTx.Hash, queuedTx.Err
case <-timeout:
return common.Hash{}, errors.New("transaction sending timed out")
}
return queuedTx.Hash, nil return queuedTx.Hash, nil
} }

View File

@ -0,0 +1,79 @@
package ethapi
import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/les/status"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog"
"golang.org/x/net/context"
)
// StatusBackend implements les.StatusBackend with direct calls to Ethereum
// internals to support calls from status-go bindings (to internal packages e.g. ethapi)
type StatusBackend struct {
eapi *PublicEthereumAPI // Wrapper around the Ethereum object to access metadata
bcapi *PublicBlockChainAPI // Wrapper around the blockchain to access chain data
txapi *PublicTransactionPoolAPI // Wrapper around the transaction pool to access transaction data
txQueue *status.TxQueue
}
// NewStatusBackend creates a new backend using an existing Ethereum object.
func NewStatusBackend(apiBackend Backend) *StatusBackend {
glog.V(logger.Info).Infof("Status backend service started")
backend := &StatusBackend{
eapi: NewPublicEthereumAPI(apiBackend, nil, nil),
bcapi: NewPublicBlockChainAPI(apiBackend),
txapi: NewPublicTransactionPoolAPI(apiBackend),
txQueue: status.NewTransactionQueue(),
}
go backend.transactionQueueForwardingLoop()
return backend
}
func (b *StatusBackend) SetTransactionQueueHandler(fn status.EnqueuedTxHandler) {
b.txQueue.SetEnqueueHandler(fn)
}
func (b *StatusBackend) TransactionQueue() *status.TxQueue {
return b.txQueue
}
// SendTransaction wraps call to PublicTransactionPoolAPI.SendTransaction
func (b *StatusBackend) SendTransaction(ctx context.Context, args status.SendTxArgs) (common.Hash, error) {
if ctx == nil {
ctx = context.Background()
}
return b.txapi.SendTransaction(ctx, SendTxArgs(args))
}
// CompleteQueuedTransaction wraps call to PublicTransactionPoolAPI.CompleteQueuedTransaction
func (b *StatusBackend) CompleteQueuedTransaction(id status.QueuedTxId, passphrase string) (common.Hash, error) {
queuedTx, err := b.txQueue.Get(id)
if err != nil {
return common.Hash{}, err
}
hash, err := b.txapi.CompleteQueuedTransaction(context.Background(), SendTxArgs(queuedTx.Args), passphrase)
queuedTx.Hash = hash
queuedTx.Err = err
queuedTx.Done <- struct{}{} // sendTransaction() waits on this, notify so that it can return
return hash, err
}
func (b *StatusBackend) transactionQueueForwardingLoop() {
txQueue, err := b.txapi.GetTransactionQueue()
if err != nil {
glog.V(logger.Error).Infof("cannot read from transaction queue")
return
}
// forward internal ethapi transactions to status backend
for queuedTx := range txQueue {
b.txQueue.Enqueue(queuedTx)
}
}

View File

@ -70,7 +70,7 @@ type LightEthereum struct {
netVersionId int netVersionId int
netRPCService *ethapi.PublicNetAPI netRPCService *ethapi.PublicNetAPI
StatusBackend *StatusBackend StatusBackend *ethapi.StatusBackend
} }
func New(ctx *node.ServiceContext, config *eth.Config) (*LightEthereum, error) { func New(ctx *node.ServiceContext, config *eth.Config) (*LightEthereum, error) {
@ -129,7 +129,7 @@ func New(ctx *node.ServiceContext, config *eth.Config) (*LightEthereum, error) {
eth.ApiBackend.gpo = gasprice.NewLightPriceOracle(eth.ApiBackend) eth.ApiBackend.gpo = gasprice.NewLightPriceOracle(eth.ApiBackend)
// inject status-im backend // inject status-im backend
eth.StatusBackend = NewStatusBackend(eth.ApiBackend) eth.StatusBackend = ethapi.NewStatusBackend(eth.ApiBackend)
return eth, nil return eth, nil
} }

View File

@ -0,0 +1,113 @@
package status
import (
"errors"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rpc"
"golang.org/x/net/context"
)
const (
DefaultTxQueueCap = int(35) // how many items can be queued
DefaultTxSendQueueCap = int(70) // how many items can be passed to sendTransaction() w/o blocking
DefaultTxSendCompletionTimeout = 300 // how many seconds to wait before returning result in sentTransaction()
)
var (
ErrQueuedTxIdNotFound = errors.New("transaction hash not found")
)
// TxQueue is capped container that holds pending transactions
type TxQueue struct {
transactions map[QueuedTxId]*QueuedTx
evictableIds chan QueuedTxId
enqueueTicker chan struct{}
// when items are enqueued notify handlers
txEnqueueHandler EnqueuedTxHandler
}
// QueuedTx holds enough information to complete the queued transaction.
type QueuedTx struct {
Id QueuedTxId
Hash common.Hash
Context context.Context
Args SendTxArgs
Done chan struct{}
Err error
}
type QueuedTxId string
// QueuedTxHandler is a function that receives queued/pending transactions, when they get queued
type EnqueuedTxHandler func(QueuedTx)
// SendTxArgs represents the arguments to submbit a new transaction into the transaction pool.
type SendTxArgs struct {
From common.Address `json:"from"`
To *common.Address `json:"to"`
Gas *rpc.HexNumber `json:"gas"`
GasPrice *rpc.HexNumber `json:"gasPrice"`
Value *rpc.HexNumber `json:"value"`
Data string `json:"data"`
Nonce *rpc.HexNumber `json:"nonce"`
}
func NewTransactionQueue() *TxQueue {
txQueue := &TxQueue{
transactions: make(map[QueuedTxId]*QueuedTx),
evictableIds: make(chan QueuedTxId, DefaultTxQueueCap), // will be used to evict in FIFO
enqueueTicker: make(chan struct{}),
}
go txQueue.evictionLoop()
return txQueue
}
func (q *TxQueue) evictionLoop() {
for range q.enqueueTicker {
if len(q.transactions) >= (DefaultTxQueueCap - 1) { // eviction is required to accommodate another/last item
delete(q.transactions, <-q.evictableIds)
}
}
}
func (q *TxQueue) Enqueue(tx *QueuedTx) error {
if q.txEnqueueHandler == nil { //discard, until handler is provided
return nil
}
q.enqueueTicker <- struct{}{} // notify eviction loop that we are trying to insert new item
q.evictableIds <- tx.Id // this will block when we hit DefaultTxQueueCap
q.transactions[tx.Id] = tx
// notify handler
q.txEnqueueHandler(*tx)
return nil
}
func (q *TxQueue) Get(id QueuedTxId) (*QueuedTx, error) {
if tx, ok := q.transactions[id]; ok {
delete(q.transactions, id)
return tx, nil
}
return nil, ErrQueuedTxIdNotFound
}
func (q *TxQueue) Count() int {
return len(q.transactions)
}
func (q *TxQueue) Has(id QueuedTxId) bool {
_, ok := q.transactions[id]
return ok
}
func (q *TxQueue) SetEnqueueHandler(fn EnqueuedTxHandler) {
q.txEnqueueHandler = fn
}

View File

@ -1,168 +0,0 @@
package les
import (
"golang.org/x/net/context"
"sync"
"errors"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/internal/ethapi"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog"
"github.com/ethereum/go-ethereum/rpc"
)
const (
defaultTxQueueCap = int(5) // how many items can be passed to sendTransaction() w/o blocking
defaultEvictingTxQueueCap = int(20) // how many items can be queued
defaultEvictingTxQueueEvictionStep = int(5) // how many item to evict in a single run
)
var (
ErrQueuedTxHashNotFound = errors.New("Transaction hash not found")
)
// StatusBackend implements les.StatusBackend with direct calls to Ethereum
// internals to support calls from status-go bindings (to internal packages e.g. ethapi)
type StatusBackend struct {
eapi *ethapi.PublicEthereumAPI // Wrapper around the Ethereum object to access metadata
bcapi *ethapi.PublicBlockChainAPI // Wrapper around the blockchain to access chain data
txapi *ethapi.PublicTransactionPoolAPI // Wrapper around the transaction pool to access transaction data
txQueue chan QueuedTx
txQueueHandler QueuedTxHandler
muTxQueueHanlder sync.Mutex
txEvictingQueue evictingTxQueue
}
type QueuedTxHash string
type evictingTxQueue struct {
transactions map[QueuedTxHash]*QueuedTx
evictionQueue chan QueuedTxHash
cap int
mu sync.Mutex
}
type QueuedTxHandler func(QueuedTx)
type QueuedTx struct {
Hash common.Hash
Context context.Context
Args SendTxArgs
}
// SendTxArgs represents the arguments to sumbit a new transaction into the transaction pool.
type SendTxArgs struct {
From common.Address `json:"from"`
To *common.Address `json:"to"`
Gas *rpc.HexNumber `json:"gas"`
GasPrice *rpc.HexNumber `json:"gasPrice"`
Value *rpc.HexNumber `json:"value"`
Data string `json:"data"`
Nonce *rpc.HexNumber `json:"nonce"`
}
// NewStatusBackend creates a new backend using an existing Ethereum object.
func NewStatusBackend(apiBackend ethapi.Backend) *StatusBackend {
glog.V(logger.Debug).Infof("Status service started")
backend := &StatusBackend{
eapi: ethapi.NewPublicEthereumAPI(apiBackend, nil, nil),
bcapi: ethapi.NewPublicBlockChainAPI(apiBackend),
txapi: ethapi.NewPublicTransactionPoolAPI(apiBackend),
txQueue: make(chan QueuedTx, defaultTxQueueCap),
txEvictingQueue: evictingTxQueue{
transactions: make(map[QueuedTxHash]*QueuedTx),
evictionQueue: make(chan QueuedTxHash, defaultEvictingTxQueueCap), // will be used to evict in FIFO
cap: defaultEvictingTxQueueCap,
},
}
go backend.transactionQueueForwardingLoop()
return backend
}
func (b *StatusBackend) SetTransactionQueueHandler(fn QueuedTxHandler) {
b.muTxQueueHanlder.Lock()
defer b.muTxQueueHanlder.Unlock()
b.txQueueHandler = fn
}
// SendTransaction wraps call to PublicTransactionPoolAPI.SendTransaction
func (b *StatusBackend) SendTransaction(ctx context.Context, args SendTxArgs) error {
if ctx == nil {
ctx = context.Background()
}
_, err := b.txapi.SendTransaction(ctx, ethapi.SendTxArgs(args))
return err
}
// CompleteQueuedTransaction wraps call to PublicTransactionPoolAPI.CompleteQueuedTransaction
func (b *StatusBackend) CompleteQueuedTransaction(hash QueuedTxHash, passphrase string) (common.Hash, error) {
queuedTx, err := b.txEvictingQueue.getQueuedTransaction(hash)
if err != nil {
return common.Hash{}, err
}
return b.txapi.CompleteQueuedTransaction(context.Background(), ethapi.SendTxArgs(queuedTx.Args), passphrase)
}
// GetTransactionQueue wraps call to PublicTransactionPoolAPI.GetTransactionQueue
func (b *StatusBackend) GetTransactionQueue() (chan QueuedTx, error) {
return b.txQueue, nil
}
func (b *StatusBackend) transactionQueueForwardingLoop() {
txQueue, err := b.txapi.GetTransactionQueue()
if err != nil {
glog.V(logger.Error).Infof("cannot read from transaction queue")
return
}
// forward internal ethapi transactions
for queuedTx := range txQueue {
if b.txQueueHandler == nil { //discard, until handler is provided
continue
}
tx := QueuedTx{
Hash: queuedTx.Hash,
Context: queuedTx.Context,
Args: SendTxArgs(queuedTx.Args),
}
b.txEvictingQueue.enqueueQueuedTransaction(tx)
b.txQueueHandler(tx)
}
}
func (q *evictingTxQueue) enqueueQueuedTransaction(tx QueuedTx) error {
q.mu.Lock()
defer q.mu.Unlock()
if q.cap <= len(q.transactions) { // eviction is required
for i := 0; i < defaultEvictingTxQueueEvictionStep; i++ {
hash := <-q.evictionQueue
delete(q.transactions, hash)
}
}
q.transactions[QueuedTxHash(tx.Hash.Hex())] = &tx
q.evictionQueue <- QueuedTxHash(tx.Hash.Hex())
return nil
}
func (q *evictingTxQueue) getQueuedTransaction(hash QueuedTxHash) (*QueuedTx, error) {
q.mu.Lock()
defer q.mu.Unlock()
if tx, ok := q.transactions[hash]; ok {
delete(q.transactions, hash)
return tx, nil
}
return nil, ErrQueuedTxHashNotFound
}