feat: Rpc client manage multiple eth client (#2359)

This commit is contained in:
Anthony Laibe 2021-09-22 19:49:20 +02:00 committed by GitHub
parent 5d1fe07544
commit 26bfeddad4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
34 changed files with 381 additions and 379 deletions

View File

@ -934,7 +934,7 @@ func (b *GethStatusBackend) registerHandlers() error {
for _, client := range clients { for _, client := range clients {
client.RegisterHandler( client.RegisterHandler(
params.AccountsMethodName, params.AccountsMethodName,
func(context.Context, ...interface{}) (interface{}, error) { func(context.Context, uint64, ...interface{}) (interface{}, error) {
return b.accountManager.Accounts() return b.accountManager.Accounts()
}, },
) )
@ -951,7 +951,7 @@ func (b *GethStatusBackend) registerHandlers() error {
return nil return nil
} }
func unsupportedMethodHandler(ctx context.Context, rpcParams ...interface{}) (interface{}, error) { func unsupportedMethodHandler(ctx context.Context, chainID uint64, rpcParams ...interface{}) (interface{}, error) {
return nil, ErrUnsupportedRPCMethod return nil, ErrUnsupportedRPCMethod
} }

View File

@ -1,67 +0,0 @@
package contracts
import (
"context"
"math/big"
ethereum "github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
)
type RPCClient interface {
CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error
}
type ContractCaller struct {
c RPCClient
}
func NewContractCaller(c RPCClient) *ContractCaller {
return &ContractCaller{
c: c,
}
}
// CodeAt returns the contract code of the given account.
// The block number can be nil, in which case the code is taken from the latest known block.
func (cc *ContractCaller) CodeAt(ctx context.Context, account common.Address, blockNumber *big.Int) ([]byte, error) {
var result hexutil.Bytes
err := cc.c.CallContext(ctx, &result, "eth_getCode", account, "latest")
return result, err
}
// CallContract executes a message call transaction, which is directly executed in the VM
// of the node, but never mined into the blockchain.
//
// blockNumber selects the block height at which the call runs. It can be nil, in which
// case the code is taken from the latest known block. Note that state from very old
// blocks might not be available.
func (cc *ContractCaller) CallContract(ctx context.Context, msg ethereum.CallMsg, blockNumber *big.Int) ([]byte, error) {
var hex hexutil.Bytes
err := cc.c.CallContext(ctx, &hex, "eth_call", toCallArg(msg), "latest")
if err != nil {
return nil, err
}
return hex, nil
}
func toCallArg(msg ethereum.CallMsg) interface{} {
arg := map[string]interface{}{
"from": msg.From,
"to": msg.To,
}
if len(msg.Data) > 0 {
arg["data"] = hexutil.Bytes(msg.Data)
}
if msg.Value != nil {
arg["value"] = (*hexutil.Big)(msg.Value)
}
if msg.Gas != 0 {
arg["gas"] = hexutil.Uint64(msg.Gas)
}
if msg.GasPrice != nil {
arg["gasPrice"] = (*hexutil.Big)(msg.GasPrice)
}
return arg
}

View File

@ -231,7 +231,7 @@ func (n *StatusNode) setupRPCClient() (err error) {
if err != nil { if err != nil {
return return
} }
n.rpcClient, err = rpc.NewClient(gethNodeClient, n.config.UpstreamConfig) n.rpcClient, err = rpc.NewClient(gethNodeClient, n.config.NetworkID, n.config.UpstreamConfig, n.config.Networks, n.appDB)
if err != nil { if err != nil {
return return
} }
@ -350,7 +350,7 @@ func (n *StatusNode) startDiscovery() error {
if err := n.register.Start(); err != nil { if err := n.register.Start(); err != nil {
return err return err
} }
return n.peerPool.Start(n.gethNode.Server(), n.rpcClient) return n.peerPool.Start(n.gethNode.Server())
} }
// Stop will stop current StatusNode. A stopped node cannot be resumed. // Stop will stop current StatusNode. A stopped node cannot be resumed.

View File

@ -37,7 +37,6 @@ import (
"github.com/status-im/status-go/services/wakuext" "github.com/status-im/status-go/services/wakuext"
"github.com/status-im/status-go/services/wakuv2ext" "github.com/status-im/status-go/services/wakuv2ext"
"github.com/status-im/status-go/services/wallet" "github.com/status-im/status-go/services/wallet"
"github.com/status-im/status-go/services/wallet/network"
"github.com/status-im/status-go/timesource" "github.com/status-im/status-go/timesource"
"github.com/status-im/status-go/waku" "github.com/status-im/status-go/waku"
wakucommon "github.com/status-im/status-go/waku/common" wakucommon "github.com/status-im/status-go/waku/common"
@ -104,7 +103,7 @@ func (b *StatusNode) initServices(config *params.NodeConfig) error {
} }
if config.WalletConfig.Enabled { if config.WalletConfig.Enabled {
walletService := b.walletService(config.NetworkID, config.Networks, accountsFeed) walletService := b.walletService(accountsFeed)
services = append(services, walletService) services = append(services, walletService)
} }
@ -367,9 +366,9 @@ func (b *StatusNode) appmetricsService() common.StatusService {
return b.appMetricsSrvc return b.appMetricsSrvc
} }
func (b *StatusNode) walletService(chainID uint64, networks []network.Network, accountsFeed *event.Feed) common.StatusService { func (b *StatusNode) walletService(accountsFeed *event.Feed) common.StatusService {
if b.walletSrvc == nil { if b.walletSrvc == nil {
b.walletSrvc = wallet.NewService(b.appDB, chainID, b.rpcClient.Ethclient(), networks, accountsFeed) b.walletSrvc = wallet.NewService(b.appDB, b.rpcClient, accountsFeed)
} }
return b.walletSrvc return b.walletSrvc
} }
@ -386,7 +385,6 @@ func (b *StatusNode) peerService() *peer.Service {
b.peerSrvc = peer.New() b.peerSrvc = peer.New()
} }
return b.peerSrvc return b.peerSrvc
} }
func registerWakuMailServer(wakuService *waku.Waku, config *params.WakuConfig) (err error) { func registerWakuMailServer(wakuService *waku.Waku, config *params.WakuConfig) (err error) {

View File

@ -22,7 +22,7 @@ import (
"github.com/status-im/status-go/eth-node/crypto" "github.com/status-im/status-go/eth-node/crypto"
"github.com/status-im/status-go/eth-node/types" "github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/protocol/pushnotificationserver" "github.com/status-im/status-go/protocol/pushnotificationserver"
"github.com/status-im/status-go/services/wallet/network" "github.com/status-im/status-go/rpc/network"
"github.com/status-im/status-go/static" "github.com/status-im/status-go/static"
wakucommon "github.com/status-im/status-go/waku/common" wakucommon "github.com/status-im/status-go/waku/common"
wakuv2common "github.com/status-im/status-go/wakuv2/common" wakuv2common "github.com/status-im/status-go/wakuv2/common"

View File

@ -12,7 +12,6 @@ import (
"github.com/ethereum/go-ethereum/p2p/discv5" "github.com/ethereum/go-ethereum/p2p/discv5"
"github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/enode"
"github.com/status-im/status-go/contracts"
"github.com/status-im/status-go/discovery" "github.com/status-im/status-go/discovery"
"github.com/status-im/status-go/params" "github.com/status-im/status-go/params"
"github.com/status-im/status-go/peers/verifier" "github.com/status-im/status-go/peers/verifier"
@ -128,7 +127,7 @@ func (p *PeerPool) setDiscoveryTimeout() {
} }
// Start creates topic pool for each topic in config and subscribes to server events. // Start creates topic pool for each topic in config and subscribes to server events.
func (p *PeerPool) Start(server *p2p.Server, rpcClient contracts.RPCClient) error { func (p *PeerPool) Start(server *p2p.Server) error {
if !p.discovery.Running() { if !p.discovery.Running() {
return ErrDiscv5NotRunning return ErrDiscv5NotRunning
} }
@ -156,7 +155,7 @@ func (p *PeerPool) Start(server *p2p.Server, rpcClient contracts.RPCClient) erro
var topicPool TopicPoolInterface var topicPool TopicPoolInterface
t := newTopicPool(p.discovery, topic, limits, p.opts.SlowSync, p.opts.FastSync, p.cache) t := newTopicPool(p.discovery, topic, limits, p.opts.SlowSync, p.opts.FastSync, p.cache)
if topic == MailServerDiscoveryTopic { if topic == MailServerDiscoveryTopic {
v, err := p.initVerifier(rpcClient) v, err := p.initVerifier()
if err != nil { if err != nil {
return err return err
} }
@ -176,7 +175,7 @@ func (p *PeerPool) Start(server *p2p.Server, rpcClient contracts.RPCClient) erro
return nil return nil
} }
func (p *PeerPool) initVerifier(rpcClient contracts.RPCClient) (v Verifier, err error) { func (p *PeerPool) initVerifier() (v Verifier, err error) {
return verifier.NewLocalVerifier(p.opts.TrustedMailServers), nil return verifier.NewLocalVerifier(p.opts.TrustedMailServers), nil
} }

View File

@ -134,7 +134,7 @@ func (s *PeerPoolSimulationSuite) TestPeerPoolCacheEthV5() {
peerPool := NewPeerPool(s.discovery[1], config, cache, peerPoolOpts) peerPool := NewPeerPool(s.discovery[1], config, cache, peerPoolOpts)
// start peer pool // start peer pool
s.Require().NoError(peerPool.Start(s.peers[1], nil)) s.Require().NoError(peerPool.Start(s.peers[1]))
defer peerPool.Stop() defer peerPool.Stop()
// check if cache is passed to topic pools // check if cache is passed to topic pools
@ -179,7 +179,7 @@ func TestPeerPoolMaxPeersOverflow(t *testing.T) {
poolOpts := &Options{DefaultFastSync, DefaultSlowSync, 0, true, 100 * time.Millisecond, nil} poolOpts := &Options{DefaultFastSync, DefaultSlowSync, 0, true, 100 * time.Millisecond, nil}
pool := NewPeerPool(discovery, nil, nil, poolOpts) pool := NewPeerPool(discovery, nil, nil, poolOpts)
require.NoError(t, pool.Start(peer, nil)) require.NoError(t, pool.Start(peer))
require.Equal(t, signal.EventDiscoveryStarted, <-signals) require.Equal(t, signal.EventDiscoveryStarted, <-signals)
// without config, it will stop the discovery because all topic pools are satisfied // without config, it will stop the discovery because all topic pools are satisfied
pool.events <- &p2p.PeerEvent{Type: p2p.PeerEventTypeAdd} pool.events <- &p2p.PeerEvent{Type: p2p.PeerEventTypeAdd}
@ -232,7 +232,7 @@ func TestPeerPoolDiscV5Timeout(t *testing.T) {
// start PeerPool // start PeerPool
poolOpts := &Options{DefaultFastSync, DefaultSlowSync, time.Millisecond * 100, true, 100 * time.Millisecond, nil} poolOpts := &Options{DefaultFastSync, DefaultSlowSync, time.Millisecond * 100, true, 100 * time.Millisecond, nil}
pool := NewPeerPool(discovery, nil, nil, poolOpts) pool := NewPeerPool(discovery, nil, nil, poolOpts)
require.NoError(t, pool.Start(server, nil)) require.NoError(t, pool.Start(server))
require.Equal(t, signal.EventDiscoveryStarted, <-signals) require.Equal(t, signal.EventDiscoveryStarted, <-signals)
// timeout after finding no peers // timeout after finding no peers
@ -279,7 +279,7 @@ func TestPeerPoolNotAllowedStopping(t *testing.T) {
// start PeerPool // start PeerPool
poolOpts := &Options{DefaultFastSync, DefaultSlowSync, time.Millisecond * 100, false, 100 * time.Millisecond, nil} poolOpts := &Options{DefaultFastSync, DefaultSlowSync, time.Millisecond * 100, false, 100 * time.Millisecond, nil}
pool := NewPeerPool(discovery, nil, nil, poolOpts) pool := NewPeerPool(discovery, nil, nil, poolOpts)
require.NoError(t, pool.Start(server, nil)) require.NoError(t, pool.Start(server))
// wait 2x timeout duration // wait 2x timeout duration
<-time.After(pool.opts.DiscServerTimeout * 2) <-time.After(pool.opts.DiscServerTimeout * 2)
@ -300,7 +300,7 @@ func (s *PeerPoolSimulationSuite) TestUpdateTopicLimits() {
peerPool := NewPeerPool(s.discovery[1], config, cache, peerPoolOpts) peerPool := NewPeerPool(s.discovery[1], config, cache, peerPoolOpts)
// start peer pool // start peer pool
s.Require().NoError(peerPool.Start(s.peers[1], nil)) s.Require().NoError(peerPool.Start(s.peers[1]))
defer peerPool.Stop() defer peerPool.Stop()
for _, topicPool := range peerPool.topics { for _, topicPool := range peerPool.topics {
@ -377,7 +377,7 @@ func (s *PeerPoolSimulationSuite) TestMailServerPeersDiscovery() {
[]enode.ID{s.peers[0].Self().ID()}, []enode.ID{s.peers[0].Self().ID()},
} }
peerPool := NewPeerPool(s.discovery[1], config, cache, peerPoolOpts) peerPool := NewPeerPool(s.discovery[1], config, cache, peerPoolOpts)
s.Require().NoError(peerPool.Start(s.peers[1], nil)) s.Require().NoError(peerPool.Start(s.peers[1]))
defer peerPool.Stop() defer peerPool.Stop()
// wait for and verify the mail server peer // wait for and verify the mail server peer

View File

@ -35,8 +35,9 @@ type jsonrpcMessage struct {
type jsonrpcRequest struct { type jsonrpcRequest struct {
jsonrpcMessage jsonrpcMessage
Method string `json:"method"` ChainID uint64 `json:"chainId"`
Params json.RawMessage `json:"params,omitempty"` Method string `json:"method"`
Params json.RawMessage `json:"params,omitempty"`
} }
type jsonrpcSuccessfulResponse struct { type jsonrpcSuccessfulResponse struct {
@ -110,14 +111,18 @@ func (c *Client) callBatchMethods(ctx context.Context, msgs json.RawMessage) str
// callSingleMethod executes single JSON-RPC message and constructs proper response. // callSingleMethod executes single JSON-RPC message and constructs proper response.
func (c *Client) callSingleMethod(ctx context.Context, msg json.RawMessage) string { func (c *Client) callSingleMethod(ctx context.Context, msg json.RawMessage) string {
// unmarshal JSON body into json-rpc request // unmarshal JSON body into json-rpc request
method, params, id, err := methodAndParamsFromBody(msg) chainID, method, params, id, err := methodAndParamsFromBody(msg)
if err != nil { if err != nil {
return newErrorResponse(errInvalidMessageCode, err, id) return newErrorResponse(errInvalidMessageCode, err, id)
} }
if chainID == 0 {
chainID = c.UpstreamChainID
}
// route and execute // route and execute
var result json.RawMessage var result json.RawMessage
err = c.CallContext(ctx, &result, method, params...) err = c.CallContext(ctx, &result, chainID, method, params...)
// as we have to return original JSON, we have to // as we have to return original JSON, we have to
// analyze returned error and reconstruct original // analyze returned error and reconstruct original
@ -138,21 +143,20 @@ func (c *Client) callSingleMethod(ctx context.Context, msg json.RawMessage) stri
// JSON-RPC body into values ready to use with ethereum-go's // JSON-RPC body into values ready to use with ethereum-go's
// RPC client Call() function. A lot of empty interface usage is // RPC client Call() function. A lot of empty interface usage is
// due to the underlying code design :/ // due to the underlying code design :/
func methodAndParamsFromBody(body json.RawMessage) (string, []interface{}, json.RawMessage, error) { func methodAndParamsFromBody(body json.RawMessage) (uint64, string, []interface{}, json.RawMessage, error) {
msg, err := unmarshalMessage(body) msg, err := unmarshalMessage(body)
if err != nil { if err != nil {
return "", nil, nil, err return 0, "", nil, nil, err
} }
params := []interface{}{} params := []interface{}{}
if msg.Params != nil { if msg.Params != nil {
err = json.Unmarshal(msg.Params, &params) err = json.Unmarshal(msg.Params, &params)
if err != nil { if err != nil {
return "", nil, nil, err return 0, "", nil, nil, err
} }
} }
return msg.Method, params, msg.ID, nil return msg.ChainID, msg.Method, params, msg.ID, nil
} }
// unmarshalMessage tries to unmarshal JSON-RPC message. // unmarshalMessage tries to unmarshal JSON-RPC message.

View File

@ -57,6 +57,7 @@ func TestMethodAndParamsFromBody(t *testing.T) {
params []interface{} params []interface{}
method string method string
id json.RawMessage id json.RawMessage
chainID uint64
shouldFail bool shouldFail bool
}{ }{
{ {
@ -70,6 +71,7 @@ func TestMethodAndParamsFromBody(t *testing.T) {
}, },
"subtract", "subtract",
json.RawMessage(`42`), json.RawMessage(`42`),
0,
false, false,
}, },
{ {
@ -78,6 +80,7 @@ func TestMethodAndParamsFromBody(t *testing.T) {
[]interface{}{}, []interface{}{},
"test", "test",
nil, nil,
0,
false, false,
}, },
{ {
@ -86,6 +89,16 @@ func TestMethodAndParamsFromBody(t *testing.T) {
[]interface{}{}, []interface{}{},
"test", "test",
nil, nil,
0,
false,
},
{
"params_chain_id",
json.RawMessage(`{"jsonrpc": "2.0", "chainId": 2, "method": "test"}`),
[]interface{}{},
"test",
nil,
2,
false, false,
}, },
{ {
@ -94,6 +107,7 @@ func TestMethodAndParamsFromBody(t *testing.T) {
[]interface{}{string("3de6a8867aeb75be74d68478b853b4b0e063704d30f8231c45d0fcbd97af207e")}, []interface{}{string("3de6a8867aeb75be74d68478b853b4b0e063704d30f8231c45d0fcbd97af207e")},
"shh_getFilterMessages", "shh_getFilterMessages",
json.RawMessage(`44`), json.RawMessage(`44`),
0,
false, false,
}, },
{ {
@ -102,6 +116,7 @@ func TestMethodAndParamsFromBody(t *testing.T) {
[]interface{}{}, []interface{}{},
"", "",
nil, nil,
0,
true, true,
}, },
{ {
@ -110,18 +125,20 @@ func TestMethodAndParamsFromBody(t *testing.T) {
[]interface{}{}, []interface{}{},
"", "",
nil, nil,
0,
true, true,
}, },
} }
for _, test := range cases { for _, test := range cases {
t.Run(test.name, func(t *testing.T) { t.Run(test.name, func(t *testing.T) {
method, params, id, err := methodAndParamsFromBody(test.body) chainID, method, params, id, err := methodAndParamsFromBody(test.body)
if test.shouldFail { if test.shouldFail {
require.Error(t, err) require.Error(t, err)
return return
} }
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, test.chainID, chainID)
require.Equal(t, test.method, method) require.Equal(t, test.method, method)
require.Equal(t, test.params, params) require.Equal(t, test.params, params)
require.EqualValues(t, test.id, id) require.EqualValues(t, test.id, id)

View File

@ -2,6 +2,7 @@ package rpc
import ( import (
"context" "context"
"database/sql"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
@ -14,6 +15,7 @@ import (
gethrpc "github.com/ethereum/go-ethereum/rpc" gethrpc "github.com/ethereum/go-ethereum/rpc"
"github.com/status-im/status-go/params" "github.com/status-im/status-go/params"
"github.com/status-im/status-go/rpc/network"
"github.com/status-im/status-go/services/rpcstats" "github.com/status-im/status-go/services/rpcstats"
) )
@ -28,7 +30,7 @@ var (
) )
// Handler defines handler for RPC methods. // Handler defines handler for RPC methods.
type Handler func(context.Context, ...interface{}) (interface{}, error) type Handler func(context.Context, uint64, ...interface{}) (interface{}, error)
// Client represents RPC client with custom routing // Client represents RPC client with custom routing
// scheme. It automatically decides where RPC call // scheme. It automatically decides where RPC call
@ -38,11 +40,14 @@ type Client struct {
upstreamEnabled bool upstreamEnabled bool
upstreamURL string upstreamURL string
UpstreamChainID uint64
local *gethrpc.Client local *gethrpc.Client
upstream *gethrpc.Client upstream *gethrpc.Client
rpcClients map[uint64]*gethrpc.Client
router *router router *router
NetworkManager *network.Manager
handlersMx sync.RWMutex // mx guards handlers handlersMx sync.RWMutex // mx guards handlers
handlers map[string]Handler // locally registered handlers handlers map[string]Handler // locally registered handlers
@ -54,16 +59,26 @@ type Client struct {
// //
// Client is safe for concurrent use and will automatically // Client is safe for concurrent use and will automatically
// reconnect to the server if connection is lost. // reconnect to the server if connection is lost.
func NewClient(client *gethrpc.Client, upstream params.UpstreamRPCConfig) (*Client, error) { func NewClient(client *gethrpc.Client, upstreamChainID uint64, upstream params.UpstreamRPCConfig, networks []network.Network, db *sql.DB) (*Client, error) {
c := Client{
local: client,
handlers: make(map[string]Handler),
log: log.New("package", "status-go/rpc.Client"),
}
var err error var err error
log := log.New("package", "status-go/rpc.Client")
networkManager := network.NewManager(db)
err = networkManager.Init(networks)
if err != nil {
log.Error("Network manager failed to initialize", "error", err)
}
c := Client{
local: client,
NetworkManager: networkManager,
handlers: make(map[string]Handler),
rpcClients: make(map[uint64]*gethrpc.Client),
log: log,
}
if upstream.Enabled { if upstream.Enabled {
c.UpstreamChainID = upstreamChainID
c.upstreamEnabled = upstream.Enabled c.upstreamEnabled = upstream.Enabled
c.upstreamURL = upstream.URL c.upstreamURL = upstream.URL
c.upstream, err = gethrpc.Dial(c.upstreamURL) c.upstream, err = gethrpc.Dial(c.upstreamURL)
@ -77,12 +92,41 @@ func NewClient(client *gethrpc.Client, upstream params.UpstreamRPCConfig) (*Clie
return &c, nil return &c, nil
} }
// Ethclient returns ethclient.Client with upstream or local client. func (c *Client) getRPCClientWithCache(chainID uint64) (*gethrpc.Client, error) {
func (c *Client) Ethclient() *ethclient.Client { if !c.upstreamEnabled {
if c.upstreamEnabled { return c.local, nil
return ethclient.NewClient(c.upstream)
} }
return ethclient.NewClient(c.local)
if c.UpstreamChainID == chainID {
return c.upstream, nil
}
if rpcClient, ok := c.rpcClients[chainID]; ok {
return rpcClient, nil
}
network := c.NetworkManager.Find(chainID)
if network == nil {
return nil, fmt.Errorf("could not find network: %d", chainID)
}
rpcClient, err := gethrpc.Dial(network.RPCURL)
if err != nil {
return nil, fmt.Errorf("dial upstream server: %s", err)
}
c.rpcClients[chainID] = rpcClient
return rpcClient, nil
}
// Ethclient returns ethclient.Client per chain
func (c *Client) EthClient(chainID uint64) (*ethclient.Client, error) {
rpcClient, err := c.getRPCClientWithCache(chainID)
if err != nil {
return nil, err
}
return ethclient.NewClient(rpcClient), nil
} }
// UpdateUpstreamURL changes the upstream RPC client URL, if the upstream is enabled. // UpdateUpstreamURL changes the upstream RPC client URL, if the upstream is enabled.
@ -111,9 +155,9 @@ func (c *Client) UpdateUpstreamURL(url string) error {
// can also pass nil, in which case the result is ignored. // can also pass nil, in which case the result is ignored.
// //
// It uses custom routing scheme for calls. // It uses custom routing scheme for calls.
func (c *Client) Call(result interface{}, method string, args ...interface{}) error { func (c *Client) Call(result interface{}, chainID uint64, method string, args ...interface{}) error {
ctx := context.Background() ctx := context.Background()
return c.CallContext(ctx, result, method, args...) return c.CallContext(ctx, result, chainID, method, args...)
} }
// CallContext performs a JSON-RPC call with the given arguments. If the context is // CallContext performs a JSON-RPC call with the given arguments. If the context is
@ -124,7 +168,7 @@ func (c *Client) Call(result interface{}, method string, args ...interface{}) er
// //
// It uses custom routing scheme for calls. // It uses custom routing scheme for calls.
// If there are any local handlers registered for this call, they will handle it. // If there are any local handlers registered for this call, they will handle it.
func (c *Client) CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error { func (c *Client) CallContext(ctx context.Context, result interface{}, chainID uint64, method string, args ...interface{}) error {
rpcstats.CountCall(method) rpcstats.CountCall(method)
if c.router.routeBlocked(method) { if c.router.routeBlocked(method) {
return ErrMethodNotFound return ErrMethodNotFound
@ -132,10 +176,10 @@ func (c *Client) CallContext(ctx context.Context, result interface{}, method str
// check locally registered handlers first // check locally registered handlers first
if handler, ok := c.handler(method); ok { if handler, ok := c.handler(method); ok {
return c.callMethod(ctx, result, handler, args...) return c.callMethod(ctx, result, chainID, handler, args...)
} }
return c.CallContextIgnoringLocalHandlers(ctx, result, method, args...) return c.CallContextIgnoringLocalHandlers(ctx, result, chainID, method, args...)
} }
// CallContextIgnoringLocalHandlers performs a JSON-RPC call with the given // CallContextIgnoringLocalHandlers performs a JSON-RPC call with the given
@ -145,16 +189,17 @@ func (c *Client) CallContext(ctx context.Context, result interface{}, method str
// be ignored. It is useful if the call is happening from within a local // be ignored. It is useful if the call is happening from within a local
// handler itself. // handler itself.
// Upstream calls routing will be used anyway. // Upstream calls routing will be used anyway.
func (c *Client) CallContextIgnoringLocalHandlers(ctx context.Context, result interface{}, method string, args ...interface{}) error { func (c *Client) CallContextIgnoringLocalHandlers(ctx context.Context, result interface{}, chainID uint64, method string, args ...interface{}) error {
if c.router.routeBlocked(method) { if c.router.routeBlocked(method) {
return ErrMethodNotFound return ErrMethodNotFound
} }
if c.router.routeRemote(method) { if c.router.routeRemote(method) {
c.RLock() ethClient, err := c.getRPCClientWithCache(chainID)
client := c.upstream if err != nil {
c.RUnlock() return err
return client.CallContext(ctx, result, method, args...) }
return ethClient.CallContext(ctx, result, method, args...)
} }
if c.local == nil { if c.local == nil {
@ -179,8 +224,8 @@ func (c *Client) RegisterHandler(method string, handler Handler) {
// It handles proper params and result converting // It handles proper params and result converting
// //
// TODO(divan): use cancellation via context here? // TODO(divan): use cancellation via context here?
func (c *Client) callMethod(ctx context.Context, result interface{}, handler Handler, args ...interface{}) error { func (c *Client) callMethod(ctx context.Context, result interface{}, chainID uint64, handler Handler, args ...interface{}) error {
response, err := handler(ctx, args...) response, err := handler(ctx, chainID, args...)
if err != nil { if err != nil {
return err return err
} }

View File

@ -2,19 +2,38 @@ package rpc
import ( import (
"context" "context"
"database/sql"
"fmt" "fmt"
"io/ioutil"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"os"
"testing" "testing"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/status-im/status-go/appdatabase"
"github.com/status-im/status-go/params" "github.com/status-im/status-go/params"
"github.com/status-im/status-go/rpc/network"
gethrpc "github.com/ethereum/go-ethereum/rpc" gethrpc "github.com/ethereum/go-ethereum/rpc"
) )
func setupTestNetworkDB(t *testing.T) (*sql.DB, func()) {
tmpfile, err := ioutil.TempFile("", "rpc-network-tests-")
require.NoError(t, err)
db, err := appdatabase.InitializeDB(tmpfile.Name(), "rpc-network-tests")
require.NoError(t, err)
return db, func() {
require.NoError(t, db.Close())
require.NoError(t, os.Remove(tmpfile.Name()))
}
}
func TestBlockedRoutesCall(t *testing.T) { func TestBlockedRoutesCall(t *testing.T) {
db, close := setupTestNetworkDB(t)
defer close()
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, `{ fmt.Fprintln(w, `{
"id": 1, "id": 1,
@ -27,7 +46,7 @@ func TestBlockedRoutesCall(t *testing.T) {
gethRPCClient, err := gethrpc.Dial(ts.URL) gethRPCClient, err := gethrpc.Dial(ts.URL)
require.NoError(t, err) require.NoError(t, err)
c, err := NewClient(gethRPCClient, params.UpstreamRPCConfig{Enabled: false, URL: ""}) c, err := NewClient(gethRPCClient, 1, params.UpstreamRPCConfig{Enabled: false, URL: ""}, []network.Network{}, db)
require.NoError(t, err) require.NoError(t, err)
for _, m := range blockedMethods { for _, m := range blockedMethods {
@ -36,21 +55,24 @@ func TestBlockedRoutesCall(t *testing.T) {
err error err error
) )
err = c.Call(&result, m) err = c.Call(&result, 1, m)
require.EqualError(t, err, ErrMethodNotFound.Error()) require.EqualError(t, err, ErrMethodNotFound.Error())
require.Nil(t, result) require.Nil(t, result)
err = c.CallContext(context.Background(), &result, m) err = c.CallContext(context.Background(), &result, 1, m)
require.EqualError(t, err, ErrMethodNotFound.Error()) require.EqualError(t, err, ErrMethodNotFound.Error())
require.Nil(t, result) require.Nil(t, result)
err = c.CallContextIgnoringLocalHandlers(context.Background(), &result, m) err = c.CallContextIgnoringLocalHandlers(context.Background(), &result, 1, m)
require.EqualError(t, err, ErrMethodNotFound.Error()) require.EqualError(t, err, ErrMethodNotFound.Error())
require.Nil(t, result) require.Nil(t, result)
} }
} }
func TestBlockedRoutesRawCall(t *testing.T) { func TestBlockedRoutesRawCall(t *testing.T) {
db, close := setupTestNetworkDB(t)
defer close()
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, `{ fmt.Fprintln(w, `{
"id": 1, "id": 1,
@ -63,7 +85,7 @@ func TestBlockedRoutesRawCall(t *testing.T) {
gethRPCClient, err := gethrpc.Dial(ts.URL) gethRPCClient, err := gethrpc.Dial(ts.URL)
require.NoError(t, err) require.NoError(t, err)
c, err := NewClient(gethRPCClient, params.UpstreamRPCConfig{Enabled: false, URL: ""}) c, err := NewClient(gethRPCClient, 1, params.UpstreamRPCConfig{Enabled: false, URL: ""}, []network.Network{}, db)
require.NoError(t, err) require.NoError(t, err)
for _, m := range blockedMethods { for _, m := range blockedMethods {
@ -78,6 +100,9 @@ func TestBlockedRoutesRawCall(t *testing.T) {
} }
func TestUpdateUpstreamURL(t *testing.T) { func TestUpdateUpstreamURL(t *testing.T) {
db, close := setupTestNetworkDB(t)
defer close()
ts := createTestServer("") ts := createTestServer("")
defer ts.Close() defer ts.Close()
@ -87,7 +112,7 @@ func TestUpdateUpstreamURL(t *testing.T) {
gethRPCClient, err := gethrpc.Dial(ts.URL) gethRPCClient, err := gethrpc.Dial(ts.URL)
require.NoError(t, err) require.NoError(t, err)
c, err := NewClient(gethRPCClient, params.UpstreamRPCConfig{Enabled: true, URL: ts.URL}) c, err := NewClient(gethRPCClient, 1, params.UpstreamRPCConfig{Enabled: true, URL: ts.URL}, []network.Network{}, db)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, ts.URL, c.upstreamURL) require.Equal(t, ts.URL, c.upstreamURL)

View File

@ -3,10 +3,6 @@ package network
import ( import (
"bytes" "bytes"
"database/sql" "database/sql"
"fmt"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/rpc"
) )
type Network struct { type Network struct {
@ -85,22 +81,20 @@ func (nq *networksQuery) exec(db *sql.DB) ([]*Network, error) {
} }
type Manager struct { type Manager struct {
db *sql.DB db *sql.DB
legacyChainID uint64
legacyClient *ethclient.Client
chainClients map[uint64]*ChainClient
} }
func NewManager(db *sql.DB, legacyChainID uint64, legacyClient *ethclient.Client) *Manager { func NewManager(db *sql.DB) *Manager {
return &Manager{ return &Manager{
db: db, db: db,
legacyChainID: legacyChainID,
legacyClient: legacyClient,
chainClients: make(map[uint64]*ChainClient),
} }
} }
func (nm *Manager) Init(networks []Network) error { func (nm *Manager) Init(networks []Network) error {
if networks == nil {
return nil
}
currentNetworks, _ := nm.Get(false) currentNetworks, _ := nm.Get(false)
if len(currentNetworks) > 0 { if len(currentNetworks) > 0 {
return nil return nil
@ -116,42 +110,6 @@ func (nm *Manager) Init(networks []Network) error {
return nil return nil
} }
func (nm *Manager) GetChainClient(chainID uint64) (*ChainClient, error) {
if chainID == nm.legacyChainID {
return &ChainClient{eth: nm.legacyClient, ChainID: chainID}, nil
}
if chainClient, ok := nm.chainClients[chainID]; ok {
return chainClient, nil
}
network := nm.Find(chainID)
if network == nil {
return nil, fmt.Errorf("could not find network: %d", chainID)
}
rpcClient, err := rpc.Dial(network.RPCURL)
if err != nil {
return nil, fmt.Errorf("dial upstream server: %s", err)
}
chainClient := &ChainClient{eth: ethclient.NewClient(rpcClient), ChainID: chainID}
nm.chainClients[chainID] = chainClient
return chainClient, nil
}
func (nm *Manager) GetChainClients(chainIDs []uint64) (res []*ChainClient, err error) {
for _, chainID := range chainIDs {
client, err := nm.GetChainClient(chainID)
if err != nil {
return nil, err
}
res = append(res, client)
}
return res, nil
}
func (nm *Manager) Upsert(network *Network) error { func (nm *Manager) Upsert(network *Network) error {
_, err := nm.db.Exec( _, err := nm.db.Exec(
"INSERT OR REPLACE INTO networks (chain_id, chain_name, rpc_url, block_explorer_url, icon_url, native_currency_name, native_currency_symbol, native_currency_decimals, is_test, layer, enabled) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", "INSERT OR REPLACE INTO networks (chain_id, chain_name, rpc_url, block_explorer_url, icon_url, native_currency_name, native_currency_symbol, native_currency_decimals, is_test, layer, enabled) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",

View File

@ -61,6 +61,7 @@ func (api *PublicAPI) Recover(rpcParams RecoverParams) (addr types.Address, err
err = api.rpcClient.CallContextIgnoringLocalHandlers( err = api.rpcClient.CallContextIgnoringLocalHandlers(
ctx, ctx,
&gethAddr, &gethAddr,
api.rpcClient.UpstreamChainID,
params.PersonalRecoverMethodName, params.PersonalRecoverMethodName,
rpcParams.Message, rpcParams.Signature) rpcParams.Message, rpcParams.Signature)
addr = types.Address(gethAddr) addr = types.Address(gethAddr)
@ -81,6 +82,7 @@ func (api *PublicAPI) Sign(rpcParams SignParams, verifiedAccount *account.Select
err = api.rpcClient.CallContextIgnoringLocalHandlers( err = api.rpcClient.CallContextIgnoringLocalHandlers(
ctx, ctx,
&gethResult, &gethResult,
api.rpcClient.UpstreamChainID,
params.PersonalSignMethodName, params.PersonalSignMethodName,
rpcParams.Data, rpcParams.Address, rpcParams.Password) rpcParams.Data, rpcParams.Address, rpcParams.Password)
result = types.HexBytes(gethResult) result = types.HexBytes(gethResult)

View File

@ -13,7 +13,7 @@ import (
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth/filters" "github.com/ethereum/go-ethereum/eth/filters"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc" getrpc "github.com/ethereum/go-ethereum/rpc"
) )
const ( const (
@ -36,14 +36,15 @@ type filter interface {
// PublicAPI represents filter API that is exported to `eth` namespace // PublicAPI represents filter API that is exported to `eth` namespace
type PublicAPI struct { type PublicAPI struct {
filtersMu sync.Mutex filtersMu sync.Mutex
filters map[rpc.ID]filter filters map[getrpc.ID]filter
// filterLivenessLoop defines how often timeout loop is executed // filterLivenessLoop defines how often timeout loop is executed
filterLivenessLoop time.Duration filterLivenessLoop time.Duration
// filter liveness increased by this period when changes are requested // filter liveness increased by this period when changes are requested
filterLivenessPeriod time.Duration filterLivenessPeriod time.Duration
client func() ContextCaller client func() ContextCaller
chainID func() uint64
latestBlockChangedEvent *latestBlockChangedEvent latestBlockChangedEvent *latestBlockChangedEvent
transactionSentToUpstreamEvent *transactionSentToUpstreamEvent transactionSentToUpstreamEvent *transactionSentToUpstreamEvent
@ -52,11 +53,12 @@ type PublicAPI struct {
// NewPublicAPI returns a reference to the PublicAPI object // NewPublicAPI returns a reference to the PublicAPI object
func NewPublicAPI(s *Service) *PublicAPI { func NewPublicAPI(s *Service) *PublicAPI {
api := &PublicAPI{ api := &PublicAPI{
filters: make(map[rpc.ID]filter), filters: make(map[getrpc.ID]filter),
latestBlockChangedEvent: s.latestBlockChangedEvent, latestBlockChangedEvent: s.latestBlockChangedEvent,
transactionSentToUpstreamEvent: s.transactionSentToUpstreamEvent, transactionSentToUpstreamEvent: s.transactionSentToUpstreamEvent,
client: func() ContextCaller { return s.rpc.RPCClient() }, client: func() ContextCaller { return s.rpc.RPCClient() },
chainID: func() uint64 { return s.rpc.RPCClient().UpstreamChainID },
filterLivenessLoop: defaultFilterLivenessPeriod, filterLivenessLoop: defaultFilterLivenessPeriod,
filterLivenessPeriod: defaultFilterLivenessPeriod + 10*time.Second, filterLivenessPeriod: defaultFilterLivenessPeriod + 10*time.Second,
} }
@ -89,8 +91,8 @@ func (api *PublicAPI) timeoutLoop(quit chan struct{}) {
} }
} }
func (api *PublicAPI) NewFilter(crit filters.FilterCriteria) (rpc.ID, error) { func (api *PublicAPI) NewFilter(crit filters.FilterCriteria) (getrpc.ID, error) {
id := rpc.ID(uuid.New()) id := getrpc.ID(uuid.New())
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
f := &logsFilter{ f := &logsFilter{
id: id, id: id,
@ -105,18 +107,18 @@ func (api *PublicAPI) NewFilter(crit filters.FilterCriteria) (rpc.ID, error) {
api.filtersMu.Lock() api.filtersMu.Lock()
api.filters[id] = f api.filters[id] = f
api.filtersMu.Unlock() api.filtersMu.Unlock()
go pollLogs(api.client(), f, defaultLogsQueryTimeout, defaultLogsPeriod) go pollLogs(api.client(), api.chainID(), f, defaultLogsQueryTimeout, defaultLogsPeriod)
return id, nil return id, nil
} }
// NewBlockFilter is an implemenation of `eth_newBlockFilter` API // NewBlockFilter is an implemenation of `eth_newBlockFilter` API
// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newblockfilter // https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newblockfilter
func (api *PublicAPI) NewBlockFilter() rpc.ID { func (api *PublicAPI) NewBlockFilter() getrpc.ID {
api.filtersMu.Lock() api.filtersMu.Lock()
defer api.filtersMu.Unlock() defer api.filtersMu.Unlock()
f := newHashFilter() f := newHashFilter()
id := rpc.ID(uuid.New()) id := getrpc.ID(uuid.New())
api.filters[id] = f api.filters[id] = f
@ -142,12 +144,12 @@ func (api *PublicAPI) NewBlockFilter() rpc.ID {
// NewPendingTransactionFilter is an implementation of `eth_newPendingTransactionFilter` API // NewPendingTransactionFilter is an implementation of `eth_newPendingTransactionFilter` API
// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newpendingtransactionfilter // https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newpendingtransactionfilter
func (api *PublicAPI) NewPendingTransactionFilter() rpc.ID { func (api *PublicAPI) NewPendingTransactionFilter() getrpc.ID {
api.filtersMu.Lock() api.filtersMu.Lock()
defer api.filtersMu.Unlock() defer api.filtersMu.Unlock()
f := newHashFilter() f := newHashFilter()
id := rpc.ID(uuid.New()) id := getrpc.ID(uuid.New())
api.filters[id] = f api.filters[id] = f
@ -174,7 +176,7 @@ func (api *PublicAPI) NewPendingTransactionFilter() rpc.ID {
// UninstallFilter is an implemenation of `eth_uninstallFilter` API // UninstallFilter is an implemenation of `eth_uninstallFilter` API
// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_uninstallfilter // https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_uninstallfilter
func (api *PublicAPI) UninstallFilter(id rpc.ID) bool { func (api *PublicAPI) UninstallFilter(id getrpc.ID) bool {
api.filtersMu.Lock() api.filtersMu.Lock()
f, found := api.filters[id] f, found := api.filters[id]
if found { if found {
@ -193,7 +195,7 @@ func (api *PublicAPI) UninstallFilter(id rpc.ID) bool {
// If the filter could not be found an empty array of logs is returned. // If the filter could not be found an empty array of logs is returned.
// //
// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getfilterlogs // https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getfilterlogs
func (api *PublicAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]types.Log, error) { func (api *PublicAPI) GetFilterLogs(ctx context.Context, id getrpc.ID) ([]types.Log, error) {
api.filtersMu.Lock() api.filtersMu.Lock()
f, exist := api.filters[id] f, exist := api.filters[id]
api.filtersMu.Unlock() api.filtersMu.Unlock()
@ -206,7 +208,7 @@ func (api *PublicAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]types.Log
} }
ctx, cancel := context.WithTimeout(ctx, defaultLogsQueryTimeout) ctx, cancel := context.WithTimeout(ctx, defaultLogsQueryTimeout)
defer cancel() defer cancel()
rst, err := getLogs(ctx, api.client(), logs.originalCrit) rst, err := getLogs(ctx, api.client(), api.chainID(), logs.originalCrit)
return rst, err return rst, err
} }
@ -214,7 +216,7 @@ func (api *PublicAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]types.Log
// last time it was called. This can be used for polling. // last time it was called. This can be used for polling.
// //
// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getfilterchanges // https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getfilterchanges
func (api *PublicAPI) GetFilterChanges(id rpc.ID) (interface{}, error) { func (api *PublicAPI) GetFilterChanges(id getrpc.ID) (interface{}, error) {
api.filtersMu.Lock() api.filtersMu.Lock()
defer api.filtersMu.Unlock() defer api.filtersMu.Unlock()

View File

@ -21,6 +21,7 @@ func TestFilterLiveness(t *testing.T) {
filterLivenessLoop: 10 * time.Millisecond, filterLivenessLoop: 10 * time.Millisecond,
filterLivenessPeriod: 15 * time.Millisecond, filterLivenessPeriod: 15 * time.Millisecond,
client: func() ContextCaller { return &callTracker{} }, client: func() ContextCaller { return &callTracker{} },
chainID: func() uint64 { return 1 },
} }
id, err := api.NewFilter(filters.FilterCriteria{}) id, err := api.NewFilter(filters.FilterCriteria{})
require.NoError(t, err) require.NoError(t, err)
@ -60,6 +61,7 @@ func TestGetFilterChangesResetsTimer(t *testing.T) {
filterLivenessLoop: 10 * time.Millisecond, filterLivenessLoop: 10 * time.Millisecond,
filterLivenessPeriod: 15 * time.Millisecond, filterLivenessPeriod: 15 * time.Millisecond,
client: func() ContextCaller { return &callTracker{} }, client: func() ContextCaller { return &callTracker{} },
chainID: func() uint64 { return 1 },
} }
id, err := api.NewFilter(filters.FilterCriteria{}) id, err := api.NewFilter(filters.FilterCriteria{})
require.NoError(t, err) require.NoError(t, err)
@ -86,6 +88,7 @@ func TestGetFilterLogs(t *testing.T) {
api := &PublicAPI{ api := &PublicAPI{
filters: make(map[rpc.ID]filter), filters: make(map[rpc.ID]filter),
client: func() ContextCaller { return tracker }, client: func() ContextCaller { return tracker },
chainID: func() uint64 { return 1 },
} }
block := big.NewInt(10) block := big.NewInt(10)
id, err := api.NewFilter(filters.FilterCriteria{ id, err := api.NewFilter(filters.FilterCriteria{

View File

@ -48,7 +48,7 @@ func (p *latestBlockProviderRPC) GetLatestBlock() (blockInfo, error) {
var result blockInfo var result blockInfo
err := rpcClient.Call(&result, "eth_getBlockByNumber", "latest", false) err := rpcClient.Call(&result, rpcClient.UpstreamChainID, "eth_getBlockByNumber", "latest", false)
if err != nil { if err != nil {
return blockInfo{}, err return blockInfo{}, err

View File

@ -9,19 +9,19 @@ import (
"github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc" getRpc "github.com/ethereum/go-ethereum/rpc"
) )
// ContextCaller provides CallContext method as ethereums rpc.Client. // ContextCaller provides CallContext method as ethereums rpc.Client.
type ContextCaller interface { type ContextCaller interface {
CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error CallContext(ctx context.Context, result interface{}, chainID uint64, method string, args ...interface{}) error
} }
func pollLogs(client ContextCaller, f *logsFilter, timeout, period time.Duration) { func pollLogs(client ContextCaller, chainID uint64, f *logsFilter, timeout, period time.Duration) {
query := func() { query := func() {
ctx, cancel := context.WithTimeout(f.ctx, timeout) ctx, cancel := context.WithTimeout(f.ctx, timeout)
defer cancel() defer cancel()
logs, err := getLogs(ctx, client, f.criteria()) logs, err := getLogs(ctx, client, chainID, f.criteria())
if err != nil { if err != nil {
log.Error("Error fetch logs", "criteria", f.crit, "error", err) log.Error("Error fetch logs", "criteria", f.crit, "error", err)
return return
@ -43,8 +43,8 @@ func pollLogs(client ContextCaller, f *logsFilter, timeout, period time.Duration
} }
} }
} }
func getLogs(ctx context.Context, client ContextCaller, crit ethereum.FilterQuery) (rst []types.Log, err error) { func getLogs(ctx context.Context, client ContextCaller, chainID uint64, crit ethereum.FilterQuery) (rst []types.Log, err error) {
return rst, client.CallContext(ctx, &rst, "eth_getLogs", toFilterArg(crit)) return rst, client.CallContext(ctx, &rst, chainID, "eth_getLogs", toFilterArg(crit))
} }
func toFilterArg(q ethereum.FilterQuery) interface{} { func toFilterArg(q ethereum.FilterQuery) interface{} {
@ -61,9 +61,9 @@ func toFilterArg(q ethereum.FilterQuery) interface{} {
} }
func toBlockNumArg(number *big.Int) string { func toBlockNumArg(number *big.Int) string {
if number == nil || number.Int64() == rpc.LatestBlockNumber.Int64() { if number == nil || number.Int64() == getRpc.LatestBlockNumber.Int64() {
return "latest" return "latest"
} else if number.Int64() == rpc.PendingBlockNumber.Int64() { } else if number.Int64() == getRpc.PendingBlockNumber.Int64() {
return "pending" return "pending"
} }
return hexutil.EncodeBig(number) return hexutil.EncodeBig(number)

View File

@ -24,7 +24,7 @@ type callTracker struct {
criteria []map[string]interface{} criteria []map[string]interface{}
} }
func (c *callTracker) CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error { func (c *callTracker) CallContext(ctx context.Context, result interface{}, chainID uint64, method string, args ...interface{}) error {
c.mu.Lock() c.mu.Lock()
defer c.mu.Unlock() defer c.mu.Unlock()
c.calls++ c.calls++
@ -50,7 +50,7 @@ func runLogsFetcherTest(t *testing.T, f *logsFilter, replies [][]types.Log, quer
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(1) wg.Add(1)
go func() { go func() {
pollLogs(&c, f, time.Second, 100*time.Millisecond) pollLogs(&c, 1, f, time.Second, 100*time.Millisecond)
wg.Done() wg.Done()
}() }()
tick := time.Tick(10 * time.Millisecond) tick := time.Tick(10 * time.Millisecond)

View File

@ -19,7 +19,7 @@ func installEthFilter(rpcClient *rpc.Client, method string, args []interface{})
var result string var result string
err := rpcClient.Call(&result, method, args...) err := rpcClient.Call(&result, rpcClient.UpstreamChainID, method, args...)
if err != nil { if err != nil {
return nil, err return nil, err
@ -41,13 +41,13 @@ func (ef *ethFilter) getID() string {
func (ef *ethFilter) getChanges() ([]interface{}, error) { func (ef *ethFilter) getChanges() ([]interface{}, error) {
var result []interface{} var result []interface{}
err := ef.rpcClient.Call(&result, "eth_getFilterChanges", ef.getID()) err := ef.rpcClient.Call(&result, ef.rpcClient.UpstreamChainID, "eth_getFilterChanges", ef.getID())
return result, err return result, err
} }
func (ef *ethFilter) uninstall() error { func (ef *ethFilter) uninstall() error {
return ef.rpcClient.Call(nil, "eth_uninstallFilter", ef.getID()) return ef.rpcClient.Call(nil, ef.rpcClient.UpstreamChainID, "eth_uninstallFilter", ef.getID())
} }
func validateEthMethod(method string) error { func validateEthMethod(method string) error {

View File

@ -19,7 +19,7 @@ func installShhFilter(rpcClient *rpc.Client, method string, args []interface{})
var result string var result string
err := rpcClient.Call(&result, method, args...) err := rpcClient.Call(&result, rpcClient.UpstreamChainID, method, args...)
if err != nil { if err != nil {
return nil, err return nil, err
@ -36,7 +36,7 @@ func installShhFilter(rpcClient *rpc.Client, method string, args []interface{})
func (wf *whisperFilter) getChanges() ([]interface{}, error) { func (wf *whisperFilter) getChanges() ([]interface{}, error) {
var result []interface{} var result []interface{}
err := wf.rpcClient.Call(&result, "shh_getFilterMessages", wf.getID()) err := wf.rpcClient.Call(&result, wf.rpcClient.UpstreamChainID, "shh_getFilterMessages", wf.getID())
return result, err return result, err
} }
@ -46,7 +46,7 @@ func (wf *whisperFilter) getID() string {
} }
func (wf *whisperFilter) uninstall() error { func (wf *whisperFilter) uninstall() error {
return wf.rpcClient.Call(nil, "shh_deleteMessageFilter", wf.getID()) return wf.rpcClient.Call(nil, wf.rpcClient.UpstreamChainID, "shh_deleteMessageFilter", wf.getID())
} }
func validateShhMethod(method string) error { func validateShhMethod(method string) error {

View File

@ -6,7 +6,8 @@ import (
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/status-im/status-go/services/wallet/network" "github.com/status-im/status-go/rpc/network"
"github.com/status-im/status-go/services/wallet/chain"
"github.com/status-im/status-go/services/wallet/transfer" "github.com/status-im/status-go/services/wallet/transfer"
) )
@ -21,7 +22,7 @@ type API struct {
// SetInitialBlocksRange sets initial blocks range // SetInitialBlocksRange sets initial blocks range
func (api *API) SetInitialBlocksRange(ctx context.Context) error { func (api *API) SetInitialBlocksRange(ctx context.Context) error {
return api.s.transferController.SetInitialBlocksRange([]uint64{api.s.legacyChainID}) return api.s.transferController.SetInitialBlocksRange([]uint64{api.s.rpcClient.UpstreamChainID})
} }
func (api *API) SetInitialBlocksRangeForChainIDs(ctx context.Context, chainIDs []uint64) error { func (api *API) SetInitialBlocksRangeForChainIDs(ctx context.Context, chainIDs []uint64) error {
@ -29,7 +30,7 @@ func (api *API) SetInitialBlocksRangeForChainIDs(ctx context.Context, chainIDs [
} }
func (api *API) CheckRecentHistory(ctx context.Context, addresses []common.Address) error { func (api *API) CheckRecentHistory(ctx context.Context, addresses []common.Address) error {
return api.s.transferController.CheckRecentHistory([]uint64{api.s.legacyChainID}, addresses) return api.s.transferController.CheckRecentHistory([]uint64{api.s.rpcClient.UpstreamChainID}, addresses)
} }
func (api *API) CheckRecentHistoryForChainIDs(ctx context.Context, chainIDs []uint64, addresses []common.Address) error { func (api *API) CheckRecentHistoryForChainIDs(ctx context.Context, chainIDs []uint64, addresses []common.Address) error {
@ -39,7 +40,7 @@ func (api *API) CheckRecentHistoryForChainIDs(ctx context.Context, chainIDs []ui
// GetTransfersByAddress returns transfers for a single address // GetTransfersByAddress returns transfers for a single address
func (api *API) GetTransfersByAddress(ctx context.Context, address common.Address, toBlock, limit *hexutil.Big, fetchMore bool) ([]transfer.View, error) { func (api *API) GetTransfersByAddress(ctx context.Context, address common.Address, toBlock, limit *hexutil.Big, fetchMore bool) ([]transfer.View, error) {
log.Debug("[WalletAPI:: GetTransfersByAddress] get transfers for an address", "address", address, "block", toBlock, "limit", limit) log.Debug("[WalletAPI:: GetTransfersByAddress] get transfers for an address", "address", address, "block", toBlock, "limit", limit)
return api.s.transferController.GetTransfersByAddress(ctx, api.s.legacyChainID, address, toBlock, limit, fetchMore) return api.s.transferController.GetTransfersByAddress(ctx, api.s.rpcClient.UpstreamChainID, address, toBlock, limit, fetchMore)
} }
func (api *API) GetTransfersByAddressAndChainID(ctx context.Context, chainID uint64, address common.Address, toBlock, limit *hexutil.Big, fetchMore bool) ([]transfer.View, error) { func (api *API) GetTransfersByAddressAndChainID(ctx context.Context, chainID uint64, address common.Address, toBlock, limit *hexutil.Big, fetchMore bool) ([]transfer.View, error) {
@ -48,7 +49,7 @@ func (api *API) GetTransfersByAddressAndChainID(ctx context.Context, chainID uin
} }
func (api *API) GetCachedBalances(ctx context.Context, addresses []common.Address) ([]transfer.LastKnownBlockView, error) { func (api *API) GetCachedBalances(ctx context.Context, addresses []common.Address) ([]transfer.LastKnownBlockView, error) {
return api.s.transferController.GetCachedBalances(ctx, api.s.legacyChainID, addresses) return api.s.transferController.GetCachedBalances(ctx, api.s.rpcClient.UpstreamChainID, addresses)
} }
func (api *API) GetCachedBalancesbyChainID(ctx context.Context, chainID uint64, addresses []common.Address) ([]transfer.LastKnownBlockView, error) { func (api *API) GetCachedBalancesbyChainID(ctx context.Context, chainID uint64, addresses []common.Address) ([]transfer.LastKnownBlockView, error) {
@ -57,15 +58,15 @@ func (api *API) GetCachedBalancesbyChainID(ctx context.Context, chainID uint64,
// GetTokensBalances return mapping of token balances for every account. // GetTokensBalances return mapping of token balances for every account.
func (api *API) GetTokensBalances(ctx context.Context, accounts, addresses []common.Address) (map[common.Address]map[common.Address]*hexutil.Big, error) { func (api *API) GetTokensBalances(ctx context.Context, accounts, addresses []common.Address) (map[common.Address]map[common.Address]*hexutil.Big, error) {
client, err := api.s.networkManager.GetChainClient(api.s.legacyChainID) chainClient, err := chain.NewLegacyClient(api.s.rpcClient)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return api.s.tokenManager.getBalances(ctx, []*network.ChainClient{client}, accounts, addresses) return api.s.tokenManager.getBalances(ctx, []*chain.Client{chainClient}, accounts, addresses)
} }
func (api *API) GetTokensBalancesForChainIDs(ctx context.Context, chainIDs []uint64, accounts, addresses []common.Address) (map[common.Address]map[common.Address]*hexutil.Big, error) { func (api *API) GetTokensBalancesForChainIDs(ctx context.Context, chainIDs []uint64, accounts, addresses []common.Address) (map[common.Address]map[common.Address]*hexutil.Big, error) {
clients, err := api.s.networkManager.GetChainClients(chainIDs) clients, err := chain.NewClients(api.s.rpcClient, chainIDs)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -82,7 +83,7 @@ func (api *API) GetCustomTokens(ctx context.Context) ([]*Token, error) {
func (api *API) AddCustomToken(ctx context.Context, token Token) error { func (api *API) AddCustomToken(ctx context.Context, token Token) error {
log.Debug("call to create or edit custom token") log.Debug("call to create or edit custom token")
if token.ChainID == 0 { if token.ChainID == 0 {
token.ChainID = api.s.legacyChainID token.ChainID = api.s.rpcClient.UpstreamChainID
} }
err := api.s.tokenManager.upsertCustom(token) err := api.s.tokenManager.upsertCustom(token)
log.Debug("result from database for create or edit custom token", "err", err) log.Debug("result from database for create or edit custom token", "err", err)
@ -91,7 +92,7 @@ func (api *API) AddCustomToken(ctx context.Context, token Token) error {
func (api *API) DeleteCustomToken(ctx context.Context, address common.Address) error { func (api *API) DeleteCustomToken(ctx context.Context, address common.Address) error {
log.Debug("call to remove custom token") log.Debug("call to remove custom token")
err := api.s.tokenManager.deleteCustom(api.s.legacyChainID, address) err := api.s.tokenManager.deleteCustom(api.s.rpcClient.UpstreamChainID, address)
log.Debug("result from database for remove custom token", "err", err) log.Debug("result from database for remove custom token", "err", err)
return err return err
} }
@ -105,7 +106,7 @@ func (api *API) DeleteCustomTokenByChainID(ctx context.Context, chainID uint64,
func (api *API) GetSavedAddresses(ctx context.Context) ([]*SavedAddress, error) { func (api *API) GetSavedAddresses(ctx context.Context) ([]*SavedAddress, error) {
log.Debug("call to get saved addresses") log.Debug("call to get saved addresses")
rst, err := api.s.savedAddressesManager.GetSavedAddresses(api.s.legacyChainID) rst, err := api.s.savedAddressesManager.GetSavedAddresses(api.s.rpcClient.UpstreamChainID)
log.Debug("result from database for saved addresses", "len", len(rst)) log.Debug("result from database for saved addresses", "len", len(rst))
return rst, err return rst, err
} }
@ -113,7 +114,7 @@ func (api *API) GetSavedAddresses(ctx context.Context) ([]*SavedAddress, error)
func (api *API) AddSavedAddress(ctx context.Context, sa SavedAddress) error { func (api *API) AddSavedAddress(ctx context.Context, sa SavedAddress) error {
log.Debug("call to create or edit saved address") log.Debug("call to create or edit saved address")
if sa.ChainID == 0 { if sa.ChainID == 0 {
sa.ChainID = api.s.legacyChainID sa.ChainID = api.s.rpcClient.UpstreamChainID
} }
err := api.s.savedAddressesManager.AddSavedAddress(sa) err := api.s.savedAddressesManager.AddSavedAddress(sa)
log.Debug("result from database for create or edit saved address", "err", err) log.Debug("result from database for create or edit saved address", "err", err)
@ -122,14 +123,14 @@ func (api *API) AddSavedAddress(ctx context.Context, sa SavedAddress) error {
func (api *API) DeleteSavedAddress(ctx context.Context, address common.Address) error { func (api *API) DeleteSavedAddress(ctx context.Context, address common.Address) error {
log.Debug("call to remove saved address") log.Debug("call to remove saved address")
err := api.s.savedAddressesManager.DeleteSavedAddress(api.s.legacyChainID, address) err := api.s.savedAddressesManager.DeleteSavedAddress(api.s.rpcClient.UpstreamChainID, address)
log.Debug("result from database for remove saved address", "err", err) log.Debug("result from database for remove saved address", "err", err)
return err return err
} }
func (api *API) GetPendingTransactions(ctx context.Context) ([]*PendingTransaction, error) { func (api *API) GetPendingTransactions(ctx context.Context) ([]*PendingTransaction, error) {
log.Debug("call to get pending transactions") log.Debug("call to get pending transactions")
rst, err := api.s.transactionManager.getAllPendings(api.s.legacyChainID) rst, err := api.s.transactionManager.getAllPendings(api.s.rpcClient.UpstreamChainID)
log.Debug("result from database for pending transactions", "len", len(rst)) log.Debug("result from database for pending transactions", "len", len(rst))
return rst, err return rst, err
} }
@ -143,7 +144,7 @@ func (api *API) GetPendingTransactionsByChainID(ctx context.Context, chainID uin
func (api *API) GetPendingOutboundTransactionsByAddress(ctx context.Context, address common.Address) ([]*PendingTransaction, error) { func (api *API) GetPendingOutboundTransactionsByAddress(ctx context.Context, address common.Address) ([]*PendingTransaction, error) {
log.Debug("call to get pending outbound transactions by address") log.Debug("call to get pending outbound transactions by address")
rst, err := api.s.transactionManager.getPendingByAddress(api.s.legacyChainID, address) rst, err := api.s.transactionManager.getPendingByAddress(api.s.rpcClient.UpstreamChainID, address)
log.Debug("result from database for pending transactions by address", "len", len(rst)) log.Debug("result from database for pending transactions by address", "len", len(rst))
return rst, err return rst, err
} }
@ -158,7 +159,7 @@ func (api *API) GetPendingOutboundTransactionsByAddressAndChainID(ctx context.Co
func (api *API) StorePendingTransaction(ctx context.Context, trx PendingTransaction) error { func (api *API) StorePendingTransaction(ctx context.Context, trx PendingTransaction) error {
log.Debug("call to create or edit pending transaction") log.Debug("call to create or edit pending transaction")
if trx.ChainID == 0 { if trx.ChainID == 0 {
trx.ChainID = api.s.legacyChainID trx.ChainID = api.s.rpcClient.UpstreamChainID
} }
err := api.s.transactionManager.addPending(trx) err := api.s.transactionManager.addPending(trx)
log.Debug("result from database for creating or editing a pending transaction", "err", err) log.Debug("result from database for creating or editing a pending transaction", "err", err)
@ -167,7 +168,7 @@ func (api *API) StorePendingTransaction(ctx context.Context, trx PendingTransact
func (api *API) DeletePendingTransaction(ctx context.Context, transactionHash common.Hash) error { func (api *API) DeletePendingTransaction(ctx context.Context, transactionHash common.Hash) error {
log.Debug("call to remove pending transaction") log.Debug("call to remove pending transaction")
err := api.s.transactionManager.deletePending(api.s.legacyChainID, transactionHash) err := api.s.transactionManager.deletePending(api.s.rpcClient.UpstreamChainID, transactionHash)
log.Debug("result from database for remove pending transaction", "err", err) log.Debug("result from database for remove pending transaction", "err", err)
return err return err
} }
@ -180,20 +181,18 @@ func (api *API) DeletePendingTransactionByChainID(ctx context.Context, chainID u
} }
func (api *API) WatchTransaction(ctx context.Context, transactionHash common.Hash) error { func (api *API) WatchTransaction(ctx context.Context, transactionHash common.Hash) error {
chainClient, err := api.s.networkManager.GetChainClient(api.s.legacyChainID) chainClient, err := chain.NewLegacyClient(api.s.rpcClient)
if err != nil { if err != nil {
return err return err
} }
return api.s.transactionManager.watch(ctx, transactionHash, chainClient) return api.s.transactionManager.watch(ctx, transactionHash, chainClient)
} }
func (api *API) WatchTransactionByChainID(ctx context.Context, chainID uint64, transactionHash common.Hash) error { func (api *API) WatchTransactionByChainID(ctx context.Context, chainID uint64, transactionHash common.Hash) error {
chainClient, err := api.s.networkManager.GetChainClient(chainID) chainClient, err := chain.NewClient(api.s.rpcClient, chainID)
if err != nil { if err != nil {
return err return err
} }
return api.s.transactionManager.watch(ctx, transactionHash, chainClient) return api.s.transactionManager.watch(ctx, transactionHash, chainClient)
} }
@ -237,15 +236,15 @@ func (api *API) GetOpenseaAssetsByOwnerAndCollection(ctx context.Context, chainI
func (api *API) AddEthereumChain(ctx context.Context, network network.Network) error { func (api *API) AddEthereumChain(ctx context.Context, network network.Network) error {
log.Debug("call to AddEthereumChain") log.Debug("call to AddEthereumChain")
return api.s.networkManager.Upsert(&network) return api.s.rpcClient.NetworkManager.Upsert(&network)
} }
func (api *API) DeleteEthereumChain(ctx context.Context, chainID uint64) error { func (api *API) DeleteEthereumChain(ctx context.Context, chainID uint64) error {
log.Debug("call to DeleteEthereumChain") log.Debug("call to DeleteEthereumChain")
return api.s.networkManager.Delete(chainID) return api.s.rpcClient.NetworkManager.Delete(chainID)
} }
func (api *API) GetEthereumChains(ctx context.Context, onlyEnabled bool) ([]*network.Network, error) { func (api *API) GetEthereumChains(ctx context.Context, onlyEnabled bool) ([]*network.Network, error) {
log.Debug("call to GetEthereumChains") log.Debug("call to GetEthereumChains")
return api.s.networkManager.Get(onlyEnabled) return api.s.rpcClient.NetworkManager.Get(onlyEnabled)
} }

View File

@ -0,0 +1,100 @@
package chain
import (
"context"
"math/big"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/status-im/status-go/rpc"
"github.com/status-im/status-go/services/rpcstats"
)
type Client struct {
eth *ethclient.Client
ChainID uint64
}
func NewClient(rpc *rpc.Client, chainID uint64) (*Client, error) {
ethClient, err := rpc.EthClient(chainID)
if err != nil {
return nil, err
}
return &Client{ethClient, chainID}, nil
}
func NewLegacyClient(rpc *rpc.Client) (*Client, error) {
return NewClient(rpc, rpc.UpstreamChainID)
}
func NewClients(rpc *rpc.Client, chainIDs []uint64) (res []*Client, err error) {
for _, chainID := range chainIDs {
client, err := NewClient(rpc, chainID)
if err != nil {
return nil, err
}
res = append(res, client)
}
return res, nil
}
func (cc *Client) ToBigInt() *big.Int {
return big.NewInt(int64(cc.ChainID))
}
func (cc *Client) HeaderByHash(ctx context.Context, hash common.Hash) (*types.Header, error) {
rpcstats.CountCall("eth_getBlockByHash")
return cc.eth.HeaderByHash(ctx, hash)
}
func (cc *Client) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) {
rpcstats.CountCall("eth_getBlockByNumber")
return cc.eth.HeaderByNumber(ctx, number)
}
func (cc *Client) BlockByHash(ctx context.Context, hash common.Hash) (*types.Block, error) {
rpcstats.CountCall("eth_getBlockByHash")
return cc.eth.BlockByHash(ctx, hash)
}
func (cc *Client) BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error) {
rpcstats.CountCall("eth_getBlockByNumber")
return cc.eth.BlockByNumber(ctx, number)
}
func (cc *Client) BalanceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (*big.Int, error) {
rpcstats.CountCall("eth_getBalance")
return cc.eth.BalanceAt(ctx, account, blockNumber)
}
func (cc *Client) NonceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (uint64, error) {
rpcstats.CountCall("eth_getTransactionCount")
return cc.eth.NonceAt(ctx, account, blockNumber)
}
func (cc *Client) TransactionReceipt(ctx context.Context, txHash common.Hash) (*types.Receipt, error) {
rpcstats.CountCall("eth_getTransactionReceipt")
return cc.eth.TransactionReceipt(ctx, txHash)
}
func (cc *Client) TransactionByHash(ctx context.Context, hash common.Hash) (tx *types.Transaction, isPending bool, err error) {
rpcstats.CountCall("eth_getTransactionByHash")
return cc.eth.TransactionByHash(ctx, hash)
}
func (cc *Client) FilterLogs(ctx context.Context, q ethereum.FilterQuery) ([]types.Log, error) {
rpcstats.CountCall("eth_getLogs")
return cc.eth.FilterLogs(ctx, q)
}
func (cc *Client) CodeAt(ctx context.Context, contract common.Address, blockNumber *big.Int) ([]byte, error) {
rpcstats.CountCall("eth_getCode")
return cc.eth.CodeAt(ctx, contract, blockNumber)
}
func (cc *Client) CallContract(ctx context.Context, call ethereum.CallMsg, blockNumber *big.Int) ([]byte, error) {
rpcstats.CountCall("eth_call")
return cc.eth.CallContract(ctx, call, blockNumber)
}

View File

@ -1,76 +0,0 @@
package network
import (
"context"
"math/big"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/status-im/status-go/services/rpcstats"
)
type ChainClient struct {
eth *ethclient.Client
ChainID uint64
}
func (cc *ChainClient) ToBigInt() *big.Int {
return big.NewInt(int64(cc.ChainID))
}
func (cc *ChainClient) HeaderByHash(ctx context.Context, hash common.Hash) (*types.Header, error) {
rpcstats.CountCall("eth_getBlockByHash")
return cc.eth.HeaderByHash(ctx, hash)
}
func (cc *ChainClient) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) {
rpcstats.CountCall("eth_getBlockByNumber")
return cc.eth.HeaderByNumber(ctx, number)
}
func (cc *ChainClient) BlockByHash(ctx context.Context, hash common.Hash) (*types.Block, error) {
rpcstats.CountCall("eth_getBlockByHash")
return cc.eth.BlockByHash(ctx, hash)
}
func (cc *ChainClient) BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error) {
rpcstats.CountCall("eth_getBlockByNumber")
return cc.eth.BlockByNumber(ctx, number)
}
func (cc *ChainClient) BalanceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (*big.Int, error) {
rpcstats.CountCall("eth_getBalance")
return cc.eth.BalanceAt(ctx, account, blockNumber)
}
func (cc *ChainClient) NonceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (uint64, error) {
rpcstats.CountCall("eth_getTransactionCount")
return cc.eth.NonceAt(ctx, account, blockNumber)
}
func (cc *ChainClient) TransactionReceipt(ctx context.Context, txHash common.Hash) (*types.Receipt, error) {
rpcstats.CountCall("eth_getTransactionReceipt")
return cc.eth.TransactionReceipt(ctx, txHash)
}
func (cc *ChainClient) TransactionByHash(ctx context.Context, hash common.Hash) (tx *types.Transaction, isPending bool, err error) {
rpcstats.CountCall("eth_getTransactionByHash")
return cc.eth.TransactionByHash(ctx, hash)
}
func (cc *ChainClient) FilterLogs(ctx context.Context, q ethereum.FilterQuery) ([]types.Log, error) {
rpcstats.CountCall("eth_getLogs")
return cc.eth.FilterLogs(ctx, q)
}
func (cc *ChainClient) CodeAt(ctx context.Context, contract common.Address, blockNumber *big.Int) ([]byte, error) {
rpcstats.CountCall("eth_getCode")
return cc.eth.CodeAt(ctx, contract, blockNumber)
}
func (cc *ChainClient) CallContract(ctx context.Context, call ethereum.CallMsg, blockNumber *big.Int) ([]byte, error) {
rpcstats.CountCall("eth_call")
return cc.eth.CallContract(ctx, call, blockNumber)
}

View File

@ -3,17 +3,17 @@ package wallet
import ( import (
"database/sql" "database/sql"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/rpc" gethrpc "github.com/ethereum/go-ethereum/rpc"
"github.com/status-im/status-go/services/wallet/network"
"github.com/status-im/status-go/rpc"
"github.com/status-im/status-go/services/wallet/transfer" "github.com/status-im/status-go/services/wallet/transfer"
) )
// NewService initializes service instance. // NewService initializes service instance.
func NewService(db *sql.DB, legacyChainID uint64, legacyClient *ethclient.Client, networks []network.Network, accountFeed *event.Feed) *Service { func NewService(db *sql.DB, rpcClient *rpc.Client, accountFeed *event.Feed) *Service {
cryptoOnRampManager := NewCryptoOnRampManager(&CryptoOnRampOptions{ cryptoOnRampManager := NewCryptoOnRampManager(&CryptoOnRampOptions{
dataSourceType: DataSourceStatic, dataSourceType: DataSourceStatic,
}) })
@ -21,36 +21,28 @@ func NewService(db *sql.DB, legacyChainID uint64, legacyClient *ethclient.Client
savedAddressesManager := &SavedAddressesManager{db: db} savedAddressesManager := &SavedAddressesManager{db: db}
transactionManager := &TransactionManager{db: db} transactionManager := &TransactionManager{db: db}
favouriteManager := &FavouriteManager{db: db} favouriteManager := &FavouriteManager{db: db}
networkManager := network.NewManager(db, legacyChainID, legacyClient) transferController := transfer.NewTransferController(db, rpcClient, accountFeed)
err := networkManager.Init(networks)
if err != nil {
log.Error("Network manager failed to initialize", "error", err)
}
transferController := transfer.NewTransferController(db, networkManager, accountFeed)
return &Service{ return &Service{
rpcClient: rpcClient,
favouriteManager: favouriteManager, favouriteManager: favouriteManager,
networkManager: networkManager,
tokenManager: tokenManager, tokenManager: tokenManager,
savedAddressesManager: savedAddressesManager, savedAddressesManager: savedAddressesManager,
transactionManager: transactionManager, transactionManager: transactionManager,
transferController: transferController, transferController: transferController,
cryptoOnRampManager: cryptoOnRampManager, cryptoOnRampManager: cryptoOnRampManager,
legacyChainID: legacyChainID,
} }
} }
// Service is a wallet service. // Service is a wallet service.
type Service struct { type Service struct {
networkManager *network.Manager rpcClient *rpc.Client
savedAddressesManager *SavedAddressesManager savedAddressesManager *SavedAddressesManager
tokenManager *TokenManager tokenManager *TokenManager
transactionManager *TransactionManager transactionManager *TransactionManager
favouriteManager *FavouriteManager favouriteManager *FavouriteManager
cryptoOnRampManager *CryptoOnRampManager cryptoOnRampManager *CryptoOnRampManager
transferController *transfer.Controller transferController *transfer.Controller
legacyChainID uint64
started bool started bool
} }
@ -76,8 +68,8 @@ func (s *Service) Stop() error {
} }
// APIs returns list of available RPC APIs. // APIs returns list of available RPC APIs.
func (s *Service) APIs() []rpc.API { func (s *Service) APIs() []gethrpc.API {
return []rpc.API{ return []gethrpc.API{
{ {
Namespace: "wallet", Namespace: "wallet",
Version: "0.1.0", Version: "0.1.0",

View File

@ -12,8 +12,8 @@ import (
"github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/status-im/status-go/services/wallet/async" "github.com/status-im/status-go/services/wallet/async"
"github.com/status-im/status-go/services/wallet/chain"
"github.com/status-im/status-go/services/wallet/ierc20" "github.com/status-im/status-go/services/wallet/ierc20"
"github.com/status-im/status-go/services/wallet/network"
) )
var requestTimeout = 20 * time.Second var requestTimeout = 20 * time.Second
@ -69,7 +69,7 @@ func (tm *TokenManager) deleteCustom(chainID uint64, address common.Address) err
return err return err
} }
func (tm *TokenManager) getBalances(parent context.Context, clients []*network.ChainClient, accounts, tokens []common.Address) (map[common.Address]map[common.Address]*hexutil.Big, error) { func (tm *TokenManager) getBalances(parent context.Context, clients []*chain.Client, accounts, tokens []common.Address) (map[common.Address]map[common.Address]*hexutil.Big, error) {
var ( var (
group = async.NewAtomicGroup(parent) group = async.NewAtomicGroup(parent)
mu sync.Mutex mu sync.Mutex

View File

@ -11,7 +11,7 @@ import (
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/status-im/status-go/services/wallet/async" "github.com/status-im/status-go/services/wallet/async"
"github.com/status-im/status-go/services/wallet/bigint" "github.com/status-im/status-go/services/wallet/bigint"
"github.com/status-im/status-go/services/wallet/network" "github.com/status-im/status-go/services/wallet/chain"
) )
type TransactionManager struct { type TransactionManager struct {
@ -156,7 +156,7 @@ func (tm *TransactionManager) deletePending(chainID uint64, hash common.Hash) er
return err return err
} }
func (tm *TransactionManager) watch(ctx context.Context, transactionHash common.Hash, client *network.ChainClient) error { func (tm *TransactionManager) watch(ctx context.Context, transactionHash common.Hash, client *chain.Client) error {
watchTxCommand := &watchTransactionCommand{ watchTxCommand := &watchTransactionCommand{
hash: transactionHash, hash: transactionHash,
client: client, client: client,
@ -169,7 +169,7 @@ func (tm *TransactionManager) watch(ctx context.Context, transactionHash common.
} }
type watchTransactionCommand struct { type watchTransactionCommand struct {
client *network.ChainClient client *chain.Client
hash common.Hash hash common.Hash
} }

View File

@ -10,7 +10,7 @@ import (
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/status-im/status-go/multiaccounts/accounts" "github.com/status-im/status-go/multiaccounts/accounts"
"github.com/status-im/status-go/services/wallet/bigint" "github.com/status-im/status-go/services/wallet/bigint"
"github.com/status-im/status-go/services/wallet/network" "github.com/status-im/status-go/services/wallet/chain"
) )
type BlocksRange struct { type BlocksRange struct {
@ -63,7 +63,7 @@ func (b *Block) mergeBlocksRanges(chainIDs []uint64, accounts []common.Address)
return nil return nil
} }
func (b *Block) setInitialBlocksRange(chainClient *network.ChainClient) error { func (b *Block) setInitialBlocksRange(chainClient *chain.Client) error {
accountsDB := accounts.NewDB(b.db) accountsDB := accounts.NewDB(b.db)
watchAddress, err := accountsDB.GetWalletAddress() watchAddress, err := accountsDB.GetWalletAddress()
if err != nil { if err != nil {

View File

@ -11,7 +11,7 @@ import (
"github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/status-im/status-go/services/wallet/async" "github.com/status-im/status-go/services/wallet/async"
"github.com/status-im/status-go/services/wallet/network" "github.com/status-im/status-go/services/wallet/chain"
) )
var numberOfBlocksCheckedPerIteration = 40 var numberOfBlocksCheckedPerIteration = 40
@ -20,7 +20,7 @@ type ethHistoricalCommand struct {
db *Database db *Database
eth Downloader eth Downloader
address common.Address address common.Address
chainClient *network.ChainClient chainClient *chain.Client
balanceCache *balanceCache balanceCache *balanceCache
feed *event.Feed feed *event.Feed
foundHeaders []*DBHeader foundHeaders []*DBHeader
@ -71,7 +71,7 @@ type erc20HistoricalCommand struct {
db *Database db *Database
erc20 BatchDownloader erc20 BatchDownloader
address common.Address address common.Address
chainClient *network.ChainClient chainClient *chain.Client
feed *event.Feed feed *event.Feed
iterator *IterativeDownloader iterator *IterativeDownloader
@ -127,7 +127,7 @@ type controlCommand struct {
block *Block block *Block
eth *ETHDownloader eth *ETHDownloader
erc20 *ERC20TransfersDownloader erc20 *ERC20TransfersDownloader
chainClient *network.ChainClient chainClient *chain.Client
feed *event.Feed feed *event.Feed
errorsCount int errorsCount int
nonArchivalRPCNode bool nonArchivalRPCNode bool
@ -300,7 +300,7 @@ type transfersCommand struct {
eth *ETHDownloader eth *ETHDownloader
block *big.Int block *big.Int
address common.Address address common.Address
chainClient *network.ChainClient chainClient *chain.Client
fetchedTransfers []Transfer fetchedTransfers []Transfer
} }
@ -333,7 +333,7 @@ type loadTransfersCommand struct {
accounts []common.Address accounts []common.Address
db *Database db *Database
block *Block block *Block
chainClient *network.ChainClient chainClient *chain.Client
blocksByAddress map[common.Address][]*big.Int blocksByAddress map[common.Address][]*big.Int
foundTransfersByAddress map[common.Address][]Transfer foundTransfersByAddress map[common.Address][]Transfer
} }
@ -368,7 +368,7 @@ func (c *loadTransfersCommand) Run(parent context.Context) (err error) {
type findAndCheckBlockRangeCommand struct { type findAndCheckBlockRangeCommand struct {
accounts []common.Address accounts []common.Address
db *Database db *Database
chainClient *network.ChainClient chainClient *chain.Client
balanceCache *balanceCache balanceCache *balanceCache
feed *event.Feed feed *event.Feed
fromByAddress map[common.Address]*LastKnownBlock fromByAddress map[common.Address]*LastKnownBlock
@ -533,7 +533,7 @@ func (c *findAndCheckBlockRangeCommand) fastIndexErc20(ctx context.Context, from
} }
} }
func loadTransfers(ctx context.Context, accounts []common.Address, block *Block, db *Database, chainClient *network.ChainClient, limit int, blocksByAddress map[common.Address][]*big.Int) (map[common.Address][]Transfer, error) { func loadTransfers(ctx context.Context, accounts []common.Address, block *Block, db *Database, chainClient *chain.Client, limit int, blocksByAddress map[common.Address][]*big.Int) (map[common.Address][]Transfer, error) {
start := time.Now() start := time.Now()
group := async.NewGroup(ctx) group := async.NewGroup(ctx)
@ -585,7 +585,7 @@ func loadTransfers(ctx context.Context, accounts []common.Address, block *Block,
} }
} }
func findFirstRange(c context.Context, account common.Address, initialTo *big.Int, client *network.ChainClient) (*big.Int, error) { func findFirstRange(c context.Context, account common.Address, initialTo *big.Int, client *chain.Client) (*big.Int, error) {
from := big.NewInt(0) from := big.NewInt(0)
to := initialTo to := initialTo
goal := uint64(20) goal := uint64(20)
@ -641,7 +641,7 @@ func findFirstRange(c context.Context, account common.Address, initialTo *big.In
return from, nil return from, nil
} }
func findFirstRanges(c context.Context, accounts []common.Address, initialTo *big.Int, client *network.ChainClient) (map[common.Address]*big.Int, error) { func findFirstRanges(c context.Context, accounts []common.Address, initialTo *big.Int, client *chain.Client) (map[common.Address]*big.Int, error) {
res := map[common.Address]*big.Int{} res := map[common.Address]*big.Int{}
for _, address := range accounts { for _, address := range accounts {

View File

@ -10,34 +10,35 @@ import (
"github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/status-im/status-go/multiaccounts/accounts" "github.com/status-im/status-go/multiaccounts/accounts"
"github.com/status-im/status-go/rpc"
"github.com/status-im/status-go/services/wallet/async" "github.com/status-im/status-go/services/wallet/async"
"github.com/status-im/status-go/services/wallet/network" "github.com/status-im/status-go/services/wallet/chain"
) )
type Controller struct { type Controller struct {
db *Database db *Database
networkManager *network.Manager rpcClient *rpc.Client
signals *SignalsTransmitter signals *SignalsTransmitter
block *Block block *Block
reactor *Reactor reactor *Reactor
accountFeed *event.Feed accountFeed *event.Feed
TransferFeed *event.Feed TransferFeed *event.Feed
group *async.Group group *async.Group
} }
func NewTransferController(db *sql.DB, networkManager *network.Manager, accountFeed *event.Feed) *Controller { func NewTransferController(db *sql.DB, rpcClient *rpc.Client, accountFeed *event.Feed) *Controller {
transferFeed := &event.Feed{} transferFeed := &event.Feed{}
signals := &SignalsTransmitter{ signals := &SignalsTransmitter{
publisher: transferFeed, publisher: transferFeed,
} }
block := &Block{db} block := &Block{db}
return &Controller{ return &Controller{
db: NewDB(db), db: NewDB(db),
block: block, block: block,
networkManager: networkManager, rpcClient: rpcClient,
signals: signals, signals: signals,
accountFeed: accountFeed, accountFeed: accountFeed,
TransferFeed: transferFeed, TransferFeed: transferFeed,
} }
} }
@ -60,7 +61,7 @@ func (c *Controller) Stop() {
} }
func (c *Controller) SetInitialBlocksRange(chainIDs []uint64) error { func (c *Controller) SetInitialBlocksRange(chainIDs []uint64) error {
chainClients, err := c.networkManager.GetChainClients(chainIDs) chainClients, err := chain.NewClients(c.rpcClient, chainIDs)
if err != nil { if err != nil {
return err return err
} }
@ -91,7 +92,7 @@ func (c *Controller) CheckRecentHistory(chainIDs []uint64, accounts []common.Add
return err return err
} }
chainClients, err := c.networkManager.GetChainClients(chainIDs) chainClients, err := chain.NewClients(c.rpcClient, chainIDs)
if err != nil { if err != nil {
return err return err
} }
@ -121,7 +122,7 @@ func (c *Controller) CheckRecentHistory(chainIDs []uint64, accounts []common.Add
// watchAccountsChanges subsribes to a feed and watches for changes in accounts list. If there are new or removed accounts // watchAccountsChanges subsribes to a feed and watches for changes in accounts list. If there are new or removed accounts
// reactor will be restarted. // reactor will be restarted.
func watchAccountsChanges(ctx context.Context, accountFeed *event.Feed, reactor *Reactor, chainClients []*network.ChainClient, initial []common.Address) error { func watchAccountsChanges(ctx context.Context, accountFeed *event.Feed, reactor *Reactor, chainClients []*chain.Client, initial []common.Address) error {
accounts := make(chan []accounts.Account, 1) // it may block if the rate of updates will be significantly higher accounts := make(chan []accounts.Account, 1) // it may block if the rate of updates will be significantly higher
sub := accountFeed.Subscribe(accounts) sub := accountFeed.Subscribe(accounts)
defer sub.Unsubscribe() defer sub.Unsubscribe()
@ -182,7 +183,7 @@ func (c *Controller) GetTransfersByAddress(ctx context.Context, chainID uint64,
} }
transfersCount := big.NewInt(int64(len(rst))) transfersCount := big.NewInt(int64(len(rst)))
chainClient, err := c.networkManager.GetChainClient(chainID) chainClient, err := chain.NewClient(c.rpcClient, chainID)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -12,7 +12,7 @@ import (
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/status-im/status-go/services/wallet/network" "github.com/status-im/status-go/services/wallet/chain"
) )
// Type type of the asset that was transferred. // Type type of the asset that was transferred.
@ -51,7 +51,7 @@ type Transfer struct {
// ETHDownloader downloads regular eth transfers. // ETHDownloader downloads regular eth transfers.
type ETHDownloader struct { type ETHDownloader struct {
chainClient *network.ChainClient chainClient *chain.Client
accounts []common.Address accounts []common.Address
signer types.Signer signer types.Signer
db *Database db *Database
@ -146,7 +146,7 @@ func (d *ETHDownloader) getTransfersInBlock(ctx context.Context, blk *types.Bloc
} }
// NewERC20TransfersDownloader returns new instance. // NewERC20TransfersDownloader returns new instance.
func NewERC20TransfersDownloader(client *network.ChainClient, accounts []common.Address, signer types.Signer) *ERC20TransfersDownloader { func NewERC20TransfersDownloader(client *chain.Client, accounts []common.Address, signer types.Signer) *ERC20TransfersDownloader {
signature := crypto.Keccak256Hash([]byte(erc20TransferEventSignature)) signature := crypto.Keccak256Hash([]byte(erc20TransferEventSignature))
return &ERC20TransfersDownloader{ return &ERC20TransfersDownloader{
client: client, client: client,
@ -158,7 +158,7 @@ func NewERC20TransfersDownloader(client *network.ChainClient, accounts []common.
// ERC20TransfersDownloader is a downloader for erc20 tokens transfers. // ERC20TransfersDownloader is a downloader for erc20 tokens transfers.
type ERC20TransfersDownloader struct { type ERC20TransfersDownloader struct {
client *network.ChainClient client *chain.Client
accounts []common.Address accounts []common.Address
// hash of the Transfer event signature // hash of the Transfer event signature

View File

@ -10,7 +10,7 @@ import (
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/event"
"github.com/status-im/status-go/services/wallet/async" "github.com/status-im/status-go/services/wallet/async"
"github.com/status-im/status-go/services/wallet/network" "github.com/status-im/status-go/services/wallet/chain"
) )
var ( var (
@ -40,7 +40,7 @@ type Reactor struct {
group *async.Group group *async.Group
} }
func (r *Reactor) newControlCommand(chainClient *network.ChainClient, accounts []common.Address) *controlCommand { func (r *Reactor) newControlCommand(chainClient *chain.Client, accounts []common.Address) *controlCommand {
signer := types.NewLondonSigner(chainClient.ToBigInt()) signer := types.NewLondonSigner(chainClient.ToBigInt())
ctl := &controlCommand{ ctl := &controlCommand{
db: r.db, db: r.db,
@ -62,7 +62,7 @@ func (r *Reactor) newControlCommand(chainClient *network.ChainClient, accounts [
} }
// Start runs reactor loop in background. // Start runs reactor loop in background.
func (r *Reactor) start(chainClients []*network.ChainClient, accounts []common.Address) error { func (r *Reactor) start(chainClients []*chain.Client, accounts []common.Address) error {
r.mu.Lock() r.mu.Lock()
defer r.mu.Unlock() defer r.mu.Unlock()
@ -89,7 +89,7 @@ func (r *Reactor) stop() {
r.group = nil r.group = nil
} }
func (r *Reactor) restart(chainClients []*network.ChainClient, accounts []common.Address) error { func (r *Reactor) restart(chainClients []*chain.Client, accounts []common.Address) error {
r.stop() r.stop()
return r.start(chainClients, accounts) return r.start(chainClients, accounts)
} }

View File

@ -27,7 +27,7 @@ func newRPCWrapper(client *rpc.Client) *rpcWrapper {
// This is the nonce that should be used for the next transaction. // This is the nonce that should be used for the next transaction.
func (w *rpcWrapper) PendingNonceAt(ctx context.Context, account common.Address) (uint64, error) { func (w *rpcWrapper) PendingNonceAt(ctx context.Context, account common.Address) (uint64, error) {
var result hexutil.Uint64 var result hexutil.Uint64
err := w.rpcClient.CallContext(ctx, &result, "eth_getTransactionCount", account, "pending") err := w.rpcClient.CallContext(ctx, &result, w.rpcClient.UpstreamChainID, "eth_getTransactionCount", account, "pending")
return uint64(result), err return uint64(result), err
} }
@ -35,7 +35,7 @@ func (w *rpcWrapper) PendingNonceAt(ctx context.Context, account common.Address)
// execution of a transaction. // execution of a transaction.
func (w *rpcWrapper) SuggestGasPrice(ctx context.Context) (*big.Int, error) { func (w *rpcWrapper) SuggestGasPrice(ctx context.Context) (*big.Int, error) {
var hex hexutil.Big var hex hexutil.Big
if err := w.rpcClient.CallContext(ctx, &hex, "eth_gasPrice"); err != nil { if err := w.rpcClient.CallContext(ctx, &hex, w.rpcClient.UpstreamChainID, "eth_gasPrice"); err != nil {
return nil, err return nil, err
} }
return (*big.Int)(&hex), nil return (*big.Int)(&hex), nil
@ -47,7 +47,7 @@ func (w *rpcWrapper) SuggestGasPrice(ctx context.Context) (*big.Int, error) {
// but it should provide a basis for setting a reasonable default. // but it should provide a basis for setting a reasonable default.
func (w *rpcWrapper) EstimateGas(ctx context.Context, msg ethereum.CallMsg) (uint64, error) { func (w *rpcWrapper) EstimateGas(ctx context.Context, msg ethereum.CallMsg) (uint64, error) {
var hex hexutil.Uint64 var hex hexutil.Uint64
err := w.rpcClient.CallContext(ctx, &hex, "eth_estimateGas", toCallArg(msg)) err := w.rpcClient.CallContext(ctx, &hex, w.rpcClient.UpstreamChainID, "eth_estimateGas", toCallArg(msg))
if err != nil { if err != nil {
return 0, err return 0, err
} }
@ -63,7 +63,7 @@ func (w *rpcWrapper) SendTransaction(ctx context.Context, tx *gethtypes.Transact
if err != nil { if err != nil {
return err return err
} }
return w.rpcClient.CallContext(ctx, nil, "eth_sendRawTransaction", types.EncodeHex(data)) return w.rpcClient.CallContext(ctx, nil, w.rpcClient.UpstreamChainID, "eth_sendRawTransaction", types.EncodeHex(data))
} }
func toCallArg(msg ethereum.CallMsg) interface{} { func toCallArg(msg ethereum.CallMsg) interface{} {

View File

@ -49,7 +49,7 @@ func (s *TransactorSuite) SetupTest() {
s.server, s.txServiceMock = fake.NewTestServer(s.txServiceMockCtrl) s.server, s.txServiceMock = fake.NewTestServer(s.txServiceMockCtrl)
s.client = gethrpc.DialInProc(s.server) s.client = gethrpc.DialInProc(s.server)
rpcClient, _ := rpc.NewClient(s.client, params.UpstreamRPCConfig{}) rpcClient, _ := rpc.NewClient(s.client, 1, params.UpstreamRPCConfig{}, nil, nil)
// expected by simulated backend // expected by simulated backend
chainID := gethparams.AllEthashProtocolChanges.ChainID.Uint64() chainID := gethparams.AllEthashProtocolChanges.ChainID.Uint64()
nodeConfig, err := utils.MakeTestNodeConfigWithDataDir("", "/tmp", chainID) nodeConfig, err := utils.MakeTestNodeConfigWithDataDir("", "/tmp", chainID)