Refactor and cleanup Jail (#368)
Refactor and clean up Jail package: Removes account.Manager and txqueue.Manager from Jail as they are not used anymore Removes messageID related code from Jail.Send Simplifies Jail.Send to be a wrapper around RPC client's CallRaw Renames jail_cell* to cell* Related cleanups
This commit is contained in:
parent
8623b52873
commit
3540972f0e
|
@ -33,11 +33,12 @@ func NewStatusBackend() *StatusBackend {
|
|||
nodeManager := node.NewNodeManager()
|
||||
accountManager := account.NewManager(nodeManager)
|
||||
txQueueManager := txqueue.NewManager(nodeManager, accountManager)
|
||||
jailManager := jail.New(nodeManager)
|
||||
|
||||
return &StatusBackend{
|
||||
nodeManager: nodeManager,
|
||||
accountManager: accountManager,
|
||||
jailManager: jail.New(nodeManager, accountManager, txQueueManager),
|
||||
jailManager: jailManager,
|
||||
txQueueManager: txQueueManager,
|
||||
}
|
||||
}
|
||||
|
@ -123,6 +124,7 @@ func (m *StatusBackend) StopNode() (<-chan struct{}, error) {
|
|||
}
|
||||
|
||||
m.txQueueManager.Stop()
|
||||
m.jailManager.Stop()
|
||||
|
||||
backendStopped := make(chan struct{}, 1)
|
||||
go func() {
|
||||
|
|
|
@ -13,7 +13,6 @@ import (
|
|||
"github.com/ethereum/go-ethereum/crypto"
|
||||
whisper "github.com/ethereum/go-ethereum/whisper/whisperv5"
|
||||
"github.com/status-im/status-go/geth/common"
|
||||
"github.com/status-im/status-go/geth/log"
|
||||
"github.com/status-im/status-go/geth/params"
|
||||
"github.com/status-im/status-go/geth/signal"
|
||||
. "github.com/status-im/status-go/geth/testing"
|
||||
|
@ -35,166 +34,6 @@ var (
|
|||
baseStatusJSCode = string(static.MustAsset("testdata/jail/status.js"))
|
||||
)
|
||||
|
||||
func (s *BackendTestSuite) TestJailSendQueuedTransaction() {
|
||||
require := s.Require()
|
||||
|
||||
s.StartTestBackend(params.RopstenNetworkID)
|
||||
defer s.StopTestBackend()
|
||||
|
||||
time.Sleep(TestConfig.Node.SyncSeconds * time.Second) // allow to sync
|
||||
|
||||
// log into account from which transactions will be sent
|
||||
require.NoError(s.backend.AccountManager().SelectAccount(TestConfig.Account1.Address, TestConfig.Account1.Password))
|
||||
|
||||
txParams := `{
|
||||
"from": "` + TestConfig.Account1.Address + `",
|
||||
"to": "` + TestConfig.Account2.Address + `",
|
||||
"value": "0.000001"
|
||||
}`
|
||||
|
||||
txHashes := make(chan gethcommon.Hash, 1)
|
||||
|
||||
// replace transaction notification handler
|
||||
requireMessageId := false
|
||||
signal.SetDefaultNodeNotificationHandler(func(jsonEvent string) {
|
||||
var envelope signal.Envelope
|
||||
err := json.Unmarshal([]byte(jsonEvent), &envelope)
|
||||
s.NoError(err, fmt.Sprintf("cannot unmarshal JSON: %s", jsonEvent))
|
||||
|
||||
if envelope.Type == txqueue.EventTransactionQueued {
|
||||
event := envelope.Event.(map[string]interface{})
|
||||
messageId, ok := event["message_id"].(string)
|
||||
s.True(ok, "Message id is required, but not found")
|
||||
if requireMessageId {
|
||||
require.NotEmpty(messageId, "Message id is required, but not provided")
|
||||
} else {
|
||||
require.Empty(messageId, "Message id is not required, but provided")
|
||||
}
|
||||
|
||||
txID := event["id"].(string)
|
||||
txHash, err := s.backend.CompleteTransaction(common.QueuedTxID(txID), TestConfig.Account1.Password)
|
||||
require.NoError(err, "cannot complete queued transaction[%v]", event["id"])
|
||||
|
||||
log.Info("Transaction complete", "URL", "https://ropsten.etherscan.io/tx/%s"+txHash.Hex())
|
||||
|
||||
txHashes <- txHash
|
||||
}
|
||||
})
|
||||
|
||||
type testCommand struct {
|
||||
command string
|
||||
params string
|
||||
expectedResponse string
|
||||
}
|
||||
type testCase struct {
|
||||
name string
|
||||
file string
|
||||
requireMessageId bool
|
||||
commands []testCommand
|
||||
}
|
||||
|
||||
tests := []testCase{
|
||||
{
|
||||
// no context or message id
|
||||
name: "Case 1: no message id or context in inited JS",
|
||||
file: "no-message-id-or-context.js",
|
||||
requireMessageId: false,
|
||||
commands: []testCommand{
|
||||
{
|
||||
`["commands", "send"]`,
|
||||
txParams,
|
||||
`{"result": {"transaction-hash":"TX_HASH"}}`,
|
||||
},
|
||||
{
|
||||
`["commands", "getBalance"]`,
|
||||
`{"address": "` + TestConfig.Account1.Address + `"}`,
|
||||
`{"result": {"balance":42}}`,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
// context is present in inited JS (but no message id is there)
|
||||
name: "Case 2: context is present in inited JS (but no message id is there)",
|
||||
file: "context-no-message-id.js",
|
||||
requireMessageId: false,
|
||||
commands: []testCommand{
|
||||
{
|
||||
`["commands", "send"]`,
|
||||
txParams,
|
||||
`{"result": {"context":{"` + params.SendTransactionMethodName + `":true},"result":{"transaction-hash":"TX_HASH"}}}`,
|
||||
},
|
||||
{
|
||||
`["commands", "getBalance"]`,
|
||||
`{"address": "` + TestConfig.Account1.Address + `"}`,
|
||||
`{"result": {"context":{},"result":{"balance":42}}}`, // note empty (but present) context!
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
// message id is present in inited JS, but no context is there
|
||||
name: "Case 3: message id is present, context is not present",
|
||||
file: "message-id-no-context.js",
|
||||
requireMessageId: true,
|
||||
commands: []testCommand{
|
||||
{
|
||||
`["commands", "send"]`,
|
||||
txParams,
|
||||
`{"result": {"transaction-hash":"TX_HASH"}}`,
|
||||
},
|
||||
{
|
||||
`["commands", "getBalance"]`,
|
||||
`{"address": "` + TestConfig.Account1.Address + `"}`,
|
||||
`{"result": {"balance":42}}`, // note empty context!
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
// both message id and context are present in inited JS (this UC is what we normally expect to see)
|
||||
name: "Case 4: both message id and context are present",
|
||||
file: "tx-send.js",
|
||||
requireMessageId: true,
|
||||
commands: []testCommand{
|
||||
{
|
||||
`["commands", "getBalance"]`,
|
||||
`{"address": "` + TestConfig.Account1.Address + `"}`,
|
||||
`{"result": {"context":{"message_id":"42"},"result":{"balance":42}}}`, // message id in context, but default one is used!
|
||||
},
|
||||
{
|
||||
`["commands", "send"]`,
|
||||
txParams,
|
||||
`{"result": {"context":{"eth_sendTransaction":true,"message_id":"foobar"},"result":{"transaction-hash":"TX_HASH"}}}`,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
jailInstance := s.backend.JailManager()
|
||||
for _, test := range tests {
|
||||
jailInstance.BaseJS(string(static.MustAsset(txSendFolder + test.file)))
|
||||
jailInstance.Parse(testChatID, ``)
|
||||
|
||||
// used by notification handler
|
||||
requireMessageId = test.requireMessageId
|
||||
|
||||
for _, command := range test.commands {
|
||||
s.T().Logf("%s: %s", test.name, command.command)
|
||||
response := jailInstance.Call(testChatID, command.command, command.params)
|
||||
|
||||
var txHash gethcommon.Hash
|
||||
if command.command == `["commands", "send"]` {
|
||||
select {
|
||||
case txHash = <-txHashes:
|
||||
case <-time.After(time.Minute):
|
||||
s.FailNow("test timed out: %s", test.name)
|
||||
}
|
||||
}
|
||||
|
||||
expectedResponse := strings.Replace(command.expectedResponse, "TX_HASH", txHash.Hex(), 1)
|
||||
require.Equal(expectedResponse, response)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *BackendTestSuite) TestContractDeployment() {
|
||||
require := s.Require()
|
||||
|
||||
|
@ -241,6 +80,7 @@ func (s *BackendTestSuite) TestContractDeployment() {
|
|||
|
||||
_, err = cell.Run(`
|
||||
var responseValue = null;
|
||||
var errorValue = null;
|
||||
var testContract = web3.eth.contract([{"constant":true,"inputs":[{"name":"a","type":"int256"}],"name":"double","outputs":[{"name":"","type":"int256"}],"payable":false,"type":"function"}]);
|
||||
var test = testContract.new(
|
||||
{
|
||||
|
@ -249,11 +89,13 @@ func (s *BackendTestSuite) TestContractDeployment() {
|
|||
gas: '` + strconv.Itoa(params.DefaultGas) + `'
|
||||
}, function (e, contract) {
|
||||
// NOTE: The callback will fire twice!
|
||||
if (e) {
|
||||
errorValue = e;
|
||||
return
|
||||
}
|
||||
// Once the contract has the transactionHash property set and once its deployed on an address.
|
||||
if (!e) {
|
||||
if (!contract.address) {
|
||||
responseValue = contract.transactionHash;
|
||||
}
|
||||
if (!contract.address) {
|
||||
responseValue = contract.transactionHash;
|
||||
}
|
||||
})
|
||||
`)
|
||||
|
@ -268,6 +110,10 @@ func (s *BackendTestSuite) TestContractDeployment() {
|
|||
// Wait until callback is fired and `responseValue` is set. Hacky but simple.
|
||||
time.Sleep(2 * time.Second)
|
||||
|
||||
errorValue, err := cell.Get("errorValue")
|
||||
require.NoError(err)
|
||||
require.Equal("null", errorValue.String())
|
||||
|
||||
responseValue, err := cell.Get("responseValue")
|
||||
require.NoError(err)
|
||||
|
||||
|
@ -276,7 +122,6 @@ func (s *BackendTestSuite) TestContractDeployment() {
|
|||
|
||||
expectedResponse := txHash.Hex()
|
||||
require.Equal(expectedResponse, response)
|
||||
s.T().Logf("estimation complete: %s", response)
|
||||
}
|
||||
|
||||
func (s *BackendTestSuite) TestJailWhisper() {
|
||||
|
|
|
@ -256,6 +256,8 @@ type JailCell interface {
|
|||
Run(interface{}) (otto.Value, error)
|
||||
// Call an arbitrary JS function by name and args.
|
||||
Call(item string, this interface{}, args ...interface{}) (otto.Value, error)
|
||||
// Stop stops background execution of cell.
|
||||
Stop()
|
||||
}
|
||||
|
||||
// JailManager defines methods for managing jailed environments
|
||||
|
@ -275,6 +277,9 @@ type JailManager interface {
|
|||
|
||||
// BaseJS allows to setup initial JavaScript to be loaded on each jail.Parse()
|
||||
BaseJS(js string)
|
||||
|
||||
// Stop stops all background activity of jail
|
||||
Stop()
|
||||
}
|
||||
|
||||
// APIResponse generic response from API
|
||||
|
|
|
@ -587,6 +587,24 @@ func (mr *MockTxQueueManagerMockRecorder) SetTransactionReturnHandler(fn interfa
|
|||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetTransactionReturnHandler", reflect.TypeOf((*MockTxQueueManager)(nil).SetTransactionReturnHandler), fn)
|
||||
}
|
||||
|
||||
// SendTransactionRPCHandler mocks base method
|
||||
func (m *MockTxQueueManager) SendTransactionRPCHandler(ctx context.Context, args ...interface{}) (interface{}, error) {
|
||||
varargs := []interface{}{ctx}
|
||||
for _, a := range args {
|
||||
varargs = append(varargs, a)
|
||||
}
|
||||
ret := m.ctrl.Call(m, "SendTransactionRPCHandler", varargs...)
|
||||
ret0, _ := ret[0].(interface{})
|
||||
ret1, _ := ret[1].(error)
|
||||
return ret0, ret1
|
||||
}
|
||||
|
||||
// SendTransactionRPCHandler indicates an expected call of SendTransactionRPCHandler
|
||||
func (mr *MockTxQueueManagerMockRecorder) SendTransactionRPCHandler(ctx interface{}, args ...interface{}) *gomock.Call {
|
||||
varargs := append([]interface{}{ctx}, args...)
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendTransactionRPCHandler", reflect.TypeOf((*MockTxQueueManager)(nil).SendTransactionRPCHandler), varargs...)
|
||||
}
|
||||
|
||||
// TransactionReturnHandler mocks base method
|
||||
func (m *MockTxQueueManager) TransactionReturnHandler() func(*QueuedTx, error) {
|
||||
ret := m.ctrl.Call(m, "TransactionReturnHandler")
|
||||
|
@ -727,6 +745,16 @@ func (mr *MockJailCellMockRecorder) Call(item, this interface{}, args ...interfa
|
|||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Call", reflect.TypeOf((*MockJailCell)(nil).Call), varargs...)
|
||||
}
|
||||
|
||||
// Stop mocks base method
|
||||
func (m *MockJailCell) Stop() {
|
||||
m.ctrl.Call(m, "Stop")
|
||||
}
|
||||
|
||||
// Stop indicates an expected call of Stop
|
||||
func (mr *MockJailCellMockRecorder) Stop() *gomock.Call {
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Stop", reflect.TypeOf((*MockJailCell)(nil).Stop))
|
||||
}
|
||||
|
||||
// MockJailManager is a mock of JailManager interface
|
||||
type MockJailManager struct {
|
||||
ctrl *gomock.Controller
|
||||
|
@ -809,3 +837,13 @@ func (m *MockJailManager) BaseJS(js string) {
|
|||
func (mr *MockJailManagerMockRecorder) BaseJS(js interface{}) *gomock.Call {
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BaseJS", reflect.TypeOf((*MockJailManager)(nil).BaseJS), js)
|
||||
}
|
||||
|
||||
// Stop mocks base method
|
||||
func (m *MockJailManager) Stop() {
|
||||
m.ctrl.Call(m, "Stop")
|
||||
}
|
||||
|
||||
// Stop indicates an expected call of Stop
|
||||
func (mr *MockJailManagerMockRecorder) Stop() *gomock.Call {
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Stop", reflect.TypeOf((*MockJailManager)(nil).Stop))
|
||||
}
|
||||
|
|
|
@ -1,17 +1,22 @@
|
|||
package jail
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/robertkrimen/otto"
|
||||
"github.com/status-im/status-go/geth/jail/internal/fetch"
|
||||
"github.com/status-im/status-go/geth/jail/internal/loop"
|
||||
"github.com/status-im/status-go/geth/jail/internal/loop/looptask"
|
||||
"github.com/status-im/status-go/geth/jail/internal/timers"
|
||||
"github.com/status-im/status-go/geth/jail/internal/vm"
|
||||
)
|
||||
|
||||
// Cell represents a single jail cell, which is basically a JavaScript VM.
|
||||
type Cell struct {
|
||||
id string
|
||||
*vm.VM
|
||||
id string
|
||||
cancel context.CancelFunc
|
||||
lo *loop.Loop
|
||||
}
|
||||
|
||||
// newCell encapsulates what we need to create a new jailCell from the
|
||||
|
@ -23,13 +28,16 @@ func newCell(id string, ottoVM *otto.Otto) (*Cell, error) {
|
|||
|
||||
registerVMHandlers(cellVM, lo)
|
||||
|
||||
// start loop in a goroutine
|
||||
// Cell is currently immortal, so the loop
|
||||
go lo.Run()
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
// start event loop in background
|
||||
go lo.Run(ctx)
|
||||
|
||||
return &Cell{
|
||||
id: id,
|
||||
VM: cellVM,
|
||||
VM: cellVM,
|
||||
id: id,
|
||||
cancel: cancel,
|
||||
lo: lo,
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -48,3 +56,19 @@ func registerVMHandlers(v *vm.VM, lo *loop.Loop) error {
|
|||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stop halts event loop associated with cell.
|
||||
func (c *Cell) Stop() {
|
||||
c.cancel()
|
||||
}
|
||||
|
||||
// CallAsync puts otto's function with given args into
|
||||
// event queue loop and schedules for immediate execution.
|
||||
// Intended to be used by any cell user that want's to run
|
||||
// async call, like callback.
|
||||
func (c *Cell) CallAsync(fn otto.Value, args ...interface{}) {
|
||||
task := looptask.NewCallTask(fn, args...)
|
||||
c.lo.Add(task)
|
||||
// TODO(divan): review API of `loop` package, it's contrintuitive
|
||||
go c.lo.Ready(task)
|
||||
}
|
|
@ -14,6 +14,7 @@ func (s *JailTestSuite) TestJailTimeoutFailure() {
|
|||
cell, err := s.jail.NewCell(testChatID)
|
||||
require.NoError(err)
|
||||
require.NotNil(cell)
|
||||
defer cell.Stop()
|
||||
|
||||
// Attempt to run a timeout string against a Cell.
|
||||
_, err = cell.Run(`
|
||||
|
@ -44,6 +45,7 @@ func (s *JailTestSuite) TestJailTimeout() {
|
|||
cell, err := s.jail.NewCell(testChatID)
|
||||
require.NoError(err)
|
||||
require.NotNil(cell)
|
||||
defer cell.Stop()
|
||||
|
||||
// Attempt to run a timeout string against a Cell.
|
||||
_, err = cell.Run(`
|
||||
|
@ -78,6 +80,7 @@ func (s *JailTestSuite) TestJailLoopInCall() {
|
|||
cell, err := s.jail.Cell(testChatID)
|
||||
require.NoError(err)
|
||||
require.NotNil(cell)
|
||||
defer cell.Stop()
|
||||
|
||||
items := make(chan string)
|
||||
|
||||
|
@ -116,6 +119,7 @@ func (s *JailTestSuite) TestJailLoopRace() {
|
|||
cell, err := s.jail.NewCell(testChatID)
|
||||
require.NoError(err)
|
||||
require.NotNil(cell)
|
||||
defer cell.Stop()
|
||||
|
||||
items := make(chan struct{})
|
||||
|
||||
|
@ -161,6 +165,7 @@ func (s *JailTestSuite) TestJailFetchPromise() {
|
|||
cell, err := s.jail.NewCell(testChatID)
|
||||
require.NoError(err)
|
||||
require.NotNil(cell)
|
||||
defer cell.Stop()
|
||||
|
||||
dataCh := make(chan otto.Value, 1)
|
||||
errCh := make(chan otto.Value, 1)
|
||||
|
@ -197,6 +202,7 @@ func (s *JailTestSuite) TestJailFetchCatch() {
|
|||
cell, err := s.jail.NewCell(testChatID)
|
||||
require.NoError(err)
|
||||
require.NotNil(cell)
|
||||
defer cell.Stop()
|
||||
|
||||
dataCh := make(chan otto.Value, 1)
|
||||
errCh := make(chan otto.Value, 1)
|
||||
|
@ -245,6 +251,7 @@ func (s *JailTestSuite) TestJailFetchRace() {
|
|||
cell, err := s.jail.NewCell(testChatID)
|
||||
require.NoError(err)
|
||||
require.NotNil(cell)
|
||||
defer cell.Stop()
|
||||
|
||||
dataCh := make(chan otto.Value, 1)
|
||||
errCh := make(chan otto.Value, 1)
|
||||
|
@ -292,3 +299,54 @@ func (s *JailTestSuite) TestJailFetchRace() {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestJailLoopCancel tests that cell.Stop() really cancels event
|
||||
// loop and pending tasks.
|
||||
func (s *JailTestSuite) TestJailLoopCancel() {
|
||||
require := s.Require()
|
||||
|
||||
// load Status JS and add test command to it
|
||||
s.jail.BaseJS(baseStatusJSCode)
|
||||
s.jail.Parse(testChatID, ``)
|
||||
|
||||
cell, err := s.jail.Cell(testChatID)
|
||||
require.NoError(err)
|
||||
require.NotNil(cell)
|
||||
|
||||
var count int
|
||||
err = cell.Set("__captureResponse", func(val string) otto.Value {
|
||||
count++
|
||||
return otto.UndefinedValue()
|
||||
})
|
||||
require.NoError(err)
|
||||
|
||||
_, err = cell.Run(`
|
||||
function callRunner(val, delay){
|
||||
return setTimeout(function(){
|
||||
__captureResponse(val);
|
||||
}, delay);
|
||||
}
|
||||
`)
|
||||
require.NoError(err)
|
||||
|
||||
// Run 5 timeout tasks to be executed in: 1, 2, 3, 4 and 5 secs
|
||||
for i := 1; i <= 5; i++ {
|
||||
_, err = cell.Call("callRunner", nil, "value", i*1000)
|
||||
require.NoError(err)
|
||||
}
|
||||
|
||||
// Wait 1.5 second (so only one task executed) so far
|
||||
// and stop the cell (event loop should die)
|
||||
time.Sleep(1500 * time.Millisecond)
|
||||
cell.Stop()
|
||||
|
||||
// check that only 1 task has increased counter
|
||||
require.Equal(1, count)
|
||||
|
||||
// wait 2 seconds more (so at least two more tasks would
|
||||
// have been executed if event loop is still running)
|
||||
<-time.After(2 * time.Second)
|
||||
|
||||
// check that counter hasn't increased
|
||||
require.Equal(1, count)
|
||||
}
|
|
@ -9,7 +9,7 @@ import (
|
|||
|
||||
"github.com/robertkrimen/otto"
|
||||
"github.com/status-im/status-go/geth/jail/console"
|
||||
"github.com/status-im/status-go/geth/node"
|
||||
"github.com/status-im/status-go/geth/signal"
|
||||
"github.com/stretchr/testify/suite"
|
||||
)
|
||||
|
||||
|
@ -58,8 +58,7 @@ func (s *ConsoleTestSuite) TestObjectLogging() {
|
|||
|
||||
var customWriter bytes.Buffer
|
||||
|
||||
node.SetDefaultNodeNotificationHandler(func(event string) {
|
||||
|
||||
signal.SetDefaultNodeNotificationHandler(func(event string) {
|
||||
var eventReceived struct {
|
||||
Type string `json:"type"`
|
||||
Event []struct {
|
||||
|
|
|
@ -1,113 +0,0 @@
|
|||
package jail
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
gethrpc "github.com/ethereum/go-ethereum/rpc"
|
||||
"github.com/status-im/status-go/geth/common"
|
||||
"github.com/status-im/status-go/geth/jail/internal/vm"
|
||||
"github.com/status-im/status-go/geth/params"
|
||||
"github.com/status-im/status-go/geth/rpc"
|
||||
)
|
||||
|
||||
// ExecutionPolicy provides a central container for the executions of RPCCall requests for both
|
||||
// remote/upstream processing and internal node processing.
|
||||
type ExecutionPolicy struct {
|
||||
nodeManager common.NodeManager
|
||||
accountManager common.AccountManager
|
||||
txQueueManager common.TxQueueManager
|
||||
}
|
||||
|
||||
// NewExecutionPolicy returns a new instance of ExecutionPolicy.
|
||||
func NewExecutionPolicy(
|
||||
nodeManager common.NodeManager, accountManager common.AccountManager, txQueueManager common.TxQueueManager,
|
||||
) *ExecutionPolicy {
|
||||
return &ExecutionPolicy{
|
||||
nodeManager: nodeManager,
|
||||
accountManager: accountManager,
|
||||
txQueueManager: txQueueManager,
|
||||
}
|
||||
}
|
||||
|
||||
// Execute handles the execution of a RPC request and routes appropriately to either a local or remote ethereum node.
|
||||
func (ep *ExecutionPolicy) Execute(req common.RPCCall, vm *vm.VM) (map[string]interface{}, error) {
|
||||
client := ep.nodeManager.RPCClient()
|
||||
return ep.executeWithClient(client, vm, req)
|
||||
}
|
||||
|
||||
func (ep *ExecutionPolicy) executeWithClient(client *rpc.Client, vm *vm.VM, req common.RPCCall) (map[string]interface{}, error) {
|
||||
// Arbitrary JSON-RPC response.
|
||||
var result interface{}
|
||||
|
||||
resp := map[string]interface{}{
|
||||
"jsonrpc": "2.0",
|
||||
"id": req.ID,
|
||||
}
|
||||
|
||||
// do extra request pre processing (persist message id)
|
||||
// within function semaphore will be acquired and released,
|
||||
// so that no more than one client (per cell) can enter
|
||||
messageID, err := preProcessRequest(vm)
|
||||
if err != nil {
|
||||
return nil, common.StopRPCCallError{Err: err}
|
||||
}
|
||||
|
||||
if client == nil {
|
||||
resp = newErrorResponse("RPC client is not available. Node is stopped?", &req.ID)
|
||||
} else {
|
||||
// TODO(adam): check if context is used
|
||||
ctx := context.WithValue(context.Background(), common.MessageIDKey, messageID)
|
||||
err = client.CallContext(ctx, &result, req.Method, req.Params...)
|
||||
if err != nil {
|
||||
if err2, ok := err.(gethrpc.Error); ok {
|
||||
resp["error"] = map[string]interface{}{
|
||||
"code": err2.ErrorCode(),
|
||||
"message": err2.Error(),
|
||||
}
|
||||
} else {
|
||||
resp = newErrorResponse(err.Error(), &req.ID)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if result == nil {
|
||||
// Special case null because it is decoded as an empty
|
||||
// raw message for some reason.
|
||||
resp["result"] = ""
|
||||
} else {
|
||||
resp["result"] = result
|
||||
}
|
||||
|
||||
// do extra request post processing (setting back tx context)
|
||||
postProcessRequest(vm, req, messageID)
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
// preProcessRequest pre-processes a given RPC call to a given Otto VM
|
||||
func preProcessRequest(vm *vm.VM) (string, error) {
|
||||
messageID := currentMessageID(vm)
|
||||
|
||||
return messageID, nil
|
||||
}
|
||||
|
||||
// postProcessRequest post-processes a given RPC call to a given Otto VM
|
||||
func postProcessRequest(vm *vm.VM, req common.RPCCall, messageID string) {
|
||||
if len(messageID) > 0 {
|
||||
vm.Call("addContext", nil, messageID, common.MessageIDKey, messageID) // nolint: errcheck
|
||||
}
|
||||
|
||||
// set extra markers for queued transaction requests
|
||||
if req.Method == params.SendTransactionMethodName {
|
||||
vm.Call("addContext", nil, messageID, params.SendTransactionMethodName, true) // nolint: errcheck
|
||||
}
|
||||
}
|
||||
|
||||
// currentMessageID looks for `status.message_id` variable in current JS context
|
||||
func currentMessageID(vm *vm.VM) string {
|
||||
msgID, err := vm.Run("status.message_id")
|
||||
if err != nil {
|
||||
return ""
|
||||
}
|
||||
return msgID.String()
|
||||
}
|
|
@ -73,31 +73,22 @@ func makeAsyncSendHandler(jail *Jail, cellInt common.JailCell) func(call otto.Fu
|
|||
cell := cellInt.(*Cell)
|
||||
return func(call otto.FunctionCall) otto.Value {
|
||||
go func() {
|
||||
response := jail.Send(call, cell.VM)
|
||||
response := jail.Send(call)
|
||||
|
||||
if fn := call.Argument(1); fn.Class() == "Function" {
|
||||
cell.Lock()
|
||||
fn.Call(otto.NullValue(), otto.NullValue(), response)
|
||||
cell.Unlock()
|
||||
}
|
||||
// run callback asyncronously with args (error, response)
|
||||
callback := call.Argument(1)
|
||||
err := otto.NullValue()
|
||||
cell.CallAsync(callback, err, response)
|
||||
}()
|
||||
return otto.UndefinedValue()
|
||||
}
|
||||
}
|
||||
|
||||
// makeSendHandler returns jeth.send() and jeth.sendAsync() handler
|
||||
// TODO(tiabc): get rid of an extra parameter.
|
||||
func makeSendHandler(jail *Jail, cellInt common.JailCell) func(call otto.FunctionCall) otto.Value {
|
||||
// FIXME(tiabc): Get rid of this.
|
||||
cell := cellInt.(*Cell)
|
||||
return func(call otto.FunctionCall) otto.Value {
|
||||
// Send calls are guaranteed to be only invoked from web3 after calling the appropriate
|
||||
// method of jail.Cell and the cell is locked during that call. In order to allow jail.Send
|
||||
// to perform any operations on cell.VM and not hang, we need to unlock the mutex and return
|
||||
// it to the previous state afterwards so that the caller didn't panic doing cell.Unlock().
|
||||
cell.Unlock()
|
||||
defer cell.Lock()
|
||||
|
||||
return jail.Send(call, cell.VM)
|
||||
return jail.Send(call)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package loop
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
@ -39,12 +40,11 @@ type Task interface {
|
|||
// to finalise on the VM. The channel holding the tasks pending finalising can
|
||||
// be buffered or unbuffered.
|
||||
type Loop struct {
|
||||
vm *vm.VM
|
||||
id int64
|
||||
lock sync.RWMutex
|
||||
tasks map[int64]Task
|
||||
ready chan Task
|
||||
closed bool
|
||||
vm *vm.VM
|
||||
id int64
|
||||
lock sync.RWMutex
|
||||
tasks map[int64]Task
|
||||
ready chan Task
|
||||
}
|
||||
|
||||
// New creates a new Loop with an unbuffered ready queue on a specific VM.
|
||||
|
@ -98,10 +98,6 @@ func (l *Loop) removeByID(id int64) {
|
|||
// Ready signals to the loop that a task is ready to be finalised. This might
|
||||
// block if the "ready channel" in the loop is at capacity.
|
||||
func (l *Loop) Ready(t Task) {
|
||||
if l.closed {
|
||||
return
|
||||
}
|
||||
|
||||
l.ready <- t
|
||||
}
|
||||
|
||||
|
@ -135,19 +131,24 @@ func (l *Loop) processTask(t Task) error {
|
|||
|
||||
// Run handles the task scheduling and finalisation.
|
||||
// It runs infinitely waiting for new tasks.
|
||||
func (l *Loop) Run() error {
|
||||
for t := range l.ready {
|
||||
if t == nil {
|
||||
continue
|
||||
}
|
||||
func (l *Loop) Run(ctx context.Context) error {
|
||||
for {
|
||||
select {
|
||||
case t := <-l.ready:
|
||||
if t == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
err := l.processTask(t)
|
||||
if err != nil {
|
||||
// TODO(divan): do we need to report
|
||||
// errors up to the caller?
|
||||
// Ignoring for now, as loop
|
||||
// should keep running.
|
||||
continue
|
||||
err := l.processTask(t)
|
||||
if err != nil {
|
||||
// TODO(divan): do we need to report
|
||||
// errors up to the caller?
|
||||
// Ignoring for now, as loop
|
||||
// should keep running.
|
||||
continue
|
||||
}
|
||||
case <-ctx.Done():
|
||||
return context.Canceled
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
|
|
@ -114,7 +114,7 @@ func (c CallTask) Cancel() {}
|
|||
// Execute calls the associated function (not necessarily in the given vm),
|
||||
// pushing the resultant return value and error (or nil) into the associated
|
||||
// channels. If the call results in an error, it will return that error.
|
||||
func (c CallTask) Execute(vm *otto.Otto, l *loop.Loop) error {
|
||||
func (c CallTask) Execute(vm *vm.VM, l *loop.Loop) error {
|
||||
v, err := c.Function.Call(otto.NullValue(), c.Args...)
|
||||
c.Value <- v
|
||||
c.Error <- err
|
||||
|
|
|
@ -69,3 +69,11 @@ func (vm *VM) CompileWithSourceMap(filename string, src, sm interface{}) (*otto.
|
|||
|
||||
return vm.vm.CompileWithSourceMap(filename, src, sm)
|
||||
}
|
||||
|
||||
// ToValue will convert an interface{} value to a value digestible by otto/JavaScript.
|
||||
func (vm *VM) ToValue(value interface{}) (otto.Value, error) {
|
||||
vm.Lock()
|
||||
defer vm.Unlock()
|
||||
|
||||
return vm.vm.ToValue(value)
|
||||
}
|
||||
|
|
|
@ -13,41 +13,35 @@ import (
|
|||
"github.com/status-im/status-go/static"
|
||||
)
|
||||
|
||||
// FIXME(tiabc): Get rid of this global variable. Move it to a constructor or initialization.
|
||||
var web3JSCode = static.MustAsset("scripts/web3.js")
|
||||
|
||||
// errors
|
||||
var (
|
||||
// FIXME(tiabc): Get rid of this global variable. Move it to a constructor or initialization.
|
||||
web3JSCode = static.MustAsset("scripts/web3.js")
|
||||
|
||||
ErrInvalidJail = errors.New("jail environment is not properly initialized")
|
||||
)
|
||||
|
||||
// Jail represents jailed environment inside of which we hold multiple cells.
|
||||
// Each cell is a separate JavaScript VM.
|
||||
type Jail struct {
|
||||
// FIXME(tiabc): This mutex handles cells field access and must be renamed appropriately: cellsMutex
|
||||
sync.RWMutex
|
||||
nodeManager common.NodeManager
|
||||
accountManager common.AccountManager
|
||||
txQueueManager common.TxQueueManager
|
||||
policy *ExecutionPolicy
|
||||
cells map[string]*Cell // jail supports running many isolated instances of jailed runtime
|
||||
baseJSCode string // JavaScript used to initialize all new cells with
|
||||
nodeManager common.NodeManager
|
||||
baseJSCode string // JavaScript used to initialize all new cells with
|
||||
|
||||
cellsMx sync.RWMutex
|
||||
cells map[string]*Cell // jail supports running many isolated instances of jailed runtime
|
||||
|
||||
vm *vm.VM // vm for internal otto related tasks (see Send method)
|
||||
}
|
||||
|
||||
// New returns new Jail environment with the associated NodeManager and
|
||||
// AccountManager.
|
||||
func New(
|
||||
nodeManager common.NodeManager, accountManager common.AccountManager, txQueueManager common.TxQueueManager,
|
||||
) *Jail {
|
||||
if nodeManager == nil || accountManager == nil || txQueueManager == nil {
|
||||
// New returns new Jail environment with the associated NodeManager.
|
||||
// It's caller responsibility to call jail.Stop() when jail is not needed.
|
||||
func New(nodeManager common.NodeManager) *Jail {
|
||||
if nodeManager == nil {
|
||||
panic("Jail is missing mandatory dependencies")
|
||||
}
|
||||
return &Jail{
|
||||
nodeManager: nodeManager,
|
||||
accountManager: accountManager,
|
||||
txQueueManager: txQueueManager,
|
||||
cells: make(map[string]*Cell),
|
||||
policy: NewExecutionPolicy(nodeManager, accountManager, txQueueManager),
|
||||
nodeManager: nodeManager,
|
||||
cells: make(map[string]*Cell),
|
||||
vm: vm.New(otto.New()),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -69,17 +63,28 @@ func (jail *Jail) NewCell(chatID string) (common.JailCell, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
jail.Lock()
|
||||
jail.cellsMx.Lock()
|
||||
jail.cells[chatID] = cell
|
||||
jail.Unlock()
|
||||
jail.cellsMx.Unlock()
|
||||
|
||||
return cell, nil
|
||||
}
|
||||
|
||||
// Stop stops jail and all assosiacted cells.
|
||||
func (jail *Jail) Stop() {
|
||||
jail.cellsMx.Lock()
|
||||
defer jail.cellsMx.Unlock()
|
||||
|
||||
for _, cell := range jail.cells {
|
||||
cell.Stop()
|
||||
}
|
||||
jail.cells = nil
|
||||
}
|
||||
|
||||
// Cell returns the existing instance of Cell.
|
||||
func (jail *Jail) Cell(chatID string) (common.JailCell, error) {
|
||||
jail.RLock()
|
||||
defer jail.RUnlock()
|
||||
jail.cellsMx.RLock()
|
||||
defer jail.cellsMx.RUnlock()
|
||||
|
||||
cell, ok := jail.cells[chatID]
|
||||
if !ok {
|
||||
|
@ -151,68 +156,38 @@ func (jail *Jail) Call(chatID, this, args string) string {
|
|||
return makeResult(res.String(), err)
|
||||
}
|
||||
|
||||
// Send will serialize the first argument, send it to the node and returns the response.
|
||||
// IMPORTANT: Don't use `call.Otto` in this function unless you want to run into race conditions. Use `vm` instead.
|
||||
// Send is a wrapper for executing RPC calls from within Otto VM.
|
||||
// It uses own jail's VM instance instead of cell's one to
|
||||
// increase safety of cell's vm usage.
|
||||
// TODO(divan): investigate if it's possible to do conversions
|
||||
// withouth involving otto code at all.
|
||||
// nolint: errcheck, unparam
|
||||
func (jail *Jail) Send(call otto.FunctionCall, vm *vm.VM) otto.Value {
|
||||
reqVal, err := vm.Call("JSON.stringify", nil, call.Argument(0))
|
||||
func (jail *Jail) Send(call otto.FunctionCall) otto.Value {
|
||||
request, err := jail.vm.Call("JSON.stringify", nil, call.Argument(0))
|
||||
if err != nil {
|
||||
throwJSException(err)
|
||||
}
|
||||
|
||||
var (
|
||||
rawReq = []byte(reqVal.String())
|
||||
reqs []common.RPCCall
|
||||
batch bool
|
||||
)
|
||||
|
||||
if rawReq[0] == '[' {
|
||||
batch = true
|
||||
err = json.Unmarshal(rawReq, &reqs)
|
||||
} else {
|
||||
batch = false
|
||||
reqs = make([]common.RPCCall, 1)
|
||||
err = json.Unmarshal(rawReq, &reqs[0])
|
||||
rpc := jail.nodeManager.RPCClient()
|
||||
// TODO(divan): remove this check as soon as jail cells have
|
||||
// proper cancellation mechanism implemented.
|
||||
if rpc == nil {
|
||||
throwJSException(fmt.Errorf("Error getting RPC client. Node stopped?"))
|
||||
}
|
||||
response := rpc.CallRaw(request.String())
|
||||
|
||||
// unmarshal response to pass to otto
|
||||
var resp interface{}
|
||||
err = json.Unmarshal([]byte(response), &resp)
|
||||
if err != nil {
|
||||
throwJSException(fmt.Errorf("can't unmarshal %v (batch=%v): %s", string(rawReq), batch, err))
|
||||
throwJSException(fmt.Errorf("Error unmarshalling result: %s", err))
|
||||
}
|
||||
|
||||
resps, err := vm.Call("new Array", nil)
|
||||
respValue, err := jail.vm.ToValue(resp)
|
||||
if err != nil {
|
||||
throwJSException(fmt.Errorf("can't create Array: %s", err))
|
||||
throwJSException(fmt.Errorf("Error converting result to Otto's value: %s", err))
|
||||
}
|
||||
|
||||
// Execute the requests.
|
||||
for _, req := range reqs {
|
||||
log.Info("execute request", "method", req.Method)
|
||||
res, err := jail.policy.Execute(req, vm)
|
||||
if err != nil {
|
||||
log.Info("request errored", "error", err.Error())
|
||||
switch err.(type) {
|
||||
case common.StopRPCCallError:
|
||||
return newErrorResponseOtto(vm, err.Error(), nil)
|
||||
default:
|
||||
res = newErrorResponse(err.Error(), &req.ID)
|
||||
}
|
||||
}
|
||||
|
||||
_, err = resps.Object().Call("push", res)
|
||||
if err != nil {
|
||||
throwJSException(fmt.Errorf("can't push result: %s", err))
|
||||
}
|
||||
}
|
||||
|
||||
// Return the responses either to the callback (if supplied)
|
||||
// or directly as the return value.
|
||||
if batch {
|
||||
return resps
|
||||
}
|
||||
v, err := resps.Object().Get("0")
|
||||
if err != nil {
|
||||
throwJSException(err)
|
||||
}
|
||||
return v
|
||||
return respValue
|
||||
}
|
||||
|
||||
func newErrorResponse(msg string, id interface{}) map[string]interface{} {
|
||||
|
|
|
@ -3,6 +3,8 @@ package jail_test
|
|||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
@ -13,7 +15,6 @@ import (
|
|||
"github.com/status-im/status-go/geth/params"
|
||||
"github.com/status-im/status-go/geth/signal"
|
||||
. "github.com/status-im/status-go/geth/testing"
|
||||
"github.com/status-im/status-go/geth/txqueue"
|
||||
"github.com/status-im/status-go/static"
|
||||
"github.com/stretchr/testify/suite"
|
||||
)
|
||||
|
@ -45,15 +46,17 @@ func (s *JailTestSuite) SetupTest() {
|
|||
accountManager := account.NewManager(nodeManager)
|
||||
require.NotNil(accountManager)
|
||||
|
||||
txQueueManager := txqueue.NewManager(nodeManager, accountManager)
|
||||
|
||||
jail := jail.New(nodeManager, accountManager, txQueueManager)
|
||||
jail := jail.New(nodeManager)
|
||||
require.NotNil(jail)
|
||||
|
||||
s.jail = jail
|
||||
s.NodeManager = nodeManager
|
||||
}
|
||||
|
||||
func (s *JailTestSuite) TearDownTest() {
|
||||
s.jail.Stop()
|
||||
}
|
||||
|
||||
func (s *JailTestSuite) TestInit() {
|
||||
require := s.Require()
|
||||
|
||||
|
@ -133,6 +136,9 @@ func (s *JailTestSuite) TestFunctionCall() {
|
|||
func (s *JailTestSuite) TestJailRPCAsyncSend() {
|
||||
require := s.Require()
|
||||
|
||||
s.StartTestNode(params.RopstenNetworkID)
|
||||
defer s.StopTestNode()
|
||||
|
||||
// load Status JS and add test command to it
|
||||
s.jail.BaseJS(baseStatusJSCode)
|
||||
s.jail.Parse(testChatID, txJSCode)
|
||||
|
@ -158,6 +164,12 @@ func (s *JailTestSuite) TestJailRPCAsyncSend() {
|
|||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
// TODO(divan): revisit this test. sendAsync now returns immediately,
|
||||
// and we need no way here to halt jail loop, which executes actual
|
||||
// transaction send in background. For now, just wait a couple of secs
|
||||
// to let tests pass.
|
||||
time.Sleep(2 * time.Second)
|
||||
}
|
||||
|
||||
func (s *JailTestSuite) TestJailRPCSend() {
|
||||
|
@ -189,7 +201,6 @@ func (s *JailTestSuite) TestJailRPCSend() {
|
|||
balance, err := value.ToFloat()
|
||||
require.NoError(err)
|
||||
|
||||
s.T().Logf("Balance of %.2f ETH found on '%s' account", balance, TestConfig.Account1.Address)
|
||||
require.False(balance < 100, "wrong balance (there should be lots of test Ether on that account)")
|
||||
}
|
||||
|
||||
|
@ -278,3 +289,57 @@ func (s *JailTestSuite) TestEventSignal() {
|
|||
expectedResponse := `{"jsonrpc":"2.0","result":true}`
|
||||
require.Equal(expectedResponse, response)
|
||||
}
|
||||
|
||||
// TestCallResponseOrder tests for problem in
|
||||
// https://github.com/status-im/status-go/issues/372
|
||||
func (s *JailTestSuite) TestCallResponseOrder() {
|
||||
require := s.Require()
|
||||
|
||||
s.StartTestNode(params.RopstenNetworkID)
|
||||
defer s.StopTestNode()
|
||||
|
||||
statusJS := baseStatusJSCode + `;
|
||||
_status_catalog.commands["testCommand"] = function (params) {
|
||||
return params.val * params.val;
|
||||
};
|
||||
_status_catalog.commands["calculateGasPrice"] = function (n) {
|
||||
var gasMultiplicator = Math.pow(1.4, n).toFixed(3);
|
||||
var price = 211000000000;
|
||||
try {
|
||||
price = web3.eth.gasPrice;
|
||||
} catch (err) {}
|
||||
|
||||
return price * gasMultiplicator;
|
||||
};
|
||||
`
|
||||
s.jail.Parse(testChatID, statusJS)
|
||||
|
||||
N := 1000
|
||||
errCh := make(chan error, N)
|
||||
var wg sync.WaitGroup
|
||||
for i := 0; i < 1000; i++ {
|
||||
wg.Add(2)
|
||||
go func(i int) {
|
||||
defer wg.Done()
|
||||
res := s.jail.Call(testChatID, `["commands", "testCommand"]`, fmt.Sprintf(`{"val": %d}`, i))
|
||||
if !strings.Contains(res, fmt.Sprintf("result\": %d", i*i)) {
|
||||
errCh <- fmt.Errorf("result should be '%d', got %s", i*i, res)
|
||||
}
|
||||
}(i)
|
||||
|
||||
go func(i int) {
|
||||
defer wg.Done()
|
||||
res := s.jail.Call(testChatID, `["commands", "calculateGasPrice"]`, fmt.Sprintf(`%d`, i))
|
||||
if strings.Contains(res, "error") {
|
||||
errCh <- fmt.Errorf("result should not contain 'error', got %s", res)
|
||||
}
|
||||
}(i)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
close(errCh)
|
||||
for e := range errCh {
|
||||
require.NoError(e)
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue