diff --git a/geth/txqueue.go b/geth/txqueue.go index 8cd5d0292..11ad414dc 100644 --- a/geth/txqueue.go +++ b/geth/txqueue.go @@ -8,12 +8,13 @@ extern bool StatusServiceSignalEvent( const char *jsonEvent ); import "C" import ( + "bytes" "context" "encoding/json" "fmt" - "bytes" "github.com/cnf/structhash" + "github.com/eapache/go-resiliency/semaphore" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/les/status" "github.com/robertkrimen/otto" @@ -23,6 +24,7 @@ const ( EventTransactionQueued = "transaction.queued" SendTransactionRequest = "eth_sendTransaction" MessageIdKey = "message_id" + CellTicketKey = "cell_ticket" ) func onSendTransactionRequest(queuedTx status.QueuedTx) { @@ -32,12 +34,17 @@ func onSendTransactionRequest(queuedTx status.QueuedTx) { requestCtx = requestQueue.PopQueuedTxContext(&queuedTx) } + // request context obtained (if exists), safe to release the ticket + if ticket := cellTicketFromContext(requestCtx); ticket != nil { + ticket.Release() + } + event := GethEvent{ Type: EventTransactionQueued, Event: SendTransactionEvent{ Id: string(queuedTx.Id), Args: queuedTx.Args, - MessageId: fromContext(requestCtx, MessageIdKey), + MessageId: messageIdFromContext(requestCtx), }, } @@ -56,17 +63,28 @@ func CompleteTransaction(id, password string) (common.Hash, error) { return backend.CompleteQueuedTransaction(status.QueuedTxId(id), password) } -func fromContext(ctx context.Context, key string) string { +func messageIdFromContext(ctx context.Context) string { if ctx == nil { return "" } - if messageId, ok := ctx.Value(key).(string); ok { + if messageId, ok := ctx.Value(MessageIdKey).(string); ok { return messageId } return "" } +func cellTicketFromContext(ctx context.Context) *semaphore.Semaphore { + if ctx == nil { + return nil + } + if sem, ok := ctx.Value(CellTicketKey).(*semaphore.Semaphore); ok { + return sem + } + + return nil +} + type JailedRequest struct { method string ctx context.Context @@ -83,7 +101,13 @@ func NewJailedRequestsQueue() *JailedRequestQueue { } } -func (q *JailedRequestQueue) PreProcessRequest(vm *otto.Otto, req RPCCall) { +func (q *JailedRequestQueue) PreProcessRequest(ticket *semaphore.Semaphore, vm *otto.Otto, req RPCCall) error { + // serialize access + err := ticket.Acquire() + if err != nil { + return err + } + messageId := currentMessageId(vm.Context()) // save request context for reuse (by request handlers, such as queued transaction signal sender) @@ -92,19 +116,28 @@ func (q *JailedRequestQueue) PreProcessRequest(vm *otto.Otto, req RPCCall) { if len(messageId) > 0 { ctx = context.WithValue(ctx, MessageIdKey, messageId) } + + // onSendTransactionRequest() will use context to obtain and release ticket + if req.Method == SendTransactionRequest { + ctx = context.WithValue(ctx, CellTicketKey, ticket) + } else { + ticket.Release() + } q.saveRequestContext(vm, ctx, req) + + return nil } func (q *JailedRequestQueue) PostProcessRequest(vm *otto.Otto, req RPCCall) { // set message id (if present in context) messageId := currentMessageId(vm.Context()) if len(messageId) > 0 { - vm.Call("addContext", nil, MessageIdKey, messageId) + vm.Call("addContext", nil, messageId, MessageIdKey, messageId) } // set extra markers for queued transaction requests if req.Method == SendTransactionRequest { - vm.Call("addContext", nil, SendTransactionRequest, true) + vm.Call("addContext", nil, messageId, SendTransactionRequest, true) } } @@ -219,7 +252,7 @@ func hashFromQueuedTx(queuedTx *status.QueuedTx) string { method: SendTransactionRequest, from: queuedTx.Args.From.Hex(), to: queuedTx.Args.To.Hex(), - value: string(bytes.Replace(value, []byte(`"`),[]byte("") , 2)), + value: string(bytes.Replace(value, []byte(`"`), []byte(""), 2)), data: queuedTx.Args.Data, } diff --git a/jail/jail.go b/jail/jail.go index 3cb2f5eb1..1e47f570f 100644 --- a/jail/jail.go +++ b/jail/jail.go @@ -80,9 +80,13 @@ func (jail *Jail) Parse(chatId string, js string) string { _, err := vm.Run(initJjs) vm.Set("jeth", struct{}{}) + sendHandler := func(call otto.FunctionCall) (response otto.Value) { + return jail.Send(chatId, call) + } + jethObj, _ := vm.Get("jeth") - jethObj.Object().Set("send", jail.Send) - jethObj.Object().Set("sendAsync", jail.Send) + jethObj.Object().Set("send", sendHandler) + jethObj.Object().Set("sendAsync", sendHandler) jjs := Web3_JS + ` var Web3 = require('web3'); @@ -110,10 +114,6 @@ func (jail *Jail) Call(chatId string, path string, args string) string { return printError(fmt.Sprintf("Cell[%s] doesn't exist.", chatId)) } - // serialize requests to VM - cell.sem.Acquire() - defer cell.sem.Release() - res, err := cell.vm.Call("call", nil, path, args) return printResult(res.String(), err) @@ -133,7 +133,12 @@ func (jail *Jail) GetVM(chatId string) (*otto.Otto, error) { } // Send will serialize the first argument, send it to the node and returns the response. -func (jail *Jail) Send(call otto.FunctionCall) (response otto.Value) { +func (jail *Jail) Send(chatId string, call otto.FunctionCall) (response otto.Value) { + cell, ok := jail.cells[chatId] + if !ok { + throwJSException(fmt.Errorf("Cell[%s] doesn't exist.", chatId)) + } + clientFactory, err := jail.ClientRestartWrapper() if err != nil { return newErrorResponse(call, -32603, err.Error(), nil) @@ -171,9 +176,13 @@ func (jail *Jail) Send(call otto.FunctionCall) (response otto.Value) { resp.Set("id", req.Id) var result json.RawMessage - // do extra request pre and post processing (message id persisting, setting tx context) - requestQueue.PreProcessRequest(call.Otto, req) - defer requestQueue.PostProcessRequest(call.Otto, req) + // do extra request pre processing (persist message id) + // within function semaphore will be acquired and released, + // so that no more than one client (per cell) can enter + err := requestQueue.PreProcessRequest(cell.sem, call.Otto, req) + if err != nil { + return newErrorResponse(call, -32603, err.Error(), nil) + } client := clientFactory.Client() errc := make(chan error, 1) @@ -207,6 +216,9 @@ func (jail *Jail) Send(call otto.FunctionCall) (response otto.Value) { resp = newErrorResponse(call, -32603, err.Error(), &req.Id).Object() } resps.Call("push", resp) + + // do extra request post processing (setting back tx context) + requestQueue.PostProcessRequest(call.Otto, req) } // Return the responses either to the callback (if supplied) diff --git a/jail/testdata/tx-send/context-no-message-id.js b/jail/testdata/tx-send/context-no-message-id.js index 1b78a3eb9..86027e93a 100644 --- a/jail/testdata/tx-send/context-no-message-id.js +++ b/jail/testdata/tx-send/context-no-message-id.js @@ -4,7 +4,8 @@ var _status_catalog = { }; var context = {}; -function addContext(key, value) { +function addContext(ns, key, value) { + // we ignore ns here (as ns is message id, and in this UC we do not have one) context[key] = value; } diff --git a/jail/testdata/tx-send/tx-send.js b/jail/testdata/tx-send/tx-send.js index 833066c41..8cc7a9614 100644 --- a/jail/testdata/tx-send/tx-send.js +++ b/jail/testdata/tx-send/tx-send.js @@ -10,8 +10,11 @@ var _status_catalog = { }; var context = {}; -function addContext(key, value) { // this function is expected to be present, as status-go uses it to set context - context[key] = value; +function addContext(ns, key, value) { // this function is expected to be present, as status-go uses it to set context + if (!(ns in context)) { + context[ns] = {} + } + context[ns][key] = value; } function call(pathStr, paramsStr) { @@ -19,6 +22,9 @@ function call(pathStr, paramsStr) { path = JSON.parse(pathStr), fn, res; + // Since we allow next request to proceed *immediately* after jail obtains message id + // we should be careful overwritting global context variable. + // We probably should limit/scope to context[message_id] = {} context = {}; fn = path.reduce(function(catalog, name) { @@ -36,9 +42,13 @@ function call(pathStr, paramsStr) { callResult = fn(params); res = { result: callResult, - // so context could contain {eth_transactionSend: true} - // additionally, context gets `message_id` as well - context: context + // So, context could contain {eth_transactionSend: true} + // additionally, context gets `message_id` as well. + // You can scope returned context by returning context[message_id], + // however since serialization guard will be released immediately after message id + // is obtained, you need to be careful if you use global message id (it + // works below, in test, it will not work as expected in highly concurrent environment) + context: context[status.message_id] }; return JSON.stringify(res); diff --git a/vendor/github.com/ethereum/go-ethereum/internal/ethapi/api.go b/vendor/github.com/ethereum/go-ethereum/internal/ethapi/api.go index 861e79951..21c279d80 100644 --- a/vendor/github.com/ethereum/go-ethereum/internal/ethapi/api.go +++ b/vendor/github.com/ethereum/go-ethereum/internal/ethapi/api.go @@ -997,16 +997,10 @@ func (s *PublicTransactionPoolAPI) SendTransaction(ctx context.Context, args Sen s.txQueue <- queuedTx // now wait up until transaction is complete (via call to CompleteQueuedTransaction) or timeout occurs - timeout := make(chan struct{}, 1) - go func() { - time.Sleep(status.DefaultTxSendCompletionTimeout * time.Second) - timeout <- struct{}{} - }() - select { case <-queuedTx.Done: return queuedTx.Hash, queuedTx.Err - case <-timeout: + case <-time.After(status.DefaultTxSendCompletionTimeout * time.Second): return common.Hash{}, errors.New("transaction sending timed out") }