From e8ef42eb06b0b9fa116bac9e7bda710388cda162 Mon Sep 17 00:00:00 2001 From: Victor Farazdagi Date: Thu, 13 Oct 2016 21:02:48 +0300 Subject: [PATCH] Send transaction: fix jailed requests - VM Cell access is isolated (each request runs w/i copy of the original VM) - SendTransaction is intercepted and executed w/o RPC call --- geth/node_test.go | 4 +- geth/txqueue.go | 181 +++++++++++-------------------------------- geth/txqueue_test.go | 4 +- geth/utils.go | 12 +-- jail/jail.go | 33 ++++++-- jail/jail_test.go | 61 +++++++++------ 6 files changed, 113 insertions(+), 182 deletions(-) diff --git a/geth/node_test.go b/geth/node_test.go index 6ddcc5c58..459c20a36 100644 --- a/geth/node_test.go +++ b/geth/node_test.go @@ -28,7 +28,7 @@ func TestMain(m *testing.M) { } // make sure you panic if node start signal is not received signalRecieved := make(chan struct{}, 1) - abortPanic := make(chan bool, 1) + abortPanic := make(chan struct{}, 1) if syncRequired { geth.PanicAfter(geth.TestNodeSyncSeconds*time.Second, abortPanic, "TestNodeSetup") } else { @@ -48,7 +48,7 @@ func TestMain(m *testing.M) { } <-signalRecieved // block and wait for either panic or successful signal - abortPanic <- true + abortPanic <- struct{}{} os.Exit(m.Run()) } diff --git a/geth/txqueue.go b/geth/txqueue.go index 11ad414dc..0913906a7 100644 --- a/geth/txqueue.go +++ b/geth/txqueue.go @@ -8,15 +8,14 @@ extern bool StatusServiceSignalEvent( const char *jsonEvent ); import "C" import ( - "bytes" "context" "encoding/json" - "fmt" + "math/big" + "strconv" - "github.com/cnf/structhash" - "github.com/eapache/go-resiliency/semaphore" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/les/status" + "github.com/ethereum/go-ethereum/rpc" "github.com/robertkrimen/otto" ) @@ -24,27 +23,15 @@ const ( EventTransactionQueued = "transaction.queued" SendTransactionRequest = "eth_sendTransaction" MessageIdKey = "message_id" - CellTicketKey = "cell_ticket" ) func onSendTransactionRequest(queuedTx status.QueuedTx) { - requestCtx := context.Background() - requestQueue, err := GetNodeManager().JailedRequestQueue() - if err == nil { - requestCtx = requestQueue.PopQueuedTxContext(&queuedTx) - } - - // request context obtained (if exists), safe to release the ticket - if ticket := cellTicketFromContext(requestCtx); ticket != nil { - ticket.Release() - } - event := GethEvent{ Type: EventTransactionQueued, Event: SendTransactionEvent{ Id: string(queuedTx.Id), Args: queuedTx.Args, - MessageId: messageIdFromContext(requestCtx), + MessageId: messageIdFromContext(queuedTx.Context), }, } @@ -74,63 +61,19 @@ func messageIdFromContext(ctx context.Context) string { return "" } -func cellTicketFromContext(ctx context.Context) *semaphore.Semaphore { - if ctx == nil { - return nil - } - if sem, ok := ctx.Value(CellTicketKey).(*semaphore.Semaphore); ok { - return sem - } - - return nil -} - -type JailedRequest struct { - method string - ctx context.Context - vm *otto.Otto -} - -type JailedRequestQueue struct { - requests map[string]*JailedRequest -} +type JailedRequestQueue struct{} func NewJailedRequestsQueue() *JailedRequestQueue { - return &JailedRequestQueue{ - requests: make(map[string]*JailedRequest), - } + return &JailedRequestQueue{} } -func (q *JailedRequestQueue) PreProcessRequest(ticket *semaphore.Semaphore, vm *otto.Otto, req RPCCall) error { - // serialize access - err := ticket.Acquire() - if err != nil { - return err - } - +func (q *JailedRequestQueue) PreProcessRequest(vm *otto.Otto, req RPCCall) (string, error) { messageId := currentMessageId(vm.Context()) - // save request context for reuse (by request handlers, such as queued transaction signal sender) - ctx := context.Background() - ctx = context.WithValue(ctx, "method", req.Method) - if len(messageId) > 0 { - ctx = context.WithValue(ctx, MessageIdKey, messageId) - } - - // onSendTransactionRequest() will use context to obtain and release ticket - if req.Method == SendTransactionRequest { - ctx = context.WithValue(ctx, CellTicketKey, ticket) - } else { - ticket.Release() - } - q.saveRequestContext(vm, ctx, req) - - return nil + return messageId, nil } -func (q *JailedRequestQueue) PostProcessRequest(vm *otto.Otto, req RPCCall) { - // set message id (if present in context) - messageId := currentMessageId(vm.Context()) +func (q *JailedRequestQueue) PostProcessRequest(vm *otto.Otto, req RPCCall, messageId string) { if len(messageId) > 0 { vm.Call("addContext", nil, messageId, MessageIdKey, messageId) } @@ -141,41 +84,32 @@ func (q *JailedRequestQueue) PostProcessRequest(vm *otto.Otto, req RPCCall) { } } -func (q *JailedRequestQueue) saveRequestContext(vm *otto.Otto, ctx context.Context, req RPCCall) { - hash := hashFromRPCCall(req) +func (q *JailedRequestQueue) ProcessSendTransactionRequest(vm *otto.Otto, req RPCCall) (common.Hash, error) { + // obtain status backend from LES service + lightEthereum, err := GetNodeManager().LightEthereumService() + if err != nil { + return common.Hash{}, err + } + backend := lightEthereum.StatusBackend - if len(hash) == 0 { // no need to persist empty hash - return + messageId, err := q.PreProcessRequest(vm, req) + if err != nil { + return common.Hash{}, err + } + // onSendTransactionRequest() will use context to obtain and release ticket + ctx := context.Background() + ctx = context.WithValue(ctx, MessageIdKey, messageId) + + // this call blocks, up until Complete Transaction is called + txHash, err := backend.SendTransaction(ctx, sendTxArgsFromRPCCall(req)) + if err != nil { + return common.Hash{}, err } - q.requests[hash] = &JailedRequest{ - method: req.Method, - ctx: ctx, - vm: vm, - } -} + // invoke post processing + q.PostProcessRequest(vm, req, messageId) -func (q *JailedRequestQueue) GetQueuedTxContext(queuedTx *status.QueuedTx) context.Context { - hash := hashFromQueuedTx(queuedTx) - - req, ok := q.requests[hash] - if ok { - return req.ctx - } - - return context.Background() -} - -func (q *JailedRequestQueue) PopQueuedTxContext(queuedTx *status.QueuedTx) context.Context { - hash := hashFromQueuedTx(queuedTx) - - req, ok := q.requests[hash] - if ok { - delete(q.requests, hash) - return req.ctx - } - - return context.Background() + return txHash, nil } // currentMessageId looks for `status.message_id` variable in current JS context @@ -193,22 +127,14 @@ func currentMessageId(ctx otto.Context) string { return "" } -type HashableSendRequest struct { - method string - from string - to string - value string - data string -} - -func hashFromRPCCall(req RPCCall) string { +func sendTxArgsFromRPCCall(req RPCCall) status.SendTxArgs { if req.Method != SendTransactionRequest { // no need to persist extra state for other requests - return "" + return status.SendTxArgs{} } params, ok := req.Params[0].(map[string]interface{}) if !ok { - return "" + return status.SendTxArgs{} } from, ok := params["from"].(string) @@ -221,9 +147,13 @@ func hashFromRPCCall(req RPCCall) string { to = "" } - value, ok := params["value"].(string) + param, ok := params["value"].(string) if !ok { - value = "" + param = "0x0" + } + value, err := strconv.ParseInt(param, 0, 64) + if err != nil { + return status.SendTxArgs{} } data, ok := params["data"].(string) @@ -231,30 +161,11 @@ func hashFromRPCCall(req RPCCall) string { data = "" } - s := HashableSendRequest{ - method: req.Method, - from: from, - to: to, - value: value, - data: data, + toAddress := common.HexToAddress(to) + return status.SendTxArgs{ + From: common.HexToAddress(from), + To: &toAddress, + Value: rpc.NewHexNumber(big.NewInt(value)), + Data: data, } - - return fmt.Sprintf("%x", structhash.Sha1(s, 1)) -} - -func hashFromQueuedTx(queuedTx *status.QueuedTx) string { - value, err := queuedTx.Args.Value.MarshalJSON() - if err != nil { - return "" - } - - s := HashableSendRequest{ - method: SendTransactionRequest, - from: queuedTx.Args.From.Hex(), - to: queuedTx.Args.To.Hex(), - value: string(bytes.Replace(value, []byte(`"`), []byte(""), 2)), - data: queuedTx.Args.Data, - } - - return fmt.Sprintf("%x", structhash.Sha1(s, 1)) } diff --git a/geth/txqueue_test.go b/geth/txqueue_test.go index c7979c695..0de1e5160 100644 --- a/geth/txqueue_test.go +++ b/geth/txqueue_test.go @@ -45,7 +45,7 @@ func TestQueuedTransactions(t *testing.T) { backend := lightEthereum.StatusBackend // make sure you panic if transaction complete doesn't return - completeQueuedTransaction := make(chan bool, 1) + completeQueuedTransaction := make(chan struct{}, 1) geth.PanicAfter(20*time.Second, completeQueuedTransaction, "TestQueuedTransactions") // replace transaction notification handler @@ -67,7 +67,7 @@ func TestQueuedTransactions(t *testing.T) { } glog.V(logger.Info).Infof("Transaction complete: https://testnet.etherscan.io/tx/%s", txHash.Hex()) - completeQueuedTransaction <- true // so that timeout is aborted + completeQueuedTransaction <- struct{}{} // so that timeout is aborted } }) diff --git a/geth/utils.go b/geth/utils.go index daf8496f7..77d416f3c 100644 --- a/geth/utils.go +++ b/geth/utils.go @@ -167,20 +167,12 @@ func PreprocessDataDir(dataDir string) (string, error) { } // PanicAfter throws panic() after waitSeconds, unless abort channel receives notification -func PanicAfter(waitSeconds time.Duration, abort chan bool, desc string) { - // panic if function takes too long - timeout := make(chan bool, 1) - - go func() { - time.Sleep(waitSeconds) - timeout <- true - }() - +func PanicAfter(waitSeconds time.Duration, abort chan struct{}, desc string) { go func() { select { case <-abort: return - case <-timeout: + case <-time.After(waitSeconds): panic("whatever you were doing takes toooo long: " + desc) } }() diff --git a/jail/jail.go b/jail/jail.go index 1e47f570f..64174e1c5 100644 --- a/jail/jail.go +++ b/jail/jail.go @@ -24,6 +24,7 @@ var ( ) type Jail struct { + sync.RWMutex client *rpc.ClientRestartWrapper // lazy inited on the first call to jail.ClientRestartWrapper() cells map[string]*JailedRuntime // jail supports running many isolated instances of jailed runtime statusJS string @@ -73,6 +74,9 @@ func (jail *Jail) Parse(chatId string, js string) string { return printError(ErrInvalidJail.Error()) } + jail.Lock() + defer jail.Unlock() + jail.cells[chatId] = NewJailedRuntime(chatId) vm := jail.cells[chatId].vm @@ -109,12 +113,16 @@ func (jail *Jail) Call(chatId string, path string, args string) string { return printError(err.Error()) } + jail.RLock() cell, ok := jail.cells[chatId] if !ok { + jail.RUnlock() return printError(fmt.Sprintf("Cell[%s] doesn't exist.", chatId)) } + jail.RUnlock() - res, err := cell.vm.Call("call", nil, path, args) + vm := cell.vm.Copy() // isolate VM to allow concurrent access + res, err := vm.Call("call", nil, path, args) return printResult(res.String(), err) } @@ -124,6 +132,9 @@ func (jail *Jail) GetVM(chatId string) (*otto.Otto, error) { return nil, ErrInvalidJail } + jail.RLock() + defer jail.RUnlock() + cell, ok := jail.cells[chatId] if !ok { return nil, fmt.Errorf("Cell[%s] doesn't exist.", chatId) @@ -134,11 +145,6 @@ func (jail *Jail) GetVM(chatId string) (*otto.Otto, error) { // Send will serialize the first argument, send it to the node and returns the response. func (jail *Jail) Send(chatId string, call otto.FunctionCall) (response otto.Value) { - cell, ok := jail.cells[chatId] - if !ok { - throwJSException(fmt.Errorf("Cell[%s] doesn't exist.", chatId)) - } - clientFactory, err := jail.ClientRestartWrapper() if err != nil { return newErrorResponse(call, -32603, err.Error(), nil) @@ -176,10 +182,21 @@ func (jail *Jail) Send(chatId string, call otto.FunctionCall) (response otto.Val resp.Set("id", req.Id) var result json.RawMessage + // execute directly w/o RPC call to node + if req.Method == geth.SendTransactionRequest { + txHash, err := requestQueue.ProcessSendTransactionRequest(call.Otto, req) + resp.Set("result", txHash.Hex()) + if err != nil { + resp = newErrorResponse(call, -32603, err.Error(), &req.Id).Object() + } + resps.Call("push", resp) + continue + } + // do extra request pre processing (persist message id) // within function semaphore will be acquired and released, // so that no more than one client (per cell) can enter - err := requestQueue.PreProcessRequest(cell.sem, call.Otto, req) + messageId, err := requestQueue.PreProcessRequest(call.Otto, req) if err != nil { return newErrorResponse(call, -32603, err.Error(), nil) } @@ -218,7 +235,7 @@ func (jail *Jail) Send(chatId string, call otto.FunctionCall) (response otto.Val resps.Call("push", resp) // do extra request post processing (setting back tx context) - requestQueue.PostProcessRequest(call.Otto, req) + requestQueue.PostProcessRequest(call.Otto, req, messageId) } // Return the responses either to the callback (if supplied) diff --git a/jail/jail_test.go b/jail/jail_test.go index f29291b52..6510863a8 100644 --- a/jail/jail_test.go +++ b/jail/jail_test.go @@ -214,10 +214,11 @@ func TestJailSendQueuedTransaction(t *testing.T) { "value": "0.000001" }` - transactionCompletedSuccessfully := make(chan bool) + txCompletedSuccessfully := make(chan struct{}) + txCompletedCounter := make(chan struct{}) + txHashes := make(chan common.Hash) // replace transaction notification handler - var txHash = common.Hash{} requireMessageId := false geth.SetDefaultNodeNotificationHandler(func(jsonEvent string) { var envelope geth.GethEvent @@ -246,34 +247,38 @@ func TestJailSendQueuedTransaction(t *testing.T) { t.Logf("Transaction queued (will be completed in 5 secs): {id: %s}\n", event["id"].(string)) time.Sleep(5 * time.Second) + var txHash common.Hash if txHash, err = geth.CompleteTransaction(event["id"].(string), TEST_ADDRESS_PASSWORD); err != nil { t.Errorf("cannot complete queued transation[%v]: %v", event["id"], err) - return + } else { + t.Logf("Transaction complete: https://testnet.etherscan.io/tx/%s", txHash.Hex()) } - t.Logf("Transaction complete: https://testnet.etherscan.io/tx/%s", txHash.Hex()) - transactionCompletedSuccessfully <- true // so that timeout is aborted + txCompletedSuccessfully <- struct{}{} // so that timeout is aborted + txHashes <- txHash + txCompletedCounter <- struct{}{} } }) - type cmd struct { + type testCommand struct { command string params string expectedResponse string } - - tests := []struct { + type testCase struct { name string file string requireMessageId bool - commands []cmd - }{ + commands []testCommand + } + + tests := []testCase{ { // no context or message id name: "Case 1: no message id or context in inited JS", file: "no-message-id-or-context.js", requireMessageId: false, - commands: []cmd{ + commands: []testCommand{ { `["commands", "send"]`, txParams, @@ -291,7 +296,7 @@ func TestJailSendQueuedTransaction(t *testing.T) { name: "Case 2: context is present in inited JS (but no message id is there)", file: "context-no-message-id.js", requireMessageId: false, - commands: []cmd{ + commands: []testCommand{ { `["commands", "send"]`, txParams, @@ -309,7 +314,7 @@ func TestJailSendQueuedTransaction(t *testing.T) { name: "Case 3: message id is present, context is not present", file: "message-id-no-context.js", requireMessageId: true, - commands: []cmd{ + commands: []testCommand{ { `["commands", "send"]`, txParams, @@ -327,7 +332,7 @@ func TestJailSendQueuedTransaction(t *testing.T) { name: "Case 4: both message id and context are present", file: "tx-send.js", requireMessageId: true, - commands: []cmd{ + commands: []testCommand{ { `["commands", "send"]`, txParams, @@ -336,29 +341,35 @@ func TestJailSendQueuedTransaction(t *testing.T) { { `["commands", "getBalance"]`, `{"address": "` + TEST_ADDRESS + `"}`, - `{"result": {"context":{"message_id":"foobar"},"result":{"balance":42}}}`, // message id in context! + `{"result": {"context":{"message_id":"42"},"result":{"balance":42}}}`, // message id in context, but default one is used! }, }, }, } - //var jailInstance *jail.Jail for _, test := range tests { jailInstance := jail.Init(geth.LoadFromFile(TESTDATA_TX_SEND_JS + test.file)) - geth.PanicAfter(20*time.Second, transactionCompletedSuccessfully, test.name) + geth.PanicAfter(60*time.Second, txCompletedSuccessfully, test.name) jailInstance.Parse(CHAT_ID_SEND, ``) requireMessageId = test.requireMessageId - for _, cmd := range test.commands { - t.Logf("%s: %s", test.name, cmd.command) - response := jailInstance.Call(CHAT_ID_SEND, cmd.command, cmd.params) - expectedResponse := strings.Replace(cmd.expectedResponse, "TX_HASH", txHash.Hex(), 1) - if response != expectedResponse { - t.Errorf("expected response is not returned: expected %s, got %s", expectedResponse, response) - return - } + for _, command := range test.commands { + go func(jail *jail.Jail, test testCase, command testCommand) { + t.Logf("->%s: %s", test.name, command.command) + response := jail.Call(CHAT_ID_SEND, command.command, command.params) + var txHash common.Hash + if command.command == `["commands", "send"]` { + txHash = <-txHashes + } + expectedResponse := strings.Replace(command.expectedResponse, "TX_HASH", txHash.Hex(), 1) + if response != expectedResponse { + t.Errorf("expected response is not returned: expected %s, got %s", expectedResponse, response) + return + } + }(jailInstance, test, command) } + <-txCompletedCounter } }