Implemented sendAsync for js commands with a callback (#321)

Geth js commands coming through jail with a callback will now be executed truly asynchronously blocking jail only when an actual interaction with VM is performed.

Technically, it registers a new handler jeth.sendAsync which executes functions with callbacks asynchronously.

Changes include:

1. Send and SendAsync now use cell.VM instead of otto.Otto providing proper locking.
2. Unmarshalling in ExecuionPolicy.ExecuteWithClient is now done into var result interface{} instead of var result json.RawMessage because test case 0 of TestJailWhisper failed providing byte codes instead of 5.0.
3. Due to the asynchronous nature of web3 calls new weird timeouts in tests have been introduced. They may fail sometimes but I gave up trying to implement a more reliable and readable solution.
This commit is contained in:
Ivan Tomilov 2017-09-18 15:13:32 +03:00 committed by GitHub
parent 79f744954c
commit 5f19c9cd0a
8 changed files with 200 additions and 147 deletions

View File

@ -47,7 +47,7 @@ statusgo-ios-simulator-mainnet: xgo
@echo "iOS framework cross compilation done (mainnet)."
ci: mock
build/env.sh go test -timeout 10m -v ./geth/api/...
build/env.sh go test -timeout 40m -v ./geth/api/...
build/env.sh go test -timeout 40m -v ./geth/common
build/env.sh go test -timeout 40m -v ./geth/jail
build/env.sh go test -timeout 40m -v ./geth/node

View File

@ -264,6 +264,9 @@ func (s *BackendTestSuite) TestContractDeployment() {
s.FailNow("test timed out")
}
// Wait until callback is fired and `responseValue` is set. Hacky but simple.
time.Sleep(2 * time.Second)
responseValue, err := cell.Get("responseValue")
require.NoError(err)
@ -723,14 +726,6 @@ func (s *BackendTestSuite) TestJailVMPersistence() {
event := envelope.Event.(map[string]interface{})
s.T().Logf("Transaction queued (will be completed shortly): {id: %s}\n", event["id"].(string))
time.Sleep(1 * time.Second)
//if err := geth.DiscardTransaction(event["id"].(string)); err != nil {
// t.Errorf("cannot discard: %v", err)
// progress <- "tx discarded"
// return
//}
//var txHash common.Hash
txID := event["id"].(string)
txHash, err := s.backend.CompleteTransaction(common.QueuedTxID(txID), TestConfig.Account1.Password)
@ -767,6 +762,10 @@ func (s *BackendTestSuite) TestJailVMPersistence() {
s.FailNow("some tests failed to finish in time")
}
// Wait till eth_sendTransaction callbacks have been executed.
// FIXME(tiabc): more reliable means of testing that.
time.Sleep(5 * time.Second)
// Validate total.
cell, err := jailInstance.Cell(testChatID)
require.NoError(err)

View File

@ -2,12 +2,11 @@ package jail
import (
"context"
"encoding/json"
gethcommon "github.com/ethereum/go-ethereum/common"
gethrpc "github.com/ethereum/go-ethereum/rpc"
"github.com/robertkrimen/otto"
"github.com/status-im/status-go/geth/common"
"github.com/status-im/status-go/geth/jail/internal/vm"
"github.com/status-im/status-go/geth/params"
"github.com/status-im/status-go/geth/rpc"
)
@ -32,26 +31,19 @@ func NewExecutionPolicy(
}
// Execute handles the execution of a RPC request and routes appropriately to either a local or remote ethereum node.
func (ep *ExecutionPolicy) Execute(req common.RPCCall, call otto.FunctionCall) (*otto.Object, error) {
func (ep *ExecutionPolicy) Execute(req common.RPCCall, vm *vm.VM) (map[string]interface{}, error) {
if params.SendTransactionMethodName == req.Method {
return ep.executeSendTransaction(req, call)
return ep.executeSendTransaction(vm, req)
}
client := ep.nodeManager.RPCClient()
return ep.executeWithClient(client, req, call)
return ep.executeWithClient(client, vm, req)
}
// executeRemoteSendTransaction defines a function to execute RPC method eth_sendTransaction over the upstream server.
func (ep *ExecutionPolicy) executeSendTransaction(req common.RPCCall, call otto.FunctionCall) (*otto.Object, error) {
res, err := call.Otto.Object(`({"jsonrpc":"2.0"})`)
if err != nil {
return nil, err
}
res.Set("id", req.ID)
messageID, err := preProcessRequest(call.Otto, req)
func (ep *ExecutionPolicy) executeSendTransaction(vm *vm.VM, req common.RPCCall) (map[string]interface{}, error) {
messageID, err := preProcessRequest(vm)
if err != nil {
return nil, err
}
@ -71,46 +63,48 @@ func (ep *ExecutionPolicy) executeSendTransaction(req common.RPCCall, call otto.
}
// invoke post processing
postProcessRequest(call.Otto, req, messageID)
postProcessRequest(vm, req, messageID)
res := map[string]interface{}{
"jsonrpc": "2.0",
"id": req.ID,
// @TODO(adam): which one is actually used?
res.Set("result", tx.Hash.Hex())
res.Set("hash", tx.Hash.Hex())
"result": tx.Hash.Hex(),
"hash": tx.Hash.Hex(),
}
return res, nil
}
func (ep *ExecutionPolicy) executeWithClient(client *rpc.Client, req common.RPCCall, call otto.FunctionCall) (*otto.Object, error) {
JSON, err := call.Otto.Object("JSON")
if err != nil {
return nil, err
func (ep *ExecutionPolicy) executeWithClient(client *rpc.Client, vm *vm.VM, req common.RPCCall) (map[string]interface{}, error) {
// Arbitrary JSON-RPC response.
var result interface{}
resp := map[string]interface{}{
"jsonrpc": "2.0",
"id": req.ID,
}
var result json.RawMessage
resp, _ := call.Otto.Object(`({"jsonrpc":"2.0"})`)
resp.Set("id", req.ID)
// do extra request pre processing (persist message id)
// within function semaphore will be acquired and released,
// so that no more than one client (per cell) can enter
messageID, err := preProcessRequest(call.Otto, req)
messageID, err := preProcessRequest(vm)
if err != nil {
return nil, common.StopRPCCallError{Err: err}
}
if client == nil {
resp = newErrorResponse(call.Otto, -32603, "RPC client is not available. Node is stopped?", &req.ID).Object()
resp = newErrorResponse("RPC client is not available. Node is stopped?", &req.ID)
} else {
err = client.Call(&result, req.Method, req.Params...)
if err != nil {
if err2, ok := err.(gethrpc.Error); ok {
resp.Set("error", map[string]interface{}{
resp["error"] = map[string]interface{}{
"code": err2.ErrorCode(),
"message": err2.Error(),
})
}
} else {
resp = newErrorResponse(call.Otto, -32603, err.Error(), &req.ID).Object()
resp = newErrorResponse(err.Error(), &req.ID)
}
}
}
@ -118,31 +112,26 @@ func (ep *ExecutionPolicy) executeWithClient(client *rpc.Client, req common.RPCC
if result == nil {
// Special case null because it is decoded as an empty
// raw message for some reason.
resp.Set("result", otto.NullValue())
resp["result"] = ""
} else {
resultVal, callErr := JSON.Call("parse", string(result))
if callErr != nil {
resp = newErrorResponse(call.Otto, -32603, callErr.Error(), &req.ID).Object()
} else {
resp.Set("result", resultVal)
}
resp["result"] = result
}
// do extra request post processing (setting back tx context)
postProcessRequest(call.Otto, req, messageID)
postProcessRequest(vm, req, messageID)
return resp, nil
}
// preProcessRequest pre-processes a given RPC call to a given Otto VM
func preProcessRequest(vm *otto.Otto, req common.RPCCall) (string, error) {
messageID := currentMessageID(vm.Context())
func preProcessRequest(vm *vm.VM) (string, error) {
messageID := currentMessageID(vm)
return messageID, nil
}
// postProcessRequest post-processes a given RPC call to a given Otto VM
func postProcessRequest(vm *otto.Otto, req common.RPCCall, messageID string) {
func postProcessRequest(vm *vm.VM, req common.RPCCall, messageID string) {
if len(messageID) > 0 {
vm.Call("addContext", nil, messageID, common.MessageIDKey, messageID) // nolint: errcheck
}
@ -154,18 +143,12 @@ func postProcessRequest(vm *otto.Otto, req common.RPCCall, messageID string) {
}
// currentMessageID looks for `status.message_id` variable in current JS context
func currentMessageID(ctx otto.Context) string {
if statusObj, ok := ctx.Symbols["status"]; ok {
messageID, err := statusObj.Object().Get("message_id")
func currentMessageID(vm *vm.VM) string {
msgID, err := vm.Run("status.message_id")
if err != nil {
return ""
}
if messageID, err := messageID.ToString(); err == nil {
return messageID
}
}
return ""
return msgID.String()
}
func sendTxArgsFromRPCCall(req common.RPCCall) common.SendTxArgs {

View File

@ -36,17 +36,17 @@ func registerHandlers(jail *Jail, cell common.JailCell, chatID string) error {
}
// register send handler
if err = registerHandler("send", makeSendHandler(jail)); err != nil {
if err = registerHandler("send", makeSendHandler(jail, cell)); err != nil {
return err
}
// register sendAsync handler
if err = registerHandler("sendAsync", makeSendHandler(jail)); err != nil {
if err = registerHandler("sendAsync", makeAsyncSendHandler(jail, cell)); err != nil {
return err
}
// register isConnected handler
if err = registerHandler("isConnected", makeJethIsConnectedHandler(jail)); err != nil {
if err = registerHandler("isConnected", makeJethIsConnectedHandler(jail, cell)); err != nil {
return err
}
@ -66,23 +66,54 @@ func registerHandlers(jail *Jail, cell common.JailCell, chatID string) error {
return nil
}
// makeAsyncSendHandler returns jeth.sendAsync() handler.
func makeAsyncSendHandler(jail *Jail, cellInt common.JailCell) func(call otto.FunctionCall) otto.Value {
// FIXME(tiabc): Get rid of this.
cell := cellInt.(*Cell)
return func(call otto.FunctionCall) otto.Value {
go func() {
response := jail.Send(call, cell.VM)
if fn := call.Argument(1); fn.Class() == "Function" {
cell.Lock()
fn.Call(otto.NullValue(), otto.NullValue(), response)
cell.Unlock()
}
}()
return otto.UndefinedValue()
}
}
// makeSendHandler returns jeth.send() and jeth.sendAsync() handler
func makeSendHandler(jail *Jail) func(call otto.FunctionCall) (response otto.Value) {
return jail.Send
func makeSendHandler(jail *Jail, cellInt common.JailCell) func(call otto.FunctionCall) otto.Value {
// FIXME(tiabc): Get rid of this.
cell := cellInt.(*Cell)
return func(call otto.FunctionCall) otto.Value {
// Send calls are guaranteed to be only invoked from web3 after calling the appropriate
// method of jail.Cell and the cell is locked during that call. In order to allow jail.Send
// to perform any operations on cell.VM and not hang, we need to unlock the mutex and return
// it to the previous state afterwards so that the caller didn't panic doing cell.Unlock().
cell.Unlock()
defer cell.Lock()
return jail.Send(call, cell.VM)
}
}
// makeJethIsConnectedHandler returns jeth.isConnected() handler
func makeJethIsConnectedHandler(jail *Jail) func(call otto.FunctionCall) (response otto.Value) {
func makeJethIsConnectedHandler(jail *Jail, cellInt common.JailCell) func(call otto.FunctionCall) (response otto.Value) {
// FIXME(tiabc): Get rid of this.
cell := cellInt.(*Cell)
return func(call otto.FunctionCall) otto.Value {
client := jail.nodeManager.RPCClient()
var netListeningResult bool
if err := client.Call(&netListeningResult, "net_listening"); err != nil {
return newErrorResponse(call.Otto, -32603, err.Error(), nil)
return newErrorResponseOtto(cell.VM, err.Error(), nil)
}
if !netListeningResult {
return newErrorResponse(call.Otto, -32603, node.ErrNoRunningNode.Error(), nil)
return newErrorResponseOtto(cell.VM, node.ErrNoRunningNode.Error(), nil)
}
return newResultResponse(call.Otto, true)

View File

@ -8,6 +8,7 @@ import (
"github.com/robertkrimen/otto"
"github.com/status-im/status-go/geth/common"
"github.com/status-im/status-go/geth/jail/internal/vm"
"github.com/status-im/status-go/geth/log"
"github.com/status-im/status-go/static"
)
@ -38,6 +39,9 @@ type Jail struct {
func New(
nodeManager common.NodeManager, accountManager common.AccountManager, txQueueManager common.TxQueueManager,
) *Jail {
if nodeManager == nil || accountManager == nil || txQueueManager == nil {
panic("Jail is missing mandatory dependencies")
}
return &Jail{
nodeManager: nodeManager,
accountManager: accountManager,
@ -148,13 +152,12 @@ func (jail *Jail) Call(chatID, this, args string) string {
}
// Send will serialize the first argument, send it to the node and returns the response.
// IMPORTANT: Don't use `call.Otto` in this function unless you want to run into race conditions. Use `vm` instead.
// nolint: errcheck, unparam
func (jail *Jail) Send(call otto.FunctionCall) (response otto.Value) {
// Remarshal the request into a Go value.
JSON, _ := call.Otto.Object("JSON")
reqVal, err := JSON.Call("stringify", call.Argument(0))
func (jail *Jail) Send(call otto.FunctionCall, vm *vm.VM) otto.Value {
reqVal, err := vm.Call("JSON.stringify", nil, call.Argument(0))
if err != nil {
throwJSException(err.Error())
throwJSException(err)
}
var (
@ -165,61 +168,70 @@ func (jail *Jail) Send(call otto.FunctionCall) (response otto.Value) {
if rawReq[0] == '[' {
batch = true
json.Unmarshal(rawReq, &reqs)
err = json.Unmarshal(rawReq, &reqs)
} else {
batch = false
reqs = make([]common.RPCCall, 1)
json.Unmarshal(rawReq, &reqs[0])
err = json.Unmarshal(rawReq, &reqs[0])
}
if err != nil {
throwJSException(fmt.Errorf("can't unmarshal %v (batch=%v): %s", string(rawReq), batch, err))
}
resps, _ := call.Otto.Object("new Array()")
resps, err := vm.Call("new Array", nil)
if err != nil {
throwJSException(fmt.Errorf("can't create Array: %s", err))
}
// Execute the requests.
for _, req := range reqs {
log.Info("execute request", "method", req.Method)
res, err := jail.policy.Execute(req, call)
res, err := jail.policy.Execute(req, vm)
if err != nil {
log.Info("request errored", "error", err.Error())
switch err.(type) {
case common.StopRPCCallError:
return newErrorResponse(call.Otto, -32603, err.Error(), nil)
return newErrorResponseOtto(vm, err.Error(), nil)
default:
res = newErrorResponse(call.Otto, -32603, err.Error(), &req.ID).Object()
res = newErrorResponse(err.Error(), &req.ID)
}
}
resps.Call("push", res)
_, err = resps.Object().Call("push", res)
if err != nil {
throwJSException(fmt.Errorf("can't push result: %s", err))
}
}
// Return the responses either to the callback (if supplied)
// or directly as the return value.
if batch {
response = resps.Value()
} else {
response, _ = resps.Get("0")
return resps
}
v, err := resps.Object().Get("0")
if err != nil {
throwJSException(err)
}
return v
}
if fn := call.Argument(1); fn.Class() == "Function" {
fn.Call(otto.NullValue(), otto.NullValue(), response)
return otto.UndefinedValue()
}
return response
}
func newErrorResponse(vm *otto.Otto, code int, msg string, id interface{}) otto.Value {
func newErrorResponse(msg string, id interface{}) map[string]interface{} {
// Bundle the error into a JSON RPC call response
m := map[string]interface{}{
return map[string]interface{}{
"jsonrpc": "2.0",
"id": id,
"error": map[string]interface{}{
"code": code,
"code": -32603, // Internal JSON-RPC Error, see http://www.jsonrpc.org/specification#error_object
"message": msg,
},
}
res, _ := json.Marshal(m)
val, _ := vm.Run("(" + string(res) + ")")
return val
}
func newErrorResponseOtto(vm *vm.VM, msg string, id interface{}) otto.Value {
// TODO(tiabc): Handle errors.
errResp, _ := json.Marshal(newErrorResponse(msg, id))
errRespVal, _ := vm.Run("(" + string(errResp) + ")")
return errRespVal
}
func newResultResponse(vm *otto.Otto, result interface{}) otto.Value {
@ -231,10 +243,10 @@ func newResultResponse(vm *otto.Otto, result interface{}) otto.Value {
// throwJSException panics on an otto.Value. The Otto VM will recover from the
// Go panic and throw msg as a JavaScript error.
func throwJSException(msg interface{}) otto.Value {
val, err := otto.ToValue(msg)
func throwJSException(msg error) otto.Value {
val, err := otto.ToValue(msg.Error())
if err != nil {
log.Error(fmt.Sprintf("Failed to serialize JavaScript exception %v: %v", msg, err))
log.Error(fmt.Sprintf("Failed to serialize JavaScript exception %v: %v", msg.Error(), err))
}
panic(val)
}

View File

@ -3,6 +3,7 @@ package jail_test
import (
"encoding/json"
"errors"
"sync"
"testing"
"time"
@ -18,7 +19,10 @@ const (
testChatID = "testChat"
)
var baseStatusJSCode = string(static.MustAsset("testdata/jail/status.js"))
var (
baseStatusJSCode = string(static.MustAsset("testdata/jail/status.js"))
txJSCode = string(static.MustAsset("testdata/jail/tx-send/tx-send.js"))
)
func TestJailTestSuite(t *testing.T) {
suite.Run(t, new(JailTestSuite))
@ -38,7 +42,9 @@ func (s *JailTestSuite) SetupTest() {
accountManager := node.NewAccountManager(nodeManager)
require.NotNil(accountManager)
jail := jail.New(nodeManager, accountManager, nil)
txQueueManager := node.NewTxQueueManager(nodeManager, accountManager)
jail := jail.New(nodeManager, accountManager, txQueueManager)
require.NotNil(jail)
s.jail = jail
@ -119,6 +125,38 @@ func (s *JailTestSuite) TestFunctionCall() {
require.Equal(expectedResponse, response)
}
// TestJailRPCAsyncSend was written to catch race conditions with a weird error message
// starting from `ReferenceError` as if otto vm were losing symbols.
func (s *JailTestSuite) TestJailRPCAsyncSend() {
require := s.Require()
// load Status JS and add test command to it
s.jail.BaseJS(baseStatusJSCode)
s.jail.Parse(testChatID, txJSCode)
cell, err := s.jail.Cell(testChatID)
require.NoError(err)
require.NotNil(cell)
// internally (since we replaced `web3.send` with `jail.Send`)
// all requests to web3 are forwarded to `jail.Send`
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
_, err = cell.Run(`_status_catalog.commands.sendAsync({
"from": "` + TestConfig.Account1.Address + `",
"to": "` + TestConfig.Account2.Address + `",
"value": "0.000001"
})`)
require.NoError(err, "Request failed to process")
}()
}
wg.Wait()
}
func (s *JailTestSuite) TestJailRPCSend() {
require := s.Require()

File diff suppressed because one or more lines are too long

View File

@ -54,6 +54,18 @@ function call(pathStr, paramsStr) {
return JSON.stringify(res);
}
function sendAsyncTransaction(params) {
var data = {
from: params.from,
to: params.to,
value: web3.toWei(params.value, "ether")
};
web3.eth.sendTransaction(data, function(){
// A specified callback is enough to make this function async.
});
}
function sendTransaction(params) {
var data = {
from: params.from,
@ -72,9 +84,10 @@ function sendTransaction(params) {
var hash = web3.eth.sendTransaction(data);
return {"transaction-hash": hash};
}
};
_status_catalog.commands['send'] = sendTransaction;
_status_catalog.commands['sendAsync'] = sendAsyncTransaction;
_status_catalog.commands['getBalance'] = function (params) {
var balance = web3.eth.getBalance(params.address);
balance = web3.fromWei(balance, "ether");