diff --git a/src/gethdep.go b/src/gethdep.go index e6bf8f080..571343fcb 100644 --- a/src/gethdep.go +++ b/src/gethdep.go @@ -16,7 +16,7 @@ import ( "github.com/ethereum/go-ethereum/cmd/utils" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" - "github.com/ethereum/go-ethereum/les" + "github.com/ethereum/go-ethereum/les/status" "github.com/ethereum/go-ethereum/p2p/discover" errextra "github.com/pkg/errors" "github.com/status-im/status-go/src/extkeys" @@ -200,11 +200,11 @@ func doAddPeer(url string) (bool, error) { return true, nil } -func onSendTransactionRequest(queuedTx les.QueuedTx) { +func onSendTransactionRequest(queuedTx status.QueuedTx) { event := GethEvent{ Type: "sendTransactionQueued", Event: SendTransactionEvent{ - Hash: queuedTx.Hash.Hex(), + Id: string(queuedTx.Id), Args: queuedTx.Args, }, } @@ -213,12 +213,12 @@ func onSendTransactionRequest(queuedTx les.QueuedTx) { C.GethServiceSignalEvent(C.CString(string(body))) } -func completeTransaction(hash, password string) (common.Hash, error) { +func completeTransaction(id, password string) (common.Hash, error) { if currentNode != nil { if lightEthereum != nil { backend := lightEthereum.StatusBackend - return backend.CompleteQueuedTransaction(les.QueuedTxHash(hash), password) + return backend.CompleteQueuedTransaction(status.QueuedTxId(id), password) } return common.Hash{}, errors.New("can not retrieve LES service") diff --git a/src/gethdep_test.go b/src/gethdep_test.go index 182fc17b6..e8c2a9206 100644 --- a/src/gethdep_test.go +++ b/src/gethdep_test.go @@ -13,10 +13,12 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/les" + "github.com/ethereum/go-ethereum/les/status" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger/glog" "github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/whisper" + "reflect" ) const ( @@ -331,13 +333,11 @@ func TestQueuedTransactions(t *testing.T) { } // create an account - address, pubKey, mnemonic, err := createAccount(newAccountPassword) + address, _, _, err := createAccount(newAccountPassword) if err != nil { - fmt.Println(err.Error()) - t.Error("Test failed: could not create account") + t.Errorf("could not create account: %v", err) return } - glog.V(logger.Info).Infof("Account created: {address: %s, key: %s, mnemonic:%s}", address, pubKey, mnemonic) // test transaction queueing var lightEthereum *les.LightEthereum @@ -346,37 +346,40 @@ func TestQueuedTransactions(t *testing.T) { } backend := lightEthereum.StatusBackend - // replace transaction notification hanlder - sentinel := 0 - backend.SetTransactionQueueHandler(func(queuedTx les.QueuedTx) { - glog.V(logger.Info).Infof("Queued transaction hash: %v\n", queuedTx.Hash.Hex()) - var txHash common.Hash - if txHash, err = completeTransaction(queuedTx.Hash.Hex(), testAddressPassword); err != nil { - t.Errorf("Test failed: cannot complete queued transation[%s]: %v", queuedTx.Hash.Hex(), err) + // replace transaction notification handler + var txHash = common.Hash{} + backend.SetTransactionQueueHandler(func(queuedTx status.QueuedTx) { + glog.V(logger.Info).Infof("Transaction queued (will be completed in 5 secs): {id: %v, hash: %v}\n", queuedTx.Id, queuedTx.Hash.Hex()) + time.Sleep(5 * time.Second) + if txHash, err = completeTransaction(string(queuedTx.Id), testAddressPassword); err != nil { + t.Errorf("cannot complete queued transation[%v]: %v", queuedTx.Id, err) return } glog.V(logger.Info).Infof("Transaction complete: https://testnet.etherscan.io/tx/%s", txHash.Hex()) - sentinel = 1 }) // try completing non-existing transaction - if _, err := completeTransaction("0x1234512345123451234512345123456123451234512345123451234512345123", testAddressPassword); err == nil { + if _, err := completeTransaction("some-bad-transaction-id", testAddressPassword); err == nil { t.Errorf("Test failed: error expected and not recieved") + return } // send normal transaction from, err := utils.MakeAddress(accountManager, testAddress) if err != nil { - t.Errorf("Test failed: Could not retrieve account from address: %v", err) + t.Errorf("could not retrieve account from address: %v", err) + return } to, err := utils.MakeAddress(accountManager, address) if err != nil { - t.Errorf("Test failed: Could not retrieve account from address: %v", err) + t.Errorf("could not retrieve account from address: %v", err) + return } - err = backend.SendTransaction(nil, les.SendTxArgs{ + // this call blocks, up until Complete Transaction is called + txHashCheck, err := backend.SendTransaction(nil, status.SendTxArgs{ From: from.Address, To: &to.Address, Value: rpc.NewHexNumber(big.NewInt(1000000000000)), @@ -385,11 +388,52 @@ func TestQueuedTransactions(t *testing.T) { t.Errorf("Test failed: cannot send transaction: %v", err) } - time.Sleep(15 * time.Second) - if sentinel != 1 { - t.Error("Test failed: transaction was never queued or completed") + if !reflect.DeepEqual(txHash, txHashCheck) { + t.Errorf("Transaction hash returned from SendTransaction is invalid") + return } + time.Sleep(10 * time.Second) + + if reflect.DeepEqual(txHashCheck, common.Hash{}) { + t.Error("Test failed: transaction was never queued or completed") + return + } + + // now test eviction queue + txQueue := backend.TransactionQueue() + var i = 0 + backend.SetTransactionQueueHandler(func(queuedTx status.QueuedTx) { + //glog.V(logger.Info).Infof("%d. Transaction queued (queue size: %d): {id: %v}\n", i, txQueue.Count(), queuedTx.Id) + i++ + }) + + if txQueue.Count() != 0 { + t.Errorf("transaction count should be zero: %d", txQueue.Count()) + return + } + + for i := 0; i < 10; i++ { + go backend.SendTransaction(nil, status.SendTxArgs{}) + } + time.Sleep(3 * time.Second) + + t.Logf("Number of transactions queued: %d. Queue size (shouldn't be more than %d): %d", i, status.DefaultTxQueueCap, txQueue.Count()) + + if txQueue.Count() != 10 { + t.Errorf("transaction count should be 10: got %d", txQueue.Count()) + return + } + + for i := 0; i < status.DefaultTxQueueCap+5; i++ { // stress test by hitting with lots of goroutines + go backend.SendTransaction(nil, status.SendTxArgs{}) + } + time.Sleep(5 * time.Second) + + if txQueue.Count() != status.DefaultTxQueueCap && txQueue.Count() != (status.DefaultTxQueueCap-1) { + t.Errorf("transaction count should be %d (or %d): got %d", status.DefaultTxQueueCap, status.DefaultTxQueueCap-1, txQueue.Count()) + return + } } func prepareTestNode() error { diff --git a/src/library.go b/src/library.go index 448ffa7fb..aa4cdc6aa 100644 --- a/src/library.go +++ b/src/library.go @@ -119,8 +119,8 @@ func UnlockAccount(address, password *C.char, seconds int) *C.char { } //export CompleteTransaction -func CompleteTransaction(hash, password *C.char) *C.char { - txHash, err := completeTransaction(C.GoString(hash), C.GoString(password)) +func CompleteTransaction(id, password *C.char) *C.char { + txHash, err := completeTransaction(C.GoString(id), C.GoString(password)) errString := emptyError if err != nil { diff --git a/src/types.go b/src/types.go index 80edb3c9d..9b4cf458e 100644 --- a/src/types.go +++ b/src/types.go @@ -1,7 +1,7 @@ package main import ( - "github.com/ethereum/go-ethereum/les" + "github.com/ethereum/go-ethereum/les/status" ) type AccountInfo struct { @@ -35,8 +35,8 @@ type WhisperMessageEvent struct { } type SendTransactionEvent struct { - Hash string `json:"hash"` - Args les.SendTxArgs `json:"args"` + Id string `json:"hash"` + Args status.SendTxArgs `json:"args"` } type CompleteTransactionResult struct { diff --git a/src/vendor/github.com/ethereum/go-ethereum/internal/ethapi/api.go b/src/vendor/github.com/ethereum/go-ethereum/internal/ethapi/api.go index 16b97616a..e6a429129 100644 --- a/src/vendor/github.com/ethereum/go-ethereum/internal/ethapi/api.go +++ b/src/vendor/github.com/ethereum/go-ethereum/internal/ethapi/api.go @@ -36,17 +36,18 @@ import ( "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/les/status" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger/glog" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rpc" + "github.com/pborman/uuid" "github.com/syndtr/goleveldb/leveldb" "golang.org/x/net/context" ) const defaultGas = uint64(90000) -const defaultTxQueueCap = int(5) // PublicEthereumAPI provides an API to access Ethereum related information. // It offers only methods that operate on public data that is freely available to anyone. @@ -866,18 +867,18 @@ type PublicTransactionPoolAPI struct { b Backend muPendingTxSubs sync.Mutex pendingTxSubs map[string]rpc.Subscription - txQueue chan QueuedTx + txQueue chan *status.QueuedTx } -var txSingletonQueue chan QueuedTx +var txSingletonQueue chan *status.QueuedTx // NewPublicTransactionPoolAPI creates a new RPC service with methods specific for the transaction pool. func NewPublicTransactionPoolAPI(b Backend) *PublicTransactionPoolAPI { var once sync.Once once.Do(func() { if txSingletonQueue == nil { - glog.V(logger.Debug).Infof("Transaction queue (for status-go) inited") - txSingletonQueue = make(chan QueuedTx, defaultTxQueueCap) + glog.V(logger.Info).Infof("Transaction queue inited (Public Transaction Pool API)") + txSingletonQueue = make(chan *status.QueuedTx, status.DefaultTxSendQueueCap) } }) @@ -1141,33 +1142,37 @@ func submitTransaction(ctx context.Context, b Backend, tx *types.Transaction, si return signedTx.Hash(), nil } -// Queued Transaction is a container that holds context and arguments enough to complete the queued transaction. -type QueuedTx struct { - Hash common.Hash - Context context.Context - Args SendTxArgs -} - -func (s *PublicTransactionPoolAPI) GetTransactionQueue() (chan QueuedTx, error) { +func (s *PublicTransactionPoolAPI) GetTransactionQueue() (chan *status.QueuedTx, error) { return s.txQueue, nil } // SendTransaction queues transactions, to be fulfilled by CompleteQueuedTransaction() func (s *PublicTransactionPoolAPI) SendTransaction(ctx context.Context, args SendTxArgs) (common.Hash, error) { - queuedTx := QueuedTx{ + queuedTx := &status.QueuedTx{ + Id: status.QueuedTxId(uuid.New()), Hash: common.Hash{}, Context: ctx, - Args: args, + Args: status.SendTxArgs(args), + Done: make(chan struct{}, 1), } - // populate transaction hash - key, err := crypto.GenerateKey() - if err != nil { - panic(err) - } - queuedTx.Hash = common.BytesToHash(crypto.FromECDSA(key)) - + // send transaction to pending pool s.txQueue <- queuedTx + + // now wait up until transaction is complete (via call to CompleteQueuedTransaction) or timeout occurs + timeout := make(chan struct{}, 1) + go func() { + time.Sleep(status.DefaultTxSendCompletionTimeout * time.Second) + timeout <- struct{}{} + }() + + select { + case <-queuedTx.Done: + return queuedTx.Hash, queuedTx.Err + case <-timeout: + return common.Hash{}, errors.New("transaction sending timed out") + } + return queuedTx.Hash, nil } diff --git a/src/vendor/github.com/ethereum/go-ethereum/internal/ethapi/status_backend.go b/src/vendor/github.com/ethereum/go-ethereum/internal/ethapi/status_backend.go new file mode 100644 index 000000000..5632b48d8 --- /dev/null +++ b/src/vendor/github.com/ethereum/go-ethereum/internal/ethapi/status_backend.go @@ -0,0 +1,79 @@ +package ethapi + +import ( + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/les/status" + "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/logger/glog" + "golang.org/x/net/context" +) + +// StatusBackend implements les.StatusBackend with direct calls to Ethereum +// internals to support calls from status-go bindings (to internal packages e.g. ethapi) +type StatusBackend struct { + eapi *PublicEthereumAPI // Wrapper around the Ethereum object to access metadata + bcapi *PublicBlockChainAPI // Wrapper around the blockchain to access chain data + txapi *PublicTransactionPoolAPI // Wrapper around the transaction pool to access transaction data + + txQueue *status.TxQueue +} + +// NewStatusBackend creates a new backend using an existing Ethereum object. +func NewStatusBackend(apiBackend Backend) *StatusBackend { + glog.V(logger.Info).Infof("Status backend service started") + backend := &StatusBackend{ + eapi: NewPublicEthereumAPI(apiBackend, nil, nil), + bcapi: NewPublicBlockChainAPI(apiBackend), + txapi: NewPublicTransactionPoolAPI(apiBackend), + txQueue: status.NewTransactionQueue(), + } + + go backend.transactionQueueForwardingLoop() + + return backend +} + +func (b *StatusBackend) SetTransactionQueueHandler(fn status.EnqueuedTxHandler) { + b.txQueue.SetEnqueueHandler(fn) +} + +func (b *StatusBackend) TransactionQueue() *status.TxQueue { + return b.txQueue +} + +// SendTransaction wraps call to PublicTransactionPoolAPI.SendTransaction +func (b *StatusBackend) SendTransaction(ctx context.Context, args status.SendTxArgs) (common.Hash, error) { + if ctx == nil { + ctx = context.Background() + } + + return b.txapi.SendTransaction(ctx, SendTxArgs(args)) +} + +// CompleteQueuedTransaction wraps call to PublicTransactionPoolAPI.CompleteQueuedTransaction +func (b *StatusBackend) CompleteQueuedTransaction(id status.QueuedTxId, passphrase string) (common.Hash, error) { + queuedTx, err := b.txQueue.Get(id) + if err != nil { + return common.Hash{}, err + } + + hash, err := b.txapi.CompleteQueuedTransaction(context.Background(), SendTxArgs(queuedTx.Args), passphrase) + queuedTx.Hash = hash + queuedTx.Err = err + queuedTx.Done <- struct{}{} // sendTransaction() waits on this, notify so that it can return + + return hash, err +} + +func (b *StatusBackend) transactionQueueForwardingLoop() { + txQueue, err := b.txapi.GetTransactionQueue() + if err != nil { + glog.V(logger.Error).Infof("cannot read from transaction queue") + return + } + + // forward internal ethapi transactions to status backend + for queuedTx := range txQueue { + b.txQueue.Enqueue(queuedTx) + } +} diff --git a/src/vendor/github.com/ethereum/go-ethereum/les/backend.go b/src/vendor/github.com/ethereum/go-ethereum/les/backend.go index 5307b65d8..4ff21c8ca 100644 --- a/src/vendor/github.com/ethereum/go-ethereum/les/backend.go +++ b/src/vendor/github.com/ethereum/go-ethereum/les/backend.go @@ -70,7 +70,7 @@ type LightEthereum struct { netVersionId int netRPCService *ethapi.PublicNetAPI - StatusBackend *StatusBackend + StatusBackend *ethapi.StatusBackend } func New(ctx *node.ServiceContext, config *eth.Config) (*LightEthereum, error) { @@ -129,7 +129,7 @@ func New(ctx *node.ServiceContext, config *eth.Config) (*LightEthereum, error) { eth.ApiBackend.gpo = gasprice.NewLightPriceOracle(eth.ApiBackend) // inject status-im backend - eth.StatusBackend = NewStatusBackend(eth.ApiBackend) + eth.StatusBackend = ethapi.NewStatusBackend(eth.ApiBackend) return eth, nil } diff --git a/src/vendor/github.com/ethereum/go-ethereum/les/status/txqueue.go b/src/vendor/github.com/ethereum/go-ethereum/les/status/txqueue.go new file mode 100644 index 000000000..2b8a0d879 --- /dev/null +++ b/src/vendor/github.com/ethereum/go-ethereum/les/status/txqueue.go @@ -0,0 +1,113 @@ +package status + +import ( + "errors" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/rpc" + "golang.org/x/net/context" +) + +const ( + DefaultTxQueueCap = int(35) // how many items can be queued + DefaultTxSendQueueCap = int(70) // how many items can be passed to sendTransaction() w/o blocking + DefaultTxSendCompletionTimeout = 300 // how many seconds to wait before returning result in sentTransaction() +) + +var ( + ErrQueuedTxIdNotFound = errors.New("transaction hash not found") +) + +// TxQueue is capped container that holds pending transactions +type TxQueue struct { + transactions map[QueuedTxId]*QueuedTx + evictableIds chan QueuedTxId + enqueueTicker chan struct{} + + // when items are enqueued notify handlers + txEnqueueHandler EnqueuedTxHandler +} + +// QueuedTx holds enough information to complete the queued transaction. +type QueuedTx struct { + Id QueuedTxId + Hash common.Hash + Context context.Context + Args SendTxArgs + Done chan struct{} + Err error +} + +type QueuedTxId string + +// QueuedTxHandler is a function that receives queued/pending transactions, when they get queued +type EnqueuedTxHandler func(QueuedTx) + +// SendTxArgs represents the arguments to submbit a new transaction into the transaction pool. +type SendTxArgs struct { + From common.Address `json:"from"` + To *common.Address `json:"to"` + Gas *rpc.HexNumber `json:"gas"` + GasPrice *rpc.HexNumber `json:"gasPrice"` + Value *rpc.HexNumber `json:"value"` + Data string `json:"data"` + Nonce *rpc.HexNumber `json:"nonce"` +} + +func NewTransactionQueue() *TxQueue { + txQueue := &TxQueue{ + transactions: make(map[QueuedTxId]*QueuedTx), + evictableIds: make(chan QueuedTxId, DefaultTxQueueCap), // will be used to evict in FIFO + enqueueTicker: make(chan struct{}), + } + + go txQueue.evictionLoop() + + return txQueue +} + +func (q *TxQueue) evictionLoop() { + for range q.enqueueTicker { + if len(q.transactions) >= (DefaultTxQueueCap - 1) { // eviction is required to accommodate another/last item + delete(q.transactions, <-q.evictableIds) + } + } +} + +func (q *TxQueue) Enqueue(tx *QueuedTx) error { + if q.txEnqueueHandler == nil { //discard, until handler is provided + return nil + } + + q.enqueueTicker <- struct{}{} // notify eviction loop that we are trying to insert new item + q.evictableIds <- tx.Id // this will block when we hit DefaultTxQueueCap + + q.transactions[tx.Id] = tx + + // notify handler + q.txEnqueueHandler(*tx) + + return nil +} + +func (q *TxQueue) Get(id QueuedTxId) (*QueuedTx, error) { + if tx, ok := q.transactions[id]; ok { + delete(q.transactions, id) + return tx, nil + } + + return nil, ErrQueuedTxIdNotFound +} + +func (q *TxQueue) Count() int { + return len(q.transactions) +} + +func (q *TxQueue) Has(id QueuedTxId) bool { + _, ok := q.transactions[id] + + return ok +} + +func (q *TxQueue) SetEnqueueHandler(fn EnqueuedTxHandler) { + q.txEnqueueHandler = fn +} diff --git a/src/vendor/github.com/ethereum/go-ethereum/les/status_backend.go b/src/vendor/github.com/ethereum/go-ethereum/les/status_backend.go deleted file mode 100644 index 67022cef9..000000000 --- a/src/vendor/github.com/ethereum/go-ethereum/les/status_backend.go +++ /dev/null @@ -1,168 +0,0 @@ -package les - -import ( - "golang.org/x/net/context" - "sync" - - "errors" - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/internal/ethapi" - "github.com/ethereum/go-ethereum/logger" - "github.com/ethereum/go-ethereum/logger/glog" - "github.com/ethereum/go-ethereum/rpc" -) - -const ( - defaultTxQueueCap = int(5) // how many items can be passed to sendTransaction() w/o blocking - defaultEvictingTxQueueCap = int(20) // how many items can be queued - defaultEvictingTxQueueEvictionStep = int(5) // how many item to evict in a single run -) - -var ( - ErrQueuedTxHashNotFound = errors.New("Transaction hash not found") -) - -// StatusBackend implements les.StatusBackend with direct calls to Ethereum -// internals to support calls from status-go bindings (to internal packages e.g. ethapi) -type StatusBackend struct { - eapi *ethapi.PublicEthereumAPI // Wrapper around the Ethereum object to access metadata - bcapi *ethapi.PublicBlockChainAPI // Wrapper around the blockchain to access chain data - txapi *ethapi.PublicTransactionPoolAPI // Wrapper around the transaction pool to access transaction data - - txQueue chan QueuedTx - txQueueHandler QueuedTxHandler - muTxQueueHanlder sync.Mutex - - txEvictingQueue evictingTxQueue -} - -type QueuedTxHash string - -type evictingTxQueue struct { - transactions map[QueuedTxHash]*QueuedTx - evictionQueue chan QueuedTxHash - cap int - mu sync.Mutex -} - -type QueuedTxHandler func(QueuedTx) - -type QueuedTx struct { - Hash common.Hash - Context context.Context - Args SendTxArgs -} - -// SendTxArgs represents the arguments to sumbit a new transaction into the transaction pool. -type SendTxArgs struct { - From common.Address `json:"from"` - To *common.Address `json:"to"` - Gas *rpc.HexNumber `json:"gas"` - GasPrice *rpc.HexNumber `json:"gasPrice"` - Value *rpc.HexNumber `json:"value"` - Data string `json:"data"` - Nonce *rpc.HexNumber `json:"nonce"` -} - -// NewStatusBackend creates a new backend using an existing Ethereum object. -func NewStatusBackend(apiBackend ethapi.Backend) *StatusBackend { - glog.V(logger.Debug).Infof("Status service started") - backend := &StatusBackend{ - eapi: ethapi.NewPublicEthereumAPI(apiBackend, nil, nil), - bcapi: ethapi.NewPublicBlockChainAPI(apiBackend), - txapi: ethapi.NewPublicTransactionPoolAPI(apiBackend), - txQueue: make(chan QueuedTx, defaultTxQueueCap), - txEvictingQueue: evictingTxQueue{ - transactions: make(map[QueuedTxHash]*QueuedTx), - evictionQueue: make(chan QueuedTxHash, defaultEvictingTxQueueCap), // will be used to evict in FIFO - cap: defaultEvictingTxQueueCap, - }, - } - - go backend.transactionQueueForwardingLoop() - - return backend -} - -func (b *StatusBackend) SetTransactionQueueHandler(fn QueuedTxHandler) { - b.muTxQueueHanlder.Lock() - defer b.muTxQueueHanlder.Unlock() - - b.txQueueHandler = fn -} - -// SendTransaction wraps call to PublicTransactionPoolAPI.SendTransaction -func (b *StatusBackend) SendTransaction(ctx context.Context, args SendTxArgs) error { - if ctx == nil { - ctx = context.Background() - } - - _, err := b.txapi.SendTransaction(ctx, ethapi.SendTxArgs(args)) - return err -} - -// CompleteQueuedTransaction wraps call to PublicTransactionPoolAPI.CompleteQueuedTransaction -func (b *StatusBackend) CompleteQueuedTransaction(hash QueuedTxHash, passphrase string) (common.Hash, error) { - queuedTx, err := b.txEvictingQueue.getQueuedTransaction(hash) - if err != nil { - return common.Hash{}, err - } - - return b.txapi.CompleteQueuedTransaction(context.Background(), ethapi.SendTxArgs(queuedTx.Args), passphrase) -} - -// GetTransactionQueue wraps call to PublicTransactionPoolAPI.GetTransactionQueue -func (b *StatusBackend) GetTransactionQueue() (chan QueuedTx, error) { - return b.txQueue, nil -} - -func (b *StatusBackend) transactionQueueForwardingLoop() { - txQueue, err := b.txapi.GetTransactionQueue() - if err != nil { - glog.V(logger.Error).Infof("cannot read from transaction queue") - return - } - - // forward internal ethapi transactions - for queuedTx := range txQueue { - if b.txQueueHandler == nil { //discard, until handler is provided - continue - } - tx := QueuedTx{ - Hash: queuedTx.Hash, - Context: queuedTx.Context, - Args: SendTxArgs(queuedTx.Args), - } - b.txEvictingQueue.enqueueQueuedTransaction(tx) - b.txQueueHandler(tx) - } -} - -func (q *evictingTxQueue) enqueueQueuedTransaction(tx QueuedTx) error { - q.mu.Lock() - defer q.mu.Unlock() - - if q.cap <= len(q.transactions) { // eviction is required - for i := 0; i < defaultEvictingTxQueueEvictionStep; i++ { - hash := <-q.evictionQueue - delete(q.transactions, hash) - } - } - - q.transactions[QueuedTxHash(tx.Hash.Hex())] = &tx - q.evictionQueue <- QueuedTxHash(tx.Hash.Hex()) - - return nil -} - -func (q *evictingTxQueue) getQueuedTransaction(hash QueuedTxHash) (*QueuedTx, error) { - q.mu.Lock() - defer q.mu.Unlock() - - if tx, ok := q.transactions[hash]; ok { - delete(q.transactions, hash) - return tx, nil - } - - return nil, ErrQueuedTxHashNotFound -}