Merge pull request #55 from farazdagi/feature/tx-complete-updates
Transactions updates
This commit is contained in:
commit
28669c49c1
|
@ -3,5 +3,5 @@
|
|||
"enode://3c0ab251e18c979a3358644f49bcadf264186847aeee4fe80de60da0d196ce879c913bc1c9b5e750847c0d7e63a707a1cf5153c4f242da0d058e42fdf985ef62@198.199.105.122:30303",
|
||||
"enode://fc3065bb80bfced98a01441718e2b70a0353f023b9da3d57beb8f96a827402d23702b3a461e1c1b6c7a208cb09cc0aea9b7c42bf953bb8f732529c198b158db4@95.85.40.211:30303",
|
||||
"enode://5ffa3a39f95614d881e07d24e265865218c45fe73b3a5f5d05868190e385cbf60d03ac8beaa4c31b7ee84a0ec947f22c969e2dd1783041a4d7381f7774c74526@188.166.229.119:30303",
|
||||
"enode://3b020a1fd6ab980a5670975e8a7361af1732fa3fa1819b751a94b6a4265e8c52b02c608c0de1347784b834b298280b018bcf6547f47bbba63612cba0e4707ec1@139.59.212.114:30303"
|
||||
"enode://3c0ab251e18c979a3358644f49bcadf264186847aeee4fe80de60da0d196ce879c913bc1c9b5e750847c0d7e63a707a1cf5153c4f242da0d058e42fdf985ef62@139.59.212.114:30303"
|
||||
]
|
||||
|
|
|
@ -165,6 +165,7 @@ func (m *NodeManager) RunNode() {
|
|||
// setup handlers
|
||||
m.lightEthereum.StatusBackend.SetTransactionQueueHandler(onSendTransactionRequest)
|
||||
m.lightEthereum.StatusBackend.SetAccountsFilterHandler(onAccountsListRequest)
|
||||
m.lightEthereum.StatusBackend.SetTransactionReturnHandler(onSendTransactionReturn)
|
||||
|
||||
var err error
|
||||
m.client, err = m.currentNode.Attach()
|
||||
|
|
|
@ -13,6 +13,7 @@ import (
|
|||
"math/big"
|
||||
"strconv"
|
||||
|
||||
"github.com/ethereum/go-ethereum/accounts"
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/les/status"
|
||||
"github.com/ethereum/go-ethereum/rpc"
|
||||
|
@ -21,8 +22,15 @@ import (
|
|||
|
||||
const (
|
||||
EventTransactionQueued = "transaction.queued"
|
||||
EventTransactionFailed = "transaction.failed"
|
||||
SendTransactionRequest = "eth_sendTransaction"
|
||||
MessageIdKey = "message_id"
|
||||
|
||||
// tx error codes
|
||||
SendTransactionNoErrorCode = "0"
|
||||
SendTransactionDefaultErrorCode = "1"
|
||||
SendTransactionPasswordErrorCode = "2"
|
||||
SendTransactionTimeoutErrorCode = "3"
|
||||
)
|
||||
|
||||
func onSendTransactionRequest(queuedTx status.QueuedTx) {
|
||||
|
@ -39,6 +47,43 @@ func onSendTransactionRequest(queuedTx status.QueuedTx) {
|
|||
C.StatusServiceSignalEvent(C.CString(string(body)))
|
||||
}
|
||||
|
||||
func onSendTransactionReturn(queuedTx status.QueuedTx, err error) {
|
||||
if err == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// error occurred, signal up to application
|
||||
event := GethEvent{
|
||||
Type: EventTransactionFailed,
|
||||
Event: ReturnSendTransactionEvent{
|
||||
Id: string(queuedTx.Id),
|
||||
Args: queuedTx.Args,
|
||||
MessageId: messageIdFromContext(queuedTx.Context),
|
||||
ErrorMessage: err.Error(),
|
||||
ErrorCode: sendTransactionErrorCode(err),
|
||||
},
|
||||
}
|
||||
|
||||
body, _ := json.Marshal(&event)
|
||||
C.StatusServiceSignalEvent(C.CString(string(body)))
|
||||
}
|
||||
|
||||
func sendTransactionErrorCode(err error) string {
|
||||
if err == nil {
|
||||
return SendTransactionNoErrorCode
|
||||
}
|
||||
|
||||
if err == accounts.ErrDecrypt {
|
||||
return SendTransactionPasswordErrorCode
|
||||
}
|
||||
|
||||
if err == status.ErrQueuedTxTimedOut {
|
||||
return SendTransactionTimeoutErrorCode
|
||||
}
|
||||
|
||||
return SendTransactionDefaultErrorCode
|
||||
}
|
||||
|
||||
func CompleteTransaction(id, password string) (common.Hash, error) {
|
||||
lightEthereum, err := GetNodeManager().LightEthereumService()
|
||||
if err != nil {
|
||||
|
|
|
@ -10,8 +10,6 @@ import (
|
|||
"github.com/ethereum/go-ethereum/cmd/utils"
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/les/status"
|
||||
"github.com/ethereum/go-ethereum/logger"
|
||||
"github.com/ethereum/go-ethereum/logger/glog"
|
||||
"github.com/ethereum/go-ethereum/rpc"
|
||||
"github.com/status-im/status-go/geth"
|
||||
)
|
||||
|
@ -36,7 +34,7 @@ func TestQueuedTransactions(t *testing.T) {
|
|||
return
|
||||
}
|
||||
|
||||
// test transaction queueing
|
||||
// obtain reference to status backend
|
||||
lightEthereum, err := geth.GetNodeManager().LightEthereumService()
|
||||
if err != nil {
|
||||
t.Errorf("Test failed: LES service is not running: %v", err)
|
||||
|
@ -58,7 +56,7 @@ func TestQueuedTransactions(t *testing.T) {
|
|||
}
|
||||
if envelope.Type == geth.EventTransactionQueued {
|
||||
event := envelope.Event.(map[string]interface{})
|
||||
glog.V(logger.Info).Infof("Transaction queued (will be completed in 5 secs): {id: %s}\n", event["id"].(string))
|
||||
t.Logf("transaction queued (will be completed in 5 secs): {id: %s}\n", event["id"].(string))
|
||||
time.Sleep(5 * time.Second)
|
||||
|
||||
if txHash, err = geth.CompleteTransaction(event["id"].(string), testAddressPassword); err != nil {
|
||||
|
@ -66,18 +64,12 @@ func TestQueuedTransactions(t *testing.T) {
|
|||
return
|
||||
}
|
||||
|
||||
glog.V(logger.Info).Infof("Transaction complete: https://testnet.etherscan.io/tx/%s", txHash.Hex())
|
||||
t.Logf("transaction complete: https://testnet.etherscan.io/tx/%s", txHash.Hex())
|
||||
completeQueuedTransaction <- struct{}{} // so that timeout is aborted
|
||||
}
|
||||
})
|
||||
|
||||
// try completing non-existing transaction
|
||||
if _, err := geth.CompleteTransaction("some-bad-transaction-id", testAddressPassword); err == nil {
|
||||
t.Error("error expected and not recieved")
|
||||
return
|
||||
}
|
||||
|
||||
// send normal transaction
|
||||
// send from the same test account (which is guaranteed to have ether)
|
||||
from, err := utils.MakeAddress(accountManager, testAddress)
|
||||
if err != nil {
|
||||
t.Errorf("could not retrieve account from address: %v", err)
|
||||
|
@ -112,11 +104,254 @@ func TestQueuedTransactions(t *testing.T) {
|
|||
return
|
||||
}
|
||||
|
||||
// now test eviction queue
|
||||
if backend.TransactionQueue().Count() != 0 {
|
||||
t.Error("tx queue must be empty at this point")
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func TestDoubleCompleteQueuedTransactions(t *testing.T) {
|
||||
err := geth.PrepareTestNode()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
|
||||
accountManager, err := geth.GetNodeManager().AccountManager()
|
||||
if err != nil {
|
||||
t.Errorf(err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
// create an account
|
||||
address, _, _, err := geth.CreateAccount(newAccountPassword)
|
||||
if err != nil {
|
||||
t.Errorf("could not create account: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
// obtain reference to status backend
|
||||
lightEthereum, err := geth.GetNodeManager().LightEthereumService()
|
||||
if err != nil {
|
||||
t.Errorf("Test failed: LES service is not running: %v", err)
|
||||
return
|
||||
}
|
||||
backend := lightEthereum.StatusBackend
|
||||
|
||||
// make sure you panic if transaction complete doesn't return
|
||||
completeQueuedTransaction := make(chan struct{}, 1)
|
||||
geth.PanicAfter(20*time.Second, completeQueuedTransaction, "TestQueuedTransactions")
|
||||
|
||||
// replace transaction notification handler
|
||||
var txId string
|
||||
txFailedEventCalled := false
|
||||
txHash := common.Hash{}
|
||||
geth.SetDefaultNodeNotificationHandler(func(jsonEvent string) {
|
||||
var envelope geth.GethEvent
|
||||
if err := json.Unmarshal([]byte(jsonEvent), &envelope); err != nil {
|
||||
t.Errorf("cannot unmarshal event's JSON: %s", jsonEvent)
|
||||
return
|
||||
}
|
||||
if envelope.Type == geth.EventTransactionQueued {
|
||||
event := envelope.Event.(map[string]interface{})
|
||||
txId = event["id"].(string)
|
||||
t.Logf("transaction queued (will be failed and completed on the second call): {id: %s}\n", txId)
|
||||
|
||||
// try with wrong password
|
||||
// make sure that tx is NOT removed from the queue (by re-trying with the correct password)
|
||||
if _, err = geth.CompleteTransaction(txId, testAddressPassword+"wrong"); err == nil {
|
||||
t.Error("expects wrong password error, but call succeeded")
|
||||
return
|
||||
}
|
||||
|
||||
time.Sleep(1 * time.Second) // make sure that tx complete signal propagates
|
||||
if txCount := backend.TransactionQueue().Count(); txCount != 1 {
|
||||
t.Errorf("txqueue cannot be empty, as tx has failed: expected = 1, got = %d", txCount)
|
||||
return
|
||||
}
|
||||
|
||||
// now try to complete transaction, but with the correct password
|
||||
t.Log("allow 5 seconds before sedning the second CompleteTransaction")
|
||||
time.Sleep(5 * time.Second)
|
||||
if txHash, err = geth.CompleteTransaction(event["id"].(string), testAddressPassword); err != nil {
|
||||
t.Errorf("cannot complete queued transation[%v]: %v", event["id"], err)
|
||||
return
|
||||
}
|
||||
|
||||
time.Sleep(1 * time.Second) // make sure that tx complete signal propagates
|
||||
if txCount := backend.TransactionQueue().Count(); txCount != 0 {
|
||||
t.Errorf("txqueue must be empty, as tx has completed: expected = 0, got = %d", txCount)
|
||||
return
|
||||
}
|
||||
|
||||
t.Logf("transaction complete: https://testnet.etherscan.io/tx/%s", txHash.Hex())
|
||||
completeQueuedTransaction <- struct{}{} // so that timeout is aborted
|
||||
}
|
||||
|
||||
if envelope.Type == geth.EventTransactionFailed {
|
||||
event := envelope.Event.(map[string]interface{})
|
||||
t.Logf("transaction return event received: {id: %s}\n", event["id"].(string))
|
||||
|
||||
receivedErrMessage := event["error_message"].(string)
|
||||
expectedErrMessage := "could not decrypt key with given passphrase"
|
||||
if receivedErrMessage != expectedErrMessage {
|
||||
t.Errorf("unexpected error message received: got %v", receivedErrMessage)
|
||||
return
|
||||
}
|
||||
|
||||
receivedErrCode := event["error_code"].(string)
|
||||
if receivedErrCode != geth.SendTransactionPasswordErrorCode {
|
||||
t.Errorf("unexpected error code received: got %v", receivedErrCode)
|
||||
return
|
||||
}
|
||||
|
||||
txFailedEventCalled = true
|
||||
}
|
||||
})
|
||||
|
||||
// send from the same test account (which is guaranteed to have ether)
|
||||
from, err := utils.MakeAddress(accountManager, testAddress)
|
||||
if err != nil {
|
||||
t.Errorf("could not retrieve account from address: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
to, err := utils.MakeAddress(accountManager, address)
|
||||
if err != nil {
|
||||
t.Errorf("could not retrieve account from address: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
// this call blocks, and should return on *second* attempt to CompleteTransaction (w/ the correct password)
|
||||
txHashCheck, err := backend.SendTransaction(nil, status.SendTxArgs{
|
||||
From: from.Address,
|
||||
To: &to.Address,
|
||||
Value: rpc.NewHexNumber(big.NewInt(1000000000000)),
|
||||
})
|
||||
if err != nil {
|
||||
t.Errorf("cannot send transaction: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(txHash, txHashCheck) {
|
||||
t.Errorf("tx hash returned from SendTransaction is invalid: expected %s, got %s", txHashCheck, txHash)
|
||||
return
|
||||
}
|
||||
|
||||
if reflect.DeepEqual(txHashCheck, common.Hash{}) {
|
||||
t.Error("transaction was never queued or completed")
|
||||
return
|
||||
}
|
||||
|
||||
if backend.TransactionQueue().Count() != 0 {
|
||||
t.Error("tx queue must be empty at this point")
|
||||
return
|
||||
}
|
||||
|
||||
if !txFailedEventCalled {
|
||||
t.Error("expected tx failure signal is not received")
|
||||
return
|
||||
}
|
||||
|
||||
t.Log("sleep extra time, to allow sync")
|
||||
time.Sleep(5 * time.Second)
|
||||
}
|
||||
|
||||
func TestNonExistentQueuedTransactions(t *testing.T) {
|
||||
err := geth.PrepareTestNode()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
|
||||
// make sure you panic if transaction complete doesn't return
|
||||
completeQueuedTransaction := make(chan struct{}, 1)
|
||||
geth.PanicAfter(20*time.Second, completeQueuedTransaction, "TestQueuedTransactions")
|
||||
|
||||
// replace transaction notification handler
|
||||
var txHash = common.Hash{}
|
||||
geth.SetDefaultNodeNotificationHandler(func(jsonEvent string) {
|
||||
var envelope geth.GethEvent
|
||||
if err := json.Unmarshal([]byte(jsonEvent), &envelope); err != nil {
|
||||
t.Errorf("cannot unmarshal event's JSON: %s", jsonEvent)
|
||||
return
|
||||
}
|
||||
if envelope.Type == geth.EventTransactionQueued {
|
||||
event := envelope.Event.(map[string]interface{})
|
||||
t.Logf("Transaction queued (will be completed in 5 secs): {id: %s}\n", event["id"].(string))
|
||||
time.Sleep(5 * time.Second)
|
||||
|
||||
// next call is the very same one, but with the correct password
|
||||
if txHash, err = geth.CompleteTransaction(event["id"].(string), testAddressPassword); err != nil {
|
||||
t.Errorf("cannot complete queued transation[%v]: %v", event["id"], err)
|
||||
return
|
||||
}
|
||||
|
||||
t.Logf("Transaction complete: https://testnet.etherscan.io/tx/%s", txHash.Hex())
|
||||
completeQueuedTransaction <- struct{}{} // so that timeout is aborted
|
||||
}
|
||||
})
|
||||
|
||||
// try completing non-existing transaction
|
||||
if _, err = geth.CompleteTransaction("some-bad-transaction-id", testAddressPassword); err == nil {
|
||||
t.Error("error expected and not recieved")
|
||||
return
|
||||
}
|
||||
if err != status.ErrQueuedTxIdNotFound {
|
||||
t.Errorf("unexpected error recieved: expected '%s', got: '%s'", status.ErrQueuedTxIdNotFound.Error(), err.Error())
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func TestEvictionOfQueuedTransactions(t *testing.T) {
|
||||
err := geth.PrepareTestNode()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
|
||||
// obtain reference to status backend
|
||||
lightEthereum, err := geth.GetNodeManager().LightEthereumService()
|
||||
if err != nil {
|
||||
t.Errorf("Test failed: LES service is not running: %v", err)
|
||||
return
|
||||
}
|
||||
backend := lightEthereum.StatusBackend
|
||||
|
||||
// make sure you panic if transaction complete doesn't return
|
||||
completeQueuedTransaction := make(chan struct{}, 1)
|
||||
geth.PanicAfter(20*time.Second, completeQueuedTransaction, "TestQueuedTransactions")
|
||||
|
||||
// replace transaction notification handler
|
||||
var txHash = common.Hash{}
|
||||
geth.SetDefaultNodeNotificationHandler(func(jsonEvent string) {
|
||||
var envelope geth.GethEvent
|
||||
if err := json.Unmarshal([]byte(jsonEvent), &envelope); err != nil {
|
||||
t.Errorf("cannot unmarshal event's JSON: %s", jsonEvent)
|
||||
return
|
||||
}
|
||||
if envelope.Type == geth.EventTransactionQueued {
|
||||
event := envelope.Event.(map[string]interface{})
|
||||
t.Logf("Transaction queued (will be completed in 5 secs): {id: %s}\n", event["id"].(string))
|
||||
time.Sleep(5 * time.Second)
|
||||
|
||||
// next call is the very same one, but with the correct password
|
||||
if txHash, err = geth.CompleteTransaction(event["id"].(string), testAddressPassword); err != nil {
|
||||
t.Errorf("cannot complete queued transation[%v]: %v", event["id"], err)
|
||||
return
|
||||
}
|
||||
|
||||
t.Logf("Transaction complete: https://testnet.etherscan.io/tx/%s", txHash.Hex())
|
||||
completeQueuedTransaction <- struct{}{} // so that timeout is aborted
|
||||
}
|
||||
})
|
||||
|
||||
txQueue := backend.TransactionQueue()
|
||||
var i = 0
|
||||
txIds := [status.DefaultTxQueueCap + 5 + 10]status.QueuedTxId{}
|
||||
backend.SetTransactionQueueHandler(func(queuedTx status.QueuedTx) {
|
||||
//glog.V(logger.Info).Infof("%d. Transaction queued (queue size: %d): {id: %v}\n", i, txQueue.Count(), queuedTx.Id)
|
||||
t.Logf("%d. Transaction queued (queue size: %d): {id: %v}\n", i, txQueue.Count(), queuedTx.Id)
|
||||
txIds[i] = queuedTx.Id
|
||||
i++
|
||||
})
|
||||
|
||||
|
@ -142,8 +377,17 @@ func TestQueuedTransactions(t *testing.T) {
|
|||
}
|
||||
time.Sleep(5 * time.Second)
|
||||
|
||||
if txQueue.Count() != status.DefaultTxQueueCap && txQueue.Count() != (status.DefaultTxQueueCap-1) {
|
||||
if txQueue.Count() > status.DefaultTxQueueCap {
|
||||
t.Errorf("transaction count should be %d (or %d): got %d", status.DefaultTxQueueCap, status.DefaultTxQueueCap-1, txQueue.Count())
|
||||
return
|
||||
}
|
||||
|
||||
for _, txId := range txIds {
|
||||
txQueue.Remove(txId)
|
||||
}
|
||||
|
||||
if txQueue.Count() != 0 {
|
||||
t.Errorf("transaction count should be zero: %d", txQueue.Count())
|
||||
return
|
||||
}
|
||||
}
|
||||
|
|
|
@ -40,6 +40,14 @@ type SendTransactionEvent struct {
|
|||
MessageId string `json:"message_id"`
|
||||
}
|
||||
|
||||
type ReturnSendTransactionEvent struct {
|
||||
Id string `json:"id"`
|
||||
Args status.SendTxArgs `json:"args"`
|
||||
MessageId string `json:"message_id"`
|
||||
ErrorMessage string `json:"error_message"`
|
||||
ErrorCode string `json:"error_code"`
|
||||
}
|
||||
|
||||
type CompleteTransactionResult struct {
|
||||
Hash string `json:"hash"`
|
||||
Error string `json:"error"`
|
||||
|
|
|
@ -20,7 +20,6 @@ import (
|
|||
"bytes"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math/big"
|
||||
"strings"
|
||||
|
@ -1035,11 +1034,14 @@ func (s *PublicTransactionPoolAPI) SendTransaction(ctx context.Context, args Sen
|
|||
s.txQueue <- queuedTx
|
||||
|
||||
// now wait up until transaction is complete (via call to CompleteQueuedTransaction) or timeout occurs
|
||||
backend := GetStatusBackend()
|
||||
select {
|
||||
case <-queuedTx.Done:
|
||||
backend.NotifyOnQueuedTxReturn(queuedTx.Id, queuedTx.Err)
|
||||
return queuedTx.Hash, queuedTx.Err
|
||||
case <-time.After(status.DefaultTxSendCompletionTimeout * time.Second):
|
||||
return common.Hash{}, errors.New("transaction sending timed out")
|
||||
backend.NotifyOnQueuedTxReturn(queuedTx.Id, status.ErrQueuedTxTimedOut)
|
||||
return common.Hash{}, status.ErrQueuedTxTimedOut
|
||||
}
|
||||
|
||||
return queuedTx.Hash, nil
|
||||
|
|
|
@ -3,6 +3,7 @@ package ethapi
|
|||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/ethereum/go-ethereum/accounts"
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/les/status"
|
||||
"github.com/ethereum/go-ethereum/logger"
|
||||
|
@ -46,6 +47,18 @@ func GetStatusBackend() *StatusBackend {
|
|||
return statusBackend
|
||||
}
|
||||
|
||||
func (b *StatusBackend) NotifyOnQueuedTxReturn(id status.QueuedTxId, err error) {
|
||||
if b == nil {
|
||||
return
|
||||
}
|
||||
|
||||
b.txQueue.NotifyOnQueuedTxReturn(id, err)
|
||||
}
|
||||
|
||||
func (b *StatusBackend) SetTransactionReturnHandler(fn status.EnqueuedTxReturnHandler) {
|
||||
b.txQueue.SetTxReturnHandler(fn)
|
||||
}
|
||||
|
||||
func (b *StatusBackend) SetTransactionQueueHandler(fn status.EnqueuedTxHandler) {
|
||||
b.txQueue.SetEnqueueHandler(fn)
|
||||
}
|
||||
|
@ -79,6 +92,14 @@ func (b *StatusBackend) CompleteQueuedTransaction(id status.QueuedTxId, passphra
|
|||
}
|
||||
|
||||
hash, err := b.txapi.CompleteQueuedTransaction(context.Background(), SendTxArgs(queuedTx.Args), passphrase)
|
||||
|
||||
// on password error, notify the app, and keep tx in queue (so that CompleteQueuedTransaction() can be resent)
|
||||
if err == accounts.ErrDecrypt {
|
||||
b.NotifyOnQueuedTxReturn(id, err)
|
||||
return hash, err // SendTransaction is still blocked
|
||||
}
|
||||
|
||||
// allow SendTransaction to return
|
||||
queuedTx.Hash = hash
|
||||
queuedTx.Err = err
|
||||
queuedTx.Done <- struct{}{} // sendTransaction() waits on this, notify so that it can return
|
||||
|
|
|
@ -2,6 +2,9 @@ package status
|
|||
|
||||
import (
|
||||
"errors"
|
||||
"sync"
|
||||
|
||||
"github.com/ethereum/go-ethereum/accounts"
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/rpc"
|
||||
"golang.org/x/net/context"
|
||||
|
@ -15,16 +18,21 @@ const (
|
|||
|
||||
var (
|
||||
ErrQueuedTxIdNotFound = errors.New("transaction hash not found")
|
||||
ErrQueuedTxTimedOut = errors.New("transaction sending timed out")
|
||||
)
|
||||
|
||||
// TxQueue is capped container that holds pending transactions
|
||||
type TxQueue struct {
|
||||
transactions map[QueuedTxId]*QueuedTx
|
||||
mu sync.RWMutex // to guard trasactions map
|
||||
evictableIds chan QueuedTxId
|
||||
enqueueTicker chan struct{}
|
||||
|
||||
// when items are enqueued notify handlers
|
||||
// when items are enqueued notify subscriber
|
||||
txEnqueueHandler EnqueuedTxHandler
|
||||
|
||||
// when tx is returned (either successfully or with error) notify subscriber
|
||||
txReturnHandler EnqueuedTxReturnHandler
|
||||
}
|
||||
|
||||
// QueuedTx holds enough information to complete the queued transaction.
|
||||
|
@ -39,9 +47,12 @@ type QueuedTx struct {
|
|||
|
||||
type QueuedTxId string
|
||||
|
||||
// QueuedTxHandler is a function that receives queued/pending transactions, when they get queued
|
||||
// EnqueuedTxHandler is a function that receives queued/pending transactions, when they get queued
|
||||
type EnqueuedTxHandler func(QueuedTx)
|
||||
|
||||
// EnqueuedTxReturnHandler is a function that receives response when tx is complete (both on success and error)
|
||||
type EnqueuedTxReturnHandler func(queuedTx QueuedTx, err error)
|
||||
|
||||
// SendTxArgs represents the arguments to submbit a new transaction into the transaction pool.
|
||||
type SendTxArgs struct {
|
||||
From common.Address `json:"from"`
|
||||
|
@ -68,7 +79,8 @@ func NewTransactionQueue() *TxQueue {
|
|||
func (q *TxQueue) evictionLoop() {
|
||||
for range q.enqueueTicker {
|
||||
if len(q.transactions) >= (DefaultTxQueueCap - 1) { // eviction is required to accommodate another/last item
|
||||
delete(q.transactions, <-q.evictableIds)
|
||||
q.Remove(<-q.evictableIds)
|
||||
q.enqueueTicker <- struct{}{} // in case we pulled already removed item
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -81,7 +93,9 @@ func (q *TxQueue) Enqueue(tx *QueuedTx) error {
|
|||
q.enqueueTicker <- struct{}{} // notify eviction loop that we are trying to insert new item
|
||||
q.evictableIds <- tx.Id // this will block when we hit DefaultTxQueueCap
|
||||
|
||||
q.mu.Lock()
|
||||
q.transactions[tx.Id] = tx
|
||||
q.mu.Unlock()
|
||||
|
||||
// notify handler
|
||||
q.txEnqueueHandler(*tx)
|
||||
|
@ -90,19 +104,34 @@ func (q *TxQueue) Enqueue(tx *QueuedTx) error {
|
|||
}
|
||||
|
||||
func (q *TxQueue) Get(id QueuedTxId) (*QueuedTx, error) {
|
||||
q.mu.RLock()
|
||||
defer q.mu.RUnlock()
|
||||
|
||||
if tx, ok := q.transactions[id]; ok {
|
||||
delete(q.transactions, id)
|
||||
return tx, nil
|
||||
}
|
||||
|
||||
return nil, ErrQueuedTxIdNotFound
|
||||
}
|
||||
|
||||
func (q *TxQueue) Remove(id QueuedTxId) {
|
||||
q.mu.Lock()
|
||||
defer q.mu.Unlock()
|
||||
|
||||
delete(q.transactions, id)
|
||||
}
|
||||
|
||||
func (q *TxQueue) Count() int {
|
||||
q.mu.RLock()
|
||||
defer q.mu.RUnlock()
|
||||
|
||||
return len(q.transactions)
|
||||
}
|
||||
|
||||
func (q *TxQueue) Has(id QueuedTxId) bool {
|
||||
q.mu.RLock()
|
||||
defer q.mu.RUnlock()
|
||||
|
||||
_, ok := q.transactions[id]
|
||||
|
||||
return ok
|
||||
|
@ -111,3 +140,38 @@ func (q *TxQueue) Has(id QueuedTxId) bool {
|
|||
func (q *TxQueue) SetEnqueueHandler(fn EnqueuedTxHandler) {
|
||||
q.txEnqueueHandler = fn
|
||||
}
|
||||
|
||||
func (q *TxQueue) SetTxReturnHandler(fn EnqueuedTxReturnHandler) {
|
||||
q.txReturnHandler = fn
|
||||
}
|
||||
|
||||
func (q *TxQueue) NotifyOnQueuedTxReturn(id QueuedTxId, err error) {
|
||||
if q == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// on success, remove item from the queue and stop propagating
|
||||
if err == nil {
|
||||
q.Remove(id)
|
||||
return
|
||||
}
|
||||
|
||||
// error occurred, send upward notification
|
||||
if q.txReturnHandler == nil { // discard, until handler is provided
|
||||
return
|
||||
}
|
||||
|
||||
// discard, if transaction is not found
|
||||
tx, _ := q.Get(id)
|
||||
if tx == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// remove from queue on any error (except for password related one) and propagate
|
||||
if err != accounts.ErrDecrypt {
|
||||
q.Remove(id)
|
||||
}
|
||||
|
||||
// notify handler
|
||||
q.txReturnHandler(*tx, err)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue