Route geth requests to the upstream or local node correctly (#276)

Geth requests have been revised and if the upstream is enabled, only queries regarding working Ethereum protocol are routed to upstream.
Whisper, most web3 requests and various utility methods are still routed to the local node as well as requests for accounts.
This commit is contained in:
Ewetumo Alexander 2017-09-07 08:49:40 +01:00 committed by Ivan Tomilov
parent 8fb2424ea5
commit 792cd1d9c5
7 changed files with 247 additions and 460 deletions

View File

@ -89,6 +89,12 @@ type NodeManager interface {
// RPCClient exposes reference to RPC client connected to the running node
RPCClient() (*rpc.Client, error)
// RPCLocalClient exposes reference to RPC client connected to the running local node rpcserver
RPCLocalClient() (*rpc.Client, error)
// RPCUpstreamClient exposes reference to RPC client connected to the upstream node server
RPCUpstreamClient() (*rpc.Client, error)
// RPCServer exposes reference to running node's in-proc RPC server/handler
RPCServer() (*rpc.Server, error)
}

View File

@ -221,6 +221,32 @@ func (mr *MockNodeManagerMockRecorder) RPCClient() *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RPCClient", reflect.TypeOf((*MockNodeManager)(nil).RPCClient))
}
// RPCLocalClient mocks base method
func (m *MockNodeManager) RPCLocalClient() (*rpc.Client, error) {
ret := m.ctrl.Call(m, "RPCLocalClient")
ret0, _ := ret[0].(*rpc.Client)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// RPCLocalClient indicates an expected call of RPCLocalClient
func (mr *MockNodeManagerMockRecorder) RPCLocalClient() *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RPCLocalClient", reflect.TypeOf((*MockNodeManager)(nil).RPCLocalClient))
}
// RPCUpstreamClient mocks base method
func (m *MockNodeManager) RPCUpstreamClient() (*rpc.Client, error) {
ret := m.ctrl.Call(m, "RPCUpstreamClient")
ret0, _ := ret[0].(*rpc.Client)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// RPCUpstreamClient indicates an expected call of RPCUpstreamClient
func (mr *MockNodeManagerMockRecorder) RPCUpstreamClient() *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RPCUpstreamClient", reflect.TypeOf((*MockNodeManager)(nil).RPCUpstreamClient))
}
// RPCServer mocks base method
func (m *MockNodeManager) RPCServer() (*rpc.Server, error) {
ret := m.ctrl.Call(m, "RPCServer")
@ -745,17 +771,22 @@ func (mr *MockJailCellMockRecorder) Run(arg0 interface{}) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Run", reflect.TypeOf((*MockJailCell)(nil).Run), arg0)
}
// RunOnLoop mocks base method
func (m *MockJailCell) RunOnLoop(arg0 string) (otto.Value, error) {
ret := m.ctrl.Call(m, "RunOnLoop", arg0)
// Call mocks base method
func (m *MockJailCell) Call(item string, this interface{}, args ...interface{}) (otto.Value, error) {
varargs := []interface{}{item, this}
for _, a := range args {
varargs = append(varargs, a)
}
ret := m.ctrl.Call(m, "Call", varargs...)
ret0, _ := ret[0].(otto.Value)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// RunOnLoop indicates an expected call of RunOnLoop
func (mr *MockJailCellMockRecorder) RunOnLoop(arg0 interface{}) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RunOnLoop", reflect.TypeOf((*MockJailCell)(nil).RunOnLoop), arg0)
// Call indicates an expected call of Call
func (mr *MockJailCellMockRecorder) Call(item, this interface{}, args ...interface{}) *gomock.Call {
varargs := append([]interface{}{item, this}, args...)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Call", reflect.TypeOf((*MockJailCell)(nil).Call), varargs...)
}
// MockJailManager is a mock of JailManager interface
@ -794,41 +825,41 @@ func (mr *MockJailManagerMockRecorder) Parse(chatID, js interface{}) *gomock.Cal
}
// Call mocks base method
func (m *MockJailManager) Call(chatID, path, args string) string {
ret := m.ctrl.Call(m, "Call", chatID, path, args)
func (m *MockJailManager) Call(chatID, this, args string) string {
ret := m.ctrl.Call(m, "Call", chatID, this, args)
ret0, _ := ret[0].(string)
return ret0
}
// Call indicates an expected call of Call
func (mr *MockJailManagerMockRecorder) Call(chatID, path, args interface{}) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Call", reflect.TypeOf((*MockJailManager)(nil).Call), chatID, path, args)
func (mr *MockJailManagerMockRecorder) Call(chatID, this, args interface{}) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Call", reflect.TypeOf((*MockJailManager)(nil).Call), chatID, this, args)
}
// NewJailCell mocks base method
func (m *MockJailManager) NewJailCell(id string) (JailCell, error) {
ret := m.ctrl.Call(m, "NewJailCell", id)
// NewCell mocks base method
func (m *MockJailManager) NewCell(chatID string) (JailCell, error) {
ret := m.ctrl.Call(m, "NewCell", chatID)
ret0, _ := ret[0].(JailCell)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// NewJailCell indicates an expected call of NewJailCell
func (mr *MockJailManagerMockRecorder) NewJailCell(id interface{}) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewJailCell", reflect.TypeOf((*MockJailManager)(nil).NewJailCell), id)
// NewCell indicates an expected call of NewCell
func (mr *MockJailManagerMockRecorder) NewCell(chatID interface{}) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewCell", reflect.TypeOf((*MockJailManager)(nil).NewCell), chatID)
}
// GetJailCell mocks base method
func (m *MockJailManager) GetJailCell(chatID string) (JailCell, error) {
ret := m.ctrl.Call(m, "GetJailCell", chatID)
// Cell mocks base method
func (m *MockJailManager) Cell(chatID string) (JailCell, error) {
ret := m.ctrl.Call(m, "Cell", chatID)
ret0, _ := ret[0].(JailCell)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// GetJailCell indicates an expected call of GetJailCell
func (mr *MockJailManagerMockRecorder) GetJailCell(chatID interface{}) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetJailCell", reflect.TypeOf((*MockJailManager)(nil).GetJailCell), chatID)
// Cell indicates an expected call of Cell
func (mr *MockJailManagerMockRecorder) Cell(chatID interface{}) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Cell", reflect.TypeOf((*MockJailManager)(nil).Cell), chatID)
}
// BaseJS mocks base method

View File

@ -11,6 +11,44 @@ import (
"github.com/status-im/status-go/geth/params"
)
// map of command routes
var (
//TODO(influx6): Replace this with a registry of commands to functions that
// call appropriate op for command with ExecutionPolicy.
rpcLocalCommandRoute = map[string]bool{
//Whisper commands
"shh_post": true,
"shh_version": true,
"shh_newIdentity": true,
"shh_hasIdentity": true,
"shh_newGroup": true,
"shh_addToGroup": true,
"shh_newFilter": true,
"shh_uninstallFilter": true,
"shh_getFilterChanges": true,
"shh_getMessages": true,
// DB commands
"db_putString": true,
"db_getString": true,
"db_putHex": true,
"db_getHex": true,
// Other commands
"net_version": true,
"net_peerCount": true,
"net_listening": true,
// blockchain commands
"eth_sign": true,
"eth_accounts": true,
"eth_getCompilers": true,
"eth_compileLLL": true,
"eth_compileSolidity": true,
"eth_compileSerpent": true,
}
)
// ExecutionPolicy provides a central container for the executions of RPCCall requests for both
// remote/upstream processing and internal node processing.
type ExecutionPolicy struct {
@ -30,17 +68,55 @@ func NewExecutionPolicy(
}
}
// Execute handles a received RPC call.
// Execute handles the execution of a RPC request and routes appropriately to either a local or remote ethereum node.
func (ep *ExecutionPolicy) Execute(req common.RPCCall, call otto.FunctionCall) (*otto.Object, error) {
switch req.Method {
case params.SendTransactionMethodName:
return ep.executeSendTransaction(req, call)
default:
return ep.executeOtherTransaction(req, call)
config, err := ep.nodeManager.NodeConfig()
if err != nil {
return nil, err
}
if config.UpstreamConfig.Enabled {
if rpcLocalCommandRoute[req.Method] {
return ep.ExecuteLocally(req, call)
}
return ep.ExecuteOnRemote(req, call)
}
return ep.ExecuteLocally(req, call)
}
// ExecuteSendTransaction defines a function to execute RPC requests for eth_sendTransaction method only.
// ExecuteLocally defines a function which handles the processing of all RPC requests from the jail object
// to be processed with the internal ethereum node server(light.LightEthereum).
func (ep *ExecutionPolicy) ExecuteLocally(req common.RPCCall, call otto.FunctionCall) (*otto.Object, error) {
if params.SendTransactionMethodName == req.Method {
return ep.executeSendTransaction(req, call)
}
client, err := ep.nodeManager.RPCLocalClient()
if err != nil {
return nil, common.StopRPCCallError{Err: err}
}
return ep.executeWithClient(client, req, call)
}
// ExecuteOnRemote defines a function which handles the processing of all RPC requests from the jail object
// to be processed by a remote ethereum node server with responses returned as needed.
func (ep *ExecutionPolicy) ExecuteOnRemote(req common.RPCCall, call otto.FunctionCall) (*otto.Object, error) {
if params.SendTransactionMethodName == req.Method {
return ep.executeSendTransaction(req, call)
}
client, err := ep.nodeManager.RPCUpstreamClient()
if err != nil {
return nil, common.StopRPCCallError{Err: err}
}
return ep.executeWithClient(client, req, call)
}
// executeRemoteSendTransaction defines a function to execute RPC method eth_sendTransaction over the upstream server.
func (ep *ExecutionPolicy) executeSendTransaction(req common.RPCCall, call otto.FunctionCall) (*otto.Object, error) {
res, err := call.Otto.Object(`({"jsonrpc":"2.0"})`)
if err != nil {
@ -57,6 +133,7 @@ func (ep *ExecutionPolicy) executeSendTransaction(req common.RPCCall, call otto.
// TODO(adam): check if context is used
ctx := context.WithValue(context.Background(), common.MessageIDKey, messageID)
args := sendTxArgsFromRPCCall(req)
tx := ep.txQueueManager.CreateTransaction(ctx, args)
if err := ep.txQueueManager.QueueTransaction(tx); err != nil {
@ -77,14 +154,7 @@ func (ep *ExecutionPolicy) executeSendTransaction(req common.RPCCall, call otto.
return res, nil
}
// ExecuteOtherTransaction defines a function which handles the processing of non `eth_sendTransaction`
// rpc request to the internal node server.
func (ep *ExecutionPolicy) executeOtherTransaction(req common.RPCCall, call otto.FunctionCall) (*otto.Object, error) {
client, err := ep.nodeManager.RPCClient()
if err != nil {
return nil, common.StopRPCCallError{Err: err}
}
func (ep *ExecutionPolicy) executeWithClient(client *rpc.Client, req common.RPCCall, call otto.FunctionCall) (*otto.Object, error) {
JSON, err := call.Otto.Object("JSON")
if err != nil {
return nil, err

View File

@ -1,340 +0,0 @@
package jail_test
import (
"testing"
"encoding/json"
"net/http"
"net/http/httptest"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rlp"
"github.com/robertkrimen/otto"
"github.com/status-im/status-go/geth/common"
"github.com/status-im/status-go/geth/jail"
"github.com/status-im/status-go/geth/node"
"github.com/status-im/status-go/geth/params"
. "github.com/status-im/status-go/geth/testing"
"github.com/stretchr/testify/suite"
)
type txRequest struct {
Method string `json:"method"`
Version string `json:"jsonrpc"`
ID int `json:"id,omitempty"`
Payload json.RawMessage `json:"params,omitempty"`
}
type service struct {
Handler http.HandlerFunc
}
func (s service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
s.Handler(w, r)
}
//==================================================================================================
func TestJailRPCTestSuite(t *testing.T) {
suite.Run(t, new(JailRPCTestSuite))
}
type JailRPCTestSuite struct {
BaseTestSuite
Account common.AccountManager
Policy *jail.ExecutionPolicy
}
func (s *JailRPCTestSuite) SetupTest() {
require := s.Require()
nodeman := node.NewNodeManager()
require.NotNil(nodeman)
acctman := node.NewAccountManager(nodeman)
require.NotNil(acctman)
txQueueManager := node.NewTxQueueManager(nodeman, acctman)
policy := jail.NewExecutionPolicy(nodeman, acctman, txQueueManager)
require.NotNil(policy)
s.Policy = policy
s.Account = acctman
s.NodeManager = nodeman
}
func (s *JailRPCTestSuite) TestSendTransaction() {
require := s.Require()
odFunc := otto.FunctionCall{
Otto: otto.New(),
This: otto.NullValue(),
}
request := common.RPCCall{
ID: 65454545334343,
Method: "eth_sendTransaction",
Params: []interface{}{
map[string]interface{}{
"from": TestConfig.Account1.Address,
"to": "0xe410006cad020e3690c8ba21ed8b0f065dde2453",
"value": "0x2",
"nonce": "0x1",
"data": "Will-power",
"gasPrice": "0x4a817c800",
"gasLimit": "0x5208",
"chainId": 3391,
},
},
}
rpcService := new(service)
rpcService.Handler = func(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
var txReq txRequest
err := json.NewDecoder(r.Body).Decode(&txReq)
require.NoError(err)
switch txReq.Method {
case "eth_getTransactionCount":
w.WriteHeader(http.StatusOK)
w.Write([]byte(`{"jsonrpc": "2.0", "status":200, "result": "0x434"}`))
return
}
payload := ([]byte)(txReq.Payload)
var bu []interface{}
jserr := json.Unmarshal(payload, &bu)
require.NoError(jserr)
require.NotNil(bu)
require.Len(bu, 1)
buElem, ok := bu[0].(string)
require.Equal(ok, true)
decoded, err := hexutil.Decode(buElem)
require.NoError(err)
require.NotNil(decoded)
var tx types.Transaction
decodeErr := rlp.DecodeBytes(decoded, &tx)
require.NoError(decodeErr)
// Validate we are receiving transaction from the proper network chain.
c, err := s.NodeManager.NodeConfig()
require.NoError(err)
require.Equal(tx.ChainId().Uint64(), c.NetworkID)
w.WriteHeader(http.StatusOK)
w.Write([]byte(`{"jsonrpc": "2.0", "status":200, "result": "3434=done"}`))
}
// httpRPCServer will serve as an upstream server accepting transactions.
httpRPCServer := httptest.NewServer(rpcService)
s.StartTestNode(params.RopstenNetworkID, WithUpstream(httpRPCServer.URL))
defer s.StopTestNode()
client, err := s.NodeManager.RPCClient()
require.NoError(err)
require.NotNil(client)
selectErr := s.Account.SelectAccount(TestConfig.Account1.Address, TestConfig.Account1.Password)
require.NoError(selectErr)
res, err := s.Policy.Execute(request, odFunc)
require.NoError(err)
result, err := res.Get("result")
require.NoError(err)
require.NotNil(result)
exported, err := result.Export()
require.NoError(err)
rawJSON, ok := exported.(json.RawMessage)
require.True(ok, "Expected raw json payload")
require.Equal(string(rawJSON), "\"3434=done\"")
}
// func (s *JailRPCTestSuite) TestMainnetAcceptance() {
// require := s.Require()
// require.NotNil(s.NodeManager)
// odFunc := otto.FunctionCall{
// Otto: otto.New(),
// This: otto.NullValue(),
// }
// request := common.RPCCall{
// ID: 65454545334343,
// Method: "eth_sendTransaction",
// Params: []interface{}{
// map[string]interface{}{
// "from": TestConfig.Account1.Address,
// "to": "0xe410006cad020e3690c8ba21ed8b0f065dde2453",
// "value": "0x2",
// "nonce": "0x1",
// "data": "Will-power",
// "gasPrice": "0x4a817c800",
// "gasLimit": "0x5208",
// "chainId": 3391,
// },
// },
// }
// nodeConfig, err := MakeTestNodeConfig(params.MainNetworkID)
// require.NoError(err)
// nodeConfig.UpstreamConfig.Enabled = true
// // Start NodeManagers Node
// started, err := s.NodeManager.StartNode(nodeConfig)
// require.NoError(err)
// select {
// case <-started:
// break
// case <-time.After(1 * time.Second):
// require.Fail("Failed to start NodeManager")
// break
// }
// defer s.NodeManager.StopNode()
// client, err := s.NodeManager.RPCClient()
// require.NoError(err)
// require.NotNil(client)
// selectErr := s.Account.SelectAccount(TestConfig.Account1.Address, TestConfig.Account1.Password)
// require.NoError(selectErr)
// res, err := s.Policy.ExecuteSendTransaction(request, odFunc)
// require.NoError(err)
// _, err = res.Get("hash")
// require.NoError(err)
// }
// func (s *JailRPCTestSuite) TestRobstenAcceptance() {
// require := s.Require()
// require.NotNil(s.NodeManager)
// odFunc := otto.FunctionCall{
// Otto: otto.New(),
// This: otto.NullValue(),
// }
// request := common.RPCCall{
// ID: 65454545334343,
// Method: "eth_sendTransaction",
// Params: []interface{}{
// map[string]interface{}{
// "from": TestConfig.Account1.Address,
// "to": "0xe410006cad020e3690c8ba21ed8b0f065dde2453",
// "value": "0x2",
// "nonce": "0x1",
// "data": "Will-power",
// "gasPrice": "0x4a817c800",
// "gasLimit": "0x5208",
// "chainId": 3391,
// },
// },
// }
// nodeConfig, err := MakeTestNodeConfig(params.RopstenNetworkID)
// require.NoError(err)
// nodeConfig.UpstreamConfig.Enabled = true
// // Start NodeManagers Node
// started, err := s.NodeManager.StartNode(nodeConfig)
// require.NoError(err)
// select {
// case <-started:
// break
// case <-time.After(1 * time.Second):
// require.Fail("Failed to start NodeManager")
// break
// }
// defer s.NodeManager.StopNode()
// client, err := s.NodeManager.RPCClient()
// require.NoError(err)
// require.NotNil(client)
// selectErr := s.Account.SelectAccount(TestConfig.Account1.Address, TestConfig.Account1.Password)
// require.NoError(selectErr)
// res, err := s.Policy.ExecuteSendTransaction(request, odFunc)
// require.NoError(err)
// _, err = res.Get("hash")
// require.NoError(err)
// }
// func (s *JailRPCTestSuite) TestRinkebyAcceptance() {
// require := s.Require()
// require.NotNil(s.NodeManager)
// odFunc := otto.FunctionCall{
// Otto: otto.New(),
// This: otto.NullValue(),
// }
// request := common.RPCCall{
// ID: 65454545334343,
// Method: "eth_sendTransaction",
// Params: []interface{}{
// map[string]interface{}{
// "from": TestConfig.Account1.Address,
// "to": "0xe410006cad020e3690c8ba21ed8b0f065dde2453",
// "value": "0x2",
// "nonce": "0x1",
// "data": "Will-power",
// "gasPrice": "0x4a817c800",
// "gasLimit": "0x5208",
// "chainId": 3391,
// },
// },
// }
// nodeConfig, err := MakeTestNodeConfig(params.RinkebyNetworkID)
// require.NoError(err)
// nodeConfig.UpstreamConfig.Enabled = true
// // Start NodeManagers Node
// started, err := s.NodeManager.StartNode(nodeConfig)
// require.NoError(err)
// select {
// case <-started:
// break
// case <-time.After(1 * time.Second):
// require.Fail("Failed to start NodeManager")
// break
// }
// defer s.NodeManager.StopNode()
// client, err := s.NodeManager.RPCClient()
// require.NoError(err)
// require.NotNil(client)
// selectErr := s.Account.SelectAccount(TestConfig.Account1.Address, TestConfig.Account1.Password)
// require.NoError(selectErr)
// res, err := s.Policy.ExecuteSendTransaction(request, odFunc)
// require.NoError(err)
// _, err = res.Get("hash")
// require.NoError(err)
// }

View File

@ -54,10 +54,6 @@ func NewNodeManager() *NodeManager {
// StartNode start Status node, fails if node is already started
func (m *NodeManager) StartNode(config *params.NodeConfig) (<-chan struct{}, error) {
if m == nil {
return nil, ErrInvalidNodeManager
}
m.Lock()
defer m.Unlock()
@ -130,10 +126,6 @@ func (m *NodeManager) startNode(config *params.NodeConfig) (<-chan struct{}, err
// StopNode stop Status node. Stopped node cannot be resumed.
func (m *NodeManager) StopNode() (<-chan struct{}, error) {
if m == nil {
return nil, ErrInvalidNodeManager
}
m.Lock()
defer m.Unlock()
@ -184,10 +176,6 @@ func (m *NodeManager) stopNode() (<-chan struct{}, error) {
// IsNodeRunning confirm that node is running
func (m *NodeManager) IsNodeRunning() bool {
if m == nil {
return false
}
m.RLock()
defer m.RUnlock()
@ -202,10 +190,6 @@ func (m *NodeManager) IsNodeRunning() bool {
// Node returns underlying Status node
func (m *NodeManager) Node() (*node.Node, error) {
if m == nil {
return nil, ErrInvalidNodeManager
}
m.RLock()
defer m.RUnlock()
@ -220,10 +204,6 @@ func (m *NodeManager) Node() (*node.Node, error) {
// PopulateStaticPeers connects current node with our publicly available LES/SHH/Swarm cluster
func (m *NodeManager) PopulateStaticPeers() error {
if m == nil {
return ErrInvalidNodeManager
}
m.RLock()
defer m.RUnlock()
@ -257,10 +237,6 @@ func (m *NodeManager) populateStaticPeers() error {
// AddPeer adds new static peer node
func (m *NodeManager) AddPeer(url string) error {
if m == nil {
return ErrInvalidNodeManager
}
m.RLock()
defer m.RUnlock()
@ -293,10 +269,6 @@ func (m *NodeManager) addPeer(url string) error {
// ResetChainData remove chain data from data directory.
// Node is stopped, and new node is started, with clean data directory.
func (m *NodeManager) ResetChainData() (<-chan struct{}, error) {
if m == nil {
return nil, ErrInvalidNodeManager
}
m.Lock()
defer m.Unlock()
@ -341,10 +313,6 @@ func (m *NodeManager) resetChainData() (<-chan struct{}, error) {
// RestartNode restart running Status node, fails if node is not running
func (m *NodeManager) RestartNode() (<-chan struct{}, error) {
if m == nil {
return nil, ErrInvalidNodeManager
}
m.Lock()
defer m.Unlock()
@ -374,10 +342,6 @@ func (m *NodeManager) restartNode() (<-chan struct{}, error) {
// NodeConfig exposes reference to running node's configuration
func (m *NodeManager) NodeConfig() (*params.NodeConfig, error) {
if m == nil {
return nil, ErrInvalidNodeManager
}
m.RLock()
defer m.RUnlock()
@ -392,10 +356,6 @@ func (m *NodeManager) NodeConfig() (*params.NodeConfig, error) {
// LightEthereumService exposes reference to LES service running on top of the node
func (m *NodeManager) LightEthereumService() (*les.LightEthereum, error) {
if m == nil {
return nil, ErrInvalidNodeManager
}
m.RLock()
defer m.RUnlock()
@ -421,10 +381,6 @@ func (m *NodeManager) LightEthereumService() (*les.LightEthereum, error) {
// WhisperService exposes reference to Whisper service running on top of the node
func (m *NodeManager) WhisperService() (*whisper.Whisper, error) {
if m == nil {
return nil, ErrInvalidNodeManager
}
m.RLock()
defer m.RUnlock()
@ -450,10 +406,6 @@ func (m *NodeManager) WhisperService() (*whisper.Whisper, error) {
// AccountManager exposes reference to node's accounts manager
func (m *NodeManager) AccountManager() (*accounts.Manager, error) {
if m == nil {
return nil, ErrInvalidNodeManager
}
m.RLock()
defer m.RUnlock()
@ -473,10 +425,6 @@ func (m *NodeManager) AccountManager() (*accounts.Manager, error) {
// AccountKeyStore exposes reference to accounts key store
func (m *NodeManager) AccountKeyStore() (*keystore.KeyStore, error) {
if m == nil {
return nil, ErrInvalidNodeManager
}
m.RLock()
defer m.RUnlock()
@ -504,19 +452,10 @@ func (m *NodeManager) AccountKeyStore() (*keystore.KeyStore, error) {
return keyStore, nil
}
// RPCClient exposes reference to RPC client connected to the running node.
func (m *NodeManager) RPCClient() (*rpc.Client, error) {
if m == nil {
return nil, ErrInvalidNodeManager
}
config, err := m.NodeConfig()
if err != nil {
return nil, err
}
m.RLock()
defer m.RUnlock()
// RPCLocalClient exposes reference to RPC client connected to the running node.
func (m *NodeManager) RPCLocalClient() (*rpc.Client, error) {
m.Lock()
defer m.Unlock()
// make sure that node is fully started
if m.node == nil || m.nodeStarted == nil {
@ -525,17 +464,6 @@ func (m *NodeManager) RPCClient() (*rpc.Client, error) {
<-m.nodeStarted
// Connect to upstream RPC server with new client and cache instance.
if config.UpstreamConfig.Enabled {
m.rpcClient, err = rpc.Dial(config.UpstreamConfig.URL)
if err != nil {
log.Error("Failed to conect to upstream RPC server", "error", err)
return nil, err
}
return m.rpcClient, nil
}
if m.rpcClient == nil {
var err error
m.rpcClient, err = m.node.Attach()
@ -545,19 +473,47 @@ func (m *NodeManager) RPCClient() (*rpc.Client, error) {
}
}
return m.rpcClient, nil
}
// RPCUpstreamClient exposes reference to RPC client connected to the running node.
func (m *NodeManager) RPCUpstreamClient() (*rpc.Client, error) {
config, err := m.NodeConfig()
if err != nil {
return nil, err
}
m.Lock()
defer m.Unlock()
if m.rpcClient == nil {
return nil, ErrInvalidRPCClient
m.rpcClient, err = rpc.Dial(config.UpstreamConfig.URL)
if err != nil {
log.Error("Failed to connect to upstream RPC server", "error", err)
return nil, ErrInvalidRPCClient
}
}
return m.rpcClient, nil
}
// RPCServer exposes reference to running node's in-proc RPC server/handler
func (m *NodeManager) RPCServer() (*rpc.Server, error) {
if m == nil {
return nil, ErrInvalidNodeManager
// RPCClient exposes reference to RPC client connected to the running node.
func (m *NodeManager) RPCClient() (*rpc.Client, error) {
config, err := m.NodeConfig()
if err != nil {
return nil, err
}
// Connect to upstream RPC server with new client and cache instance.
if config.UpstreamConfig.Enabled {
return m.RPCUpstreamClient()
}
return m.RPCLocalClient()
}
// RPCServer exposes reference to running node's in-proc RPC server/handler
func (m *NodeManager) RPCServer() (*rpc.Server, error) {
m.RLock()
defer m.RUnlock()

View File

@ -1,7 +1,9 @@
package node_test
import (
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"github.com/status-im/status-go/geth/log"
@ -11,6 +13,13 @@ import (
"github.com/stretchr/testify/suite"
)
type txRequest struct {
Method string `json:"method"`
Version string `json:"jsonrpc"`
ID int `json:"id,omitempty"`
Payload json.RawMessage `json:"params,omitempty"`
}
type service struct {
Handler http.HandlerFunc
}
@ -32,10 +41,65 @@ func (s *RPCTestSuite) SetupTest() {
nodeManager := node.NewNodeManager()
require.NotNil(nodeManager)
s.NodeManager = nodeManager
}
func (s *RPCTestSuite) TestRPCSendTransaction() {
require := s.Require()
expectedResponse := []byte(`{"jsonrpc": "2.0", "status":200, "result": "3434=done"}`)
// httpRPCServer will serve as an upstream server accepting transactions.
httpRPCServer := httptest.NewServer(service{
Handler: func(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
var txReq txRequest
err := json.NewDecoder(r.Body).Decode(&txReq)
require.NoError(err)
if txReq.Method == "eth_getTransactionCount" {
w.WriteHeader(http.StatusOK)
w.Write([]byte(`{"jsonrpc": "2.0", "status":200, "result": "0x434"}`))
return
}
payload := ([]byte)(txReq.Payload)
var bu []interface{}
jserr := json.Unmarshal(payload, &bu)
require.NoError(jserr)
require.Len(bu, 1)
require.IsType(bu[0], (map[string]interface{})(nil))
w.WriteHeader(http.StatusOK)
w.Write(expectedResponse)
},
})
s.StartTestNode(params.RopstenNetworkID, WithUpstream(httpRPCServer.URL))
defer s.StopTestNode()
rpcClient := node.NewRPCManager(s.NodeManager)
require.NotNil(rpcClient)
response := rpcClient.Call(`{
"jsonrpc": "2.0",
"id":10,
"method": "eth_sendTransaction",
"params": [{
"from": "` + TestConfig.Account1.Address + `",
"to": "` + TestConfig.Account2.Address + `",
"value": "0x200",
"nonce": "0x100",
"data": "Will-power",
"gasPrice": "0x4a817c800",
"gasLimit": "0x5208",
"chainId": 3391
}]
}`)
require.Equal(response, string(expectedResponse))
}
func (s *RPCTestSuite) TestCallRPC() {
require := s.Require()
require.NotNil(s.NodeManager)

View File

@ -210,7 +210,7 @@ func (m *TxQueueManager) completeRemoteTransaction(queuedTx *common.QueuedTx, pa
return emptyHash, err
}
client, err := m.nodeManager.RPCClient()
client, err := m.nodeManager.RPCUpstreamClient()
if err != nil {
return emptyHash, err
}