Implement eth_getFilterLogs (#1265)

Queries logs from remote server using original filter criteria.
This commit is contained in:
Dmitry Shulyak 2018-11-06 07:41:36 +01:00 committed by GitHub
parent db786ef1d2
commit 3fe5b25ff2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 94 additions and 10 deletions

View File

@ -3,10 +3,12 @@ package rpcfilters
import ( import (
"context" "context"
"errors" "errors"
"fmt"
"sync" "sync"
"time" "time"
ethereum "github.com/ethereum/go-ethereum" ethereum "github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth/filters" "github.com/ethereum/go-ethereum/eth/filters"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
@ -19,6 +21,10 @@ const (
defaultLogsQueryTimeout = 10 * time.Second defaultLogsQueryTimeout = 10 * time.Second
) )
var (
errFilterNotFound = errors.New("filter not found")
)
type filter interface { type filter interface {
add(interface{}) error add(interface{}) error
pop() interface{} pop() interface{}
@ -86,13 +92,14 @@ func (api *PublicAPI) NewFilter(crit filters.FilterCriteria) (rpc.ID, error) {
id := rpc.ID(uuid.New()) id := rpc.ID(uuid.New())
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
f := &logsFilter{ f := &logsFilter{
id: id, id: id,
crit: ethereum.FilterQuery(crit), crit: ethereum.FilterQuery(crit),
done: make(chan struct{}), originalCrit: ethereum.FilterQuery(crit),
timer: time.NewTimer(api.filterLivenessPeriod), done: make(chan struct{}),
ctx: ctx, timer: time.NewTimer(api.filterLivenessPeriod),
cancel: cancel, ctx: ctx,
logsCache: newCache(defaultCacheSize), cancel: cancel,
logsCache: newCache(defaultCacheSize),
} }
api.filtersMu.Lock() api.filtersMu.Lock()
api.filters[id] = f api.filters[id] = f
@ -181,6 +188,27 @@ func (api *PublicAPI) UninstallFilter(id rpc.ID) bool {
return found return found
} }
// GetFilterLogs returns the logs for the filter with the given id.
// If the filter could not be found an empty array of logs is returned.
//
// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getfilterlogs
func (api *PublicAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]types.Log, error) {
api.filtersMu.Lock()
f, exist := api.filters[id]
api.filtersMu.Unlock()
if !exist {
return []types.Log{}, errFilterNotFound
}
logs, ok := f.(*logsFilter)
if !ok {
return []types.Log{}, fmt.Errorf("filter with ID %v is not of logs type", id)
}
ctx, cancel := context.WithTimeout(ctx, defaultLogsQueryTimeout)
defer cancel()
rst, err := getLogs(ctx, api.client(), logs.originalCrit)
return rst, err
}
// GetFilterChanges returns the hashes for the filter with the given id since // GetFilterChanges returns the hashes for the filter with the given id since
// last time it was called. This can be used for polling. // last time it was called. This can be used for polling.
// //
@ -206,5 +234,5 @@ func (api *PublicAPI) GetFilterChanges(id rpc.ID) (interface{}, error) {
} }
return rst, nil return rst, nil
} }
return []interface{}{}, errors.New("filter not found") return []interface{}{}, errFilterNotFound
} }

View File

@ -1,10 +1,13 @@
package rpcfilters package rpcfilters
import ( import (
"context"
"math/big"
"sync" "sync"
"testing" "testing"
"time" "time"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/eth/filters" "github.com/ethereum/go-ethereum/eth/filters"
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
@ -74,3 +77,23 @@ func TestGetFilterChangesResetsTimer(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.True(t, f.deadline().Stop()) require.True(t, f.deadline().Stop())
} }
func TestGetFilterLogs(t *testing.T) {
tracker := new(callTracker)
api := &PublicAPI{
filters: make(map[rpc.ID]filter),
client: func() ContextCaller { return tracker },
}
block := big.NewInt(10)
id, err := api.NewFilter(filters.FilterCriteria{
FromBlock: block,
})
require.NoError(t, err)
logs, err := api.GetFilterLogs(context.TODO(), id)
require.NoError(t, err)
require.Empty(t, logs)
require.Len(t, tracker.criterias, 1)
rst, err := hexutil.DecodeBig(tracker.criterias[0]["fromBlock"].(string))
require.NoError(t, err)
require.Equal(t, block, rst)
}

View File

@ -54,6 +54,9 @@ func toFilterArg(q ethereum.FilterQuery) interface{} {
"address": q.Addresses, "address": q.Addresses,
"topics": q.Topics, "topics": q.Topics,
} }
if q.FromBlock == nil {
arg["fromBlock"] = "0x0"
}
return arg return arg
} }

View File

@ -16,7 +16,9 @@ import (
type logsFilter struct { type logsFilter struct {
mu sync.RWMutex mu sync.RWMutex
logs []types.Log logs []types.Log
crit ethereum.FilterQuery crit ethereum.FilterQuery // will be modified and different from original
originalCrit ethereum.FilterQuery // not modified version of the criteria
logsCache *cache logsCache *cache

View File

@ -22,14 +22,17 @@ type LogsSuite struct {
DevNodeSuite DevNodeSuite
} }
func (s *LogsSuite) testEmit(event *eventer.Eventer, opts *bind.TransactOpts, id string, topic [32]byte, expect int) { func (s *LogsSuite) emitEvent(event *eventer.Eventer, opts *bind.TransactOpts, topic [32]byte) {
tx, err := event.Emit(opts, topic) tx, err := event.Emit(opts, topic)
s.NoError(err) s.NoError(err)
timeout, cancel := context.WithTimeout(context.Background(), 5*time.Second) timeout, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel() defer cancel()
_, err = bind.WaitMined(timeout, s.Eth, tx) _, err = bind.WaitMined(timeout, s.Eth, tx)
s.NoError(err) s.NoError(err)
}
func (s *LogsSuite) testEmit(event *eventer.Eventer, opts *bind.TransactOpts, id string, topic [32]byte, expect int) {
s.emitEvent(event, opts, topic)
s.NoError(utils.Eventually(func() error { s.NoError(utils.Eventually(func() error {
var logs []types.Log var logs []types.Log
err := s.Local.Call(&logs, "eth_getFilterChanges", id) err := s.Local.Call(&logs, "eth_getFilterChanges", id)
@ -77,3 +80,28 @@ func (s *LogsSuite) TestLogsNewFilter() {
s.testEmit(event, opts, sid, stopic, 1) s.testEmit(event, opts, sid, stopic, 1)
s.testEmit(event, opts, fid, ftopic, 1) s.testEmit(event, opts, fid, ftopic, 1)
} }
func (s *LogsSuite) TestGetFilterLogs() {
var (
id string
)
s.Require().NoError(s.Local.Call(&id, "eth_newFilter", map[string]interface{}{}))
opts := bind.NewKeyedTransactor(s.DevAccount)
_, tx, event, err := eventer.DeployEventer(opts, s.Eth)
s.Require().NoError(err)
timeout, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
addr, err := bind.WaitDeployed(timeout, s.Eth, tx)
s.Require().NoError(err)
s.Require().NotEmpty(addr)
s.emitEvent(event, opts, [32]byte{})
var logs []types.Log
s.Require().NoError(s.Local.Call(&logs, "eth_getFilterLogs", id))
s.Require().Len(logs, 1)
s.emitEvent(event, opts, [32]byte{})
logs = nil
s.Require().NoError(s.Local.Call(&logs, "eth_getFilterLogs", id))
s.Require().Len(logs, 2)
}