Implement block filters API for the RPC mode.

Some operations (like deploying contracts) require filter APIs to work.
Since these operations aren't supported on Infura anymore, and we don't
run LES, a separate implemenation of filters is required.

Signed-off-by: Igor Mandrigin <i@mandrigin.ru>
This commit is contained in:
Igor Mandrigin 2018-05-04 12:53:41 +02:00 committed by Igor Mandrigin
parent 6e1fc99365
commit 31cf2297d2
8 changed files with 660 additions and 10 deletions

View File

@ -8,6 +8,7 @@ import (
gethcommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
gethnode "github.com/ethereum/go-ethereum/node"
fcmlib "github.com/NaySoftware/go-fcm"
@ -19,6 +20,7 @@ import (
"github.com/status-im/status-go/geth/rpc"
"github.com/status-im/status-go/geth/transactions"
"github.com/status-im/status-go/services/personal"
"github.com/status-im/status-go/services/rpcfilters"
"github.com/status-im/status-go/sign"
"github.com/status-im/status-go/signal"
)
@ -122,6 +124,12 @@ func (b *StatusBackend) StartNode(config *params.NodeConfig) error {
return nil
}
func (b *StatusBackend) rpcFiltersService() gethnode.ServiceConstructor {
return func(*gethnode.ServiceContext) (gethnode.Service, error) {
return rpcfilters.New(b.statusNode), nil
}
}
func (b *StatusBackend) startNode(config *params.NodeConfig) (err error) {
defer func() {
if r := recover(); r != nil {
@ -129,7 +137,10 @@ func (b *StatusBackend) startNode(config *params.NodeConfig) (err error) {
}
}()
if err = b.statusNode.Start(config); err != nil {
services := []gethnode.ServiceConstructor{}
services = appendIf(config.UpstreamConfig.Enabled, services, b.rpcFiltersService())
if err = b.statusNode.Start(config, services...); err != nil {
return
}
signal.SendNodeStarted()
@ -423,3 +434,10 @@ func (b *StatusBackend) NotifyUsers(message string, payload fcmlib.NotificationP
return err
}
func appendIf(condition bool, services []gethnode.ServiceConstructor, service gethnode.ServiceConstructor) []gethnode.ServiceConstructor {
if !condition {
return services
}
return append(services, service)
}

View File

@ -35,9 +35,10 @@ func (r *router) routeRemote(method string) bool {
// remoteMethods contains methods that should be routed to
// the upstream node; the rest is considered to be routed to
// the local node.
// A list of supported methods: https://infura.io/docs/#supported-json-rpc-methods
// TODO(tiabc): Write a test on each of these methods to ensure they're all routed to the proper node and ensure they really work
// TODO(tiabc: as we already caught https://github.com/status-im/status-go/issues/350 as the result of missing such test.
// A list of supported methods:
// curl --include \
// --header "Content-Type: application/json" \
// --header "Accept: application/json" 'https://api.infura.io/v1/jsonrpc/ropsten/methods'
// Although it's tempting to only list methods coming to the local node as there're fewer of them
// but it's deceptive: we want to ensure that only known requests leave our zone of responsibility.
// Also, we want new requests in newer Geth versions not to be accidentally routed to the upstream.
@ -76,12 +77,7 @@ var remoteMethods = [...]string{
//"eth_compileLLL", // goes to the local because there's no need to send it anywhere
//"eth_compileSolidity", // goes to the local because there's no need to send it anywhere
//"eth_compileSerpent", // goes to the local because there's no need to send it anywhere
"eth_newFilter",
"eth_newBlockFilter",
"eth_newPendingTransactionFilter",
"eth_uninstallFilter",
"eth_getFilterChanges",
"eth_getFilterLogs",
"eth_getLogs",
"eth_getWork",
"eth_submitWork",

123
services/rpcfilters/api.go Normal file
View File

@ -0,0 +1,123 @@
package rpcfilters
import (
"errors"
"sync"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rpc"
"github.com/pborman/uuid"
)
type filter struct {
hashes []common.Hash
mu sync.Mutex
done chan struct{}
}
// AddHash adds a hash to the filter
func (f *filter) AddHash(hash common.Hash) {
f.mu.Lock()
defer f.mu.Unlock()
f.hashes = append(f.hashes, hash)
}
// PopHashes returns all the hashes stored in the filter and clears the filter contents
func (f *filter) PopHashes() []common.Hash {
f.mu.Lock()
defer f.mu.Unlock()
hashes := f.hashes
f.hashes = nil
return returnHashes(hashes)
}
func newFilter() *filter {
return &filter{
done: make(chan struct{}),
}
}
// PublicAPI represents filter API that is exported to `eth` namespace
type PublicAPI struct {
filters map[rpc.ID]*filter
filtersMu sync.Mutex
event *latestBlockChangedEvent
}
// NewPublicAPI returns a reference to the PublicAPI object
func NewPublicAPI(event *latestBlockChangedEvent) *PublicAPI {
return &PublicAPI{
filters: make(map[rpc.ID]*filter),
event: event,
}
}
// NewBlockFilter is an implemenation of `eth_newBlockFilter` API
// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newblockfilter
func (api *PublicAPI) NewBlockFilter() rpc.ID {
api.filtersMu.Lock()
defer api.filtersMu.Unlock()
f := newFilter()
id := rpc.ID(uuid.New())
api.filters[id] = f
go func() {
id, s := api.event.Subscribe()
defer api.event.Unsubscribe(id)
for {
select {
case hash := <-s:
f.AddHash(hash)
case <-f.done:
return
}
}
}()
return id
}
// UninstallFilter is an implemenation of `eth_uninstallFilter` API
// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_uninstallfilter
func (api *PublicAPI) UninstallFilter(id rpc.ID) bool {
api.filtersMu.Lock()
f, found := api.filters[id]
if found {
delete(api.filters, id)
}
api.filtersMu.Unlock()
if found {
close(f.done)
}
return found
}
// GetFilterChanges returns the hashes for the filter with the given id since
// last time it was called. This can be used for polling.
//
// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getfilterchanges
func (api *PublicAPI) GetFilterChanges(id rpc.ID) ([]common.Hash, error) {
api.filtersMu.Lock()
defer api.filtersMu.Unlock()
if f, found := api.filters[id]; found {
return f.PopHashes(), nil
}
return []common.Hash{}, errors.New("filter not found")
}
// returnHashes is a helper that will return an empty hash array case the given hash array is nil,
// otherwise the given hashes array is returned.
func returnHashes(hashes []common.Hash) []common.Hash {
if hashes == nil {
return []common.Hash{}
}
return hashes
}

View File

@ -0,0 +1,121 @@
package rpcfilters
import (
"errors"
"sync"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
)
const (
defaultTickerPeriod = 3 * time.Second
)
// latestBlockChangedEvent represents an event that one can subscribe to
type latestBlockChangedEvent struct {
sxMu sync.Mutex
sx map[int]chan common.Hash
latestBlockMu sync.Mutex
previousLatestBlock blockInfo
provider latestBlockProvider
quit chan struct{}
tickerPeriod time.Duration
}
func (e *latestBlockChangedEvent) Start() error {
if e.quit != nil {
return errors.New("latest block changed event is already started")
}
e.quit = make(chan struct{})
go func() {
ticker := time.NewTicker(e.tickerPeriod)
for {
select {
case <-ticker.C:
if e.numberOfSubscriptions() == 0 {
continue
}
latestBlock, err := e.provider.GetLatestBlock()
if err != nil {
log.Error("error while receiving latest block", "error", err)
continue
}
e.processLatestBlock(latestBlock)
case <-e.quit:
return
}
}
}()
return nil
}
func (e *latestBlockChangedEvent) numberOfSubscriptions() int {
e.sxMu.Lock()
defer e.sxMu.Unlock()
return len(e.sx)
}
func (e *latestBlockChangedEvent) processLatestBlock(latestBlock blockInfo) {
e.latestBlockMu.Lock()
defer e.latestBlockMu.Unlock()
// if we received the same or an older block than we already have, ignore it.
if latestBlock.Number().Cmp(e.previousLatestBlock.Number()) <= 0 {
return
}
e.previousLatestBlock = latestBlock
e.sxMu.Lock()
defer e.sxMu.Unlock()
for _, channel := range e.sx {
channel <- e.previousLatestBlock.Hash
}
}
func (e *latestBlockChangedEvent) Stop() {
if e.quit == nil {
return
}
select {
case <-e.quit:
return
default:
close(e.quit)
}
}
func (e *latestBlockChangedEvent) Subscribe() (int, chan common.Hash) {
e.sxMu.Lock()
defer e.sxMu.Unlock()
channel := make(chan common.Hash)
id := len(e.sx)
e.sx[id] = channel
return id, channel
}
func (e *latestBlockChangedEvent) Unsubscribe(id int) {
e.sxMu.Lock()
defer e.sxMu.Unlock()
delete(e.sx, id)
}
func newLatestBlockChangedEvent(provider latestBlockProvider) *latestBlockChangedEvent {
return &latestBlockChangedEvent{
sx: make(map[int]chan common.Hash),
provider: provider,
tickerPeriod: defaultTickerPeriod,
}
}

View File

@ -0,0 +1,165 @@
package rpcfilters
import (
"math/big"
"sync"
"testing"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/stretchr/testify/assert"
)
type latestBlockProviderTest struct {
BlockFunc func() (blockInfo, error)
}
func (p latestBlockProviderTest) GetLatestBlock() (blockInfo, error) {
return p.BlockFunc()
}
func TestEventSubscribe(t *testing.T) {
counter := 0
hashes := []common.Hash{common.HexToHash("0xAA"), common.HexToHash("0xBB"), common.HexToHash("0xCC")}
f := func() (blockInfo, error) {
counter++
number := big.NewInt(int64(counter))
if counter > len(hashes) {
counter = len(hashes)
}
return blockInfo{hashes[counter-1], hexutil.Bytes(number.Bytes())}, nil
}
testEventSubscribe(t, f, hashes)
}
func TestZeroSubsciptionsOptimization(t *testing.T) {
counter := 0
hash := common.HexToHash("0xFF")
f := func() (blockInfo, error) {
counter++
number := big.NewInt(1)
return blockInfo{hash, hexutil.Bytes(number.Bytes())}, nil
}
event := newLatestBlockChangedEvent(latestBlockProviderTest{f})
event.tickerPeriod = time.Millisecond
assert.NoError(t, event.Start())
defer event.Stop()
// let the ticker to call ~10 times
time.Sleep(10 * time.Millisecond)
// check that our provider function wasn't called when there are no subscribers to it
assert.Equal(t, 0, counter)
// subscribing an event, checking that it works
id, channel := event.Subscribe()
timeout := time.After(1 * time.Second)
select {
case receivedHash := <-channel:
assert.Equal(t, hash, receivedHash)
case <-timeout:
assert.Fail(t, "timeout")
}
event.Unsubscribe(id)
// check that our function was called multiple times
assert.True(t, counter > 0)
counterValue := counter
// let the ticker to call ~10 times
time.Sleep(10 * time.Millisecond)
// check that our provider function wasn't called when there are no subscribers to it
assert.Equal(t, counterValue, counter)
}
func TestMultipleSubscribe(t *testing.T) {
hash := common.HexToHash("0xFF")
f := func() (blockInfo, error) {
number := big.NewInt(1)
return blockInfo{hash, hexutil.Bytes(number.Bytes())}, nil
}
event := newLatestBlockChangedEvent(latestBlockProviderTest{f})
event.tickerPeriod = time.Millisecond
wg := sync.WaitGroup{}
testFunc := func() {
testEvent(t, event, []common.Hash{hash})
wg.Done()
}
numberOfSubscriptions := 3
wg.Add(numberOfSubscriptions)
for i := 0; i < numberOfSubscriptions; i++ {
go testFunc()
}
assert.NoError(t, event.Start())
defer event.Stop()
wg.Wait()
assert.Equal(t, 0, len(event.sx))
}
func testEventSubscribe(t *testing.T, f func() (blockInfo, error), expectedHashes []common.Hash) {
event := newLatestBlockChangedEvent(latestBlockProviderTest{f})
event.tickerPeriod = time.Millisecond
assert.NoError(t, event.Start())
defer event.Stop()
testEvent(t, event, expectedHashes)
}
func testEvent(t *testing.T, event *latestBlockChangedEvent, expectedHashes []common.Hash) {
id, channel := event.Subscribe()
timeout := time.After(1 * time.Second)
for _, hash := range expectedHashes {
select {
case receivedHash := <-channel:
assert.Equal(t, hash, receivedHash)
case <-timeout:
assert.Fail(t, "timeout")
}
}
event.Unsubscribe(id)
}
func TestEventReceivedBlocksOutOfOrders(t *testing.T) {
// We are sending blocks out of order (simulating load balancing on RPC
// nodes). We should still receive them in order and not have the event
// fired for out-of-order events.
expectedHashes := []common.Hash{common.HexToHash("0xAA"), common.HexToHash("0xBB"), common.HexToHash("0xCC")}
sentHashes := []common.Hash{common.HexToHash("0xAA"), common.HexToHash("0xBB"), common.HexToHash("0xAA"), common.HexToHash("0xCC")}
sentBlockNumbers := []int64{1, 2, 1, 3}
counter := 0
f := func() (blockInfo, error) {
counter++
number := big.NewInt(sentBlockNumbers[counter-1])
if counter > len(sentHashes) {
counter = len(sentHashes)
}
return blockInfo{sentHashes[counter-1], hexutil.Bytes(number.Bytes())}, nil
}
testEventSubscribe(t, f, expectedHashes)
}

View File

@ -0,0 +1,57 @@
package rpcfilters
import (
"errors"
"math/big"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/status-im/status-go/geth/rpc"
)
type rpcProvider interface {
RPCClient() *rpc.Client
}
// blockInfo contains the hash and the number of the latest block
type blockInfo struct {
Hash common.Hash `json:"hash"`
NumberBytes hexutil.Bytes `json:"number"`
}
// Number returns a big.Int representation of the encoded block number.
func (i blockInfo) Number() *big.Int {
number := big.NewInt(0)
number.SetBytes(i.NumberBytes)
return number
}
// latestBlockProvider provides the latest block info from the blockchain
type latestBlockProvider interface {
GetLatestBlock() (blockInfo, error)
}
// latestBlockProviderRPC is an implementation of latestBlockProvider interface
// that requests a block using an RPC client provided
type latestBlockProviderRPC struct {
rpc rpcProvider
}
// GetLatestBlock returns the block info
func (p *latestBlockProviderRPC) GetLatestBlock() (blockInfo, error) {
rpcClient := p.rpc.RPCClient()
if rpcClient == nil {
return blockInfo{}, errors.New("no active RPC client: is the node running?")
}
var result blockInfo
err := rpcClient.Call(&result, "eth_getBlockByNumber", "latest", false)
if err != nil {
return blockInfo{}, err
}
return result, nil
}

View File

@ -0,0 +1,50 @@
package rpcfilters
import (
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/rpc"
)
// Make sure that Service implements node.Service interface.
var _ node.Service = (*Service)(nil)
// Service represents out own implementation of personal sign operations.
type Service struct {
latestBlockChangedEvent *latestBlockChangedEvent
}
// New returns a new Service.
func New(rpc rpcProvider) *Service {
provider := &latestBlockProviderRPC{rpc}
event := newLatestBlockChangedEvent(provider)
return &Service{event}
}
// Protocols returns a new protocols list. In this case, there are none.
func (s *Service) Protocols() []p2p.Protocol {
return []p2p.Protocol{}
}
// APIs returns a list of new APIs.
func (s *Service) APIs() []rpc.API {
return []rpc.API{
{
Namespace: "eth",
Version: "1.0",
Service: NewPublicAPI(s.latestBlockChangedEvent),
Public: true,
},
}
}
// Start is run when a service is started.
func (s *Service) Start(server *p2p.Server) error {
return s.latestBlockChangedEvent.Start()
}
// Stop is run when a service is stopped.
func (s *Service) Stop() error {
s.latestBlockChangedEvent.Stop()
return nil
}

View File

@ -0,0 +1,120 @@
package services
import (
"encoding/json"
"fmt"
"testing"
"time"
"github.com/status-im/status-go/geth/params"
"github.com/stretchr/testify/suite"
. "github.com/status-im/status-go/t/utils"
)
func TestFiltersAPISuite(t *testing.T) {
s := new(FiltersAPISuite)
s.upstream = false
suite.Run(t, s)
}
func TestFiltersAPISuiteUpstream(t *testing.T) {
s := new(FiltersAPISuite)
s.upstream = true
if s.upstream && GetNetworkID() == params.StatusChainNetworkID {
t.Skip()
return
}
suite.Run(t, s)
}
type FiltersAPISuite struct {
BaseJSONRPCSuite
upstream bool
}
func (s *FiltersAPISuite) TestFilters() {
err := s.SetupTest(s.upstream, false)
s.NoError(err)
defer func() {
err := s.Backend.StopNode()
s.NoError(err)
}()
basicCall := `{"jsonrpc":"2.0","method":"eth_newBlockFilter","params":[],"id":67}`
response := s.Backend.CallRPC(basicCall)
filterID := s.filterIDFromRPCResponse(response)
// we don't check new blocks on private network, because no one mines them
if GetNetworkID() != params.StatusChainNetworkID {
timeout := time.After(time.Minute)
newBlocksChannel := s.getFirstFilterChange(filterID)
select {
case hash := <-newBlocksChannel:
s.True(len(hash) > 0, "received hash isn't empty")
case <-timeout:
s.Fail("timeout while waiting for filter results")
}
}
basicCall = fmt.Sprintf(`{"jsonrpc":"2.0","method":"eth_uninstallFilter","params":["%s"],"id":67}`, filterID)
response = s.Backend.CallRPC(basicCall)
result := s.boolFromRPCResponse(response)
s.True(result, "filter expected to be removed successfully")
}
func (s *FiltersAPISuite) getFirstFilterChange(filterID string) chan string {
result := make(chan string)
go func() {
timeout := time.Now().Add(time.Minute)
for time.Now().Before(timeout) {
basicCall := fmt.Sprintf(`{"jsonrpc":"2.0","method":"eth_getFilterChanges","params":["%s"],"id":67}`, filterID)
response := s.Backend.CallRPC(basicCall)
filterChanges := s.arrayFromRPCResponse(response)
if len(filterChanges) > 0 {
result <- filterChanges[0]
return
}
time.Sleep(10 * time.Millisecond)
}
}()
return result
}
func (s *FiltersAPISuite) filterIDFromRPCResponse(response string) string {
var r struct {
Result string `json:"result"`
}
s.NoError(json.Unmarshal([]byte(response), &r))
return r.Result
}
func (s *FiltersAPISuite) arrayFromRPCResponse(response string) []string {
var r struct {
Result []string `json:"result"`
}
s.NoError(json.Unmarshal([]byte(response), &r))
return r.Result
}
func (s *FiltersAPISuite) boolFromRPCResponse(response string) bool {
var r struct {
Result bool `json:"result"`
}
s.NoError(json.Unmarshal([]byte(response), &r))
return r.Result
}