Send transaction: fix jailed requests

- VM Cell access is isolated (each request runs w/i copy of the original VM)
- SendTransaction is intercepted and executed w/o RPC call
This commit is contained in:
Victor Farazdagi 2016-10-13 21:02:48 +03:00
parent 6abbca9935
commit e8ef42eb06
6 changed files with 113 additions and 182 deletions

View File

@ -28,7 +28,7 @@ func TestMain(m *testing.M) {
}
// make sure you panic if node start signal is not received
signalRecieved := make(chan struct{}, 1)
abortPanic := make(chan bool, 1)
abortPanic := make(chan struct{}, 1)
if syncRequired {
geth.PanicAfter(geth.TestNodeSyncSeconds*time.Second, abortPanic, "TestNodeSetup")
} else {
@ -48,7 +48,7 @@ func TestMain(m *testing.M) {
}
<-signalRecieved // block and wait for either panic or successful signal
abortPanic <- true
abortPanic <- struct{}{}
os.Exit(m.Run())
}

View File

@ -8,15 +8,14 @@ extern bool StatusServiceSignalEvent( const char *jsonEvent );
import "C"
import (
"bytes"
"context"
"encoding/json"
"fmt"
"math/big"
"strconv"
"github.com/cnf/structhash"
"github.com/eapache/go-resiliency/semaphore"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/les/status"
"github.com/ethereum/go-ethereum/rpc"
"github.com/robertkrimen/otto"
)
@ -24,27 +23,15 @@ const (
EventTransactionQueued = "transaction.queued"
SendTransactionRequest = "eth_sendTransaction"
MessageIdKey = "message_id"
CellTicketKey = "cell_ticket"
)
func onSendTransactionRequest(queuedTx status.QueuedTx) {
requestCtx := context.Background()
requestQueue, err := GetNodeManager().JailedRequestQueue()
if err == nil {
requestCtx = requestQueue.PopQueuedTxContext(&queuedTx)
}
// request context obtained (if exists), safe to release the ticket
if ticket := cellTicketFromContext(requestCtx); ticket != nil {
ticket.Release()
}
event := GethEvent{
Type: EventTransactionQueued,
Event: SendTransactionEvent{
Id: string(queuedTx.Id),
Args: queuedTx.Args,
MessageId: messageIdFromContext(requestCtx),
MessageId: messageIdFromContext(queuedTx.Context),
},
}
@ -74,63 +61,19 @@ func messageIdFromContext(ctx context.Context) string {
return ""
}
func cellTicketFromContext(ctx context.Context) *semaphore.Semaphore {
if ctx == nil {
return nil
}
if sem, ok := ctx.Value(CellTicketKey).(*semaphore.Semaphore); ok {
return sem
}
return nil
}
type JailedRequest struct {
method string
ctx context.Context
vm *otto.Otto
}
type JailedRequestQueue struct {
requests map[string]*JailedRequest
}
type JailedRequestQueue struct{}
func NewJailedRequestsQueue() *JailedRequestQueue {
return &JailedRequestQueue{
requests: make(map[string]*JailedRequest),
}
return &JailedRequestQueue{}
}
func (q *JailedRequestQueue) PreProcessRequest(ticket *semaphore.Semaphore, vm *otto.Otto, req RPCCall) error {
// serialize access
err := ticket.Acquire()
if err != nil {
return err
}
func (q *JailedRequestQueue) PreProcessRequest(vm *otto.Otto, req RPCCall) (string, error) {
messageId := currentMessageId(vm.Context())
// save request context for reuse (by request handlers, such as queued transaction signal sender)
ctx := context.Background()
ctx = context.WithValue(ctx, "method", req.Method)
if len(messageId) > 0 {
ctx = context.WithValue(ctx, MessageIdKey, messageId)
}
// onSendTransactionRequest() will use context to obtain and release ticket
if req.Method == SendTransactionRequest {
ctx = context.WithValue(ctx, CellTicketKey, ticket)
} else {
ticket.Release()
}
q.saveRequestContext(vm, ctx, req)
return nil
return messageId, nil
}
func (q *JailedRequestQueue) PostProcessRequest(vm *otto.Otto, req RPCCall) {
// set message id (if present in context)
messageId := currentMessageId(vm.Context())
func (q *JailedRequestQueue) PostProcessRequest(vm *otto.Otto, req RPCCall, messageId string) {
if len(messageId) > 0 {
vm.Call("addContext", nil, messageId, MessageIdKey, messageId)
}
@ -141,41 +84,32 @@ func (q *JailedRequestQueue) PostProcessRequest(vm *otto.Otto, req RPCCall) {
}
}
func (q *JailedRequestQueue) saveRequestContext(vm *otto.Otto, ctx context.Context, req RPCCall) {
hash := hashFromRPCCall(req)
func (q *JailedRequestQueue) ProcessSendTransactionRequest(vm *otto.Otto, req RPCCall) (common.Hash, error) {
// obtain status backend from LES service
lightEthereum, err := GetNodeManager().LightEthereumService()
if err != nil {
return common.Hash{}, err
}
backend := lightEthereum.StatusBackend
if len(hash) == 0 { // no need to persist empty hash
return
messageId, err := q.PreProcessRequest(vm, req)
if err != nil {
return common.Hash{}, err
}
// onSendTransactionRequest() will use context to obtain and release ticket
ctx := context.Background()
ctx = context.WithValue(ctx, MessageIdKey, messageId)
// this call blocks, up until Complete Transaction is called
txHash, err := backend.SendTransaction(ctx, sendTxArgsFromRPCCall(req))
if err != nil {
return common.Hash{}, err
}
q.requests[hash] = &JailedRequest{
method: req.Method,
ctx: ctx,
vm: vm,
}
}
// invoke post processing
q.PostProcessRequest(vm, req, messageId)
func (q *JailedRequestQueue) GetQueuedTxContext(queuedTx *status.QueuedTx) context.Context {
hash := hashFromQueuedTx(queuedTx)
req, ok := q.requests[hash]
if ok {
return req.ctx
}
return context.Background()
}
func (q *JailedRequestQueue) PopQueuedTxContext(queuedTx *status.QueuedTx) context.Context {
hash := hashFromQueuedTx(queuedTx)
req, ok := q.requests[hash]
if ok {
delete(q.requests, hash)
return req.ctx
}
return context.Background()
return txHash, nil
}
// currentMessageId looks for `status.message_id` variable in current JS context
@ -193,22 +127,14 @@ func currentMessageId(ctx otto.Context) string {
return ""
}
type HashableSendRequest struct {
method string
from string
to string
value string
data string
}
func hashFromRPCCall(req RPCCall) string {
func sendTxArgsFromRPCCall(req RPCCall) status.SendTxArgs {
if req.Method != SendTransactionRequest { // no need to persist extra state for other requests
return ""
return status.SendTxArgs{}
}
params, ok := req.Params[0].(map[string]interface{})
if !ok {
return ""
return status.SendTxArgs{}
}
from, ok := params["from"].(string)
@ -221,9 +147,13 @@ func hashFromRPCCall(req RPCCall) string {
to = ""
}
value, ok := params["value"].(string)
param, ok := params["value"].(string)
if !ok {
value = ""
param = "0x0"
}
value, err := strconv.ParseInt(param, 0, 64)
if err != nil {
return status.SendTxArgs{}
}
data, ok := params["data"].(string)
@ -231,30 +161,11 @@ func hashFromRPCCall(req RPCCall) string {
data = ""
}
s := HashableSendRequest{
method: req.Method,
from: from,
to: to,
value: value,
data: data,
toAddress := common.HexToAddress(to)
return status.SendTxArgs{
From: common.HexToAddress(from),
To: &toAddress,
Value: rpc.NewHexNumber(big.NewInt(value)),
Data: data,
}
return fmt.Sprintf("%x", structhash.Sha1(s, 1))
}
func hashFromQueuedTx(queuedTx *status.QueuedTx) string {
value, err := queuedTx.Args.Value.MarshalJSON()
if err != nil {
return ""
}
s := HashableSendRequest{
method: SendTransactionRequest,
from: queuedTx.Args.From.Hex(),
to: queuedTx.Args.To.Hex(),
value: string(bytes.Replace(value, []byte(`"`), []byte(""), 2)),
data: queuedTx.Args.Data,
}
return fmt.Sprintf("%x", structhash.Sha1(s, 1))
}

View File

@ -45,7 +45,7 @@ func TestQueuedTransactions(t *testing.T) {
backend := lightEthereum.StatusBackend
// make sure you panic if transaction complete doesn't return
completeQueuedTransaction := make(chan bool, 1)
completeQueuedTransaction := make(chan struct{}, 1)
geth.PanicAfter(20*time.Second, completeQueuedTransaction, "TestQueuedTransactions")
// replace transaction notification handler
@ -67,7 +67,7 @@ func TestQueuedTransactions(t *testing.T) {
}
glog.V(logger.Info).Infof("Transaction complete: https://testnet.etherscan.io/tx/%s", txHash.Hex())
completeQueuedTransaction <- true // so that timeout is aborted
completeQueuedTransaction <- struct{}{} // so that timeout is aborted
}
})

View File

@ -167,20 +167,12 @@ func PreprocessDataDir(dataDir string) (string, error) {
}
// PanicAfter throws panic() after waitSeconds, unless abort channel receives notification
func PanicAfter(waitSeconds time.Duration, abort chan bool, desc string) {
// panic if function takes too long
timeout := make(chan bool, 1)
go func() {
time.Sleep(waitSeconds)
timeout <- true
}()
func PanicAfter(waitSeconds time.Duration, abort chan struct{}, desc string) {
go func() {
select {
case <-abort:
return
case <-timeout:
case <-time.After(waitSeconds):
panic("whatever you were doing takes toooo long: " + desc)
}
}()

View File

@ -24,6 +24,7 @@ var (
)
type Jail struct {
sync.RWMutex
client *rpc.ClientRestartWrapper // lazy inited on the first call to jail.ClientRestartWrapper()
cells map[string]*JailedRuntime // jail supports running many isolated instances of jailed runtime
statusJS string
@ -73,6 +74,9 @@ func (jail *Jail) Parse(chatId string, js string) string {
return printError(ErrInvalidJail.Error())
}
jail.Lock()
defer jail.Unlock()
jail.cells[chatId] = NewJailedRuntime(chatId)
vm := jail.cells[chatId].vm
@ -109,12 +113,16 @@ func (jail *Jail) Call(chatId string, path string, args string) string {
return printError(err.Error())
}
jail.RLock()
cell, ok := jail.cells[chatId]
if !ok {
jail.RUnlock()
return printError(fmt.Sprintf("Cell[%s] doesn't exist.", chatId))
}
jail.RUnlock()
res, err := cell.vm.Call("call", nil, path, args)
vm := cell.vm.Copy() // isolate VM to allow concurrent access
res, err := vm.Call("call", nil, path, args)
return printResult(res.String(), err)
}
@ -124,6 +132,9 @@ func (jail *Jail) GetVM(chatId string) (*otto.Otto, error) {
return nil, ErrInvalidJail
}
jail.RLock()
defer jail.RUnlock()
cell, ok := jail.cells[chatId]
if !ok {
return nil, fmt.Errorf("Cell[%s] doesn't exist.", chatId)
@ -134,11 +145,6 @@ func (jail *Jail) GetVM(chatId string) (*otto.Otto, error) {
// Send will serialize the first argument, send it to the node and returns the response.
func (jail *Jail) Send(chatId string, call otto.FunctionCall) (response otto.Value) {
cell, ok := jail.cells[chatId]
if !ok {
throwJSException(fmt.Errorf("Cell[%s] doesn't exist.", chatId))
}
clientFactory, err := jail.ClientRestartWrapper()
if err != nil {
return newErrorResponse(call, -32603, err.Error(), nil)
@ -176,10 +182,21 @@ func (jail *Jail) Send(chatId string, call otto.FunctionCall) (response otto.Val
resp.Set("id", req.Id)
var result json.RawMessage
// execute directly w/o RPC call to node
if req.Method == geth.SendTransactionRequest {
txHash, err := requestQueue.ProcessSendTransactionRequest(call.Otto, req)
resp.Set("result", txHash.Hex())
if err != nil {
resp = newErrorResponse(call, -32603, err.Error(), &req.Id).Object()
}
resps.Call("push", resp)
continue
}
// do extra request pre processing (persist message id)
// within function semaphore will be acquired and released,
// so that no more than one client (per cell) can enter
err := requestQueue.PreProcessRequest(cell.sem, call.Otto, req)
messageId, err := requestQueue.PreProcessRequest(call.Otto, req)
if err != nil {
return newErrorResponse(call, -32603, err.Error(), nil)
}
@ -218,7 +235,7 @@ func (jail *Jail) Send(chatId string, call otto.FunctionCall) (response otto.Val
resps.Call("push", resp)
// do extra request post processing (setting back tx context)
requestQueue.PostProcessRequest(call.Otto, req)
requestQueue.PostProcessRequest(call.Otto, req, messageId)
}
// Return the responses either to the callback (if supplied)

View File

@ -214,10 +214,11 @@ func TestJailSendQueuedTransaction(t *testing.T) {
"value": "0.000001"
}`
transactionCompletedSuccessfully := make(chan bool)
txCompletedSuccessfully := make(chan struct{})
txCompletedCounter := make(chan struct{})
txHashes := make(chan common.Hash)
// replace transaction notification handler
var txHash = common.Hash{}
requireMessageId := false
geth.SetDefaultNodeNotificationHandler(func(jsonEvent string) {
var envelope geth.GethEvent
@ -246,34 +247,38 @@ func TestJailSendQueuedTransaction(t *testing.T) {
t.Logf("Transaction queued (will be completed in 5 secs): {id: %s}\n", event["id"].(string))
time.Sleep(5 * time.Second)
var txHash common.Hash
if txHash, err = geth.CompleteTransaction(event["id"].(string), TEST_ADDRESS_PASSWORD); err != nil {
t.Errorf("cannot complete queued transation[%v]: %v", event["id"], err)
return
} else {
t.Logf("Transaction complete: https://testnet.etherscan.io/tx/%s", txHash.Hex())
}
t.Logf("Transaction complete: https://testnet.etherscan.io/tx/%s", txHash.Hex())
transactionCompletedSuccessfully <- true // so that timeout is aborted
txCompletedSuccessfully <- struct{}{} // so that timeout is aborted
txHashes <- txHash
txCompletedCounter <- struct{}{}
}
})
type cmd struct {
type testCommand struct {
command string
params string
expectedResponse string
}
tests := []struct {
type testCase struct {
name string
file string
requireMessageId bool
commands []cmd
}{
commands []testCommand
}
tests := []testCase{
{
// no context or message id
name: "Case 1: no message id or context in inited JS",
file: "no-message-id-or-context.js",
requireMessageId: false,
commands: []cmd{
commands: []testCommand{
{
`["commands", "send"]`,
txParams,
@ -291,7 +296,7 @@ func TestJailSendQueuedTransaction(t *testing.T) {
name: "Case 2: context is present in inited JS (but no message id is there)",
file: "context-no-message-id.js",
requireMessageId: false,
commands: []cmd{
commands: []testCommand{
{
`["commands", "send"]`,
txParams,
@ -309,7 +314,7 @@ func TestJailSendQueuedTransaction(t *testing.T) {
name: "Case 3: message id is present, context is not present",
file: "message-id-no-context.js",
requireMessageId: true,
commands: []cmd{
commands: []testCommand{
{
`["commands", "send"]`,
txParams,
@ -327,7 +332,7 @@ func TestJailSendQueuedTransaction(t *testing.T) {
name: "Case 4: both message id and context are present",
file: "tx-send.js",
requireMessageId: true,
commands: []cmd{
commands: []testCommand{
{
`["commands", "send"]`,
txParams,
@ -336,29 +341,35 @@ func TestJailSendQueuedTransaction(t *testing.T) {
{
`["commands", "getBalance"]`,
`{"address": "` + TEST_ADDRESS + `"}`,
`{"result": {"context":{"message_id":"foobar"},"result":{"balance":42}}}`, // message id in context!
`{"result": {"context":{"message_id":"42"},"result":{"balance":42}}}`, // message id in context, but default one is used!
},
},
},
}
//var jailInstance *jail.Jail
for _, test := range tests {
jailInstance := jail.Init(geth.LoadFromFile(TESTDATA_TX_SEND_JS + test.file))
geth.PanicAfter(20*time.Second, transactionCompletedSuccessfully, test.name)
geth.PanicAfter(60*time.Second, txCompletedSuccessfully, test.name)
jailInstance.Parse(CHAT_ID_SEND, ``)
requireMessageId = test.requireMessageId
for _, cmd := range test.commands {
t.Logf("%s: %s", test.name, cmd.command)
response := jailInstance.Call(CHAT_ID_SEND, cmd.command, cmd.params)
expectedResponse := strings.Replace(cmd.expectedResponse, "TX_HASH", txHash.Hex(), 1)
if response != expectedResponse {
t.Errorf("expected response is not returned: expected %s, got %s", expectedResponse, response)
return
}
for _, command := range test.commands {
go func(jail *jail.Jail, test testCase, command testCommand) {
t.Logf("->%s: %s", test.name, command.command)
response := jail.Call(CHAT_ID_SEND, command.command, command.params)
var txHash common.Hash
if command.command == `["commands", "send"]` {
txHash = <-txHashes
}
expectedResponse := strings.Replace(command.expectedResponse, "TX_HASH", txHash.Hex(), 1)
if response != expectedResponse {
t.Errorf("expected response is not returned: expected %s, got %s", expectedResponse, response)
return
}
}(jailInstance, test, command)
}
<-txCompletedCounter
}
}