CompleteTransaction() exposed + onSendTransactionRequest() notifier added

This commit is contained in:
Victor Farazdagi 2016-07-27 14:47:41 +03:00
parent 45ffd5599d
commit 92737d0395
9 changed files with 359 additions and 96 deletions

View File

@ -1,15 +1,25 @@
package main package main
/*
#include <stddef.h>
#include <stdbool.h>
#include <jni.h>
extern bool GethServiceSignalEvent( const char *jsonEvent );
*/
import "C"
import ( import (
"errors" "errors"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"time" "time"
"encoding/json"
"github.com/ethereum/go-ethereum/accounts" "github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/cmd/utils" "github.com/ethereum/go-ethereum/cmd/utils"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/les"
"github.com/ethereum/go-ethereum/p2p/discover" "github.com/ethereum/go-ethereum/p2p/discover"
errextra "github.com/pkg/errors" errextra "github.com/pkg/errors"
) )
@ -95,7 +105,6 @@ func createAndStartNode(inputDir string) error {
} }
func doAddPeer(url string) (bool, error) { func doAddPeer(url string) (bool, error) {
server := currentNode.Server() server := currentNode.Server()
if server == nil { if server == nil {
@ -108,4 +117,30 @@ func doAddPeer(url string) (bool, error) {
} }
server.AddPeer(node) server.AddPeer(node)
return true, nil return true, nil
} }
func onSendTransactionRequest(queuedTx les.QueuedTx) {
event := GethEvent{
Type: "sendTransactionQueued",
Event: SendTransactionEvent{
Hash: queuedTx.Hash.Hex(),
Args: queuedTx.Args,
},
}
body, _ := json.Marshal(&event)
C.GethServiceSignalEvent(C.CString(string(body)))
}
func completeTransaction(hash string) error {
if currentNode != nil {
if lightEthereum != nil {
backend := lightEthereum.StatusBackend
return backend.CompleteQueuedTransaction(les.QueuedTxHash(hash))
}
return errors.New("Could not retrieve light ethereum service")
}
return errors.New("No running node detected for account unlock")
}

View File

@ -6,9 +6,15 @@ import (
"testing" "testing"
"time" "time"
"github.com/ethereum/go-ethereum/cmd/utils"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/les"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/whisper" "github.com/ethereum/go-ethereum/whisper"
"math/big"
) )
// TestAccountBindings makes sure we can create an account and subsequently // TestAccountBindings makes sure we can create an account and subsequently
@ -37,7 +43,7 @@ func TestAccountBindings(t *testing.T) {
} }
time.Sleep(2 * time.Second) time.Sleep(2 * time.Second)
// test to see if the account was injected in whisqer // test to see if the account was injected in whisper
var whisperInstance *whisper.Whisper var whisperInstance *whisper.Whisper
if err := currentNode.Service(&whisperInstance); err != nil { if err := currentNode.Service(&whisperInstance); err != nil {
t.Errorf("whisper service not running: %v", err) t.Errorf("whisper service not running: %v", err)
@ -56,11 +62,67 @@ func TestAccountBindings(t *testing.T) {
Payload: "test message", Payload: "test message",
} }
whisperAPI := whisper.NewPublicWhisperAPI(whisperInstance) whisperAPI := whisper.NewPublicWhisperAPI(whisperInstance)
postSucess, err := whisperAPI.Post(postArgs) postSuccess, err := whisperAPI.Post(postArgs)
if !postSucess || err != nil { if !postSuccess || err != nil {
t.Errorf("Test failed: Could not post to whisper: %v", err) t.Errorf("Test failed: Could not post to whisper: %v", err)
} }
// create another account
address1, _, err := createAccount("badpassword")
if err != nil {
fmt.Println(err.Error())
t.Error("Test failed: could not create account")
}
// unlock the created account
err = unlockAccount(address1, "badpassword", 3)
if err != nil {
fmt.Println(err)
t.Error("Test failed: could not unlock account")
}
time.Sleep(2 * time.Second)
// test transaction queueing
var lightEthereum *les.LightEthereum
if err := currentNode.Service(&lightEthereum); err != nil {
t.Errorf("Test failed: LES service is not running: %v", err)
}
backend := lightEthereum.StatusBackend
// replace transaction notification hanlder
sentinel := 0
backend.SetTransactionQueueHandler(func(queuedTx les.QueuedTx) {
glog.V(logger.Info).Infof("[STATUS-GO] Tx queue value: %v\n", queuedTx.Hash.Hex())
if err := completeTransaction(queuedTx.Hash.Hex()); err != nil {
t.Errorf("Test failed: cannot complete queued transation[%s]: %v", queuedTx.Hash.Hex(), err)
}
sentinel = 1
})
from, err := utils.MakeAddress(accountManager, address1)
if err != nil {
t.Errorf("Test failed: Could not retrieve account from address: %v", err)
}
to, err := utils.MakeAddress(accountManager, address)
if err != nil {
t.Errorf("Test failed: Could not retrieve account from address: %v", err)
}
err = backend.SendTransaction(nil, les.SendTxArgs{
From: from.Address,
To: &to.Address,
Value: rpc.NewHexNumber(big.NewInt(1000000000000)),
})
if err != nil {
t.Errorf("Test failed: cannot send transaction: %v", err)
}
time.Sleep(5 * time.Second)
if sentinel != 1 {
t.Error("Test failed: transaction was never queued or completed")
}
// clean up // clean up
err = os.RemoveAll(".ethereumtest") err = os.RemoveAll(".ethereumtest")
if err != nil { if err != nil {

View File

@ -4,8 +4,8 @@ import "C"
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"os"
"github.com/ethereum/go-ethereum/whisper" "github.com/ethereum/go-ethereum/whisper"
"os"
) )
var emptyError = "" var emptyError = ""
@ -63,6 +63,24 @@ func UnlockAccount(address, password *C.char, seconds int) *C.char {
return C.CString(string(outBytes)) return C.CString(string(outBytes))
} }
//export CompleteTransaction
func CompleteTransaction(hash *C.char) *C.char {
err := completeTransaction(C.GoString(hash))
errString := emptyError
if err != nil {
fmt.Fprintln(os.Stderr, err)
errString = err.Error()
}
out := JSONError{
Error: errString,
}
outBytes, _ := json.Marshal(&out)
return C.CString(string(outBytes))
}
//export StartNode //export StartNode
func StartNode(datadir *C.char) *C.char { func StartNode(datadir *C.char) *C.char {
@ -121,11 +139,11 @@ func addPeer(url *C.char) *C.char {
//export addWhisperFilter //export addWhisperFilter
func addWhisperFilter(filterJson *C.char) *C.char { func addWhisperFilter(filterJson *C.char) *C.char {
var id int var id int
var filter whisper.NewFilterArgs var filter whisper.NewFilterArgs
err := json.Unmarshal([]byte(C.GoString(filterJson)), &filter) err := json.Unmarshal([]byte(C.GoString(filterJson)), &filter)
if err == nil { if err == nil {
id = doAddWhisperFilter(filter) id = doAddWhisperFilter(filter)
} }
@ -136,13 +154,13 @@ func addWhisperFilter(filterJson *C.char) *C.char {
} }
out := AddWhisperFilterResult{ out := AddWhisperFilterResult{
Id: id, Id: id,
Error: errString, Error: errString,
} }
outBytes, _ := json.Marshal(&out) outBytes, _ := json.Marshal(&out)
return C.CString(string(outBytes)) return C.CString(string(outBytes))
} }
//export removeWhisperFilter //export removeWhisperFilter
@ -154,5 +172,5 @@ func removeWhisperFilter(idFilter int) {
//export clearWhisperFilters //export clearWhisperFilters
func clearWhisperFilters() { func clearWhisperFilters() {
doClearWhisperFilters() doClearWhisperFilters()
} }

View File

@ -8,6 +8,7 @@ import (
"github.com/ethereum/go-ethereum/accounts" "github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/cmd/utils" "github.com/ethereum/go-ethereum/cmd/utils"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/les"
"github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog" "github.com/ethereum/go-ethereum/logger/glog"
"github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/node"
@ -29,14 +30,15 @@ const (
) )
var ( var (
vString string // Combined textual representation of the version vString string // Combined textual representation of the version
rConfig release.Config // Structured version information and release oracle config rConfig release.Config // Structured version information and release oracle config
currentNode *node.Node // currently running geth node currentNode *node.Node // currently running geth node
c *cli.Context // the CLI context used to start the geth node c *cli.Context // the CLI context used to start the geth node
accountSync *[]node.Service // the object used to sync accounts between geth services accountSync *[]node.Service // the object used to sync accounts between geth services
accountManager *accounts.Manager // the account manager attached to the currentNode lightEthereum *les.LightEthereum // LES service
whisperService *whisper.Whisper // whisper service accountManager *accounts.Manager // the account manager attached to the currentNode
datadir string // data directory for geth whisperService *whisper.Whisper // whisper service
datadir string // data directory for geth
) )
func main() { func main() {
@ -60,7 +62,8 @@ func MakeNode(inputDir string) *node.Node {
set.Bool("rpc", true, "enable rpc") set.Bool("rpc", true, "enable rpc")
set.String("rpcaddr", "localhost", "host for RPC") set.String("rpcaddr", "localhost", "host for RPC")
set.String("rpcport", "8545", "rpc port") set.String("rpcport", "8545", "rpc port")
set.String("rpcapi", "db,eth,net,web3,shh,admin", "rpc api(s)") set.String("verbosity", "3", "verbosity level")
set.String("rpcapi", "db,eth,net,web3,shh,personal,admin", "rpc api(s)")
set.String("datadir", datadir, "data directory for geth") set.String("datadir", datadir, "data directory for geth")
set.String("logdir", datadir, "log dir for glog") set.String("logdir", datadir, "log dir for glog")
c = cli.NewContext(nil, set, nil) c = cli.NewContext(nil, set, nil)
@ -92,6 +95,11 @@ func RunNode(nodeIn *node.Node) {
if err := nodeIn.Service(&whisperService); err != nil { if err := nodeIn.Service(&whisperService); err != nil {
glog.V(logger.Warn).Infoln("cannot get whisper service:", err) glog.V(logger.Warn).Infoln("cannot get whisper service:", err)
} }
if err := nodeIn.Service(&lightEthereum); err != nil {
glog.V(logger.Warn).Infoln("cannot get light ethereum service:", err)
}
lightEthereum.StatusBackend.SetTransactionQueueHandler(onSendTransactionRequest)
nodeIn.Wait() nodeIn.Wait()
} }

View File

@ -1,5 +1,9 @@
package main package main
import (
"github.com/ethereum/go-ethereum/les"
)
type AccountInfo struct { type AccountInfo struct {
Address string `json:"address"` Address string `json:"address"`
PubKey string `json:"pubkey"` PubKey string `json:"pubkey"`
@ -11,25 +15,30 @@ type JSONError struct {
} }
type AddPeerResult struct { type AddPeerResult struct {
Success bool `json:"success"` Success bool `json:"success"`
Error string `json:"error"` Error string `json:"error"`
} }
type AddWhisperFilterResult struct { type AddWhisperFilterResult struct {
Id int `json:"id"` Id int `json:"id"`
Error string `json:"error"` Error string `json:"error"`
} }
type WhisperMessageEvent struct { type WhisperMessageEvent struct {
Payload string `json:"payload"` Payload string `json:"payload"`
To string `json:"to"` To string `json:"to"`
From string `json:"from"` From string `json:"from"`
Sent int64 `json:"sent"` Sent int64 `json:"sent"`
TTL int64 `json:"ttl"` TTL int64 `json:"ttl"`
Hash string `json:"hash"` Hash string `json:"hash"`
}
type SendTransactionEvent struct {
Hash string `json:"hash"`
Args les.SendTxArgs `json:"args"`
} }
type GethEvent struct { type GethEvent struct {
Type string `json:"type"` Type string `json:"type"`
Event interface{} `json:"event"` Event interface{} `json:"event"`
} }

View File

@ -1,44 +0,0 @@
diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go
index fc58447..e9d9ef1 100644
--- a/cmd/utils/flags.go
+++ b/cmd/utils/flags.go
@@ -39,6 +39,7 @@ import (
"github.com/ethereum/go-ethereum/eth"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
+ "github.com/ethereum/go-ethereum/internal/debug"
"github.com/ethereum/go-ethereum/les"
"github.com/ethereum/go-ethereum/light"
"github.com/ethereum/go-ethereum/logger"
@@ -160,7 +161,7 @@ var (
Usage: "Enable light client mode",
}
NoDefSrvFlag = cli.BoolFlag{
- Name: "nodefsrv",
+ Name: "nodefsrv",
Usage: "Don't add default LES server (only for test version)",
}
LightServFlag = cli.IntFlag{
@@ -419,6 +420,13 @@ var (
}
)
+// DebugSetup sets up the debugging parameters such that logs can be retrieved when a
+// node is started via importing go-ethereum packages, as opposed to starting via CLI
+func DebugSetup(ctx *cli.Context) error {
+ err := debug.Setup(ctx)
+ return err
+}
+
// MustMakeDataDir retrieves the currently requested data directory, terminating
// if none (or the empty string) is specified. If the node is starting a testnet,
// the a subdirectory of the specified datadir will be used.
@@ -669,7 +677,7 @@ func MakeSystemNode(name, version string, relconf release.Config, extra []byte,
DataDir: MustMakeDataDir(ctx),
PrivateKey: MakeNodeKey(ctx),
Name: MakeNodeName(name, version, ctx),
- NoDiscovery: ctx.GlobalBool(NoDiscoverFlag.Name) || ctx.GlobalBool(LightModeFlag.Name), // light client hack
+ NoDiscovery: ctx.GlobalBool(NoDiscoverFlag.Name) || ctx.GlobalBool(LightModeFlag.Name), // light client hack
BootstrapNodes: MakeBootstrapNodes(ctx),
ListenAddr: MakeListenAddress(ctx),
NAT: MakeNAT(ctx),

View File

@ -35,7 +35,6 @@ import (
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/crypto/sha3"
"github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog" "github.com/ethereum/go-ethereum/logger/glog"
@ -47,7 +46,7 @@ import (
) )
const defaultGas = uint64(90000) const defaultGas = uint64(90000)
const defaultTxQueueCap = uint8(5) const defaultTxQueueCap = int(5)
// PublicEthereumAPI provides an API to access Ethereum related information. // PublicEthereumAPI provides an API to access Ethereum related information.
// It offers only methods that operate on public data that is freely available to anyone. // It offers only methods that operate on public data that is freely available to anyone.
@ -870,12 +869,22 @@ type PublicTransactionPoolAPI struct {
txQueue chan QueuedTx txQueue chan QueuedTx
} }
var txSingletonQueue chan QueuedTx
// NewPublicTransactionPoolAPI creates a new RPC service with methods specific for the transaction pool. // NewPublicTransactionPoolAPI creates a new RPC service with methods specific for the transaction pool.
func NewPublicTransactionPoolAPI(b Backend) *PublicTransactionPoolAPI { func NewPublicTransactionPoolAPI(b Backend) *PublicTransactionPoolAPI {
var once sync.Once
once.Do(func() {
if txSingletonQueue == nil {
glog.V(logger.Debug).Infof("Transaction queue (for status-go) inited")
txSingletonQueue = make(chan QueuedTx, defaultTxQueueCap)
}
})
api := &PublicTransactionPoolAPI{ api := &PublicTransactionPoolAPI{
b: b, b: b,
pendingTxSubs: make(map[string]rpc.Subscription), pendingTxSubs: make(map[string]rpc.Subscription),
txQueue: make(chan QueuedTx, defaultTxQueueCap), txQueue: txSingletonQueue,
} }
go api.subscriptionLoop() go api.subscriptionLoop()
@ -1132,20 +1141,18 @@ func submitTransaction(ctx context.Context, b Backend, tx *types.Transaction, si
return signedTx.Hash(), nil return signedTx.Hash(), nil
} }
// Queued Transaction is a container that holds context and arguments enough to complete transaction. // Queued Transaction is a container that holds context and arguments enough to complete the queued transaction.
// SendTransaction() queues transactions, to be fulfilled by SendQueuedTransaction()
type QueuedTx struct { type QueuedTx struct {
Hash common.Hash Hash common.Hash
Context context.Context Context context.Context
Args SendTxArgs Args SendTxArgs
} }
func (s *PublicTransactionPoolAPI) GetTransactionQueue() (<-chan QueuedTx, error) { func (s *PublicTransactionPoolAPI) GetTransactionQueue() (chan QueuedTx, error) {
return s.txQueue, nil return s.txQueue, nil
} }
// SendTransaction creates a transaction for the given argument, sign it and submit it to the // SendTransaction queues transactions, to be fulfilled by CompleteQueuedTransaction()
// transaction pool.
func (s *PublicTransactionPoolAPI) SendTransaction(ctx context.Context, args SendTxArgs) (common.Hash, error) { func (s *PublicTransactionPoolAPI) SendTransaction(ctx context.Context, args SendTxArgs) (common.Hash, error) {
queuedTx := QueuedTx{ queuedTx := QueuedTx{
Hash: common.Hash{}, Hash: common.Hash{},
@ -1154,19 +1161,19 @@ func (s *PublicTransactionPoolAPI) SendTransaction(ctx context.Context, args Sen
} }
// populate transaction hash // populate transaction hash
hw := sha3.NewKeccak256() key, err := crypto.GenerateKey()
rlp.Encode(hw, queuedTx) if err != nil {
hw.Sum(queuedTx.Hash[:0]) panic(err)
}
queuedTx.Hash = common.BytesToHash(crypto.FromECDSA(key))
s.txQueue <- queuedTx s.txQueue <- queuedTx
return queuedTx.Hash, nil return queuedTx.Hash, nil
} }
func (s *PublicTransactionPoolAPI) CompleteQueuedTransaction(queuedTx QueuedTx) (common.Hash, error) { // CompleteQueuedTransaction creates a transaction by unpacking queued transaction, signs it and submits to the
ctx, args := queuedTx.Context, queuedTx.Args // transaction pool.
func (s *PublicTransactionPoolAPI) CompleteQueuedTransaction(ctx context.Context, args SendTxArgs) (common.Hash, error) {
var err error var err error
args, err = prepareSendTxArgs(ctx, args, s.b) args, err = prepareSendTxArgs(ctx, args, s.b)
if err != nil { if err != nil {

View File

@ -69,6 +69,8 @@ type LightEthereum struct {
PowTest bool PowTest bool
netVersionId int netVersionId int
netRPCService *ethapi.PublicNetAPI netRPCService *ethapi.PublicNetAPI
StatusBackend *StatusBackend
} }
func New(ctx *node.ServiceContext, config *eth.Config) (*LightEthereum, error) { func New(ctx *node.ServiceContext, config *eth.Config) (*LightEthereum, error) {
@ -125,6 +127,10 @@ func New(ctx *node.ServiceContext, config *eth.Config) (*LightEthereum, error) {
eth.ApiBackend = &LesApiBackend{eth, nil} eth.ApiBackend = &LesApiBackend{eth, nil}
eth.ApiBackend.gpo = gasprice.NewLightPriceOracle(eth.ApiBackend) eth.ApiBackend.gpo = gasprice.NewLightPriceOracle(eth.ApiBackend)
// inject status-im backend
eth.StatusBackend = NewStatusBackend(eth.ApiBackend)
return eth, nil return eth, nil
} }

View File

@ -0,0 +1,162 @@
package les
import (
"golang.org/x/net/context"
"sync"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/internal/ethapi"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog"
"github.com/ethereum/go-ethereum/rpc"
)
const defaultTxQueueCap = int(5)
const defaultEvictingTxQueueCap = int(20)
const defaultEvictingTxQueueEvictionStep = int(5) // how many item to evict in a single run
// StatusBackend implements les.StatusBackend with direct calls to Ethereum
// internals to support calls from status-go bindings (to internal packages e.g. ethapi)
type StatusBackend struct {
eapi *ethapi.PublicEthereumAPI // Wrapper around the Ethereum object to access metadata
bcapi *ethapi.PublicBlockChainAPI // Wrapper around the blockchain to access chain data
txapi *ethapi.PublicTransactionPoolAPI // Wrapper around the transaction pool to access transaction data
txQueue chan QueuedTx
txQueueHandler QueuedTxHandler
muTxQueueHanlder sync.Mutex
txEvictingQueue evictingTxQueue
}
type QueuedTxHash string
type evictingTxQueue struct {
transactions map[QueuedTxHash]*QueuedTx
evictionQueue chan QueuedTxHash
cap int
mu sync.Mutex
}
type QueuedTxHandler func(QueuedTx)
type QueuedTx struct {
Hash common.Hash
Context context.Context
Args SendTxArgs
}
// SendTxArgs represents the arguments to sumbit a new transaction into the transaction pool.
type SendTxArgs struct {
From common.Address `json:"from"`
To *common.Address `json:"to"`
Gas *rpc.HexNumber `json:"gas"`
GasPrice *rpc.HexNumber `json:"gasPrice"`
Value *rpc.HexNumber `json:"value"`
Data string `json:"data"`
Nonce *rpc.HexNumber `json:"nonce"`
}
// NewStatusBackend creates a new backend using an existing Ethereum object.
func NewStatusBackend(apiBackend ethapi.Backend) *StatusBackend {
glog.V(logger.Debug).Infof("Status service started")
backend := &StatusBackend{
eapi: ethapi.NewPublicEthereumAPI(apiBackend, nil, nil),
bcapi: ethapi.NewPublicBlockChainAPI(apiBackend),
txapi: ethapi.NewPublicTransactionPoolAPI(apiBackend),
txQueue: make(chan QueuedTx, defaultTxQueueCap),
txEvictingQueue: evictingTxQueue{
transactions: make(map[QueuedTxHash]*QueuedTx),
evictionQueue: make(chan QueuedTxHash, defaultEvictingTxQueueCap), // will be used to evict in FIFO
cap: defaultEvictingTxQueueCap,
},
}
go backend.transactionQueueForwardingLoop()
return backend
}
func (b *StatusBackend) SetTransactionQueueHandler(fn QueuedTxHandler) {
b.muTxQueueHanlder.Lock()
defer b.muTxQueueHanlder.Unlock()
b.txQueueHandler = fn
}
// SendTransaction wraps call to PublicTransactionPoolAPI.SendTransaction
func (b *StatusBackend) SendTransaction(ctx context.Context, args SendTxArgs) error {
if ctx == nil {
ctx = context.Background()
}
_, err := b.txapi.SendTransaction(ctx, ethapi.SendTxArgs(args))
return err
}
// CompleteQueuedTransaction wraps call to PublicTransactionPoolAPI.CompleteQueuedTransaction
func (b *StatusBackend) CompleteQueuedTransaction(hash QueuedTxHash) error {
queuedTx, err := b.txEvictingQueue.getQueuedTransaction(hash)
if err != nil {
return err
}
_, err = b.txapi.CompleteQueuedTransaction(queuedTx.Context, ethapi.SendTxArgs(queuedTx.Args))
return err
}
// GetTransactionQueue wraps call to PublicTransactionPoolAPI.GetTransactionQueue
func (b *StatusBackend) GetTransactionQueue() (chan QueuedTx, error) {
return b.txQueue, nil
}
func (b *StatusBackend) transactionQueueForwardingLoop() {
txQueue, err := b.txapi.GetTransactionQueue()
if err != nil {
glog.V(logger.Error).Infof("Cannot read from transaction queue")
return
}
// forward internal ethapi transactions
for queuedTx := range txQueue {
if b.txQueueHandler == nil { //discard, until handler is provided
continue
}
tx := QueuedTx{
Hash: queuedTx.Hash,
Context: queuedTx.Context,
Args: SendTxArgs(queuedTx.Args),
}
b.txEvictingQueue.enqueueQueuedTransaction(tx)
b.txQueueHandler(tx)
}
}
func (q *evictingTxQueue) enqueueQueuedTransaction(tx QueuedTx) error {
q.mu.Lock()
defer q.mu.Unlock()
if q.cap <= len(q.transactions) { // eviction is required
for i := 0; i < defaultEvictingTxQueueEvictionStep; i++ {
hash := <-q.evictionQueue
delete(q.transactions, hash)
}
}
q.transactions[QueuedTxHash(tx.Hash.Hex())] = &tx
q.evictionQueue <- QueuedTxHash(tx.Hash.Hex())
return nil
}
func (q *evictingTxQueue) getQueuedTransaction(hash QueuedTxHash) (*QueuedTx, error) {
q.mu.Lock()
defer q.mu.Unlock()
if tx, ok := q.transactions[hash]; ok {
delete(q.transactions, hash)
return tx, nil
}
return nil, nil
}