implements DiscardTransaction(), required for #32

This commit is contained in:
Victor Farazdagi 2016-10-31 01:35:10 +03:00
parent 28669c49c1
commit c29dedf84a
7 changed files with 245 additions and 28 deletions

View File

@ -158,6 +158,25 @@ func CompleteTransaction(id, password *C.char) *C.char {
return C.CString(string(outBytes)) return C.CString(string(outBytes))
} }
//export DiscardTransaction
func DiscardTransaction(id *C.char) *C.char {
err := geth.DiscardTransaction(C.GoString(id))
errString := ""
if err != nil {
fmt.Fprintln(os.Stderr, err)
errString = err.Error()
}
out := geth.DiscardTransactionResult{
Id: C.GoString(id),
Error: errString,
}
outBytes, _ := json.Marshal(&out)
return C.CString(string(outBytes))
}
//export StartNode //export StartNode
func StartNode(datadir *C.char) *C.char { func StartNode(datadir *C.char) *C.char {
// This starts a geth node with the given datadir // This starts a geth node with the given datadir

View File

@ -31,6 +31,7 @@ const (
SendTransactionDefaultErrorCode = "1" SendTransactionDefaultErrorCode = "1"
SendTransactionPasswordErrorCode = "2" SendTransactionPasswordErrorCode = "2"
SendTransactionTimeoutErrorCode = "3" SendTransactionTimeoutErrorCode = "3"
SendTransactionDiscardedErrorCode = "4"
) )
func onSendTransactionRequest(queuedTx status.QueuedTx) { func onSendTransactionRequest(queuedTx status.QueuedTx) {
@ -47,11 +48,16 @@ func onSendTransactionRequest(queuedTx status.QueuedTx) {
C.StatusServiceSignalEvent(C.CString(string(body))) C.StatusServiceSignalEvent(C.CString(string(body)))
} }
func onSendTransactionReturn(queuedTx status.QueuedTx, err error) { func onSendTransactionReturn(queuedTx *status.QueuedTx, err error) {
if err == nil { if err == nil {
return return
} }
// discard notifications with empty tx
if queuedTx == nil {
return
}
// error occurred, signal up to application // error occurred, signal up to application
event := GethEvent{ event := GethEvent{
Type: EventTransactionFailed, Type: EventTransactionFailed,
@ -73,15 +79,16 @@ func sendTransactionErrorCode(err error) string {
return SendTransactionNoErrorCode return SendTransactionNoErrorCode
} }
if err == accounts.ErrDecrypt { switch err {
case accounts.ErrDecrypt:
return SendTransactionPasswordErrorCode return SendTransactionPasswordErrorCode
} case status.ErrQueuedTxTimedOut:
if err == status.ErrQueuedTxTimedOut {
return SendTransactionTimeoutErrorCode return SendTransactionTimeoutErrorCode
} case status.ErrQueuedTxDiscarded:
return SendTransactionDiscardedErrorCode
default:
return SendTransactionDefaultErrorCode return SendTransactionDefaultErrorCode
}
} }
func CompleteTransaction(id, password string) (common.Hash, error) { func CompleteTransaction(id, password string) (common.Hash, error) {
@ -95,6 +102,17 @@ func CompleteTransaction(id, password string) (common.Hash, error) {
return backend.CompleteQueuedTransaction(status.QueuedTxId(id), password) return backend.CompleteQueuedTransaction(status.QueuedTxId(id), password)
} }
func DiscardTransaction(id string) error {
lightEthereum, err := GetNodeManager().LightEthereumService()
if err != nil {
return err
}
backend := lightEthereum.StatusBackend
return backend.DiscardQueuedTransaction(status.QueuedTxId(id))
}
func messageIdFromContext(ctx context.Context) string { func messageIdFromContext(ctx context.Context) string {
if ctx == nil { if ctx == nil {
return "" return ""

View File

@ -257,6 +257,147 @@ func TestDoubleCompleteQueuedTransactions(t *testing.T) {
time.Sleep(5 * time.Second) time.Sleep(5 * time.Second)
} }
func TestDiscardQueuedTransactions(t *testing.T) {
err := geth.PrepareTestNode()
if err != nil {
t.Error(err)
return
}
accountManager, err := geth.GetNodeManager().AccountManager()
if err != nil {
t.Errorf(err.Error())
return
}
// create an account
address, _, _, err := geth.CreateAccount(newAccountPassword)
if err != nil {
t.Errorf("could not create account: %v", err)
return
}
// obtain reference to status backend
lightEthereum, err := geth.GetNodeManager().LightEthereumService()
if err != nil {
t.Errorf("Test failed: LES service is not running: %v", err)
return
}
backend := lightEthereum.StatusBackend
// reset queue
backend.TransactionQueue().Reset()
// make sure you panic if transaction complete doesn't return
completeQueuedTransaction := make(chan struct{}, 1)
geth.PanicAfter(20*time.Second, completeQueuedTransaction, "TestDiscardQueuedTransactions")
// replace transaction notification handler
var txId string
txFailedEventCalled := false
geth.SetDefaultNodeNotificationHandler(func(jsonEvent string) {
var envelope geth.GethEvent
if err := json.Unmarshal([]byte(jsonEvent), &envelope); err != nil {
t.Errorf("cannot unmarshal event's JSON: %s", jsonEvent)
return
}
if envelope.Type == geth.EventTransactionQueued {
event := envelope.Event.(map[string]interface{})
txId = event["id"].(string)
t.Logf("transaction queued (will be discarded soon): {id: %s}\n", txId)
if !backend.TransactionQueue().Has(status.QueuedTxId(txId)) {
t.Errorf("txqueue should still have test tx: %s", txId)
return
}
// discard
err := geth.DiscardTransaction(txId)
if err != nil {
t.Errorf("cannot discard tx: %v", err)
return
}
// try completing discarded transaction
_, err = geth.CompleteTransaction(txId, testAddressPassword)
if err.Error() != "transaction hash not found" {
t.Error("expects tx not found, but call to CompleteTransaction succeeded")
return
}
time.Sleep(1 * time.Second) // make sure that tx complete signal propagates
if backend.TransactionQueue().Has(status.QueuedTxId(txId)) {
t.Errorf("txqueue should not have test tx at this point (it should be discarded): %s", txId)
return
}
completeQueuedTransaction <- struct{}{} // so that timeout is aborted
}
if envelope.Type == geth.EventTransactionFailed {
event := envelope.Event.(map[string]interface{})
t.Logf("transaction return event received: {id: %s}\n", event["id"].(string))
receivedErrMessage := event["error_message"].(string)
expectedErrMessage := status.ErrQueuedTxDiscarded.Error()
if receivedErrMessage != expectedErrMessage {
t.Errorf("unexpected error message received: got %v", receivedErrMessage)
return
}
receivedErrCode := event["error_code"].(string)
if receivedErrCode != geth.SendTransactionDiscardedErrorCode {
t.Errorf("unexpected error code received: got %v", receivedErrCode)
return
}
txFailedEventCalled = true
}
})
// send from the same test account (which is guaranteed to have ether)
from, err := utils.MakeAddress(accountManager, testAddress)
if err != nil {
t.Errorf("could not retrieve account from address: %v", err)
return
}
to, err := utils.MakeAddress(accountManager, address)
if err != nil {
t.Errorf("could not retrieve account from address: %v", err)
return
}
// this call blocks, and should return when DiscardQueuedTransaction() is called
txHashCheck, err := backend.SendTransaction(nil, status.SendTxArgs{
From: from.Address,
To: &to.Address,
Value: rpc.NewHexNumber(big.NewInt(1000000000000)),
})
if err != status.ErrQueuedTxDiscarded {
t.Errorf("expeced error not thrown: %v", err)
return
}
if !reflect.DeepEqual(txHashCheck, common.Hash{}) {
t.Error("transaction returned hash, while it shouldn't")
return
}
if backend.TransactionQueue().Count() != 0 {
t.Error("tx queue must be empty at this point")
return
}
if !txFailedEventCalled {
t.Error("expected tx failure signal is not received")
return
}
t.Log("sleep extra time, to allow sync")
time.Sleep(5 * time.Second)
}
func TestNonExistentQueuedTransactions(t *testing.T) { func TestNonExistentQueuedTransactions(t *testing.T) {
err := geth.PrepareTestNode() err := geth.PrepareTestNode()
if err != nil { if err != nil {

View File

@ -53,6 +53,11 @@ type CompleteTransactionResult struct {
Error string `json:"error"` Error string `json:"error"`
} }
type DiscardTransactionResult struct {
Id string `json:"id"`
Error string `json:"error"`
}
type GethEvent struct { type GethEvent struct {
Type string `json:"type"` Type string `json:"type"`
Event interface{} `json:"event"` Event interface{} `json:"event"`

View File

@ -1028,19 +1028,26 @@ func (s *PublicTransactionPoolAPI) SendTransaction(ctx context.Context, args Sen
Context: ctx, Context: ctx,
Args: status.SendTxArgs(args), Args: status.SendTxArgs(args),
Done: make(chan struct{}, 1), Done: make(chan struct{}, 1),
Discard: make(chan struct{}, 1),
} }
// send transaction to pending pool // send transaction to pending pool
s.txQueue <- queuedTx s.txQueue <- queuedTx
// now wait up until transaction is complete (via call to CompleteQueuedTransaction) or timeout occurs // now wait up until transaction is:
// - completed (via CompleteQueuedTransaction),
// - discarded (via DiscardQueuedTransaction)
// - or times out
backend := GetStatusBackend() backend := GetStatusBackend()
select { select {
case <-queuedTx.Done: case <-queuedTx.Done:
backend.NotifyOnQueuedTxReturn(queuedTx.Id, queuedTx.Err) backend.NotifyOnQueuedTxReturn(queuedTx, queuedTx.Err)
return queuedTx.Hash, queuedTx.Err
case <-queuedTx.Discard:
backend.NotifyOnQueuedTxReturn(queuedTx, status.ErrQueuedTxDiscarded)
return queuedTx.Hash, queuedTx.Err return queuedTx.Hash, queuedTx.Err
case <-time.After(status.DefaultTxSendCompletionTimeout * time.Second): case <-time.After(status.DefaultTxSendCompletionTimeout * time.Second):
backend.NotifyOnQueuedTxReturn(queuedTx.Id, status.ErrQueuedTxTimedOut) backend.NotifyOnQueuedTxReturn(queuedTx, status.ErrQueuedTxTimedOut)
return common.Hash{}, status.ErrQueuedTxTimedOut return common.Hash{}, status.ErrQueuedTxTimedOut
} }

View File

@ -47,12 +47,12 @@ func GetStatusBackend() *StatusBackend {
return statusBackend return statusBackend
} }
func (b *StatusBackend) NotifyOnQueuedTxReturn(id status.QueuedTxId, err error) { func (b *StatusBackend) NotifyOnQueuedTxReturn(queuedTx *status.QueuedTx, err error) {
if b == nil { if b == nil {
return return
} }
b.txQueue.NotifyOnQueuedTxReturn(id, err) b.txQueue.NotifyOnQueuedTxReturn(queuedTx, err)
} }
func (b *StatusBackend) SetTransactionReturnHandler(fn status.EnqueuedTxReturnHandler) { func (b *StatusBackend) SetTransactionReturnHandler(fn status.EnqueuedTxReturnHandler) {
@ -95,7 +95,7 @@ func (b *StatusBackend) CompleteQueuedTransaction(id status.QueuedTxId, passphra
// on password error, notify the app, and keep tx in queue (so that CompleteQueuedTransaction() can be resent) // on password error, notify the app, and keep tx in queue (so that CompleteQueuedTransaction() can be resent)
if err == accounts.ErrDecrypt { if err == accounts.ErrDecrypt {
b.NotifyOnQueuedTxReturn(id, err) b.NotifyOnQueuedTxReturn(queuedTx, err)
return hash, err // SendTransaction is still blocked return hash, err // SendTransaction is still blocked
} }
@ -107,6 +107,23 @@ func (b *StatusBackend) CompleteQueuedTransaction(id status.QueuedTxId, passphra
return hash, err return hash, err
} }
// DiscardQueuedTransaction discards queued transaction forcing SendTransaction to return
func (b *StatusBackend) DiscardQueuedTransaction(id status.QueuedTxId) error {
queuedTx, err := b.txQueue.Get(id)
if err != nil {
return err
}
// remove from queue, before notifying SendTransaction
b.TransactionQueue().Remove(queuedTx.Id)
// allow SendTransaction to return
queuedTx.Err = status.ErrQueuedTxDiscarded
queuedTx.Discard <- struct{}{} // sendTransaction() waits on this, notify so that it can return
return nil
}
func (b *StatusBackend) transactionQueueForwardingLoop() { func (b *StatusBackend) transactionQueueForwardingLoop() {
txQueue, err := b.txapi.GetTransactionQueue() txQueue, err := b.txapi.GetTransactionQueue()
if err != nil { if err != nil {

View File

@ -19,6 +19,7 @@ const (
var ( var (
ErrQueuedTxIdNotFound = errors.New("transaction hash not found") ErrQueuedTxIdNotFound = errors.New("transaction hash not found")
ErrQueuedTxTimedOut = errors.New("transaction sending timed out") ErrQueuedTxTimedOut = errors.New("transaction sending timed out")
ErrQueuedTxDiscarded = errors.New("transaction has been discarded")
) )
// TxQueue is capped container that holds pending transactions // TxQueue is capped container that holds pending transactions
@ -42,6 +43,7 @@ type QueuedTx struct {
Context context.Context Context context.Context
Args SendTxArgs Args SendTxArgs
Done chan struct{} Done chan struct{}
Discard chan struct{}
Err error Err error
} }
@ -51,7 +53,7 @@ type QueuedTxId string
type EnqueuedTxHandler func(QueuedTx) type EnqueuedTxHandler func(QueuedTx)
// EnqueuedTxReturnHandler is a function that receives response when tx is complete (both on success and error) // EnqueuedTxReturnHandler is a function that receives response when tx is complete (both on success and error)
type EnqueuedTxReturnHandler func(queuedTx QueuedTx, err error) type EnqueuedTxReturnHandler func(queuedTx *QueuedTx, err error)
// SendTxArgs represents the arguments to submbit a new transaction into the transaction pool. // SendTxArgs represents the arguments to submbit a new transaction into the transaction pool.
type SendTxArgs struct { type SendTxArgs struct {
@ -85,6 +87,15 @@ func (q *TxQueue) evictionLoop() {
} }
} }
// Reset is to be used in tests only, as it simply creates new transaction map, w/o any cleanup of the previous one
func (q *TxQueue) Reset() {
q.mu.Lock()
defer q.mu.Unlock()
q.transactions = make(map[QueuedTxId]*QueuedTx)
q.evictableIds = make(chan QueuedTxId, DefaultTxQueueCap)
}
func (q *TxQueue) Enqueue(tx *QueuedTx) error { func (q *TxQueue) Enqueue(tx *QueuedTx) error {
if q.txEnqueueHandler == nil { //discard, until handler is provided if q.txEnqueueHandler == nil { //discard, until handler is provided
return nil return nil
@ -145,14 +156,19 @@ func (q *TxQueue) SetTxReturnHandler(fn EnqueuedTxReturnHandler) {
q.txReturnHandler = fn q.txReturnHandler = fn
} }
func (q *TxQueue) NotifyOnQueuedTxReturn(id QueuedTxId, err error) { func (q *TxQueue) NotifyOnQueuedTxReturn(queuedTx *QueuedTx, err error) {
if q == nil { if q == nil {
return return
} }
// discard, if transaction is not found
if queuedTx == nil {
return
}
// on success, remove item from the queue and stop propagating // on success, remove item from the queue and stop propagating
if err == nil { if err == nil {
q.Remove(id) q.Remove(queuedTx.Id)
return return
} }
@ -161,17 +177,11 @@ func (q *TxQueue) NotifyOnQueuedTxReturn(id QueuedTxId, err error) {
return return
} }
// discard, if transaction is not found
tx, _ := q.Get(id)
if tx == nil {
return
}
// remove from queue on any error (except for password related one) and propagate // remove from queue on any error (except for password related one) and propagate
if err != accounts.ErrDecrypt { if err != accounts.ErrDecrypt {
q.Remove(id) q.Remove(queuedTx.Id)
} }
// notify handler // notify handler
q.txReturnHandler(*tx, err) q.txReturnHandler(queuedTx, err)
} }