From 26bfeddad483a649ef2f17e9729b9547f33979aa Mon Sep 17 00:00:00 2001 From: Anthony Laibe Date: Wed, 22 Sep 2021 19:49:20 +0200 Subject: [PATCH] feat: Rpc client manage multiple eth client (#2359) --- api/geth_backend.go | 4 +- contracts/contract_caller.go | 67 ------------ node/get_status_node.go | 4 +- node/status_node_services.go | 8 +- params/config.go | 2 +- peers/peerpool.go | 7 +- peers/peerpool_test.go | 12 +-- rpc/call_raw.go | 22 ++-- rpc/call_raw_test.go | 19 +++- rpc/client.go | 101 +++++++++++++----- rpc/client_test.go | 37 +++++-- {services/wallet => rpc}/network/network.go | 56 ++-------- .../wallet => rpc}/network/network_test.go | 0 services/personal/api.go | 2 + services/rpcfilters/api.go | 32 +++--- services/rpcfilters/api_test.go | 3 + services/rpcfilters/latest_block_provider.go | 2 +- services/rpcfilters/latest_logs.go | 16 +-- services/rpcfilters/latest_logs_test.go | 4 +- services/subscriptions/filters_eth.go | 6 +- services/subscriptions/filters_shh.go | 6 +- services/wallet/api.go | 47 ++++---- services/wallet/chain/client.go | 100 +++++++++++++++++ services/wallet/network/chain_client.go | 76 ------------- services/wallet/service.go | 26 ++--- services/wallet/token.go | 4 +- services/wallet/transaction.go | 6 +- services/wallet/transfer/block.go | 4 +- services/wallet/transfer/commands.go | 20 ++-- services/wallet/transfer/controller.go | 41 +++---- services/wallet/transfer/downloader.go | 8 +- services/wallet/transfer/reactor.go | 8 +- transactions/rpc_wrapper.go | 8 +- transactions/transactor_test.go | 2 +- 34 files changed, 381 insertions(+), 379 deletions(-) delete mode 100644 contracts/contract_caller.go rename {services/wallet => rpc}/network/network.go (72%) rename {services/wallet => rpc}/network/network_test.go (100%) create mode 100644 services/wallet/chain/client.go delete mode 100644 services/wallet/network/chain_client.go diff --git a/api/geth_backend.go b/api/geth_backend.go index 8a7fd5ce0..41d18797e 100644 --- a/api/geth_backend.go +++ b/api/geth_backend.go @@ -934,7 +934,7 @@ func (b *GethStatusBackend) registerHandlers() error { for _, client := range clients { client.RegisterHandler( params.AccountsMethodName, - func(context.Context, ...interface{}) (interface{}, error) { + func(context.Context, uint64, ...interface{}) (interface{}, error) { return b.accountManager.Accounts() }, ) @@ -951,7 +951,7 @@ func (b *GethStatusBackend) registerHandlers() error { return nil } -func unsupportedMethodHandler(ctx context.Context, rpcParams ...interface{}) (interface{}, error) { +func unsupportedMethodHandler(ctx context.Context, chainID uint64, rpcParams ...interface{}) (interface{}, error) { return nil, ErrUnsupportedRPCMethod } diff --git a/contracts/contract_caller.go b/contracts/contract_caller.go deleted file mode 100644 index 59edfb96f..000000000 --- a/contracts/contract_caller.go +++ /dev/null @@ -1,67 +0,0 @@ -package contracts - -import ( - "context" - "math/big" - - ethereum "github.com/ethereum/go-ethereum" - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/common/hexutil" -) - -type RPCClient interface { - CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error -} - -type ContractCaller struct { - c RPCClient -} - -func NewContractCaller(c RPCClient) *ContractCaller { - return &ContractCaller{ - c: c, - } -} - -// CodeAt returns the contract code of the given account. -// The block number can be nil, in which case the code is taken from the latest known block. -func (cc *ContractCaller) CodeAt(ctx context.Context, account common.Address, blockNumber *big.Int) ([]byte, error) { - var result hexutil.Bytes - err := cc.c.CallContext(ctx, &result, "eth_getCode", account, "latest") - return result, err -} - -// CallContract executes a message call transaction, which is directly executed in the VM -// of the node, but never mined into the blockchain. -// -// blockNumber selects the block height at which the call runs. It can be nil, in which -// case the code is taken from the latest known block. Note that state from very old -// blocks might not be available. -func (cc *ContractCaller) CallContract(ctx context.Context, msg ethereum.CallMsg, blockNumber *big.Int) ([]byte, error) { - var hex hexutil.Bytes - err := cc.c.CallContext(ctx, &hex, "eth_call", toCallArg(msg), "latest") - if err != nil { - return nil, err - } - return hex, nil -} - -func toCallArg(msg ethereum.CallMsg) interface{} { - arg := map[string]interface{}{ - "from": msg.From, - "to": msg.To, - } - if len(msg.Data) > 0 { - arg["data"] = hexutil.Bytes(msg.Data) - } - if msg.Value != nil { - arg["value"] = (*hexutil.Big)(msg.Value) - } - if msg.Gas != 0 { - arg["gas"] = hexutil.Uint64(msg.Gas) - } - if msg.GasPrice != nil { - arg["gasPrice"] = (*hexutil.Big)(msg.GasPrice) - } - return arg -} diff --git a/node/get_status_node.go b/node/get_status_node.go index db196db30..9db26f821 100644 --- a/node/get_status_node.go +++ b/node/get_status_node.go @@ -231,7 +231,7 @@ func (n *StatusNode) setupRPCClient() (err error) { if err != nil { return } - n.rpcClient, err = rpc.NewClient(gethNodeClient, n.config.UpstreamConfig) + n.rpcClient, err = rpc.NewClient(gethNodeClient, n.config.NetworkID, n.config.UpstreamConfig, n.config.Networks, n.appDB) if err != nil { return } @@ -350,7 +350,7 @@ func (n *StatusNode) startDiscovery() error { if err := n.register.Start(); err != nil { return err } - return n.peerPool.Start(n.gethNode.Server(), n.rpcClient) + return n.peerPool.Start(n.gethNode.Server()) } // Stop will stop current StatusNode. A stopped node cannot be resumed. diff --git a/node/status_node_services.go b/node/status_node_services.go index c188328d7..7ae37fc35 100644 --- a/node/status_node_services.go +++ b/node/status_node_services.go @@ -37,7 +37,6 @@ import ( "github.com/status-im/status-go/services/wakuext" "github.com/status-im/status-go/services/wakuv2ext" "github.com/status-im/status-go/services/wallet" - "github.com/status-im/status-go/services/wallet/network" "github.com/status-im/status-go/timesource" "github.com/status-im/status-go/waku" wakucommon "github.com/status-im/status-go/waku/common" @@ -104,7 +103,7 @@ func (b *StatusNode) initServices(config *params.NodeConfig) error { } if config.WalletConfig.Enabled { - walletService := b.walletService(config.NetworkID, config.Networks, accountsFeed) + walletService := b.walletService(accountsFeed) services = append(services, walletService) } @@ -367,9 +366,9 @@ func (b *StatusNode) appmetricsService() common.StatusService { return b.appMetricsSrvc } -func (b *StatusNode) walletService(chainID uint64, networks []network.Network, accountsFeed *event.Feed) common.StatusService { +func (b *StatusNode) walletService(accountsFeed *event.Feed) common.StatusService { if b.walletSrvc == nil { - b.walletSrvc = wallet.NewService(b.appDB, chainID, b.rpcClient.Ethclient(), networks, accountsFeed) + b.walletSrvc = wallet.NewService(b.appDB, b.rpcClient, accountsFeed) } return b.walletSrvc } @@ -386,7 +385,6 @@ func (b *StatusNode) peerService() *peer.Service { b.peerSrvc = peer.New() } return b.peerSrvc - } func registerWakuMailServer(wakuService *waku.Waku, config *params.WakuConfig) (err error) { diff --git a/params/config.go b/params/config.go index 537b59bf6..0eafd5d37 100644 --- a/params/config.go +++ b/params/config.go @@ -22,7 +22,7 @@ import ( "github.com/status-im/status-go/eth-node/crypto" "github.com/status-im/status-go/eth-node/types" "github.com/status-im/status-go/protocol/pushnotificationserver" - "github.com/status-im/status-go/services/wallet/network" + "github.com/status-im/status-go/rpc/network" "github.com/status-im/status-go/static" wakucommon "github.com/status-im/status-go/waku/common" wakuv2common "github.com/status-im/status-go/wakuv2/common" diff --git a/peers/peerpool.go b/peers/peerpool.go index 4d6e3484d..7f7c4b6df 100644 --- a/peers/peerpool.go +++ b/peers/peerpool.go @@ -12,7 +12,6 @@ import ( "github.com/ethereum/go-ethereum/p2p/discv5" "github.com/ethereum/go-ethereum/p2p/enode" - "github.com/status-im/status-go/contracts" "github.com/status-im/status-go/discovery" "github.com/status-im/status-go/params" "github.com/status-im/status-go/peers/verifier" @@ -128,7 +127,7 @@ func (p *PeerPool) setDiscoveryTimeout() { } // Start creates topic pool for each topic in config and subscribes to server events. -func (p *PeerPool) Start(server *p2p.Server, rpcClient contracts.RPCClient) error { +func (p *PeerPool) Start(server *p2p.Server) error { if !p.discovery.Running() { return ErrDiscv5NotRunning } @@ -156,7 +155,7 @@ func (p *PeerPool) Start(server *p2p.Server, rpcClient contracts.RPCClient) erro var topicPool TopicPoolInterface t := newTopicPool(p.discovery, topic, limits, p.opts.SlowSync, p.opts.FastSync, p.cache) if topic == MailServerDiscoveryTopic { - v, err := p.initVerifier(rpcClient) + v, err := p.initVerifier() if err != nil { return err } @@ -176,7 +175,7 @@ func (p *PeerPool) Start(server *p2p.Server, rpcClient contracts.RPCClient) erro return nil } -func (p *PeerPool) initVerifier(rpcClient contracts.RPCClient) (v Verifier, err error) { +func (p *PeerPool) initVerifier() (v Verifier, err error) { return verifier.NewLocalVerifier(p.opts.TrustedMailServers), nil } diff --git a/peers/peerpool_test.go b/peers/peerpool_test.go index 07df3f06e..526b6dd82 100644 --- a/peers/peerpool_test.go +++ b/peers/peerpool_test.go @@ -134,7 +134,7 @@ func (s *PeerPoolSimulationSuite) TestPeerPoolCacheEthV5() { peerPool := NewPeerPool(s.discovery[1], config, cache, peerPoolOpts) // start peer pool - s.Require().NoError(peerPool.Start(s.peers[1], nil)) + s.Require().NoError(peerPool.Start(s.peers[1])) defer peerPool.Stop() // check if cache is passed to topic pools @@ -179,7 +179,7 @@ func TestPeerPoolMaxPeersOverflow(t *testing.T) { poolOpts := &Options{DefaultFastSync, DefaultSlowSync, 0, true, 100 * time.Millisecond, nil} pool := NewPeerPool(discovery, nil, nil, poolOpts) - require.NoError(t, pool.Start(peer, nil)) + require.NoError(t, pool.Start(peer)) require.Equal(t, signal.EventDiscoveryStarted, <-signals) // without config, it will stop the discovery because all topic pools are satisfied pool.events <- &p2p.PeerEvent{Type: p2p.PeerEventTypeAdd} @@ -232,7 +232,7 @@ func TestPeerPoolDiscV5Timeout(t *testing.T) { // start PeerPool poolOpts := &Options{DefaultFastSync, DefaultSlowSync, time.Millisecond * 100, true, 100 * time.Millisecond, nil} pool := NewPeerPool(discovery, nil, nil, poolOpts) - require.NoError(t, pool.Start(server, nil)) + require.NoError(t, pool.Start(server)) require.Equal(t, signal.EventDiscoveryStarted, <-signals) // timeout after finding no peers @@ -279,7 +279,7 @@ func TestPeerPoolNotAllowedStopping(t *testing.T) { // start PeerPool poolOpts := &Options{DefaultFastSync, DefaultSlowSync, time.Millisecond * 100, false, 100 * time.Millisecond, nil} pool := NewPeerPool(discovery, nil, nil, poolOpts) - require.NoError(t, pool.Start(server, nil)) + require.NoError(t, pool.Start(server)) // wait 2x timeout duration <-time.After(pool.opts.DiscServerTimeout * 2) @@ -300,7 +300,7 @@ func (s *PeerPoolSimulationSuite) TestUpdateTopicLimits() { peerPool := NewPeerPool(s.discovery[1], config, cache, peerPoolOpts) // start peer pool - s.Require().NoError(peerPool.Start(s.peers[1], nil)) + s.Require().NoError(peerPool.Start(s.peers[1])) defer peerPool.Stop() for _, topicPool := range peerPool.topics { @@ -377,7 +377,7 @@ func (s *PeerPoolSimulationSuite) TestMailServerPeersDiscovery() { []enode.ID{s.peers[0].Self().ID()}, } peerPool := NewPeerPool(s.discovery[1], config, cache, peerPoolOpts) - s.Require().NoError(peerPool.Start(s.peers[1], nil)) + s.Require().NoError(peerPool.Start(s.peers[1])) defer peerPool.Stop() // wait for and verify the mail server peer diff --git a/rpc/call_raw.go b/rpc/call_raw.go index a0ebb449f..42873334e 100644 --- a/rpc/call_raw.go +++ b/rpc/call_raw.go @@ -35,8 +35,9 @@ type jsonrpcMessage struct { type jsonrpcRequest struct { jsonrpcMessage - Method string `json:"method"` - Params json.RawMessage `json:"params,omitempty"` + ChainID uint64 `json:"chainId"` + Method string `json:"method"` + Params json.RawMessage `json:"params,omitempty"` } type jsonrpcSuccessfulResponse struct { @@ -110,14 +111,18 @@ func (c *Client) callBatchMethods(ctx context.Context, msgs json.RawMessage) str // callSingleMethod executes single JSON-RPC message and constructs proper response. func (c *Client) callSingleMethod(ctx context.Context, msg json.RawMessage) string { // unmarshal JSON body into json-rpc request - method, params, id, err := methodAndParamsFromBody(msg) + chainID, method, params, id, err := methodAndParamsFromBody(msg) if err != nil { return newErrorResponse(errInvalidMessageCode, err, id) } + if chainID == 0 { + chainID = c.UpstreamChainID + } + // route and execute var result json.RawMessage - err = c.CallContext(ctx, &result, method, params...) + err = c.CallContext(ctx, &result, chainID, method, params...) // as we have to return original JSON, we have to // analyze returned error and reconstruct original @@ -138,21 +143,20 @@ func (c *Client) callSingleMethod(ctx context.Context, msg json.RawMessage) stri // JSON-RPC body into values ready to use with ethereum-go's // RPC client Call() function. A lot of empty interface usage is // due to the underlying code design :/ -func methodAndParamsFromBody(body json.RawMessage) (string, []interface{}, json.RawMessage, error) { +func methodAndParamsFromBody(body json.RawMessage) (uint64, string, []interface{}, json.RawMessage, error) { msg, err := unmarshalMessage(body) if err != nil { - return "", nil, nil, err + return 0, "", nil, nil, err } - params := []interface{}{} if msg.Params != nil { err = json.Unmarshal(msg.Params, ¶ms) if err != nil { - return "", nil, nil, err + return 0, "", nil, nil, err } } - return msg.Method, params, msg.ID, nil + return msg.ChainID, msg.Method, params, msg.ID, nil } // unmarshalMessage tries to unmarshal JSON-RPC message. diff --git a/rpc/call_raw_test.go b/rpc/call_raw_test.go index a75912223..bb75622c6 100644 --- a/rpc/call_raw_test.go +++ b/rpc/call_raw_test.go @@ -57,6 +57,7 @@ func TestMethodAndParamsFromBody(t *testing.T) { params []interface{} method string id json.RawMessage + chainID uint64 shouldFail bool }{ { @@ -70,6 +71,7 @@ func TestMethodAndParamsFromBody(t *testing.T) { }, "subtract", json.RawMessage(`42`), + 0, false, }, { @@ -78,6 +80,7 @@ func TestMethodAndParamsFromBody(t *testing.T) { []interface{}{}, "test", nil, + 0, false, }, { @@ -86,6 +89,16 @@ func TestMethodAndParamsFromBody(t *testing.T) { []interface{}{}, "test", nil, + 0, + false, + }, + { + "params_chain_id", + json.RawMessage(`{"jsonrpc": "2.0", "chainId": 2, "method": "test"}`), + []interface{}{}, + "test", + nil, + 2, false, }, { @@ -94,6 +107,7 @@ func TestMethodAndParamsFromBody(t *testing.T) { []interface{}{string("3de6a8867aeb75be74d68478b853b4b0e063704d30f8231c45d0fcbd97af207e")}, "shh_getFilterMessages", json.RawMessage(`44`), + 0, false, }, { @@ -102,6 +116,7 @@ func TestMethodAndParamsFromBody(t *testing.T) { []interface{}{}, "", nil, + 0, true, }, { @@ -110,18 +125,20 @@ func TestMethodAndParamsFromBody(t *testing.T) { []interface{}{}, "", nil, + 0, true, }, } for _, test := range cases { t.Run(test.name, func(t *testing.T) { - method, params, id, err := methodAndParamsFromBody(test.body) + chainID, method, params, id, err := methodAndParamsFromBody(test.body) if test.shouldFail { require.Error(t, err) return } require.NoError(t, err) + require.Equal(t, test.chainID, chainID) require.Equal(t, test.method, method) require.Equal(t, test.params, params) require.EqualValues(t, test.id, id) diff --git a/rpc/client.go b/rpc/client.go index a160b039a..82621d490 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -2,6 +2,7 @@ package rpc import ( "context" + "database/sql" "encoding/json" "errors" "fmt" @@ -14,6 +15,7 @@ import ( gethrpc "github.com/ethereum/go-ethereum/rpc" "github.com/status-im/status-go/params" + "github.com/status-im/status-go/rpc/network" "github.com/status-im/status-go/services/rpcstats" ) @@ -28,7 +30,7 @@ var ( ) // Handler defines handler for RPC methods. -type Handler func(context.Context, ...interface{}) (interface{}, error) +type Handler func(context.Context, uint64, ...interface{}) (interface{}, error) // Client represents RPC client with custom routing // scheme. It automatically decides where RPC call @@ -38,11 +40,14 @@ type Client struct { upstreamEnabled bool upstreamURL string + UpstreamChainID uint64 - local *gethrpc.Client - upstream *gethrpc.Client + local *gethrpc.Client + upstream *gethrpc.Client + rpcClients map[uint64]*gethrpc.Client - router *router + router *router + NetworkManager *network.Manager handlersMx sync.RWMutex // mx guards handlers handlers map[string]Handler // locally registered handlers @@ -54,16 +59,26 @@ type Client struct { // // Client is safe for concurrent use and will automatically // reconnect to the server if connection is lost. -func NewClient(client *gethrpc.Client, upstream params.UpstreamRPCConfig) (*Client, error) { - c := Client{ - local: client, - handlers: make(map[string]Handler), - log: log.New("package", "status-go/rpc.Client"), - } - +func NewClient(client *gethrpc.Client, upstreamChainID uint64, upstream params.UpstreamRPCConfig, networks []network.Network, db *sql.DB) (*Client, error) { var err error + log := log.New("package", "status-go/rpc.Client") + networkManager := network.NewManager(db) + err = networkManager.Init(networks) + if err != nil { + log.Error("Network manager failed to initialize", "error", err) + } + + c := Client{ + local: client, + NetworkManager: networkManager, + handlers: make(map[string]Handler), + rpcClients: make(map[uint64]*gethrpc.Client), + log: log, + } + if upstream.Enabled { + c.UpstreamChainID = upstreamChainID c.upstreamEnabled = upstream.Enabled c.upstreamURL = upstream.URL c.upstream, err = gethrpc.Dial(c.upstreamURL) @@ -77,12 +92,41 @@ func NewClient(client *gethrpc.Client, upstream params.UpstreamRPCConfig) (*Clie return &c, nil } -// Ethclient returns ethclient.Client with upstream or local client. -func (c *Client) Ethclient() *ethclient.Client { - if c.upstreamEnabled { - return ethclient.NewClient(c.upstream) +func (c *Client) getRPCClientWithCache(chainID uint64) (*gethrpc.Client, error) { + if !c.upstreamEnabled { + return c.local, nil } - return ethclient.NewClient(c.local) + + if c.UpstreamChainID == chainID { + return c.upstream, nil + } + + if rpcClient, ok := c.rpcClients[chainID]; ok { + return rpcClient, nil + } + + network := c.NetworkManager.Find(chainID) + if network == nil { + return nil, fmt.Errorf("could not find network: %d", chainID) + } + + rpcClient, err := gethrpc.Dial(network.RPCURL) + if err != nil { + return nil, fmt.Errorf("dial upstream server: %s", err) + } + + c.rpcClients[chainID] = rpcClient + return rpcClient, nil +} + +// Ethclient returns ethclient.Client per chain +func (c *Client) EthClient(chainID uint64) (*ethclient.Client, error) { + rpcClient, err := c.getRPCClientWithCache(chainID) + if err != nil { + return nil, err + } + + return ethclient.NewClient(rpcClient), nil } // UpdateUpstreamURL changes the upstream RPC client URL, if the upstream is enabled. @@ -111,9 +155,9 @@ func (c *Client) UpdateUpstreamURL(url string) error { // can also pass nil, in which case the result is ignored. // // It uses custom routing scheme for calls. -func (c *Client) Call(result interface{}, method string, args ...interface{}) error { +func (c *Client) Call(result interface{}, chainID uint64, method string, args ...interface{}) error { ctx := context.Background() - return c.CallContext(ctx, result, method, args...) + return c.CallContext(ctx, result, chainID, method, args...) } // CallContext performs a JSON-RPC call with the given arguments. If the context is @@ -124,7 +168,7 @@ func (c *Client) Call(result interface{}, method string, args ...interface{}) er // // It uses custom routing scheme for calls. // If there are any local handlers registered for this call, they will handle it. -func (c *Client) CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error { +func (c *Client) CallContext(ctx context.Context, result interface{}, chainID uint64, method string, args ...interface{}) error { rpcstats.CountCall(method) if c.router.routeBlocked(method) { return ErrMethodNotFound @@ -132,10 +176,10 @@ func (c *Client) CallContext(ctx context.Context, result interface{}, method str // check locally registered handlers first if handler, ok := c.handler(method); ok { - return c.callMethod(ctx, result, handler, args...) + return c.callMethod(ctx, result, chainID, handler, args...) } - return c.CallContextIgnoringLocalHandlers(ctx, result, method, args...) + return c.CallContextIgnoringLocalHandlers(ctx, result, chainID, method, args...) } // CallContextIgnoringLocalHandlers performs a JSON-RPC call with the given @@ -145,16 +189,17 @@ func (c *Client) CallContext(ctx context.Context, result interface{}, method str // be ignored. It is useful if the call is happening from within a local // handler itself. // Upstream calls routing will be used anyway. -func (c *Client) CallContextIgnoringLocalHandlers(ctx context.Context, result interface{}, method string, args ...interface{}) error { +func (c *Client) CallContextIgnoringLocalHandlers(ctx context.Context, result interface{}, chainID uint64, method string, args ...interface{}) error { if c.router.routeBlocked(method) { return ErrMethodNotFound } if c.router.routeRemote(method) { - c.RLock() - client := c.upstream - c.RUnlock() - return client.CallContext(ctx, result, method, args...) + ethClient, err := c.getRPCClientWithCache(chainID) + if err != nil { + return err + } + return ethClient.CallContext(ctx, result, method, args...) } if c.local == nil { @@ -179,8 +224,8 @@ func (c *Client) RegisterHandler(method string, handler Handler) { // It handles proper params and result converting // // TODO(divan): use cancellation via context here? -func (c *Client) callMethod(ctx context.Context, result interface{}, handler Handler, args ...interface{}) error { - response, err := handler(ctx, args...) +func (c *Client) callMethod(ctx context.Context, result interface{}, chainID uint64, handler Handler, args ...interface{}) error { + response, err := handler(ctx, chainID, args...) if err != nil { return err } diff --git a/rpc/client_test.go b/rpc/client_test.go index c503ea84e..b3521e867 100644 --- a/rpc/client_test.go +++ b/rpc/client_test.go @@ -2,19 +2,38 @@ package rpc import ( "context" + "database/sql" "fmt" + "io/ioutil" "net/http" "net/http/httptest" + "os" "testing" "github.com/stretchr/testify/require" + "github.com/status-im/status-go/appdatabase" "github.com/status-im/status-go/params" + "github.com/status-im/status-go/rpc/network" gethrpc "github.com/ethereum/go-ethereum/rpc" ) +func setupTestNetworkDB(t *testing.T) (*sql.DB, func()) { + tmpfile, err := ioutil.TempFile("", "rpc-network-tests-") + require.NoError(t, err) + db, err := appdatabase.InitializeDB(tmpfile.Name(), "rpc-network-tests") + require.NoError(t, err) + return db, func() { + require.NoError(t, db.Close()) + require.NoError(t, os.Remove(tmpfile.Name())) + } +} + func TestBlockedRoutesCall(t *testing.T) { + db, close := setupTestNetworkDB(t) + defer close() + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { fmt.Fprintln(w, `{ "id": 1, @@ -27,7 +46,7 @@ func TestBlockedRoutesCall(t *testing.T) { gethRPCClient, err := gethrpc.Dial(ts.URL) require.NoError(t, err) - c, err := NewClient(gethRPCClient, params.UpstreamRPCConfig{Enabled: false, URL: ""}) + c, err := NewClient(gethRPCClient, 1, params.UpstreamRPCConfig{Enabled: false, URL: ""}, []network.Network{}, db) require.NoError(t, err) for _, m := range blockedMethods { @@ -36,21 +55,24 @@ func TestBlockedRoutesCall(t *testing.T) { err error ) - err = c.Call(&result, m) + err = c.Call(&result, 1, m) require.EqualError(t, err, ErrMethodNotFound.Error()) require.Nil(t, result) - err = c.CallContext(context.Background(), &result, m) + err = c.CallContext(context.Background(), &result, 1, m) require.EqualError(t, err, ErrMethodNotFound.Error()) require.Nil(t, result) - err = c.CallContextIgnoringLocalHandlers(context.Background(), &result, m) + err = c.CallContextIgnoringLocalHandlers(context.Background(), &result, 1, m) require.EqualError(t, err, ErrMethodNotFound.Error()) require.Nil(t, result) } } func TestBlockedRoutesRawCall(t *testing.T) { + db, close := setupTestNetworkDB(t) + defer close() + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { fmt.Fprintln(w, `{ "id": 1, @@ -63,7 +85,7 @@ func TestBlockedRoutesRawCall(t *testing.T) { gethRPCClient, err := gethrpc.Dial(ts.URL) require.NoError(t, err) - c, err := NewClient(gethRPCClient, params.UpstreamRPCConfig{Enabled: false, URL: ""}) + c, err := NewClient(gethRPCClient, 1, params.UpstreamRPCConfig{Enabled: false, URL: ""}, []network.Network{}, db) require.NoError(t, err) for _, m := range blockedMethods { @@ -78,6 +100,9 @@ func TestBlockedRoutesRawCall(t *testing.T) { } func TestUpdateUpstreamURL(t *testing.T) { + db, close := setupTestNetworkDB(t) + defer close() + ts := createTestServer("") defer ts.Close() @@ -87,7 +112,7 @@ func TestUpdateUpstreamURL(t *testing.T) { gethRPCClient, err := gethrpc.Dial(ts.URL) require.NoError(t, err) - c, err := NewClient(gethRPCClient, params.UpstreamRPCConfig{Enabled: true, URL: ts.URL}) + c, err := NewClient(gethRPCClient, 1, params.UpstreamRPCConfig{Enabled: true, URL: ts.URL}, []network.Network{}, db) require.NoError(t, err) require.Equal(t, ts.URL, c.upstreamURL) diff --git a/services/wallet/network/network.go b/rpc/network/network.go similarity index 72% rename from services/wallet/network/network.go rename to rpc/network/network.go index 89a14c210..7f38d38a7 100644 --- a/services/wallet/network/network.go +++ b/rpc/network/network.go @@ -3,10 +3,6 @@ package network import ( "bytes" "database/sql" - "fmt" - - "github.com/ethereum/go-ethereum/ethclient" - "github.com/ethereum/go-ethereum/rpc" ) type Network struct { @@ -85,22 +81,20 @@ func (nq *networksQuery) exec(db *sql.DB) ([]*Network, error) { } type Manager struct { - db *sql.DB - legacyChainID uint64 - legacyClient *ethclient.Client - chainClients map[uint64]*ChainClient + db *sql.DB } -func NewManager(db *sql.DB, legacyChainID uint64, legacyClient *ethclient.Client) *Manager { +func NewManager(db *sql.DB) *Manager { return &Manager{ - db: db, - legacyChainID: legacyChainID, - legacyClient: legacyClient, - chainClients: make(map[uint64]*ChainClient), + db: db, } } func (nm *Manager) Init(networks []Network) error { + if networks == nil { + return nil + } + currentNetworks, _ := nm.Get(false) if len(currentNetworks) > 0 { return nil @@ -116,42 +110,6 @@ func (nm *Manager) Init(networks []Network) error { return nil } -func (nm *Manager) GetChainClient(chainID uint64) (*ChainClient, error) { - if chainID == nm.legacyChainID { - return &ChainClient{eth: nm.legacyClient, ChainID: chainID}, nil - } - - if chainClient, ok := nm.chainClients[chainID]; ok { - return chainClient, nil - } - - network := nm.Find(chainID) - if network == nil { - return nil, fmt.Errorf("could not find network: %d", chainID) - } - - rpcClient, err := rpc.Dial(network.RPCURL) - if err != nil { - return nil, fmt.Errorf("dial upstream server: %s", err) - } - - chainClient := &ChainClient{eth: ethclient.NewClient(rpcClient), ChainID: chainID} - nm.chainClients[chainID] = chainClient - return chainClient, nil -} - -func (nm *Manager) GetChainClients(chainIDs []uint64) (res []*ChainClient, err error) { - for _, chainID := range chainIDs { - client, err := nm.GetChainClient(chainID) - if err != nil { - return nil, err - } - res = append(res, client) - } - - return res, nil -} - func (nm *Manager) Upsert(network *Network) error { _, err := nm.db.Exec( "INSERT OR REPLACE INTO networks (chain_id, chain_name, rpc_url, block_explorer_url, icon_url, native_currency_name, native_currency_symbol, native_currency_decimals, is_test, layer, enabled) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", diff --git a/services/wallet/network/network_test.go b/rpc/network/network_test.go similarity index 100% rename from services/wallet/network/network_test.go rename to rpc/network/network_test.go diff --git a/services/personal/api.go b/services/personal/api.go index adff39683..dff13bb6f 100644 --- a/services/personal/api.go +++ b/services/personal/api.go @@ -61,6 +61,7 @@ func (api *PublicAPI) Recover(rpcParams RecoverParams) (addr types.Address, err err = api.rpcClient.CallContextIgnoringLocalHandlers( ctx, &gethAddr, + api.rpcClient.UpstreamChainID, params.PersonalRecoverMethodName, rpcParams.Message, rpcParams.Signature) addr = types.Address(gethAddr) @@ -81,6 +82,7 @@ func (api *PublicAPI) Sign(rpcParams SignParams, verifiedAccount *account.Select err = api.rpcClient.CallContextIgnoringLocalHandlers( ctx, &gethResult, + api.rpcClient.UpstreamChainID, params.PersonalSignMethodName, rpcParams.Data, rpcParams.Address, rpcParams.Password) result = types.HexBytes(gethResult) diff --git a/services/rpcfilters/api.go b/services/rpcfilters/api.go index 744265461..25fd693b4 100644 --- a/services/rpcfilters/api.go +++ b/services/rpcfilters/api.go @@ -13,7 +13,7 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/eth/filters" "github.com/ethereum/go-ethereum/log" - "github.com/ethereum/go-ethereum/rpc" + getrpc "github.com/ethereum/go-ethereum/rpc" ) const ( @@ -36,14 +36,15 @@ type filter interface { // PublicAPI represents filter API that is exported to `eth` namespace type PublicAPI struct { filtersMu sync.Mutex - filters map[rpc.ID]filter + filters map[getrpc.ID]filter // filterLivenessLoop defines how often timeout loop is executed filterLivenessLoop time.Duration // filter liveness increased by this period when changes are requested filterLivenessPeriod time.Duration - client func() ContextCaller + client func() ContextCaller + chainID func() uint64 latestBlockChangedEvent *latestBlockChangedEvent transactionSentToUpstreamEvent *transactionSentToUpstreamEvent @@ -52,11 +53,12 @@ type PublicAPI struct { // NewPublicAPI returns a reference to the PublicAPI object func NewPublicAPI(s *Service) *PublicAPI { api := &PublicAPI{ - filters: make(map[rpc.ID]filter), + filters: make(map[getrpc.ID]filter), latestBlockChangedEvent: s.latestBlockChangedEvent, transactionSentToUpstreamEvent: s.transactionSentToUpstreamEvent, client: func() ContextCaller { return s.rpc.RPCClient() }, + chainID: func() uint64 { return s.rpc.RPCClient().UpstreamChainID }, filterLivenessLoop: defaultFilterLivenessPeriod, filterLivenessPeriod: defaultFilterLivenessPeriod + 10*time.Second, } @@ -89,8 +91,8 @@ func (api *PublicAPI) timeoutLoop(quit chan struct{}) { } } -func (api *PublicAPI) NewFilter(crit filters.FilterCriteria) (rpc.ID, error) { - id := rpc.ID(uuid.New()) +func (api *PublicAPI) NewFilter(crit filters.FilterCriteria) (getrpc.ID, error) { + id := getrpc.ID(uuid.New()) ctx, cancel := context.WithCancel(context.Background()) f := &logsFilter{ id: id, @@ -105,18 +107,18 @@ func (api *PublicAPI) NewFilter(crit filters.FilterCriteria) (rpc.ID, error) { api.filtersMu.Lock() api.filters[id] = f api.filtersMu.Unlock() - go pollLogs(api.client(), f, defaultLogsQueryTimeout, defaultLogsPeriod) + go pollLogs(api.client(), api.chainID(), f, defaultLogsQueryTimeout, defaultLogsPeriod) return id, nil } // NewBlockFilter is an implemenation of `eth_newBlockFilter` API // https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newblockfilter -func (api *PublicAPI) NewBlockFilter() rpc.ID { +func (api *PublicAPI) NewBlockFilter() getrpc.ID { api.filtersMu.Lock() defer api.filtersMu.Unlock() f := newHashFilter() - id := rpc.ID(uuid.New()) + id := getrpc.ID(uuid.New()) api.filters[id] = f @@ -142,12 +144,12 @@ func (api *PublicAPI) NewBlockFilter() rpc.ID { // NewPendingTransactionFilter is an implementation of `eth_newPendingTransactionFilter` API // https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newpendingtransactionfilter -func (api *PublicAPI) NewPendingTransactionFilter() rpc.ID { +func (api *PublicAPI) NewPendingTransactionFilter() getrpc.ID { api.filtersMu.Lock() defer api.filtersMu.Unlock() f := newHashFilter() - id := rpc.ID(uuid.New()) + id := getrpc.ID(uuid.New()) api.filters[id] = f @@ -174,7 +176,7 @@ func (api *PublicAPI) NewPendingTransactionFilter() rpc.ID { // UninstallFilter is an implemenation of `eth_uninstallFilter` API // https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_uninstallfilter -func (api *PublicAPI) UninstallFilter(id rpc.ID) bool { +func (api *PublicAPI) UninstallFilter(id getrpc.ID) bool { api.filtersMu.Lock() f, found := api.filters[id] if found { @@ -193,7 +195,7 @@ func (api *PublicAPI) UninstallFilter(id rpc.ID) bool { // If the filter could not be found an empty array of logs is returned. // // https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getfilterlogs -func (api *PublicAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]types.Log, error) { +func (api *PublicAPI) GetFilterLogs(ctx context.Context, id getrpc.ID) ([]types.Log, error) { api.filtersMu.Lock() f, exist := api.filters[id] api.filtersMu.Unlock() @@ -206,7 +208,7 @@ func (api *PublicAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]types.Log } ctx, cancel := context.WithTimeout(ctx, defaultLogsQueryTimeout) defer cancel() - rst, err := getLogs(ctx, api.client(), logs.originalCrit) + rst, err := getLogs(ctx, api.client(), api.chainID(), logs.originalCrit) return rst, err } @@ -214,7 +216,7 @@ func (api *PublicAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]types.Log // last time it was called. This can be used for polling. // // https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getfilterchanges -func (api *PublicAPI) GetFilterChanges(id rpc.ID) (interface{}, error) { +func (api *PublicAPI) GetFilterChanges(id getrpc.ID) (interface{}, error) { api.filtersMu.Lock() defer api.filtersMu.Unlock() diff --git a/services/rpcfilters/api_test.go b/services/rpcfilters/api_test.go index 537b90d9b..7e37f9f44 100644 --- a/services/rpcfilters/api_test.go +++ b/services/rpcfilters/api_test.go @@ -21,6 +21,7 @@ func TestFilterLiveness(t *testing.T) { filterLivenessLoop: 10 * time.Millisecond, filterLivenessPeriod: 15 * time.Millisecond, client: func() ContextCaller { return &callTracker{} }, + chainID: func() uint64 { return 1 }, } id, err := api.NewFilter(filters.FilterCriteria{}) require.NoError(t, err) @@ -60,6 +61,7 @@ func TestGetFilterChangesResetsTimer(t *testing.T) { filterLivenessLoop: 10 * time.Millisecond, filterLivenessPeriod: 15 * time.Millisecond, client: func() ContextCaller { return &callTracker{} }, + chainID: func() uint64 { return 1 }, } id, err := api.NewFilter(filters.FilterCriteria{}) require.NoError(t, err) @@ -86,6 +88,7 @@ func TestGetFilterLogs(t *testing.T) { api := &PublicAPI{ filters: make(map[rpc.ID]filter), client: func() ContextCaller { return tracker }, + chainID: func() uint64 { return 1 }, } block := big.NewInt(10) id, err := api.NewFilter(filters.FilterCriteria{ diff --git a/services/rpcfilters/latest_block_provider.go b/services/rpcfilters/latest_block_provider.go index d0de571ca..2d7670747 100644 --- a/services/rpcfilters/latest_block_provider.go +++ b/services/rpcfilters/latest_block_provider.go @@ -48,7 +48,7 @@ func (p *latestBlockProviderRPC) GetLatestBlock() (blockInfo, error) { var result blockInfo - err := rpcClient.Call(&result, "eth_getBlockByNumber", "latest", false) + err := rpcClient.Call(&result, rpcClient.UpstreamChainID, "eth_getBlockByNumber", "latest", false) if err != nil { return blockInfo{}, err diff --git a/services/rpcfilters/latest_logs.go b/services/rpcfilters/latest_logs.go index 916c5452a..d37b21131 100644 --- a/services/rpcfilters/latest_logs.go +++ b/services/rpcfilters/latest_logs.go @@ -9,19 +9,19 @@ import ( "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/log" - "github.com/ethereum/go-ethereum/rpc" + getRpc "github.com/ethereum/go-ethereum/rpc" ) // ContextCaller provides CallContext method as ethereums rpc.Client. type ContextCaller interface { - CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error + CallContext(ctx context.Context, result interface{}, chainID uint64, method string, args ...interface{}) error } -func pollLogs(client ContextCaller, f *logsFilter, timeout, period time.Duration) { +func pollLogs(client ContextCaller, chainID uint64, f *logsFilter, timeout, period time.Duration) { query := func() { ctx, cancel := context.WithTimeout(f.ctx, timeout) defer cancel() - logs, err := getLogs(ctx, client, f.criteria()) + logs, err := getLogs(ctx, client, chainID, f.criteria()) if err != nil { log.Error("Error fetch logs", "criteria", f.crit, "error", err) return @@ -43,8 +43,8 @@ func pollLogs(client ContextCaller, f *logsFilter, timeout, period time.Duration } } } -func getLogs(ctx context.Context, client ContextCaller, crit ethereum.FilterQuery) (rst []types.Log, err error) { - return rst, client.CallContext(ctx, &rst, "eth_getLogs", toFilterArg(crit)) +func getLogs(ctx context.Context, client ContextCaller, chainID uint64, crit ethereum.FilterQuery) (rst []types.Log, err error) { + return rst, client.CallContext(ctx, &rst, chainID, "eth_getLogs", toFilterArg(crit)) } func toFilterArg(q ethereum.FilterQuery) interface{} { @@ -61,9 +61,9 @@ func toFilterArg(q ethereum.FilterQuery) interface{} { } func toBlockNumArg(number *big.Int) string { - if number == nil || number.Int64() == rpc.LatestBlockNumber.Int64() { + if number == nil || number.Int64() == getRpc.LatestBlockNumber.Int64() { return "latest" - } else if number.Int64() == rpc.PendingBlockNumber.Int64() { + } else if number.Int64() == getRpc.PendingBlockNumber.Int64() { return "pending" } return hexutil.EncodeBig(number) diff --git a/services/rpcfilters/latest_logs_test.go b/services/rpcfilters/latest_logs_test.go index f9453dcb1..b53beaa75 100644 --- a/services/rpcfilters/latest_logs_test.go +++ b/services/rpcfilters/latest_logs_test.go @@ -24,7 +24,7 @@ type callTracker struct { criteria []map[string]interface{} } -func (c *callTracker) CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error { +func (c *callTracker) CallContext(ctx context.Context, result interface{}, chainID uint64, method string, args ...interface{}) error { c.mu.Lock() defer c.mu.Unlock() c.calls++ @@ -50,7 +50,7 @@ func runLogsFetcherTest(t *testing.T, f *logsFilter, replies [][]types.Log, quer var wg sync.WaitGroup wg.Add(1) go func() { - pollLogs(&c, f, time.Second, 100*time.Millisecond) + pollLogs(&c, 1, f, time.Second, 100*time.Millisecond) wg.Done() }() tick := time.Tick(10 * time.Millisecond) diff --git a/services/subscriptions/filters_eth.go b/services/subscriptions/filters_eth.go index 8acfc18d1..b6f48983f 100644 --- a/services/subscriptions/filters_eth.go +++ b/services/subscriptions/filters_eth.go @@ -19,7 +19,7 @@ func installEthFilter(rpcClient *rpc.Client, method string, args []interface{}) var result string - err := rpcClient.Call(&result, method, args...) + err := rpcClient.Call(&result, rpcClient.UpstreamChainID, method, args...) if err != nil { return nil, err @@ -41,13 +41,13 @@ func (ef *ethFilter) getID() string { func (ef *ethFilter) getChanges() ([]interface{}, error) { var result []interface{} - err := ef.rpcClient.Call(&result, "eth_getFilterChanges", ef.getID()) + err := ef.rpcClient.Call(&result, ef.rpcClient.UpstreamChainID, "eth_getFilterChanges", ef.getID()) return result, err } func (ef *ethFilter) uninstall() error { - return ef.rpcClient.Call(nil, "eth_uninstallFilter", ef.getID()) + return ef.rpcClient.Call(nil, ef.rpcClient.UpstreamChainID, "eth_uninstallFilter", ef.getID()) } func validateEthMethod(method string) error { diff --git a/services/subscriptions/filters_shh.go b/services/subscriptions/filters_shh.go index 230f61780..2da5b5006 100644 --- a/services/subscriptions/filters_shh.go +++ b/services/subscriptions/filters_shh.go @@ -19,7 +19,7 @@ func installShhFilter(rpcClient *rpc.Client, method string, args []interface{}) var result string - err := rpcClient.Call(&result, method, args...) + err := rpcClient.Call(&result, rpcClient.UpstreamChainID, method, args...) if err != nil { return nil, err @@ -36,7 +36,7 @@ func installShhFilter(rpcClient *rpc.Client, method string, args []interface{}) func (wf *whisperFilter) getChanges() ([]interface{}, error) { var result []interface{} - err := wf.rpcClient.Call(&result, "shh_getFilterMessages", wf.getID()) + err := wf.rpcClient.Call(&result, wf.rpcClient.UpstreamChainID, "shh_getFilterMessages", wf.getID()) return result, err } @@ -46,7 +46,7 @@ func (wf *whisperFilter) getID() string { } func (wf *whisperFilter) uninstall() error { - return wf.rpcClient.Call(nil, "shh_deleteMessageFilter", wf.getID()) + return wf.rpcClient.Call(nil, wf.rpcClient.UpstreamChainID, "shh_deleteMessageFilter", wf.getID()) } func validateShhMethod(method string) error { diff --git a/services/wallet/api.go b/services/wallet/api.go index 3afda097f..82ed7611b 100644 --- a/services/wallet/api.go +++ b/services/wallet/api.go @@ -6,7 +6,8 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/log" - "github.com/status-im/status-go/services/wallet/network" + "github.com/status-im/status-go/rpc/network" + "github.com/status-im/status-go/services/wallet/chain" "github.com/status-im/status-go/services/wallet/transfer" ) @@ -21,7 +22,7 @@ type API struct { // SetInitialBlocksRange sets initial blocks range func (api *API) SetInitialBlocksRange(ctx context.Context) error { - return api.s.transferController.SetInitialBlocksRange([]uint64{api.s.legacyChainID}) + return api.s.transferController.SetInitialBlocksRange([]uint64{api.s.rpcClient.UpstreamChainID}) } func (api *API) SetInitialBlocksRangeForChainIDs(ctx context.Context, chainIDs []uint64) error { @@ -29,7 +30,7 @@ func (api *API) SetInitialBlocksRangeForChainIDs(ctx context.Context, chainIDs [ } func (api *API) CheckRecentHistory(ctx context.Context, addresses []common.Address) error { - return api.s.transferController.CheckRecentHistory([]uint64{api.s.legacyChainID}, addresses) + return api.s.transferController.CheckRecentHistory([]uint64{api.s.rpcClient.UpstreamChainID}, addresses) } func (api *API) CheckRecentHistoryForChainIDs(ctx context.Context, chainIDs []uint64, addresses []common.Address) error { @@ -39,7 +40,7 @@ func (api *API) CheckRecentHistoryForChainIDs(ctx context.Context, chainIDs []ui // GetTransfersByAddress returns transfers for a single address func (api *API) GetTransfersByAddress(ctx context.Context, address common.Address, toBlock, limit *hexutil.Big, fetchMore bool) ([]transfer.View, error) { log.Debug("[WalletAPI:: GetTransfersByAddress] get transfers for an address", "address", address, "block", toBlock, "limit", limit) - return api.s.transferController.GetTransfersByAddress(ctx, api.s.legacyChainID, address, toBlock, limit, fetchMore) + return api.s.transferController.GetTransfersByAddress(ctx, api.s.rpcClient.UpstreamChainID, address, toBlock, limit, fetchMore) } func (api *API) GetTransfersByAddressAndChainID(ctx context.Context, chainID uint64, address common.Address, toBlock, limit *hexutil.Big, fetchMore bool) ([]transfer.View, error) { @@ -48,7 +49,7 @@ func (api *API) GetTransfersByAddressAndChainID(ctx context.Context, chainID uin } func (api *API) GetCachedBalances(ctx context.Context, addresses []common.Address) ([]transfer.LastKnownBlockView, error) { - return api.s.transferController.GetCachedBalances(ctx, api.s.legacyChainID, addresses) + return api.s.transferController.GetCachedBalances(ctx, api.s.rpcClient.UpstreamChainID, addresses) } func (api *API) GetCachedBalancesbyChainID(ctx context.Context, chainID uint64, addresses []common.Address) ([]transfer.LastKnownBlockView, error) { @@ -57,15 +58,15 @@ func (api *API) GetCachedBalancesbyChainID(ctx context.Context, chainID uint64, // GetTokensBalances return mapping of token balances for every account. func (api *API) GetTokensBalances(ctx context.Context, accounts, addresses []common.Address) (map[common.Address]map[common.Address]*hexutil.Big, error) { - client, err := api.s.networkManager.GetChainClient(api.s.legacyChainID) + chainClient, err := chain.NewLegacyClient(api.s.rpcClient) if err != nil { return nil, err } - return api.s.tokenManager.getBalances(ctx, []*network.ChainClient{client}, accounts, addresses) + return api.s.tokenManager.getBalances(ctx, []*chain.Client{chainClient}, accounts, addresses) } func (api *API) GetTokensBalancesForChainIDs(ctx context.Context, chainIDs []uint64, accounts, addresses []common.Address) (map[common.Address]map[common.Address]*hexutil.Big, error) { - clients, err := api.s.networkManager.GetChainClients(chainIDs) + clients, err := chain.NewClients(api.s.rpcClient, chainIDs) if err != nil { return nil, err } @@ -82,7 +83,7 @@ func (api *API) GetCustomTokens(ctx context.Context) ([]*Token, error) { func (api *API) AddCustomToken(ctx context.Context, token Token) error { log.Debug("call to create or edit custom token") if token.ChainID == 0 { - token.ChainID = api.s.legacyChainID + token.ChainID = api.s.rpcClient.UpstreamChainID } err := api.s.tokenManager.upsertCustom(token) log.Debug("result from database for create or edit custom token", "err", err) @@ -91,7 +92,7 @@ func (api *API) AddCustomToken(ctx context.Context, token Token) error { func (api *API) DeleteCustomToken(ctx context.Context, address common.Address) error { log.Debug("call to remove custom token") - err := api.s.tokenManager.deleteCustom(api.s.legacyChainID, address) + err := api.s.tokenManager.deleteCustom(api.s.rpcClient.UpstreamChainID, address) log.Debug("result from database for remove custom token", "err", err) return err } @@ -105,7 +106,7 @@ func (api *API) DeleteCustomTokenByChainID(ctx context.Context, chainID uint64, func (api *API) GetSavedAddresses(ctx context.Context) ([]*SavedAddress, error) { log.Debug("call to get saved addresses") - rst, err := api.s.savedAddressesManager.GetSavedAddresses(api.s.legacyChainID) + rst, err := api.s.savedAddressesManager.GetSavedAddresses(api.s.rpcClient.UpstreamChainID) log.Debug("result from database for saved addresses", "len", len(rst)) return rst, err } @@ -113,7 +114,7 @@ func (api *API) GetSavedAddresses(ctx context.Context) ([]*SavedAddress, error) func (api *API) AddSavedAddress(ctx context.Context, sa SavedAddress) error { log.Debug("call to create or edit saved address") if sa.ChainID == 0 { - sa.ChainID = api.s.legacyChainID + sa.ChainID = api.s.rpcClient.UpstreamChainID } err := api.s.savedAddressesManager.AddSavedAddress(sa) log.Debug("result from database for create or edit saved address", "err", err) @@ -122,14 +123,14 @@ func (api *API) AddSavedAddress(ctx context.Context, sa SavedAddress) error { func (api *API) DeleteSavedAddress(ctx context.Context, address common.Address) error { log.Debug("call to remove saved address") - err := api.s.savedAddressesManager.DeleteSavedAddress(api.s.legacyChainID, address) + err := api.s.savedAddressesManager.DeleteSavedAddress(api.s.rpcClient.UpstreamChainID, address) log.Debug("result from database for remove saved address", "err", err) return err } func (api *API) GetPendingTransactions(ctx context.Context) ([]*PendingTransaction, error) { log.Debug("call to get pending transactions") - rst, err := api.s.transactionManager.getAllPendings(api.s.legacyChainID) + rst, err := api.s.transactionManager.getAllPendings(api.s.rpcClient.UpstreamChainID) log.Debug("result from database for pending transactions", "len", len(rst)) return rst, err } @@ -143,7 +144,7 @@ func (api *API) GetPendingTransactionsByChainID(ctx context.Context, chainID uin func (api *API) GetPendingOutboundTransactionsByAddress(ctx context.Context, address common.Address) ([]*PendingTransaction, error) { log.Debug("call to get pending outbound transactions by address") - rst, err := api.s.transactionManager.getPendingByAddress(api.s.legacyChainID, address) + rst, err := api.s.transactionManager.getPendingByAddress(api.s.rpcClient.UpstreamChainID, address) log.Debug("result from database for pending transactions by address", "len", len(rst)) return rst, err } @@ -158,7 +159,7 @@ func (api *API) GetPendingOutboundTransactionsByAddressAndChainID(ctx context.Co func (api *API) StorePendingTransaction(ctx context.Context, trx PendingTransaction) error { log.Debug("call to create or edit pending transaction") if trx.ChainID == 0 { - trx.ChainID = api.s.legacyChainID + trx.ChainID = api.s.rpcClient.UpstreamChainID } err := api.s.transactionManager.addPending(trx) log.Debug("result from database for creating or editing a pending transaction", "err", err) @@ -167,7 +168,7 @@ func (api *API) StorePendingTransaction(ctx context.Context, trx PendingTransact func (api *API) DeletePendingTransaction(ctx context.Context, transactionHash common.Hash) error { log.Debug("call to remove pending transaction") - err := api.s.transactionManager.deletePending(api.s.legacyChainID, transactionHash) + err := api.s.transactionManager.deletePending(api.s.rpcClient.UpstreamChainID, transactionHash) log.Debug("result from database for remove pending transaction", "err", err) return err } @@ -180,20 +181,18 @@ func (api *API) DeletePendingTransactionByChainID(ctx context.Context, chainID u } func (api *API) WatchTransaction(ctx context.Context, transactionHash common.Hash) error { - chainClient, err := api.s.networkManager.GetChainClient(api.s.legacyChainID) + chainClient, err := chain.NewLegacyClient(api.s.rpcClient) if err != nil { return err } - return api.s.transactionManager.watch(ctx, transactionHash, chainClient) } func (api *API) WatchTransactionByChainID(ctx context.Context, chainID uint64, transactionHash common.Hash) error { - chainClient, err := api.s.networkManager.GetChainClient(chainID) + chainClient, err := chain.NewClient(api.s.rpcClient, chainID) if err != nil { return err } - return api.s.transactionManager.watch(ctx, transactionHash, chainClient) } @@ -237,15 +236,15 @@ func (api *API) GetOpenseaAssetsByOwnerAndCollection(ctx context.Context, chainI func (api *API) AddEthereumChain(ctx context.Context, network network.Network) error { log.Debug("call to AddEthereumChain") - return api.s.networkManager.Upsert(&network) + return api.s.rpcClient.NetworkManager.Upsert(&network) } func (api *API) DeleteEthereumChain(ctx context.Context, chainID uint64) error { log.Debug("call to DeleteEthereumChain") - return api.s.networkManager.Delete(chainID) + return api.s.rpcClient.NetworkManager.Delete(chainID) } func (api *API) GetEthereumChains(ctx context.Context, onlyEnabled bool) ([]*network.Network, error) { log.Debug("call to GetEthereumChains") - return api.s.networkManager.Get(onlyEnabled) + return api.s.rpcClient.NetworkManager.Get(onlyEnabled) } diff --git a/services/wallet/chain/client.go b/services/wallet/chain/client.go new file mode 100644 index 000000000..081958d76 --- /dev/null +++ b/services/wallet/chain/client.go @@ -0,0 +1,100 @@ +package chain + +import ( + "context" + "math/big" + + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/ethclient" + "github.com/status-im/status-go/rpc" + "github.com/status-im/status-go/services/rpcstats" +) + +type Client struct { + eth *ethclient.Client + ChainID uint64 +} + +func NewClient(rpc *rpc.Client, chainID uint64) (*Client, error) { + ethClient, err := rpc.EthClient(chainID) + if err != nil { + return nil, err + } + return &Client{ethClient, chainID}, nil +} + +func NewLegacyClient(rpc *rpc.Client) (*Client, error) { + return NewClient(rpc, rpc.UpstreamChainID) +} + +func NewClients(rpc *rpc.Client, chainIDs []uint64) (res []*Client, err error) { + for _, chainID := range chainIDs { + client, err := NewClient(rpc, chainID) + if err != nil { + return nil, err + } + res = append(res, client) + } + return res, nil +} + +func (cc *Client) ToBigInt() *big.Int { + return big.NewInt(int64(cc.ChainID)) +} + +func (cc *Client) HeaderByHash(ctx context.Context, hash common.Hash) (*types.Header, error) { + rpcstats.CountCall("eth_getBlockByHash") + return cc.eth.HeaderByHash(ctx, hash) +} + +func (cc *Client) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) { + rpcstats.CountCall("eth_getBlockByNumber") + return cc.eth.HeaderByNumber(ctx, number) +} + +func (cc *Client) BlockByHash(ctx context.Context, hash common.Hash) (*types.Block, error) { + rpcstats.CountCall("eth_getBlockByHash") + return cc.eth.BlockByHash(ctx, hash) +} + +func (cc *Client) BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error) { + rpcstats.CountCall("eth_getBlockByNumber") + return cc.eth.BlockByNumber(ctx, number) +} + +func (cc *Client) BalanceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (*big.Int, error) { + rpcstats.CountCall("eth_getBalance") + return cc.eth.BalanceAt(ctx, account, blockNumber) +} + +func (cc *Client) NonceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (uint64, error) { + rpcstats.CountCall("eth_getTransactionCount") + return cc.eth.NonceAt(ctx, account, blockNumber) +} + +func (cc *Client) TransactionReceipt(ctx context.Context, txHash common.Hash) (*types.Receipt, error) { + rpcstats.CountCall("eth_getTransactionReceipt") + return cc.eth.TransactionReceipt(ctx, txHash) +} + +func (cc *Client) TransactionByHash(ctx context.Context, hash common.Hash) (tx *types.Transaction, isPending bool, err error) { + rpcstats.CountCall("eth_getTransactionByHash") + return cc.eth.TransactionByHash(ctx, hash) +} + +func (cc *Client) FilterLogs(ctx context.Context, q ethereum.FilterQuery) ([]types.Log, error) { + rpcstats.CountCall("eth_getLogs") + return cc.eth.FilterLogs(ctx, q) +} + +func (cc *Client) CodeAt(ctx context.Context, contract common.Address, blockNumber *big.Int) ([]byte, error) { + rpcstats.CountCall("eth_getCode") + return cc.eth.CodeAt(ctx, contract, blockNumber) +} + +func (cc *Client) CallContract(ctx context.Context, call ethereum.CallMsg, blockNumber *big.Int) ([]byte, error) { + rpcstats.CountCall("eth_call") + return cc.eth.CallContract(ctx, call, blockNumber) +} diff --git a/services/wallet/network/chain_client.go b/services/wallet/network/chain_client.go deleted file mode 100644 index 1f783b686..000000000 --- a/services/wallet/network/chain_client.go +++ /dev/null @@ -1,76 +0,0 @@ -package network - -import ( - "context" - "math/big" - - "github.com/ethereum/go-ethereum" - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/ethclient" - "github.com/status-im/status-go/services/rpcstats" -) - -type ChainClient struct { - eth *ethclient.Client - ChainID uint64 -} - -func (cc *ChainClient) ToBigInt() *big.Int { - return big.NewInt(int64(cc.ChainID)) -} - -func (cc *ChainClient) HeaderByHash(ctx context.Context, hash common.Hash) (*types.Header, error) { - rpcstats.CountCall("eth_getBlockByHash") - return cc.eth.HeaderByHash(ctx, hash) -} - -func (cc *ChainClient) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) { - rpcstats.CountCall("eth_getBlockByNumber") - return cc.eth.HeaderByNumber(ctx, number) -} - -func (cc *ChainClient) BlockByHash(ctx context.Context, hash common.Hash) (*types.Block, error) { - rpcstats.CountCall("eth_getBlockByHash") - return cc.eth.BlockByHash(ctx, hash) -} - -func (cc *ChainClient) BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error) { - rpcstats.CountCall("eth_getBlockByNumber") - return cc.eth.BlockByNumber(ctx, number) -} - -func (cc *ChainClient) BalanceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (*big.Int, error) { - rpcstats.CountCall("eth_getBalance") - return cc.eth.BalanceAt(ctx, account, blockNumber) -} - -func (cc *ChainClient) NonceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (uint64, error) { - rpcstats.CountCall("eth_getTransactionCount") - return cc.eth.NonceAt(ctx, account, blockNumber) -} - -func (cc *ChainClient) TransactionReceipt(ctx context.Context, txHash common.Hash) (*types.Receipt, error) { - rpcstats.CountCall("eth_getTransactionReceipt") - return cc.eth.TransactionReceipt(ctx, txHash) -} - -func (cc *ChainClient) TransactionByHash(ctx context.Context, hash common.Hash) (tx *types.Transaction, isPending bool, err error) { - rpcstats.CountCall("eth_getTransactionByHash") - return cc.eth.TransactionByHash(ctx, hash) -} - -func (cc *ChainClient) FilterLogs(ctx context.Context, q ethereum.FilterQuery) ([]types.Log, error) { - rpcstats.CountCall("eth_getLogs") - return cc.eth.FilterLogs(ctx, q) -} - -func (cc *ChainClient) CodeAt(ctx context.Context, contract common.Address, blockNumber *big.Int) ([]byte, error) { - rpcstats.CountCall("eth_getCode") - return cc.eth.CodeAt(ctx, contract, blockNumber) -} - -func (cc *ChainClient) CallContract(ctx context.Context, call ethereum.CallMsg, blockNumber *big.Int) ([]byte, error) { - rpcstats.CountCall("eth_call") - return cc.eth.CallContract(ctx, call, blockNumber) -} diff --git a/services/wallet/service.go b/services/wallet/service.go index b05ccacc4..d293e6ea6 100644 --- a/services/wallet/service.go +++ b/services/wallet/service.go @@ -3,17 +3,17 @@ package wallet import ( "database/sql" - "github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p" - "github.com/ethereum/go-ethereum/rpc" - "github.com/status-im/status-go/services/wallet/network" + gethrpc "github.com/ethereum/go-ethereum/rpc" + + "github.com/status-im/status-go/rpc" "github.com/status-im/status-go/services/wallet/transfer" ) // NewService initializes service instance. -func NewService(db *sql.DB, legacyChainID uint64, legacyClient *ethclient.Client, networks []network.Network, accountFeed *event.Feed) *Service { +func NewService(db *sql.DB, rpcClient *rpc.Client, accountFeed *event.Feed) *Service { cryptoOnRampManager := NewCryptoOnRampManager(&CryptoOnRampOptions{ dataSourceType: DataSourceStatic, }) @@ -21,36 +21,28 @@ func NewService(db *sql.DB, legacyChainID uint64, legacyClient *ethclient.Client savedAddressesManager := &SavedAddressesManager{db: db} transactionManager := &TransactionManager{db: db} favouriteManager := &FavouriteManager{db: db} - networkManager := network.NewManager(db, legacyChainID, legacyClient) - err := networkManager.Init(networks) - if err != nil { - log.Error("Network manager failed to initialize", "error", err) - } - - transferController := transfer.NewTransferController(db, networkManager, accountFeed) + transferController := transfer.NewTransferController(db, rpcClient, accountFeed) return &Service{ + rpcClient: rpcClient, favouriteManager: favouriteManager, - networkManager: networkManager, tokenManager: tokenManager, savedAddressesManager: savedAddressesManager, transactionManager: transactionManager, transferController: transferController, cryptoOnRampManager: cryptoOnRampManager, - legacyChainID: legacyChainID, } } // Service is a wallet service. type Service struct { - networkManager *network.Manager + rpcClient *rpc.Client savedAddressesManager *SavedAddressesManager tokenManager *TokenManager transactionManager *TransactionManager favouriteManager *FavouriteManager cryptoOnRampManager *CryptoOnRampManager transferController *transfer.Controller - legacyChainID uint64 started bool } @@ -76,8 +68,8 @@ func (s *Service) Stop() error { } // APIs returns list of available RPC APIs. -func (s *Service) APIs() []rpc.API { - return []rpc.API{ +func (s *Service) APIs() []gethrpc.API { + return []gethrpc.API{ { Namespace: "wallet", Version: "0.1.0", diff --git a/services/wallet/token.go b/services/wallet/token.go index 60459882b..52644a861 100644 --- a/services/wallet/token.go +++ b/services/wallet/token.go @@ -12,8 +12,8 @@ import ( "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/log" "github.com/status-im/status-go/services/wallet/async" + "github.com/status-im/status-go/services/wallet/chain" "github.com/status-im/status-go/services/wallet/ierc20" - "github.com/status-im/status-go/services/wallet/network" ) var requestTimeout = 20 * time.Second @@ -69,7 +69,7 @@ func (tm *TokenManager) deleteCustom(chainID uint64, address common.Address) err return err } -func (tm *TokenManager) getBalances(parent context.Context, clients []*network.ChainClient, accounts, tokens []common.Address) (map[common.Address]map[common.Address]*hexutil.Big, error) { +func (tm *TokenManager) getBalances(parent context.Context, clients []*chain.Client, accounts, tokens []common.Address) (map[common.Address]map[common.Address]*hexutil.Big, error) { var ( group = async.NewAtomicGroup(parent) mu sync.Mutex diff --git a/services/wallet/transaction.go b/services/wallet/transaction.go index 61f891933..d8d31a1f7 100644 --- a/services/wallet/transaction.go +++ b/services/wallet/transaction.go @@ -11,7 +11,7 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/status-im/status-go/services/wallet/async" "github.com/status-im/status-go/services/wallet/bigint" - "github.com/status-im/status-go/services/wallet/network" + "github.com/status-im/status-go/services/wallet/chain" ) type TransactionManager struct { @@ -156,7 +156,7 @@ func (tm *TransactionManager) deletePending(chainID uint64, hash common.Hash) er return err } -func (tm *TransactionManager) watch(ctx context.Context, transactionHash common.Hash, client *network.ChainClient) error { +func (tm *TransactionManager) watch(ctx context.Context, transactionHash common.Hash, client *chain.Client) error { watchTxCommand := &watchTransactionCommand{ hash: transactionHash, client: client, @@ -169,7 +169,7 @@ func (tm *TransactionManager) watch(ctx context.Context, transactionHash common. } type watchTransactionCommand struct { - client *network.ChainClient + client *chain.Client hash common.Hash } diff --git a/services/wallet/transfer/block.go b/services/wallet/transfer/block.go index 949ad4a94..ff5d923aa 100644 --- a/services/wallet/transfer/block.go +++ b/services/wallet/transfer/block.go @@ -10,7 +10,7 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/status-im/status-go/multiaccounts/accounts" "github.com/status-im/status-go/services/wallet/bigint" - "github.com/status-im/status-go/services/wallet/network" + "github.com/status-im/status-go/services/wallet/chain" ) type BlocksRange struct { @@ -63,7 +63,7 @@ func (b *Block) mergeBlocksRanges(chainIDs []uint64, accounts []common.Address) return nil } -func (b *Block) setInitialBlocksRange(chainClient *network.ChainClient) error { +func (b *Block) setInitialBlocksRange(chainClient *chain.Client) error { accountsDB := accounts.NewDB(b.db) watchAddress, err := accountsDB.GetWalletAddress() if err != nil { diff --git a/services/wallet/transfer/commands.go b/services/wallet/transfer/commands.go index 8a25796bd..aa2845935 100644 --- a/services/wallet/transfer/commands.go +++ b/services/wallet/transfer/commands.go @@ -11,7 +11,7 @@ import ( "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" "github.com/status-im/status-go/services/wallet/async" - "github.com/status-im/status-go/services/wallet/network" + "github.com/status-im/status-go/services/wallet/chain" ) var numberOfBlocksCheckedPerIteration = 40 @@ -20,7 +20,7 @@ type ethHistoricalCommand struct { db *Database eth Downloader address common.Address - chainClient *network.ChainClient + chainClient *chain.Client balanceCache *balanceCache feed *event.Feed foundHeaders []*DBHeader @@ -71,7 +71,7 @@ type erc20HistoricalCommand struct { db *Database erc20 BatchDownloader address common.Address - chainClient *network.ChainClient + chainClient *chain.Client feed *event.Feed iterator *IterativeDownloader @@ -127,7 +127,7 @@ type controlCommand struct { block *Block eth *ETHDownloader erc20 *ERC20TransfersDownloader - chainClient *network.ChainClient + chainClient *chain.Client feed *event.Feed errorsCount int nonArchivalRPCNode bool @@ -300,7 +300,7 @@ type transfersCommand struct { eth *ETHDownloader block *big.Int address common.Address - chainClient *network.ChainClient + chainClient *chain.Client fetchedTransfers []Transfer } @@ -333,7 +333,7 @@ type loadTransfersCommand struct { accounts []common.Address db *Database block *Block - chainClient *network.ChainClient + chainClient *chain.Client blocksByAddress map[common.Address][]*big.Int foundTransfersByAddress map[common.Address][]Transfer } @@ -368,7 +368,7 @@ func (c *loadTransfersCommand) Run(parent context.Context) (err error) { type findAndCheckBlockRangeCommand struct { accounts []common.Address db *Database - chainClient *network.ChainClient + chainClient *chain.Client balanceCache *balanceCache feed *event.Feed fromByAddress map[common.Address]*LastKnownBlock @@ -533,7 +533,7 @@ func (c *findAndCheckBlockRangeCommand) fastIndexErc20(ctx context.Context, from } } -func loadTransfers(ctx context.Context, accounts []common.Address, block *Block, db *Database, chainClient *network.ChainClient, limit int, blocksByAddress map[common.Address][]*big.Int) (map[common.Address][]Transfer, error) { +func loadTransfers(ctx context.Context, accounts []common.Address, block *Block, db *Database, chainClient *chain.Client, limit int, blocksByAddress map[common.Address][]*big.Int) (map[common.Address][]Transfer, error) { start := time.Now() group := async.NewGroup(ctx) @@ -585,7 +585,7 @@ func loadTransfers(ctx context.Context, accounts []common.Address, block *Block, } } -func findFirstRange(c context.Context, account common.Address, initialTo *big.Int, client *network.ChainClient) (*big.Int, error) { +func findFirstRange(c context.Context, account common.Address, initialTo *big.Int, client *chain.Client) (*big.Int, error) { from := big.NewInt(0) to := initialTo goal := uint64(20) @@ -641,7 +641,7 @@ func findFirstRange(c context.Context, account common.Address, initialTo *big.In return from, nil } -func findFirstRanges(c context.Context, accounts []common.Address, initialTo *big.Int, client *network.ChainClient) (map[common.Address]*big.Int, error) { +func findFirstRanges(c context.Context, accounts []common.Address, initialTo *big.Int, client *chain.Client) (map[common.Address]*big.Int, error) { res := map[common.Address]*big.Int{} for _, address := range accounts { diff --git a/services/wallet/transfer/controller.go b/services/wallet/transfer/controller.go index 4727820a5..416115558 100644 --- a/services/wallet/transfer/controller.go +++ b/services/wallet/transfer/controller.go @@ -10,34 +10,35 @@ import ( "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" "github.com/status-im/status-go/multiaccounts/accounts" + "github.com/status-im/status-go/rpc" "github.com/status-im/status-go/services/wallet/async" - "github.com/status-im/status-go/services/wallet/network" + "github.com/status-im/status-go/services/wallet/chain" ) type Controller struct { - db *Database - networkManager *network.Manager - signals *SignalsTransmitter - block *Block - reactor *Reactor - accountFeed *event.Feed - TransferFeed *event.Feed - group *async.Group + db *Database + rpcClient *rpc.Client + signals *SignalsTransmitter + block *Block + reactor *Reactor + accountFeed *event.Feed + TransferFeed *event.Feed + group *async.Group } -func NewTransferController(db *sql.DB, networkManager *network.Manager, accountFeed *event.Feed) *Controller { +func NewTransferController(db *sql.DB, rpcClient *rpc.Client, accountFeed *event.Feed) *Controller { transferFeed := &event.Feed{} signals := &SignalsTransmitter{ publisher: transferFeed, } block := &Block{db} return &Controller{ - db: NewDB(db), - block: block, - networkManager: networkManager, - signals: signals, - accountFeed: accountFeed, - TransferFeed: transferFeed, + db: NewDB(db), + block: block, + rpcClient: rpcClient, + signals: signals, + accountFeed: accountFeed, + TransferFeed: transferFeed, } } @@ -60,7 +61,7 @@ func (c *Controller) Stop() { } func (c *Controller) SetInitialBlocksRange(chainIDs []uint64) error { - chainClients, err := c.networkManager.GetChainClients(chainIDs) + chainClients, err := chain.NewClients(c.rpcClient, chainIDs) if err != nil { return err } @@ -91,7 +92,7 @@ func (c *Controller) CheckRecentHistory(chainIDs []uint64, accounts []common.Add return err } - chainClients, err := c.networkManager.GetChainClients(chainIDs) + chainClients, err := chain.NewClients(c.rpcClient, chainIDs) if err != nil { return err } @@ -121,7 +122,7 @@ func (c *Controller) CheckRecentHistory(chainIDs []uint64, accounts []common.Add // watchAccountsChanges subsribes to a feed and watches for changes in accounts list. If there are new or removed accounts // reactor will be restarted. -func watchAccountsChanges(ctx context.Context, accountFeed *event.Feed, reactor *Reactor, chainClients []*network.ChainClient, initial []common.Address) error { +func watchAccountsChanges(ctx context.Context, accountFeed *event.Feed, reactor *Reactor, chainClients []*chain.Client, initial []common.Address) error { accounts := make(chan []accounts.Account, 1) // it may block if the rate of updates will be significantly higher sub := accountFeed.Subscribe(accounts) defer sub.Unsubscribe() @@ -182,7 +183,7 @@ func (c *Controller) GetTransfersByAddress(ctx context.Context, chainID uint64, } transfersCount := big.NewInt(int64(len(rst))) - chainClient, err := c.networkManager.GetChainClient(chainID) + chainClient, err := chain.NewClient(c.rpcClient, chainID) if err != nil { return nil, err } diff --git a/services/wallet/transfer/downloader.go b/services/wallet/transfer/downloader.go index 1495a91ac..3202ca02d 100644 --- a/services/wallet/transfer/downloader.go +++ b/services/wallet/transfer/downloader.go @@ -12,7 +12,7 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" - "github.com/status-im/status-go/services/wallet/network" + "github.com/status-im/status-go/services/wallet/chain" ) // Type type of the asset that was transferred. @@ -51,7 +51,7 @@ type Transfer struct { // ETHDownloader downloads regular eth transfers. type ETHDownloader struct { - chainClient *network.ChainClient + chainClient *chain.Client accounts []common.Address signer types.Signer db *Database @@ -146,7 +146,7 @@ func (d *ETHDownloader) getTransfersInBlock(ctx context.Context, blk *types.Bloc } // NewERC20TransfersDownloader returns new instance. -func NewERC20TransfersDownloader(client *network.ChainClient, accounts []common.Address, signer types.Signer) *ERC20TransfersDownloader { +func NewERC20TransfersDownloader(client *chain.Client, accounts []common.Address, signer types.Signer) *ERC20TransfersDownloader { signature := crypto.Keccak256Hash([]byte(erc20TransferEventSignature)) return &ERC20TransfersDownloader{ client: client, @@ -158,7 +158,7 @@ func NewERC20TransfersDownloader(client *network.ChainClient, accounts []common. // ERC20TransfersDownloader is a downloader for erc20 tokens transfers. type ERC20TransfersDownloader struct { - client *network.ChainClient + client *chain.Client accounts []common.Address // hash of the Transfer event signature diff --git a/services/wallet/transfer/reactor.go b/services/wallet/transfer/reactor.go index 527cbc672..76a8ab196 100644 --- a/services/wallet/transfer/reactor.go +++ b/services/wallet/transfer/reactor.go @@ -10,7 +10,7 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/event" "github.com/status-im/status-go/services/wallet/async" - "github.com/status-im/status-go/services/wallet/network" + "github.com/status-im/status-go/services/wallet/chain" ) var ( @@ -40,7 +40,7 @@ type Reactor struct { group *async.Group } -func (r *Reactor) newControlCommand(chainClient *network.ChainClient, accounts []common.Address) *controlCommand { +func (r *Reactor) newControlCommand(chainClient *chain.Client, accounts []common.Address) *controlCommand { signer := types.NewLondonSigner(chainClient.ToBigInt()) ctl := &controlCommand{ db: r.db, @@ -62,7 +62,7 @@ func (r *Reactor) newControlCommand(chainClient *network.ChainClient, accounts [ } // Start runs reactor loop in background. -func (r *Reactor) start(chainClients []*network.ChainClient, accounts []common.Address) error { +func (r *Reactor) start(chainClients []*chain.Client, accounts []common.Address) error { r.mu.Lock() defer r.mu.Unlock() @@ -89,7 +89,7 @@ func (r *Reactor) stop() { r.group = nil } -func (r *Reactor) restart(chainClients []*network.ChainClient, accounts []common.Address) error { +func (r *Reactor) restart(chainClients []*chain.Client, accounts []common.Address) error { r.stop() return r.start(chainClients, accounts) } diff --git a/transactions/rpc_wrapper.go b/transactions/rpc_wrapper.go index eb626b71d..9144aa020 100644 --- a/transactions/rpc_wrapper.go +++ b/transactions/rpc_wrapper.go @@ -27,7 +27,7 @@ func newRPCWrapper(client *rpc.Client) *rpcWrapper { // This is the nonce that should be used for the next transaction. func (w *rpcWrapper) PendingNonceAt(ctx context.Context, account common.Address) (uint64, error) { var result hexutil.Uint64 - err := w.rpcClient.CallContext(ctx, &result, "eth_getTransactionCount", account, "pending") + err := w.rpcClient.CallContext(ctx, &result, w.rpcClient.UpstreamChainID, "eth_getTransactionCount", account, "pending") return uint64(result), err } @@ -35,7 +35,7 @@ func (w *rpcWrapper) PendingNonceAt(ctx context.Context, account common.Address) // execution of a transaction. func (w *rpcWrapper) SuggestGasPrice(ctx context.Context) (*big.Int, error) { var hex hexutil.Big - if err := w.rpcClient.CallContext(ctx, &hex, "eth_gasPrice"); err != nil { + if err := w.rpcClient.CallContext(ctx, &hex, w.rpcClient.UpstreamChainID, "eth_gasPrice"); err != nil { return nil, err } return (*big.Int)(&hex), nil @@ -47,7 +47,7 @@ func (w *rpcWrapper) SuggestGasPrice(ctx context.Context) (*big.Int, error) { // but it should provide a basis for setting a reasonable default. func (w *rpcWrapper) EstimateGas(ctx context.Context, msg ethereum.CallMsg) (uint64, error) { var hex hexutil.Uint64 - err := w.rpcClient.CallContext(ctx, &hex, "eth_estimateGas", toCallArg(msg)) + err := w.rpcClient.CallContext(ctx, &hex, w.rpcClient.UpstreamChainID, "eth_estimateGas", toCallArg(msg)) if err != nil { return 0, err } @@ -63,7 +63,7 @@ func (w *rpcWrapper) SendTransaction(ctx context.Context, tx *gethtypes.Transact if err != nil { return err } - return w.rpcClient.CallContext(ctx, nil, "eth_sendRawTransaction", types.EncodeHex(data)) + return w.rpcClient.CallContext(ctx, nil, w.rpcClient.UpstreamChainID, "eth_sendRawTransaction", types.EncodeHex(data)) } func toCallArg(msg ethereum.CallMsg) interface{} { diff --git a/transactions/transactor_test.go b/transactions/transactor_test.go index ed7c9fd66..8af341e53 100644 --- a/transactions/transactor_test.go +++ b/transactions/transactor_test.go @@ -49,7 +49,7 @@ func (s *TransactorSuite) SetupTest() { s.server, s.txServiceMock = fake.NewTestServer(s.txServiceMockCtrl) s.client = gethrpc.DialInProc(s.server) - rpcClient, _ := rpc.NewClient(s.client, params.UpstreamRPCConfig{}) + rpcClient, _ := rpc.NewClient(s.client, 1, params.UpstreamRPCConfig{}, nil, nil) // expected by simulated backend chainID := gethparams.AllEthashProtocolChanges.ChainID.Uint64() nodeConfig, err := utils.MakeTestNodeConfigWithDataDir("", "/tmp", chainID)