tx send: allow to re-send + error signaling. Fixes #49, #51

This commit is contained in:
Victor Farazdagi 2016-10-26 05:51:33 +03:00
parent 8e855a9ee3
commit 6e18cbb5d4
8 changed files with 407 additions and 22 deletions

View File

@ -3,5 +3,5 @@
"enode://3c0ab251e18c979a3358644f49bcadf264186847aeee4fe80de60da0d196ce879c913bc1c9b5e750847c0d7e63a707a1cf5153c4f242da0d058e42fdf985ef62@198.199.105.122:30303",
"enode://fc3065bb80bfced98a01441718e2b70a0353f023b9da3d57beb8f96a827402d23702b3a461e1c1b6c7a208cb09cc0aea9b7c42bf953bb8f732529c198b158db4@95.85.40.211:30303",
"enode://5ffa3a39f95614d881e07d24e265865218c45fe73b3a5f5d05868190e385cbf60d03ac8beaa4c31b7ee84a0ec947f22c969e2dd1783041a4d7381f7774c74526@188.166.229.119:30303",
"enode://3b020a1fd6ab980a5670975e8a7361af1732fa3fa1819b751a94b6a4265e8c52b02c608c0de1347784b834b298280b018bcf6547f47bbba63612cba0e4707ec1@139.59.212.114:30303"
"enode://3c0ab251e18c979a3358644f49bcadf264186847aeee4fe80de60da0d196ce879c913bc1c9b5e750847c0d7e63a707a1cf5153c4f242da0d058e42fdf985ef62@139.59.212.114:30303"
]

View File

@ -165,6 +165,7 @@ func (m *NodeManager) RunNode() {
// setup handlers
m.lightEthereum.StatusBackend.SetTransactionQueueHandler(onSendTransactionRequest)
m.lightEthereum.StatusBackend.SetAccountsFilterHandler(onAccountsListRequest)
m.lightEthereum.StatusBackend.SetTransactionReturnHandler(onSendTransactionReturn)
var err error
m.client, err = m.currentNode.Attach()

View File

@ -13,6 +13,7 @@ import (
"math/big"
"strconv"
"github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/les/status"
"github.com/ethereum/go-ethereum/rpc"
@ -21,8 +22,15 @@ import (
const (
EventTransactionQueued = "transaction.queued"
EventTransactionFailed = "transaction.failed"
SendTransactionRequest = "eth_sendTransaction"
MessageIdKey = "message_id"
// tx error codes
SendTransactionNoErrorCode = "0"
SendTransactionDefaultErrorCode = "1"
SendTransactionPasswordErrorCode = "2"
SendTransactionTimeoutErrorCode = "3"
)
func onSendTransactionRequest(queuedTx status.QueuedTx) {
@ -39,6 +47,43 @@ func onSendTransactionRequest(queuedTx status.QueuedTx) {
C.StatusServiceSignalEvent(C.CString(string(body)))
}
func onSendTransactionReturn(queuedTx status.QueuedTx, err error) {
if err == nil {
return
}
// error occurred, signal up to application
event := GethEvent{
Type: EventTransactionFailed,
Event: ReturnSendTransactionEvent{
Id: string(queuedTx.Id),
Args: queuedTx.Args,
MessageId: messageIdFromContext(queuedTx.Context),
ErrorMessage: err.Error(),
ErrorCode: sendTransactionErrorCode(err),
},
}
body, _ := json.Marshal(&event)
C.StatusServiceSignalEvent(C.CString(string(body)))
}
func sendTransactionErrorCode(err error) string {
if err == nil {
return SendTransactionNoErrorCode
}
if err == accounts.ErrDecrypt {
return SendTransactionPasswordErrorCode
}
if err == status.ErrQueuedTxTimedOut {
return SendTransactionTimeoutErrorCode
}
return SendTransactionDefaultErrorCode
}
func CompleteTransaction(id, password string) (common.Hash, error) {
lightEthereum, err := GetNodeManager().LightEthereumService()
if err != nil {

View File

@ -10,8 +10,6 @@ import (
"github.com/ethereum/go-ethereum/cmd/utils"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/les/status"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog"
"github.com/ethereum/go-ethereum/rpc"
"github.com/status-im/status-go/geth"
)
@ -36,7 +34,7 @@ func TestQueuedTransactions(t *testing.T) {
return
}
// test transaction queueing
// obtain reference to status backend
lightEthereum, err := geth.GetNodeManager().LightEthereumService()
if err != nil {
t.Errorf("Test failed: LES service is not running: %v", err)
@ -58,7 +56,7 @@ func TestQueuedTransactions(t *testing.T) {
}
if envelope.Type == geth.EventTransactionQueued {
event := envelope.Event.(map[string]interface{})
glog.V(logger.Info).Infof("Transaction queued (will be completed in 5 secs): {id: %s}\n", event["id"].(string))
t.Logf("transaction queued (will be completed in 5 secs): {id: %s}\n", event["id"].(string))
time.Sleep(5 * time.Second)
if txHash, err = geth.CompleteTransaction(event["id"].(string), testAddressPassword); err != nil {
@ -66,18 +64,12 @@ func TestQueuedTransactions(t *testing.T) {
return
}
glog.V(logger.Info).Infof("Transaction complete: https://testnet.etherscan.io/tx/%s", txHash.Hex())
t.Logf("transaction complete: https://testnet.etherscan.io/tx/%s", txHash.Hex())
completeQueuedTransaction <- struct{}{} // so that timeout is aborted
}
})
// try completing non-existing transaction
if _, err := geth.CompleteTransaction("some-bad-transaction-id", testAddressPassword); err == nil {
t.Error("error expected and not recieved")
return
}
// send normal transaction
// send from the same test account (which is guaranteed to have ether)
from, err := utils.MakeAddress(accountManager, testAddress)
if err != nil {
t.Errorf("could not retrieve account from address: %v", err)
@ -112,11 +104,254 @@ func TestQueuedTransactions(t *testing.T) {
return
}
// now test eviction queue
if backend.TransactionQueue().Count() != 0 {
t.Error("tx queue must be empty at this point")
return
}
}
func TestDoubleCompleteQueuedTransactions(t *testing.T) {
err := geth.PrepareTestNode()
if err != nil {
t.Error(err)
return
}
accountManager, err := geth.GetNodeManager().AccountManager()
if err != nil {
t.Errorf(err.Error())
return
}
// create an account
address, _, _, err := geth.CreateAccount(newAccountPassword)
if err != nil {
t.Errorf("could not create account: %v", err)
return
}
// obtain reference to status backend
lightEthereum, err := geth.GetNodeManager().LightEthereumService()
if err != nil {
t.Errorf("Test failed: LES service is not running: %v", err)
return
}
backend := lightEthereum.StatusBackend
// make sure you panic if transaction complete doesn't return
completeQueuedTransaction := make(chan struct{}, 1)
geth.PanicAfter(20*time.Second, completeQueuedTransaction, "TestQueuedTransactions")
// replace transaction notification handler
var txId string
txFailedEventCalled := false
txHash := common.Hash{}
geth.SetDefaultNodeNotificationHandler(func(jsonEvent string) {
var envelope geth.GethEvent
if err := json.Unmarshal([]byte(jsonEvent), &envelope); err != nil {
t.Errorf("cannot unmarshal event's JSON: %s", jsonEvent)
return
}
if envelope.Type == geth.EventTransactionQueued {
event := envelope.Event.(map[string]interface{})
txId = event["id"].(string)
t.Logf("transaction queued (will be failed and completed on the second call): {id: %s}\n", txId)
// try with wrong password
// make sure that tx is NOT removed from the queue (by re-trying with the correct password)
if _, err = geth.CompleteTransaction(txId, testAddressPassword+"wrong"); err == nil {
t.Error("expects wrong password error, but call succeeded")
return
}
time.Sleep(1 * time.Second) // make sure that tx complete signal propagates
if txCount := backend.TransactionQueue().Count(); txCount != 1 {
t.Errorf("txqueue cannot be empty, as tx has failed: expected = 1, got = %d", txCount)
return
}
// now try to complete transaction, but with the correct password
t.Log("allow 5 seconds before sedning the second CompleteTransaction")
time.Sleep(5 * time.Second)
if txHash, err = geth.CompleteTransaction(event["id"].(string), testAddressPassword); err != nil {
t.Errorf("cannot complete queued transation[%v]: %v", event["id"], err)
return
}
time.Sleep(1 * time.Second) // make sure that tx complete signal propagates
if txCount := backend.TransactionQueue().Count(); txCount != 0 {
t.Errorf("txqueue must be empty, as tx has completed: expected = 0, got = %d", txCount)
return
}
t.Logf("transaction complete: https://testnet.etherscan.io/tx/%s", txHash.Hex())
completeQueuedTransaction <- struct{}{} // so that timeout is aborted
}
if envelope.Type == geth.EventTransactionFailed {
event := envelope.Event.(map[string]interface{})
t.Logf("transaction return event received: {id: %s}\n", event["id"].(string))
receivedErrMessage := event["error_message"].(string)
expectedErrMessage := "could not decrypt key with given passphrase"
if receivedErrMessage != expectedErrMessage {
t.Errorf("unexpected error message received: got %v", receivedErrMessage)
return
}
receivedErrCode := event["error_code"].(string)
if receivedErrCode != geth.SendTransactionPasswordErrorCode {
t.Errorf("unexpected error code received: got %v", receivedErrCode)
return
}
txFailedEventCalled = true
}
})
// send from the same test account (which is guaranteed to have ether)
from, err := utils.MakeAddress(accountManager, testAddress)
if err != nil {
t.Errorf("could not retrieve account from address: %v", err)
return
}
to, err := utils.MakeAddress(accountManager, address)
if err != nil {
t.Errorf("could not retrieve account from address: %v", err)
return
}
// this call blocks, and should return on *second* attempt to CompleteTransaction (w/ the correct password)
txHashCheck, err := backend.SendTransaction(nil, status.SendTxArgs{
From: from.Address,
To: &to.Address,
Value: rpc.NewHexNumber(big.NewInt(1000000000000)),
})
if err != nil {
t.Errorf("cannot send transaction: %v", err)
return
}
if !reflect.DeepEqual(txHash, txHashCheck) {
t.Errorf("tx hash returned from SendTransaction is invalid: expected %s, got %s", txHashCheck, txHash)
return
}
if reflect.DeepEqual(txHashCheck, common.Hash{}) {
t.Error("transaction was never queued or completed")
return
}
if backend.TransactionQueue().Count() != 0 {
t.Error("tx queue must be empty at this point")
return
}
if !txFailedEventCalled {
t.Error("expected tx failure signal is not received")
return
}
t.Log("sleep extra time, to allow sync")
time.Sleep(5 * time.Second)
}
func TestNonExistentQueuedTransactions(t *testing.T) {
err := geth.PrepareTestNode()
if err != nil {
t.Error(err)
return
}
// make sure you panic if transaction complete doesn't return
completeQueuedTransaction := make(chan struct{}, 1)
geth.PanicAfter(20*time.Second, completeQueuedTransaction, "TestQueuedTransactions")
// replace transaction notification handler
var txHash = common.Hash{}
geth.SetDefaultNodeNotificationHandler(func(jsonEvent string) {
var envelope geth.GethEvent
if err := json.Unmarshal([]byte(jsonEvent), &envelope); err != nil {
t.Errorf("cannot unmarshal event's JSON: %s", jsonEvent)
return
}
if envelope.Type == geth.EventTransactionQueued {
event := envelope.Event.(map[string]interface{})
t.Logf("Transaction queued (will be completed in 5 secs): {id: %s}\n", event["id"].(string))
time.Sleep(5 * time.Second)
// next call is the very same one, but with the correct password
if txHash, err = geth.CompleteTransaction(event["id"].(string), testAddressPassword); err != nil {
t.Errorf("cannot complete queued transation[%v]: %v", event["id"], err)
return
}
t.Logf("Transaction complete: https://testnet.etherscan.io/tx/%s", txHash.Hex())
completeQueuedTransaction <- struct{}{} // so that timeout is aborted
}
})
// try completing non-existing transaction
if _, err = geth.CompleteTransaction("some-bad-transaction-id", testAddressPassword); err == nil {
t.Error("error expected and not recieved")
return
}
if err != status.ErrQueuedTxIdNotFound {
t.Errorf("unexpected error recieved: expected '%s', got: '%s'", status.ErrQueuedTxIdNotFound.Error(), err.Error())
return
}
}
func TestEvictionOfQueuedTransactions(t *testing.T) {
err := geth.PrepareTestNode()
if err != nil {
t.Error(err)
return
}
// obtain reference to status backend
lightEthereum, err := geth.GetNodeManager().LightEthereumService()
if err != nil {
t.Errorf("Test failed: LES service is not running: %v", err)
return
}
backend := lightEthereum.StatusBackend
// make sure you panic if transaction complete doesn't return
completeQueuedTransaction := make(chan struct{}, 1)
geth.PanicAfter(20*time.Second, completeQueuedTransaction, "TestQueuedTransactions")
// replace transaction notification handler
var txHash = common.Hash{}
geth.SetDefaultNodeNotificationHandler(func(jsonEvent string) {
var envelope geth.GethEvent
if err := json.Unmarshal([]byte(jsonEvent), &envelope); err != nil {
t.Errorf("cannot unmarshal event's JSON: %s", jsonEvent)
return
}
if envelope.Type == geth.EventTransactionQueued {
event := envelope.Event.(map[string]interface{})
t.Logf("Transaction queued (will be completed in 5 secs): {id: %s}\n", event["id"].(string))
time.Sleep(5 * time.Second)
// next call is the very same one, but with the correct password
if txHash, err = geth.CompleteTransaction(event["id"].(string), testAddressPassword); err != nil {
t.Errorf("cannot complete queued transation[%v]: %v", event["id"], err)
return
}
t.Logf("Transaction complete: https://testnet.etherscan.io/tx/%s", txHash.Hex())
completeQueuedTransaction <- struct{}{} // so that timeout is aborted
}
})
txQueue := backend.TransactionQueue()
var i = 0
txIds := [status.DefaultTxQueueCap + 5 + 10]status.QueuedTxId{}
backend.SetTransactionQueueHandler(func(queuedTx status.QueuedTx) {
//glog.V(logger.Info).Infof("%d. Transaction queued (queue size: %d): {id: %v}\n", i, txQueue.Count(), queuedTx.Id)
t.Logf("%d. Transaction queued (queue size: %d): {id: %v}\n", i, txQueue.Count(), queuedTx.Id)
txIds[i] = queuedTx.Id
i++
})
@ -142,8 +377,17 @@ func TestQueuedTransactions(t *testing.T) {
}
time.Sleep(5 * time.Second)
if txQueue.Count() != status.DefaultTxQueueCap && txQueue.Count() != (status.DefaultTxQueueCap-1) {
if txQueue.Count() > status.DefaultTxQueueCap {
t.Errorf("transaction count should be %d (or %d): got %d", status.DefaultTxQueueCap, status.DefaultTxQueueCap-1, txQueue.Count())
return
}
for _, txId := range txIds {
txQueue.Remove(txId)
}
if txQueue.Count() != 0 {
t.Errorf("transaction count should be zero: %d", txQueue.Count())
return
}
}

View File

@ -40,6 +40,14 @@ type SendTransactionEvent struct {
MessageId string `json:"message_id"`
}
type ReturnSendTransactionEvent struct {
Id string `json:"id"`
Args status.SendTxArgs `json:"args"`
MessageId string `json:"message_id"`
ErrorMessage string `json:"error_message"`
ErrorCode string `json:"error_code"`
}
type CompleteTransactionResult struct {
Hash string `json:"hash"`
Error string `json:"error"`

View File

@ -20,7 +20,6 @@ import (
"bytes"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"math/big"
"strings"
@ -1035,11 +1034,14 @@ func (s *PublicTransactionPoolAPI) SendTransaction(ctx context.Context, args Sen
s.txQueue <- queuedTx
// now wait up until transaction is complete (via call to CompleteQueuedTransaction) or timeout occurs
backend := GetStatusBackend()
select {
case <-queuedTx.Done:
backend.NotifyOnQueuedTxReturn(queuedTx.Id, queuedTx.Err)
return queuedTx.Hash, queuedTx.Err
case <-time.After(status.DefaultTxSendCompletionTimeout * time.Second):
return common.Hash{}, errors.New("transaction sending timed out")
backend.NotifyOnQueuedTxReturn(queuedTx.Id, status.ErrQueuedTxTimedOut)
return common.Hash{}, status.ErrQueuedTxTimedOut
}
return queuedTx.Hash, nil

View File

@ -3,6 +3,7 @@ package ethapi
import (
"sync"
"github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/les/status"
"github.com/ethereum/go-ethereum/logger"
@ -46,6 +47,18 @@ func GetStatusBackend() *StatusBackend {
return statusBackend
}
func (b *StatusBackend) NotifyOnQueuedTxReturn(id status.QueuedTxId, err error) {
if b == nil {
return
}
b.txQueue.NotifyOnQueuedTxReturn(id, err)
}
func (b *StatusBackend) SetTransactionReturnHandler(fn status.EnqueuedTxReturnHandler) {
b.txQueue.SetTxReturnHandler(fn)
}
func (b *StatusBackend) SetTransactionQueueHandler(fn status.EnqueuedTxHandler) {
b.txQueue.SetEnqueueHandler(fn)
}
@ -79,6 +92,14 @@ func (b *StatusBackend) CompleteQueuedTransaction(id status.QueuedTxId, passphra
}
hash, err := b.txapi.CompleteQueuedTransaction(context.Background(), SendTxArgs(queuedTx.Args), passphrase)
// on password error, notify the app, and keep tx in queue (so that CompleteQueuedTransaction() can be resent)
if err == accounts.ErrDecrypt {
b.NotifyOnQueuedTxReturn(id, err)
return hash, err // SendTransaction is still blocked
}
// allow SendTransaction to return
queuedTx.Hash = hash
queuedTx.Err = err
queuedTx.Done <- struct{}{} // sendTransaction() waits on this, notify so that it can return

View File

@ -2,6 +2,9 @@ package status
import (
"errors"
"sync"
"github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rpc"
"golang.org/x/net/context"
@ -15,16 +18,21 @@ const (
var (
ErrQueuedTxIdNotFound = errors.New("transaction hash not found")
ErrQueuedTxTimedOut = errors.New("transaction sending timed out")
)
// TxQueue is capped container that holds pending transactions
type TxQueue struct {
transactions map[QueuedTxId]*QueuedTx
mu sync.RWMutex // to guard trasactions map
evictableIds chan QueuedTxId
enqueueTicker chan struct{}
// when items are enqueued notify handlers
// when items are enqueued notify subscriber
txEnqueueHandler EnqueuedTxHandler
// when tx is returned (either successfully or with error) notify subscriber
txReturnHandler EnqueuedTxReturnHandler
}
// QueuedTx holds enough information to complete the queued transaction.
@ -39,9 +47,12 @@ type QueuedTx struct {
type QueuedTxId string
// QueuedTxHandler is a function that receives queued/pending transactions, when they get queued
// EnqueuedTxHandler is a function that receives queued/pending transactions, when they get queued
type EnqueuedTxHandler func(QueuedTx)
// EnqueuedTxReturnHandler is a function that receives response when tx is complete (both on success and error)
type EnqueuedTxReturnHandler func(queuedTx QueuedTx, err error)
// SendTxArgs represents the arguments to submbit a new transaction into the transaction pool.
type SendTxArgs struct {
From common.Address `json:"from"`
@ -68,7 +79,8 @@ func NewTransactionQueue() *TxQueue {
func (q *TxQueue) evictionLoop() {
for range q.enqueueTicker {
if len(q.transactions) >= (DefaultTxQueueCap - 1) { // eviction is required to accommodate another/last item
delete(q.transactions, <-q.evictableIds)
q.Remove(<-q.evictableIds)
q.enqueueTicker <- struct{}{} // in case we pulled already removed item
}
}
}
@ -81,7 +93,9 @@ func (q *TxQueue) Enqueue(tx *QueuedTx) error {
q.enqueueTicker <- struct{}{} // notify eviction loop that we are trying to insert new item
q.evictableIds <- tx.Id // this will block when we hit DefaultTxQueueCap
q.mu.Lock()
q.transactions[tx.Id] = tx
q.mu.Unlock()
// notify handler
q.txEnqueueHandler(*tx)
@ -90,19 +104,34 @@ func (q *TxQueue) Enqueue(tx *QueuedTx) error {
}
func (q *TxQueue) Get(id QueuedTxId) (*QueuedTx, error) {
q.mu.RLock()
defer q.mu.RUnlock()
if tx, ok := q.transactions[id]; ok {
delete(q.transactions, id)
return tx, nil
}
return nil, ErrQueuedTxIdNotFound
}
func (q *TxQueue) Remove(id QueuedTxId) {
q.mu.Lock()
defer q.mu.Unlock()
delete(q.transactions, id)
}
func (q *TxQueue) Count() int {
q.mu.RLock()
defer q.mu.RUnlock()
return len(q.transactions)
}
func (q *TxQueue) Has(id QueuedTxId) bool {
q.mu.RLock()
defer q.mu.RUnlock()
_, ok := q.transactions[id]
return ok
@ -111,3 +140,38 @@ func (q *TxQueue) Has(id QueuedTxId) bool {
func (q *TxQueue) SetEnqueueHandler(fn EnqueuedTxHandler) {
q.txEnqueueHandler = fn
}
func (q *TxQueue) SetTxReturnHandler(fn EnqueuedTxReturnHandler) {
q.txReturnHandler = fn
}
func (q *TxQueue) NotifyOnQueuedTxReturn(id QueuedTxId, err error) {
if q == nil {
return
}
// on success, remove item from the queue and stop propagating
if err == nil {
q.Remove(id)
return
}
// error occurred, send upward notification
if q.txReturnHandler == nil { // discard, until handler is provided
return
}
// discard, if transaction is not found
tx, _ := q.Get(id)
if tx == nil {
return
}
// remove from queue on any error (except for password related one) and propagate
if err != accounts.ErrDecrypt {
q.Remove(id)
}
// notify handler
q.txReturnHandler(*tx, err)
}