Refactor and cleanup Jail (#368)

Refactor and clean up Jail package:

 Removes account.Manager and txqueue.Manager from Jail as they are not used anymore
 Removes messageID related code from Jail.Send
 Simplifies Jail.Send to be a wrapper around RPC client's CallRaw
 Renames jail_cell* to cell*
 Related cleanups
This commit is contained in:
Ivan Daniluk 2017-10-06 18:52:26 +02:00 committed by Ivan Tomilov
parent 8623b52873
commit 3540972f0e
14 changed files with 309 additions and 411 deletions

View File

@ -33,11 +33,12 @@ func NewStatusBackend() *StatusBackend {
nodeManager := node.NewNodeManager() nodeManager := node.NewNodeManager()
accountManager := account.NewManager(nodeManager) accountManager := account.NewManager(nodeManager)
txQueueManager := txqueue.NewManager(nodeManager, accountManager) txQueueManager := txqueue.NewManager(nodeManager, accountManager)
jailManager := jail.New(nodeManager)
return &StatusBackend{ return &StatusBackend{
nodeManager: nodeManager, nodeManager: nodeManager,
accountManager: accountManager, accountManager: accountManager,
jailManager: jail.New(nodeManager, accountManager, txQueueManager), jailManager: jailManager,
txQueueManager: txQueueManager, txQueueManager: txQueueManager,
} }
} }
@ -123,6 +124,7 @@ func (m *StatusBackend) StopNode() (<-chan struct{}, error) {
} }
m.txQueueManager.Stop() m.txQueueManager.Stop()
m.jailManager.Stop()
backendStopped := make(chan struct{}, 1) backendStopped := make(chan struct{}, 1)
go func() { go func() {

View File

@ -13,7 +13,6 @@ import (
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
whisper "github.com/ethereum/go-ethereum/whisper/whisperv5" whisper "github.com/ethereum/go-ethereum/whisper/whisperv5"
"github.com/status-im/status-go/geth/common" "github.com/status-im/status-go/geth/common"
"github.com/status-im/status-go/geth/log"
"github.com/status-im/status-go/geth/params" "github.com/status-im/status-go/geth/params"
"github.com/status-im/status-go/geth/signal" "github.com/status-im/status-go/geth/signal"
. "github.com/status-im/status-go/geth/testing" . "github.com/status-im/status-go/geth/testing"
@ -35,166 +34,6 @@ var (
baseStatusJSCode = string(static.MustAsset("testdata/jail/status.js")) baseStatusJSCode = string(static.MustAsset("testdata/jail/status.js"))
) )
func (s *BackendTestSuite) TestJailSendQueuedTransaction() {
require := s.Require()
s.StartTestBackend(params.RopstenNetworkID)
defer s.StopTestBackend()
time.Sleep(TestConfig.Node.SyncSeconds * time.Second) // allow to sync
// log into account from which transactions will be sent
require.NoError(s.backend.AccountManager().SelectAccount(TestConfig.Account1.Address, TestConfig.Account1.Password))
txParams := `{
"from": "` + TestConfig.Account1.Address + `",
"to": "` + TestConfig.Account2.Address + `",
"value": "0.000001"
}`
txHashes := make(chan gethcommon.Hash, 1)
// replace transaction notification handler
requireMessageId := false
signal.SetDefaultNodeNotificationHandler(func(jsonEvent string) {
var envelope signal.Envelope
err := json.Unmarshal([]byte(jsonEvent), &envelope)
s.NoError(err, fmt.Sprintf("cannot unmarshal JSON: %s", jsonEvent))
if envelope.Type == txqueue.EventTransactionQueued {
event := envelope.Event.(map[string]interface{})
messageId, ok := event["message_id"].(string)
s.True(ok, "Message id is required, but not found")
if requireMessageId {
require.NotEmpty(messageId, "Message id is required, but not provided")
} else {
require.Empty(messageId, "Message id is not required, but provided")
}
txID := event["id"].(string)
txHash, err := s.backend.CompleteTransaction(common.QueuedTxID(txID), TestConfig.Account1.Password)
require.NoError(err, "cannot complete queued transaction[%v]", event["id"])
log.Info("Transaction complete", "URL", "https://ropsten.etherscan.io/tx/%s"+txHash.Hex())
txHashes <- txHash
}
})
type testCommand struct {
command string
params string
expectedResponse string
}
type testCase struct {
name string
file string
requireMessageId bool
commands []testCommand
}
tests := []testCase{
{
// no context or message id
name: "Case 1: no message id or context in inited JS",
file: "no-message-id-or-context.js",
requireMessageId: false,
commands: []testCommand{
{
`["commands", "send"]`,
txParams,
`{"result": {"transaction-hash":"TX_HASH"}}`,
},
{
`["commands", "getBalance"]`,
`{"address": "` + TestConfig.Account1.Address + `"}`,
`{"result": {"balance":42}}`,
},
},
},
{
// context is present in inited JS (but no message id is there)
name: "Case 2: context is present in inited JS (but no message id is there)",
file: "context-no-message-id.js",
requireMessageId: false,
commands: []testCommand{
{
`["commands", "send"]`,
txParams,
`{"result": {"context":{"` + params.SendTransactionMethodName + `":true},"result":{"transaction-hash":"TX_HASH"}}}`,
},
{
`["commands", "getBalance"]`,
`{"address": "` + TestConfig.Account1.Address + `"}`,
`{"result": {"context":{},"result":{"balance":42}}}`, // note empty (but present) context!
},
},
},
{
// message id is present in inited JS, but no context is there
name: "Case 3: message id is present, context is not present",
file: "message-id-no-context.js",
requireMessageId: true,
commands: []testCommand{
{
`["commands", "send"]`,
txParams,
`{"result": {"transaction-hash":"TX_HASH"}}`,
},
{
`["commands", "getBalance"]`,
`{"address": "` + TestConfig.Account1.Address + `"}`,
`{"result": {"balance":42}}`, // note empty context!
},
},
},
{
// both message id and context are present in inited JS (this UC is what we normally expect to see)
name: "Case 4: both message id and context are present",
file: "tx-send.js",
requireMessageId: true,
commands: []testCommand{
{
`["commands", "getBalance"]`,
`{"address": "` + TestConfig.Account1.Address + `"}`,
`{"result": {"context":{"message_id":"42"},"result":{"balance":42}}}`, // message id in context, but default one is used!
},
{
`["commands", "send"]`,
txParams,
`{"result": {"context":{"eth_sendTransaction":true,"message_id":"foobar"},"result":{"transaction-hash":"TX_HASH"}}}`,
},
},
},
}
jailInstance := s.backend.JailManager()
for _, test := range tests {
jailInstance.BaseJS(string(static.MustAsset(txSendFolder + test.file)))
jailInstance.Parse(testChatID, ``)
// used by notification handler
requireMessageId = test.requireMessageId
for _, command := range test.commands {
s.T().Logf("%s: %s", test.name, command.command)
response := jailInstance.Call(testChatID, command.command, command.params)
var txHash gethcommon.Hash
if command.command == `["commands", "send"]` {
select {
case txHash = <-txHashes:
case <-time.After(time.Minute):
s.FailNow("test timed out: %s", test.name)
}
}
expectedResponse := strings.Replace(command.expectedResponse, "TX_HASH", txHash.Hex(), 1)
require.Equal(expectedResponse, response)
}
}
}
func (s *BackendTestSuite) TestContractDeployment() { func (s *BackendTestSuite) TestContractDeployment() {
require := s.Require() require := s.Require()
@ -241,6 +80,7 @@ func (s *BackendTestSuite) TestContractDeployment() {
_, err = cell.Run(` _, err = cell.Run(`
var responseValue = null; var responseValue = null;
var errorValue = null;
var testContract = web3.eth.contract([{"constant":true,"inputs":[{"name":"a","type":"int256"}],"name":"double","outputs":[{"name":"","type":"int256"}],"payable":false,"type":"function"}]); var testContract = web3.eth.contract([{"constant":true,"inputs":[{"name":"a","type":"int256"}],"name":"double","outputs":[{"name":"","type":"int256"}],"payable":false,"type":"function"}]);
var test = testContract.new( var test = testContract.new(
{ {
@ -249,12 +89,14 @@ func (s *BackendTestSuite) TestContractDeployment() {
gas: '` + strconv.Itoa(params.DefaultGas) + `' gas: '` + strconv.Itoa(params.DefaultGas) + `'
}, function (e, contract) { }, function (e, contract) {
// NOTE: The callback will fire twice! // NOTE: The callback will fire twice!
if (e) {
errorValue = e;
return
}
// Once the contract has the transactionHash property set and once its deployed on an address. // Once the contract has the transactionHash property set and once its deployed on an address.
if (!e) {
if (!contract.address) { if (!contract.address) {
responseValue = contract.transactionHash; responseValue = contract.transactionHash;
} }
}
}) })
`) `)
require.NoError(err) require.NoError(err)
@ -268,6 +110,10 @@ func (s *BackendTestSuite) TestContractDeployment() {
// Wait until callback is fired and `responseValue` is set. Hacky but simple. // Wait until callback is fired and `responseValue` is set. Hacky but simple.
time.Sleep(2 * time.Second) time.Sleep(2 * time.Second)
errorValue, err := cell.Get("errorValue")
require.NoError(err)
require.Equal("null", errorValue.String())
responseValue, err := cell.Get("responseValue") responseValue, err := cell.Get("responseValue")
require.NoError(err) require.NoError(err)
@ -276,7 +122,6 @@ func (s *BackendTestSuite) TestContractDeployment() {
expectedResponse := txHash.Hex() expectedResponse := txHash.Hex()
require.Equal(expectedResponse, response) require.Equal(expectedResponse, response)
s.T().Logf("estimation complete: %s", response)
} }
func (s *BackendTestSuite) TestJailWhisper() { func (s *BackendTestSuite) TestJailWhisper() {

View File

@ -256,6 +256,8 @@ type JailCell interface {
Run(interface{}) (otto.Value, error) Run(interface{}) (otto.Value, error)
// Call an arbitrary JS function by name and args. // Call an arbitrary JS function by name and args.
Call(item string, this interface{}, args ...interface{}) (otto.Value, error) Call(item string, this interface{}, args ...interface{}) (otto.Value, error)
// Stop stops background execution of cell.
Stop()
} }
// JailManager defines methods for managing jailed environments // JailManager defines methods for managing jailed environments
@ -275,6 +277,9 @@ type JailManager interface {
// BaseJS allows to setup initial JavaScript to be loaded on each jail.Parse() // BaseJS allows to setup initial JavaScript to be loaded on each jail.Parse()
BaseJS(js string) BaseJS(js string)
// Stop stops all background activity of jail
Stop()
} }
// APIResponse generic response from API // APIResponse generic response from API

View File

@ -587,6 +587,24 @@ func (mr *MockTxQueueManagerMockRecorder) SetTransactionReturnHandler(fn interfa
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetTransactionReturnHandler", reflect.TypeOf((*MockTxQueueManager)(nil).SetTransactionReturnHandler), fn) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetTransactionReturnHandler", reflect.TypeOf((*MockTxQueueManager)(nil).SetTransactionReturnHandler), fn)
} }
// SendTransactionRPCHandler mocks base method
func (m *MockTxQueueManager) SendTransactionRPCHandler(ctx context.Context, args ...interface{}) (interface{}, error) {
varargs := []interface{}{ctx}
for _, a := range args {
varargs = append(varargs, a)
}
ret := m.ctrl.Call(m, "SendTransactionRPCHandler", varargs...)
ret0, _ := ret[0].(interface{})
ret1, _ := ret[1].(error)
return ret0, ret1
}
// SendTransactionRPCHandler indicates an expected call of SendTransactionRPCHandler
func (mr *MockTxQueueManagerMockRecorder) SendTransactionRPCHandler(ctx interface{}, args ...interface{}) *gomock.Call {
varargs := append([]interface{}{ctx}, args...)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendTransactionRPCHandler", reflect.TypeOf((*MockTxQueueManager)(nil).SendTransactionRPCHandler), varargs...)
}
// TransactionReturnHandler mocks base method // TransactionReturnHandler mocks base method
func (m *MockTxQueueManager) TransactionReturnHandler() func(*QueuedTx, error) { func (m *MockTxQueueManager) TransactionReturnHandler() func(*QueuedTx, error) {
ret := m.ctrl.Call(m, "TransactionReturnHandler") ret := m.ctrl.Call(m, "TransactionReturnHandler")
@ -727,6 +745,16 @@ func (mr *MockJailCellMockRecorder) Call(item, this interface{}, args ...interfa
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Call", reflect.TypeOf((*MockJailCell)(nil).Call), varargs...) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Call", reflect.TypeOf((*MockJailCell)(nil).Call), varargs...)
} }
// Stop mocks base method
func (m *MockJailCell) Stop() {
m.ctrl.Call(m, "Stop")
}
// Stop indicates an expected call of Stop
func (mr *MockJailCellMockRecorder) Stop() *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Stop", reflect.TypeOf((*MockJailCell)(nil).Stop))
}
// MockJailManager is a mock of JailManager interface // MockJailManager is a mock of JailManager interface
type MockJailManager struct { type MockJailManager struct {
ctrl *gomock.Controller ctrl *gomock.Controller
@ -809,3 +837,13 @@ func (m *MockJailManager) BaseJS(js string) {
func (mr *MockJailManagerMockRecorder) BaseJS(js interface{}) *gomock.Call { func (mr *MockJailManagerMockRecorder) BaseJS(js interface{}) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BaseJS", reflect.TypeOf((*MockJailManager)(nil).BaseJS), js) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BaseJS", reflect.TypeOf((*MockJailManager)(nil).BaseJS), js)
} }
// Stop mocks base method
func (m *MockJailManager) Stop() {
m.ctrl.Call(m, "Stop")
}
// Stop indicates an expected call of Stop
func (mr *MockJailManagerMockRecorder) Stop() *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Stop", reflect.TypeOf((*MockJailManager)(nil).Stop))
}

View File

@ -1,17 +1,22 @@
package jail package jail
import ( import (
"context"
"github.com/robertkrimen/otto" "github.com/robertkrimen/otto"
"github.com/status-im/status-go/geth/jail/internal/fetch" "github.com/status-im/status-go/geth/jail/internal/fetch"
"github.com/status-im/status-go/geth/jail/internal/loop" "github.com/status-im/status-go/geth/jail/internal/loop"
"github.com/status-im/status-go/geth/jail/internal/loop/looptask"
"github.com/status-im/status-go/geth/jail/internal/timers" "github.com/status-im/status-go/geth/jail/internal/timers"
"github.com/status-im/status-go/geth/jail/internal/vm" "github.com/status-im/status-go/geth/jail/internal/vm"
) )
// Cell represents a single jail cell, which is basically a JavaScript VM. // Cell represents a single jail cell, which is basically a JavaScript VM.
type Cell struct { type Cell struct {
id string
*vm.VM *vm.VM
id string
cancel context.CancelFunc
lo *loop.Loop
} }
// newCell encapsulates what we need to create a new jailCell from the // newCell encapsulates what we need to create a new jailCell from the
@ -23,13 +28,16 @@ func newCell(id string, ottoVM *otto.Otto) (*Cell, error) {
registerVMHandlers(cellVM, lo) registerVMHandlers(cellVM, lo)
// start loop in a goroutine ctx, cancel := context.WithCancel(context.Background())
// Cell is currently immortal, so the loop
go lo.Run() // start event loop in background
go lo.Run(ctx)
return &Cell{ return &Cell{
id: id,
VM: cellVM, VM: cellVM,
id: id,
cancel: cancel,
lo: lo,
}, nil }, nil
} }
@ -48,3 +56,19 @@ func registerVMHandlers(v *vm.VM, lo *loop.Loop) error {
return nil return nil
} }
// Stop halts event loop associated with cell.
func (c *Cell) Stop() {
c.cancel()
}
// CallAsync puts otto's function with given args into
// event queue loop and schedules for immediate execution.
// Intended to be used by any cell user that want's to run
// async call, like callback.
func (c *Cell) CallAsync(fn otto.Value, args ...interface{}) {
task := looptask.NewCallTask(fn, args...)
c.lo.Add(task)
// TODO(divan): review API of `loop` package, it's contrintuitive
go c.lo.Ready(task)
}

View File

@ -14,6 +14,7 @@ func (s *JailTestSuite) TestJailTimeoutFailure() {
cell, err := s.jail.NewCell(testChatID) cell, err := s.jail.NewCell(testChatID)
require.NoError(err) require.NoError(err)
require.NotNil(cell) require.NotNil(cell)
defer cell.Stop()
// Attempt to run a timeout string against a Cell. // Attempt to run a timeout string against a Cell.
_, err = cell.Run(` _, err = cell.Run(`
@ -44,6 +45,7 @@ func (s *JailTestSuite) TestJailTimeout() {
cell, err := s.jail.NewCell(testChatID) cell, err := s.jail.NewCell(testChatID)
require.NoError(err) require.NoError(err)
require.NotNil(cell) require.NotNil(cell)
defer cell.Stop()
// Attempt to run a timeout string against a Cell. // Attempt to run a timeout string against a Cell.
_, err = cell.Run(` _, err = cell.Run(`
@ -78,6 +80,7 @@ func (s *JailTestSuite) TestJailLoopInCall() {
cell, err := s.jail.Cell(testChatID) cell, err := s.jail.Cell(testChatID)
require.NoError(err) require.NoError(err)
require.NotNil(cell) require.NotNil(cell)
defer cell.Stop()
items := make(chan string) items := make(chan string)
@ -116,6 +119,7 @@ func (s *JailTestSuite) TestJailLoopRace() {
cell, err := s.jail.NewCell(testChatID) cell, err := s.jail.NewCell(testChatID)
require.NoError(err) require.NoError(err)
require.NotNil(cell) require.NotNil(cell)
defer cell.Stop()
items := make(chan struct{}) items := make(chan struct{})
@ -161,6 +165,7 @@ func (s *JailTestSuite) TestJailFetchPromise() {
cell, err := s.jail.NewCell(testChatID) cell, err := s.jail.NewCell(testChatID)
require.NoError(err) require.NoError(err)
require.NotNil(cell) require.NotNil(cell)
defer cell.Stop()
dataCh := make(chan otto.Value, 1) dataCh := make(chan otto.Value, 1)
errCh := make(chan otto.Value, 1) errCh := make(chan otto.Value, 1)
@ -197,6 +202,7 @@ func (s *JailTestSuite) TestJailFetchCatch() {
cell, err := s.jail.NewCell(testChatID) cell, err := s.jail.NewCell(testChatID)
require.NoError(err) require.NoError(err)
require.NotNil(cell) require.NotNil(cell)
defer cell.Stop()
dataCh := make(chan otto.Value, 1) dataCh := make(chan otto.Value, 1)
errCh := make(chan otto.Value, 1) errCh := make(chan otto.Value, 1)
@ -245,6 +251,7 @@ func (s *JailTestSuite) TestJailFetchRace() {
cell, err := s.jail.NewCell(testChatID) cell, err := s.jail.NewCell(testChatID)
require.NoError(err) require.NoError(err)
require.NotNil(cell) require.NotNil(cell)
defer cell.Stop()
dataCh := make(chan otto.Value, 1) dataCh := make(chan otto.Value, 1)
errCh := make(chan otto.Value, 1) errCh := make(chan otto.Value, 1)
@ -292,3 +299,54 @@ func (s *JailTestSuite) TestJailFetchRace() {
} }
} }
} }
// TestJailLoopCancel tests that cell.Stop() really cancels event
// loop and pending tasks.
func (s *JailTestSuite) TestJailLoopCancel() {
require := s.Require()
// load Status JS and add test command to it
s.jail.BaseJS(baseStatusJSCode)
s.jail.Parse(testChatID, ``)
cell, err := s.jail.Cell(testChatID)
require.NoError(err)
require.NotNil(cell)
var count int
err = cell.Set("__captureResponse", func(val string) otto.Value {
count++
return otto.UndefinedValue()
})
require.NoError(err)
_, err = cell.Run(`
function callRunner(val, delay){
return setTimeout(function(){
__captureResponse(val);
}, delay);
}
`)
require.NoError(err)
// Run 5 timeout tasks to be executed in: 1, 2, 3, 4 and 5 secs
for i := 1; i <= 5; i++ {
_, err = cell.Call("callRunner", nil, "value", i*1000)
require.NoError(err)
}
// Wait 1.5 second (so only one task executed) so far
// and stop the cell (event loop should die)
time.Sleep(1500 * time.Millisecond)
cell.Stop()
// check that only 1 task has increased counter
require.Equal(1, count)
// wait 2 seconds more (so at least two more tasks would
// have been executed if event loop is still running)
<-time.After(2 * time.Second)
// check that counter hasn't increased
require.Equal(1, count)
}

View File

@ -9,7 +9,7 @@ import (
"github.com/robertkrimen/otto" "github.com/robertkrimen/otto"
"github.com/status-im/status-go/geth/jail/console" "github.com/status-im/status-go/geth/jail/console"
"github.com/status-im/status-go/geth/node" "github.com/status-im/status-go/geth/signal"
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
) )
@ -58,8 +58,7 @@ func (s *ConsoleTestSuite) TestObjectLogging() {
var customWriter bytes.Buffer var customWriter bytes.Buffer
node.SetDefaultNodeNotificationHandler(func(event string) { signal.SetDefaultNodeNotificationHandler(func(event string) {
var eventReceived struct { var eventReceived struct {
Type string `json:"type"` Type string `json:"type"`
Event []struct { Event []struct {

View File

@ -1,113 +0,0 @@
package jail
import (
"context"
gethrpc "github.com/ethereum/go-ethereum/rpc"
"github.com/status-im/status-go/geth/common"
"github.com/status-im/status-go/geth/jail/internal/vm"
"github.com/status-im/status-go/geth/params"
"github.com/status-im/status-go/geth/rpc"
)
// ExecutionPolicy provides a central container for the executions of RPCCall requests for both
// remote/upstream processing and internal node processing.
type ExecutionPolicy struct {
nodeManager common.NodeManager
accountManager common.AccountManager
txQueueManager common.TxQueueManager
}
// NewExecutionPolicy returns a new instance of ExecutionPolicy.
func NewExecutionPolicy(
nodeManager common.NodeManager, accountManager common.AccountManager, txQueueManager common.TxQueueManager,
) *ExecutionPolicy {
return &ExecutionPolicy{
nodeManager: nodeManager,
accountManager: accountManager,
txQueueManager: txQueueManager,
}
}
// Execute handles the execution of a RPC request and routes appropriately to either a local or remote ethereum node.
func (ep *ExecutionPolicy) Execute(req common.RPCCall, vm *vm.VM) (map[string]interface{}, error) {
client := ep.nodeManager.RPCClient()
return ep.executeWithClient(client, vm, req)
}
func (ep *ExecutionPolicy) executeWithClient(client *rpc.Client, vm *vm.VM, req common.RPCCall) (map[string]interface{}, error) {
// Arbitrary JSON-RPC response.
var result interface{}
resp := map[string]interface{}{
"jsonrpc": "2.0",
"id": req.ID,
}
// do extra request pre processing (persist message id)
// within function semaphore will be acquired and released,
// so that no more than one client (per cell) can enter
messageID, err := preProcessRequest(vm)
if err != nil {
return nil, common.StopRPCCallError{Err: err}
}
if client == nil {
resp = newErrorResponse("RPC client is not available. Node is stopped?", &req.ID)
} else {
// TODO(adam): check if context is used
ctx := context.WithValue(context.Background(), common.MessageIDKey, messageID)
err = client.CallContext(ctx, &result, req.Method, req.Params...)
if err != nil {
if err2, ok := err.(gethrpc.Error); ok {
resp["error"] = map[string]interface{}{
"code": err2.ErrorCode(),
"message": err2.Error(),
}
} else {
resp = newErrorResponse(err.Error(), &req.ID)
}
}
}
if result == nil {
// Special case null because it is decoded as an empty
// raw message for some reason.
resp["result"] = ""
} else {
resp["result"] = result
}
// do extra request post processing (setting back tx context)
postProcessRequest(vm, req, messageID)
return resp, nil
}
// preProcessRequest pre-processes a given RPC call to a given Otto VM
func preProcessRequest(vm *vm.VM) (string, error) {
messageID := currentMessageID(vm)
return messageID, nil
}
// postProcessRequest post-processes a given RPC call to a given Otto VM
func postProcessRequest(vm *vm.VM, req common.RPCCall, messageID string) {
if len(messageID) > 0 {
vm.Call("addContext", nil, messageID, common.MessageIDKey, messageID) // nolint: errcheck
}
// set extra markers for queued transaction requests
if req.Method == params.SendTransactionMethodName {
vm.Call("addContext", nil, messageID, params.SendTransactionMethodName, true) // nolint: errcheck
}
}
// currentMessageID looks for `status.message_id` variable in current JS context
func currentMessageID(vm *vm.VM) string {
msgID, err := vm.Run("status.message_id")
if err != nil {
return ""
}
return msgID.String()
}

View File

@ -73,31 +73,22 @@ func makeAsyncSendHandler(jail *Jail, cellInt common.JailCell) func(call otto.Fu
cell := cellInt.(*Cell) cell := cellInt.(*Cell)
return func(call otto.FunctionCall) otto.Value { return func(call otto.FunctionCall) otto.Value {
go func() { go func() {
response := jail.Send(call, cell.VM) response := jail.Send(call)
if fn := call.Argument(1); fn.Class() == "Function" { // run callback asyncronously with args (error, response)
cell.Lock() callback := call.Argument(1)
fn.Call(otto.NullValue(), otto.NullValue(), response) err := otto.NullValue()
cell.Unlock() cell.CallAsync(callback, err, response)
}
}() }()
return otto.UndefinedValue() return otto.UndefinedValue()
} }
} }
// makeSendHandler returns jeth.send() and jeth.sendAsync() handler // makeSendHandler returns jeth.send() and jeth.sendAsync() handler
// TODO(tiabc): get rid of an extra parameter.
func makeSendHandler(jail *Jail, cellInt common.JailCell) func(call otto.FunctionCall) otto.Value { func makeSendHandler(jail *Jail, cellInt common.JailCell) func(call otto.FunctionCall) otto.Value {
// FIXME(tiabc): Get rid of this.
cell := cellInt.(*Cell)
return func(call otto.FunctionCall) otto.Value { return func(call otto.FunctionCall) otto.Value {
// Send calls are guaranteed to be only invoked from web3 after calling the appropriate return jail.Send(call)
// method of jail.Cell and the cell is locked during that call. In order to allow jail.Send
// to perform any operations on cell.VM and not hang, we need to unlock the mutex and return
// it to the previous state afterwards so that the caller didn't panic doing cell.Unlock().
cell.Unlock()
defer cell.Lock()
return jail.Send(call, cell.VM)
} }
} }

View File

@ -1,6 +1,7 @@
package loop package loop
import ( import (
"context"
"fmt" "fmt"
"sync" "sync"
"sync/atomic" "sync/atomic"
@ -44,7 +45,6 @@ type Loop struct {
lock sync.RWMutex lock sync.RWMutex
tasks map[int64]Task tasks map[int64]Task
ready chan Task ready chan Task
closed bool
} }
// New creates a new Loop with an unbuffered ready queue on a specific VM. // New creates a new Loop with an unbuffered ready queue on a specific VM.
@ -98,10 +98,6 @@ func (l *Loop) removeByID(id int64) {
// Ready signals to the loop that a task is ready to be finalised. This might // Ready signals to the loop that a task is ready to be finalised. This might
// block if the "ready channel" in the loop is at capacity. // block if the "ready channel" in the loop is at capacity.
func (l *Loop) Ready(t Task) { func (l *Loop) Ready(t Task) {
if l.closed {
return
}
l.ready <- t l.ready <- t
} }
@ -135,8 +131,10 @@ func (l *Loop) processTask(t Task) error {
// Run handles the task scheduling and finalisation. // Run handles the task scheduling and finalisation.
// It runs infinitely waiting for new tasks. // It runs infinitely waiting for new tasks.
func (l *Loop) Run() error { func (l *Loop) Run(ctx context.Context) error {
for t := range l.ready { for {
select {
case t := <-l.ready:
if t == nil { if t == nil {
continue continue
} }
@ -149,6 +147,9 @@ func (l *Loop) Run() error {
// should keep running. // should keep running.
continue continue
} }
case <-ctx.Done():
return context.Canceled
}
} }
return nil return nil
} }

View File

@ -114,7 +114,7 @@ func (c CallTask) Cancel() {}
// Execute calls the associated function (not necessarily in the given vm), // Execute calls the associated function (not necessarily in the given vm),
// pushing the resultant return value and error (or nil) into the associated // pushing the resultant return value and error (or nil) into the associated
// channels. If the call results in an error, it will return that error. // channels. If the call results in an error, it will return that error.
func (c CallTask) Execute(vm *otto.Otto, l *loop.Loop) error { func (c CallTask) Execute(vm *vm.VM, l *loop.Loop) error {
v, err := c.Function.Call(otto.NullValue(), c.Args...) v, err := c.Function.Call(otto.NullValue(), c.Args...)
c.Value <- v c.Value <- v
c.Error <- err c.Error <- err

View File

@ -69,3 +69,11 @@ func (vm *VM) CompileWithSourceMap(filename string, src, sm interface{}) (*otto.
return vm.vm.CompileWithSourceMap(filename, src, sm) return vm.vm.CompileWithSourceMap(filename, src, sm)
} }
// ToValue will convert an interface{} value to a value digestible by otto/JavaScript.
func (vm *VM) ToValue(value interface{}) (otto.Value, error) {
vm.Lock()
defer vm.Unlock()
return vm.vm.ToValue(value)
}

View File

@ -13,41 +13,35 @@ import (
"github.com/status-im/status-go/static" "github.com/status-im/status-go/static"
) )
// FIXME(tiabc): Get rid of this global variable. Move it to a constructor or initialization.
var web3JSCode = static.MustAsset("scripts/web3.js")
// errors
var ( var (
// FIXME(tiabc): Get rid of this global variable. Move it to a constructor or initialization.
web3JSCode = static.MustAsset("scripts/web3.js")
ErrInvalidJail = errors.New("jail environment is not properly initialized") ErrInvalidJail = errors.New("jail environment is not properly initialized")
) )
// Jail represents jailed environment inside of which we hold multiple cells. // Jail represents jailed environment inside of which we hold multiple cells.
// Each cell is a separate JavaScript VM. // Each cell is a separate JavaScript VM.
type Jail struct { type Jail struct {
// FIXME(tiabc): This mutex handles cells field access and must be renamed appropriately: cellsMutex
sync.RWMutex
nodeManager common.NodeManager nodeManager common.NodeManager
accountManager common.AccountManager
txQueueManager common.TxQueueManager
policy *ExecutionPolicy
cells map[string]*Cell // jail supports running many isolated instances of jailed runtime
baseJSCode string // JavaScript used to initialize all new cells with baseJSCode string // JavaScript used to initialize all new cells with
cellsMx sync.RWMutex
cells map[string]*Cell // jail supports running many isolated instances of jailed runtime
vm *vm.VM // vm for internal otto related tasks (see Send method)
} }
// New returns new Jail environment with the associated NodeManager and // New returns new Jail environment with the associated NodeManager.
// AccountManager. // It's caller responsibility to call jail.Stop() when jail is not needed.
func New( func New(nodeManager common.NodeManager) *Jail {
nodeManager common.NodeManager, accountManager common.AccountManager, txQueueManager common.TxQueueManager, if nodeManager == nil {
) *Jail {
if nodeManager == nil || accountManager == nil || txQueueManager == nil {
panic("Jail is missing mandatory dependencies") panic("Jail is missing mandatory dependencies")
} }
return &Jail{ return &Jail{
nodeManager: nodeManager, nodeManager: nodeManager,
accountManager: accountManager,
txQueueManager: txQueueManager,
cells: make(map[string]*Cell), cells: make(map[string]*Cell),
policy: NewExecutionPolicy(nodeManager, accountManager, txQueueManager), vm: vm.New(otto.New()),
} }
} }
@ -69,17 +63,28 @@ func (jail *Jail) NewCell(chatID string) (common.JailCell, error) {
return nil, err return nil, err
} }
jail.Lock() jail.cellsMx.Lock()
jail.cells[chatID] = cell jail.cells[chatID] = cell
jail.Unlock() jail.cellsMx.Unlock()
return cell, nil return cell, nil
} }
// Stop stops jail and all assosiacted cells.
func (jail *Jail) Stop() {
jail.cellsMx.Lock()
defer jail.cellsMx.Unlock()
for _, cell := range jail.cells {
cell.Stop()
}
jail.cells = nil
}
// Cell returns the existing instance of Cell. // Cell returns the existing instance of Cell.
func (jail *Jail) Cell(chatID string) (common.JailCell, error) { func (jail *Jail) Cell(chatID string) (common.JailCell, error) {
jail.RLock() jail.cellsMx.RLock()
defer jail.RUnlock() defer jail.cellsMx.RUnlock()
cell, ok := jail.cells[chatID] cell, ok := jail.cells[chatID]
if !ok { if !ok {
@ -151,68 +156,38 @@ func (jail *Jail) Call(chatID, this, args string) string {
return makeResult(res.String(), err) return makeResult(res.String(), err)
} }
// Send will serialize the first argument, send it to the node and returns the response. // Send is a wrapper for executing RPC calls from within Otto VM.
// IMPORTANT: Don't use `call.Otto` in this function unless you want to run into race conditions. Use `vm` instead. // It uses own jail's VM instance instead of cell's one to
// increase safety of cell's vm usage.
// TODO(divan): investigate if it's possible to do conversions
// withouth involving otto code at all.
// nolint: errcheck, unparam // nolint: errcheck, unparam
func (jail *Jail) Send(call otto.FunctionCall, vm *vm.VM) otto.Value { func (jail *Jail) Send(call otto.FunctionCall) otto.Value {
reqVal, err := vm.Call("JSON.stringify", nil, call.Argument(0)) request, err := jail.vm.Call("JSON.stringify", nil, call.Argument(0))
if err != nil { if err != nil {
throwJSException(err) throwJSException(err)
} }
var ( rpc := jail.nodeManager.RPCClient()
rawReq = []byte(reqVal.String()) // TODO(divan): remove this check as soon as jail cells have
reqs []common.RPCCall // proper cancellation mechanism implemented.
batch bool if rpc == nil {
) throwJSException(fmt.Errorf("Error getting RPC client. Node stopped?"))
if rawReq[0] == '[' {
batch = true
err = json.Unmarshal(rawReq, &reqs)
} else {
batch = false
reqs = make([]common.RPCCall, 1)
err = json.Unmarshal(rawReq, &reqs[0])
} }
response := rpc.CallRaw(request.String())
// unmarshal response to pass to otto
var resp interface{}
err = json.Unmarshal([]byte(response), &resp)
if err != nil { if err != nil {
throwJSException(fmt.Errorf("can't unmarshal %v (batch=%v): %s", string(rawReq), batch, err)) throwJSException(fmt.Errorf("Error unmarshalling result: %s", err))
}
respValue, err := jail.vm.ToValue(resp)
if err != nil {
throwJSException(fmt.Errorf("Error converting result to Otto's value: %s", err))
} }
resps, err := vm.Call("new Array", nil) return respValue
if err != nil {
throwJSException(fmt.Errorf("can't create Array: %s", err))
}
// Execute the requests.
for _, req := range reqs {
log.Info("execute request", "method", req.Method)
res, err := jail.policy.Execute(req, vm)
if err != nil {
log.Info("request errored", "error", err.Error())
switch err.(type) {
case common.StopRPCCallError:
return newErrorResponseOtto(vm, err.Error(), nil)
default:
res = newErrorResponse(err.Error(), &req.ID)
}
}
_, err = resps.Object().Call("push", res)
if err != nil {
throwJSException(fmt.Errorf("can't push result: %s", err))
}
}
// Return the responses either to the callback (if supplied)
// or directly as the return value.
if batch {
return resps
}
v, err := resps.Object().Get("0")
if err != nil {
throwJSException(err)
}
return v
} }
func newErrorResponse(msg string, id interface{}) map[string]interface{} { func newErrorResponse(msg string, id interface{}) map[string]interface{} {

View File

@ -3,6 +3,8 @@ package jail_test
import ( import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt"
"strings"
"sync" "sync"
"testing" "testing"
"time" "time"
@ -13,7 +15,6 @@ import (
"github.com/status-im/status-go/geth/params" "github.com/status-im/status-go/geth/params"
"github.com/status-im/status-go/geth/signal" "github.com/status-im/status-go/geth/signal"
. "github.com/status-im/status-go/geth/testing" . "github.com/status-im/status-go/geth/testing"
"github.com/status-im/status-go/geth/txqueue"
"github.com/status-im/status-go/static" "github.com/status-im/status-go/static"
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
) )
@ -45,15 +46,17 @@ func (s *JailTestSuite) SetupTest() {
accountManager := account.NewManager(nodeManager) accountManager := account.NewManager(nodeManager)
require.NotNil(accountManager) require.NotNil(accountManager)
txQueueManager := txqueue.NewManager(nodeManager, accountManager) jail := jail.New(nodeManager)
jail := jail.New(nodeManager, accountManager, txQueueManager)
require.NotNil(jail) require.NotNil(jail)
s.jail = jail s.jail = jail
s.NodeManager = nodeManager s.NodeManager = nodeManager
} }
func (s *JailTestSuite) TearDownTest() {
s.jail.Stop()
}
func (s *JailTestSuite) TestInit() { func (s *JailTestSuite) TestInit() {
require := s.Require() require := s.Require()
@ -133,6 +136,9 @@ func (s *JailTestSuite) TestFunctionCall() {
func (s *JailTestSuite) TestJailRPCAsyncSend() { func (s *JailTestSuite) TestJailRPCAsyncSend() {
require := s.Require() require := s.Require()
s.StartTestNode(params.RopstenNetworkID)
defer s.StopTestNode()
// load Status JS and add test command to it // load Status JS and add test command to it
s.jail.BaseJS(baseStatusJSCode) s.jail.BaseJS(baseStatusJSCode)
s.jail.Parse(testChatID, txJSCode) s.jail.Parse(testChatID, txJSCode)
@ -158,6 +164,12 @@ func (s *JailTestSuite) TestJailRPCAsyncSend() {
}() }()
} }
wg.Wait() wg.Wait()
// TODO(divan): revisit this test. sendAsync now returns immediately,
// and we need no way here to halt jail loop, which executes actual
// transaction send in background. For now, just wait a couple of secs
// to let tests pass.
time.Sleep(2 * time.Second)
} }
func (s *JailTestSuite) TestJailRPCSend() { func (s *JailTestSuite) TestJailRPCSend() {
@ -189,7 +201,6 @@ func (s *JailTestSuite) TestJailRPCSend() {
balance, err := value.ToFloat() balance, err := value.ToFloat()
require.NoError(err) require.NoError(err)
s.T().Logf("Balance of %.2f ETH found on '%s' account", balance, TestConfig.Account1.Address)
require.False(balance < 100, "wrong balance (there should be lots of test Ether on that account)") require.False(balance < 100, "wrong balance (there should be lots of test Ether on that account)")
} }
@ -278,3 +289,57 @@ func (s *JailTestSuite) TestEventSignal() {
expectedResponse := `{"jsonrpc":"2.0","result":true}` expectedResponse := `{"jsonrpc":"2.0","result":true}`
require.Equal(expectedResponse, response) require.Equal(expectedResponse, response)
} }
// TestCallResponseOrder tests for problem in
// https://github.com/status-im/status-go/issues/372
func (s *JailTestSuite) TestCallResponseOrder() {
require := s.Require()
s.StartTestNode(params.RopstenNetworkID)
defer s.StopTestNode()
statusJS := baseStatusJSCode + `;
_status_catalog.commands["testCommand"] = function (params) {
return params.val * params.val;
};
_status_catalog.commands["calculateGasPrice"] = function (n) {
var gasMultiplicator = Math.pow(1.4, n).toFixed(3);
var price = 211000000000;
try {
price = web3.eth.gasPrice;
} catch (err) {}
return price * gasMultiplicator;
};
`
s.jail.Parse(testChatID, statusJS)
N := 1000
errCh := make(chan error, N)
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(2)
go func(i int) {
defer wg.Done()
res := s.jail.Call(testChatID, `["commands", "testCommand"]`, fmt.Sprintf(`{"val": %d}`, i))
if !strings.Contains(res, fmt.Sprintf("result\": %d", i*i)) {
errCh <- fmt.Errorf("result should be '%d', got %s", i*i, res)
}
}(i)
go func(i int) {
defer wg.Done()
res := s.jail.Call(testChatID, `["commands", "calculateGasPrice"]`, fmt.Sprintf(`%d`, i))
if strings.Contains(res, "error") {
errCh <- fmt.Errorf("result should not contain 'error', got %s", res)
}
}(i)
}
wg.Wait()
close(errCh)
for e := range errCh {
require.NoError(e)
}
}