jail: more robust cell ticket acquire/release

This commit is contained in:
Victor Farazdagi 2016-10-12 20:51:25 +03:00
parent 745c3a46c1
commit 6abbca9935
5 changed files with 81 additions and 31 deletions

View File

@ -8,12 +8,13 @@ extern bool StatusServiceSignalEvent( const char *jsonEvent );
import "C"
import (
"bytes"
"context"
"encoding/json"
"fmt"
"bytes"
"github.com/cnf/structhash"
"github.com/eapache/go-resiliency/semaphore"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/les/status"
"github.com/robertkrimen/otto"
@ -23,6 +24,7 @@ const (
EventTransactionQueued = "transaction.queued"
SendTransactionRequest = "eth_sendTransaction"
MessageIdKey = "message_id"
CellTicketKey = "cell_ticket"
)
func onSendTransactionRequest(queuedTx status.QueuedTx) {
@ -32,12 +34,17 @@ func onSendTransactionRequest(queuedTx status.QueuedTx) {
requestCtx = requestQueue.PopQueuedTxContext(&queuedTx)
}
// request context obtained (if exists), safe to release the ticket
if ticket := cellTicketFromContext(requestCtx); ticket != nil {
ticket.Release()
}
event := GethEvent{
Type: EventTransactionQueued,
Event: SendTransactionEvent{
Id: string(queuedTx.Id),
Args: queuedTx.Args,
MessageId: fromContext(requestCtx, MessageIdKey),
MessageId: messageIdFromContext(requestCtx),
},
}
@ -56,17 +63,28 @@ func CompleteTransaction(id, password string) (common.Hash, error) {
return backend.CompleteQueuedTransaction(status.QueuedTxId(id), password)
}
func fromContext(ctx context.Context, key string) string {
func messageIdFromContext(ctx context.Context) string {
if ctx == nil {
return ""
}
if messageId, ok := ctx.Value(key).(string); ok {
if messageId, ok := ctx.Value(MessageIdKey).(string); ok {
return messageId
}
return ""
}
func cellTicketFromContext(ctx context.Context) *semaphore.Semaphore {
if ctx == nil {
return nil
}
if sem, ok := ctx.Value(CellTicketKey).(*semaphore.Semaphore); ok {
return sem
}
return nil
}
type JailedRequest struct {
method string
ctx context.Context
@ -83,7 +101,13 @@ func NewJailedRequestsQueue() *JailedRequestQueue {
}
}
func (q *JailedRequestQueue) PreProcessRequest(vm *otto.Otto, req RPCCall) {
func (q *JailedRequestQueue) PreProcessRequest(ticket *semaphore.Semaphore, vm *otto.Otto, req RPCCall) error {
// serialize access
err := ticket.Acquire()
if err != nil {
return err
}
messageId := currentMessageId(vm.Context())
// save request context for reuse (by request handlers, such as queued transaction signal sender)
@ -92,19 +116,28 @@ func (q *JailedRequestQueue) PreProcessRequest(vm *otto.Otto, req RPCCall) {
if len(messageId) > 0 {
ctx = context.WithValue(ctx, MessageIdKey, messageId)
}
// onSendTransactionRequest() will use context to obtain and release ticket
if req.Method == SendTransactionRequest {
ctx = context.WithValue(ctx, CellTicketKey, ticket)
} else {
ticket.Release()
}
q.saveRequestContext(vm, ctx, req)
return nil
}
func (q *JailedRequestQueue) PostProcessRequest(vm *otto.Otto, req RPCCall) {
// set message id (if present in context)
messageId := currentMessageId(vm.Context())
if len(messageId) > 0 {
vm.Call("addContext", nil, MessageIdKey, messageId)
vm.Call("addContext", nil, messageId, MessageIdKey, messageId)
}
// set extra markers for queued transaction requests
if req.Method == SendTransactionRequest {
vm.Call("addContext", nil, SendTransactionRequest, true)
vm.Call("addContext", nil, messageId, SendTransactionRequest, true)
}
}
@ -219,7 +252,7 @@ func hashFromQueuedTx(queuedTx *status.QueuedTx) string {
method: SendTransactionRequest,
from: queuedTx.Args.From.Hex(),
to: queuedTx.Args.To.Hex(),
value: string(bytes.Replace(value, []byte(`"`),[]byte("") , 2)),
value: string(bytes.Replace(value, []byte(`"`), []byte(""), 2)),
data: queuedTx.Args.Data,
}

View File

@ -80,9 +80,13 @@ func (jail *Jail) Parse(chatId string, js string) string {
_, err := vm.Run(initJjs)
vm.Set("jeth", struct{}{})
sendHandler := func(call otto.FunctionCall) (response otto.Value) {
return jail.Send(chatId, call)
}
jethObj, _ := vm.Get("jeth")
jethObj.Object().Set("send", jail.Send)
jethObj.Object().Set("sendAsync", jail.Send)
jethObj.Object().Set("send", sendHandler)
jethObj.Object().Set("sendAsync", sendHandler)
jjs := Web3_JS + `
var Web3 = require('web3');
@ -110,10 +114,6 @@ func (jail *Jail) Call(chatId string, path string, args string) string {
return printError(fmt.Sprintf("Cell[%s] doesn't exist.", chatId))
}
// serialize requests to VM
cell.sem.Acquire()
defer cell.sem.Release()
res, err := cell.vm.Call("call", nil, path, args)
return printResult(res.String(), err)
@ -133,7 +133,12 @@ func (jail *Jail) GetVM(chatId string) (*otto.Otto, error) {
}
// Send will serialize the first argument, send it to the node and returns the response.
func (jail *Jail) Send(call otto.FunctionCall) (response otto.Value) {
func (jail *Jail) Send(chatId string, call otto.FunctionCall) (response otto.Value) {
cell, ok := jail.cells[chatId]
if !ok {
throwJSException(fmt.Errorf("Cell[%s] doesn't exist.", chatId))
}
clientFactory, err := jail.ClientRestartWrapper()
if err != nil {
return newErrorResponse(call, -32603, err.Error(), nil)
@ -171,9 +176,13 @@ func (jail *Jail) Send(call otto.FunctionCall) (response otto.Value) {
resp.Set("id", req.Id)
var result json.RawMessage
// do extra request pre and post processing (message id persisting, setting tx context)
requestQueue.PreProcessRequest(call.Otto, req)
defer requestQueue.PostProcessRequest(call.Otto, req)
// do extra request pre processing (persist message id)
// within function semaphore will be acquired and released,
// so that no more than one client (per cell) can enter
err := requestQueue.PreProcessRequest(cell.sem, call.Otto, req)
if err != nil {
return newErrorResponse(call, -32603, err.Error(), nil)
}
client := clientFactory.Client()
errc := make(chan error, 1)
@ -207,6 +216,9 @@ func (jail *Jail) Send(call otto.FunctionCall) (response otto.Value) {
resp = newErrorResponse(call, -32603, err.Error(), &req.Id).Object()
}
resps.Call("push", resp)
// do extra request post processing (setting back tx context)
requestQueue.PostProcessRequest(call.Otto, req)
}
// Return the responses either to the callback (if supplied)

View File

@ -4,7 +4,8 @@ var _status_catalog = {
};
var context = {};
function addContext(key, value) {
function addContext(ns, key, value) {
// we ignore ns here (as ns is message id, and in this UC we do not have one)
context[key] = value;
}

View File

@ -10,8 +10,11 @@ var _status_catalog = {
};
var context = {};
function addContext(key, value) { // this function is expected to be present, as status-go uses it to set context
context[key] = value;
function addContext(ns, key, value) { // this function is expected to be present, as status-go uses it to set context
if (!(ns in context)) {
context[ns] = {}
}
context[ns][key] = value;
}
function call(pathStr, paramsStr) {
@ -19,6 +22,9 @@ function call(pathStr, paramsStr) {
path = JSON.parse(pathStr),
fn, res;
// Since we allow next request to proceed *immediately* after jail obtains message id
// we should be careful overwritting global context variable.
// We probably should limit/scope to context[message_id] = {}
context = {};
fn = path.reduce(function(catalog, name) {
@ -36,9 +42,13 @@ function call(pathStr, paramsStr) {
callResult = fn(params);
res = {
result: callResult,
// so context could contain {eth_transactionSend: true}
// additionally, context gets `message_id` as well
context: context
// So, context could contain {eth_transactionSend: true}
// additionally, context gets `message_id` as well.
// You can scope returned context by returning context[message_id],
// however since serialization guard will be released immediately after message id
// is obtained, you need to be careful if you use global message id (it
// works below, in test, it will not work as expected in highly concurrent environment)
context: context[status.message_id]
};
return JSON.stringify(res);

View File

@ -997,16 +997,10 @@ func (s *PublicTransactionPoolAPI) SendTransaction(ctx context.Context, args Sen
s.txQueue <- queuedTx
// now wait up until transaction is complete (via call to CompleteQueuedTransaction) or timeout occurs
timeout := make(chan struct{}, 1)
go func() {
time.Sleep(status.DefaultTxSendCompletionTimeout * time.Second)
timeout <- struct{}{}
}()
select {
case <-queuedTx.Done:
return queuedTx.Hash, queuedTx.Err
case <-timeout:
case <-time.After(status.DefaultTxSendCompletionTimeout * time.Second):
return common.Hash{}, errors.New("transaction sending timed out")
}