Add local implementation of the newFilter call using remote getLogs call (#1235)

* Implement subscriptions and filtering

* Add e2e test with log filter polling logs from EVM with clique backend

* Apply review comments

* Move devnode to t/devtests to avoid cycle in imports
This commit is contained in:
Dmitry Shulyak 2018-10-23 08:11:11 +03:00 committed by GitHub
parent a75f9c34cf
commit 29b55bd445
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 1364 additions and 58 deletions

View File

@ -1,58 +1,102 @@
package rpcfilters package rpcfilters
import ( import (
"context"
"errors" "errors"
"sync" "sync"
"time"
"github.com/ethereum/go-ethereum/common" ethereum "github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/eth/filters"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
"github.com/pborman/uuid" "github.com/pborman/uuid"
) )
type filter struct { const (
hashes []common.Hash defaultFilterLivenessPeriod = 5 * time.Minute
mu sync.Mutex defaultLogsPeriod = 3 * time.Second
done chan struct{} defaultLogsQueryTimeout = 10 * time.Second
} )
// AddHash adds a hash to the filter type filter interface {
func (f *filter) AddHash(hash common.Hash) { add(interface{}) error
f.mu.Lock() pop() interface{}
defer f.mu.Unlock() stop()
f.hashes = append(f.hashes, hash) deadline() *time.Timer
}
// PopHashes returns all the hashes stored in the filter and clears the filter contents
func (f *filter) PopHashes() []common.Hash {
f.mu.Lock()
defer f.mu.Unlock()
hashes := f.hashes
f.hashes = nil
return returnHashes(hashes)
}
func newFilter() *filter {
return &filter{
done: make(chan struct{}),
}
} }
// PublicAPI represents filter API that is exported to `eth` namespace // PublicAPI represents filter API that is exported to `eth` namespace
type PublicAPI struct { type PublicAPI struct {
filters map[rpc.ID]*filter
filtersMu sync.Mutex filtersMu sync.Mutex
filters map[rpc.ID]filter
// filterLivenessLoop defines how often timeout loop is executed
filterLivenessLoop time.Duration
// filter liveness increased by this period when changes are requested
filterLivenessPeriod time.Duration
client func() ContextCaller
latestBlockChangedEvent *latestBlockChangedEvent latestBlockChangedEvent *latestBlockChangedEvent
transactionSentToUpstreamEvent *transactionSentToUpstreamEvent transactionSentToUpstreamEvent *transactionSentToUpstreamEvent
} }
// NewPublicAPI returns a reference to the PublicAPI object // NewPublicAPI returns a reference to the PublicAPI object
func NewPublicAPI(latestBlockChangedEvent *latestBlockChangedEvent, func NewPublicAPI(s *Service) *PublicAPI {
transactionSentToUpstreamEvent *transactionSentToUpstreamEvent) *PublicAPI { api := &PublicAPI{
return &PublicAPI{ filters: make(map[rpc.ID]filter),
filters: make(map[rpc.ID]*filter), latestBlockChangedEvent: s.latestBlockChangedEvent,
latestBlockChangedEvent: latestBlockChangedEvent, transactionSentToUpstreamEvent: s.transactionSentToUpstreamEvent,
transactionSentToUpstreamEvent: transactionSentToUpstreamEvent, client: func() ContextCaller { return s.rpc.RPCClient() },
filterLivenessLoop: defaultFilterLivenessPeriod,
filterLivenessPeriod: defaultFilterLivenessPeriod + 10*time.Second,
} }
go api.timeoutLoop(s.quit)
return api
}
func (api *PublicAPI) timeoutLoop(quit chan struct{}) {
for {
select {
case <-quit:
return
case <-time.After(api.filterLivenessLoop):
api.filtersMu.Lock()
for id, f := range api.filters {
deadline := f.deadline()
if deadline == nil {
continue
}
select {
case <-deadline.C:
delete(api.filters, id)
f.stop()
default:
continue
}
}
api.filtersMu.Unlock()
}
}
}
func (api *PublicAPI) NewFilter(crit filters.FilterCriteria) (rpc.ID, error) {
id := rpc.ID(uuid.New())
ctx, cancel := context.WithCancel(context.Background())
f := &logsFilter{
id: id,
crit: ethereum.FilterQuery(crit),
done: make(chan struct{}),
timer: time.NewTimer(api.filterLivenessPeriod),
ctx: ctx,
cancel: cancel,
}
api.filtersMu.Lock()
api.filters[id] = f
api.filtersMu.Unlock()
go pollLogs(api.client(), f, defaultLogsQueryTimeout, defaultLogsPeriod)
return id, nil
} }
// NewBlockFilter is an implemenation of `eth_newBlockFilter` API // NewBlockFilter is an implemenation of `eth_newBlockFilter` API
@ -61,7 +105,7 @@ func (api *PublicAPI) NewBlockFilter() rpc.ID {
api.filtersMu.Lock() api.filtersMu.Lock()
defer api.filtersMu.Unlock() defer api.filtersMu.Unlock()
f := newFilter() f := newHashFilter()
id := rpc.ID(uuid.New()) id := rpc.ID(uuid.New())
api.filters[id] = f api.filters[id] = f
@ -73,7 +117,9 @@ func (api *PublicAPI) NewBlockFilter() rpc.ID {
for { for {
select { select {
case hash := <-s: case hash := <-s:
f.AddHash(hash) if err := f.add(hash); err != nil {
log.Error("error adding value to filter", "hash", hash, "error", err)
}
case <-f.done: case <-f.done:
return return
} }
@ -90,7 +136,7 @@ func (api *PublicAPI) NewPendingTransactionFilter() rpc.ID {
api.filtersMu.Lock() api.filtersMu.Lock()
defer api.filtersMu.Unlock() defer api.filtersMu.Unlock()
f := newFilter() f := newHashFilter()
id := rpc.ID(uuid.New()) id := rpc.ID(uuid.New())
api.filters[id] = f api.filters[id] = f
@ -102,7 +148,9 @@ func (api *PublicAPI) NewPendingTransactionFilter() rpc.ID {
for { for {
select { select {
case hash := <-s: case hash := <-s:
f.AddHash(hash) if err := f.add(hash); err != nil {
log.Error("error adding value to filter", "hash", hash, "error", err)
}
case <-f.done: case <-f.done:
return return
} }
@ -125,7 +173,7 @@ func (api *PublicAPI) UninstallFilter(id rpc.ID) bool {
api.filtersMu.Unlock() api.filtersMu.Unlock()
if found { if found {
close(f.done) f.stop()
} }
return found return found
@ -135,22 +183,26 @@ func (api *PublicAPI) UninstallFilter(id rpc.ID) bool {
// last time it was called. This can be used for polling. // last time it was called. This can be used for polling.
// //
// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getfilterchanges // https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getfilterchanges
func (api *PublicAPI) GetFilterChanges(id rpc.ID) ([]common.Hash, error) { func (api *PublicAPI) GetFilterChanges(id rpc.ID) (interface{}, error) {
api.filtersMu.Lock() api.filtersMu.Lock()
defer api.filtersMu.Unlock() defer api.filtersMu.Unlock()
if f, found := api.filters[id]; found { if f, found := api.filters[id]; found {
return f.PopHashes(), nil deadline := f.deadline()
if deadline != nil {
if !deadline.Stop() {
// timer expired but filter is not yet removed in timeout loop
// receive timer value and reset timer
// see https://golang.org/pkg/time/#Timer.Reset
<-deadline.C
} }
deadline.Reset(api.filterLivenessPeriod)
return []common.Hash{}, errors.New("filter not found")
}
// returnHashes is a helper that will return an empty hash array case the given hash array is nil,
// otherwise the given hashes array is returned.
func returnHashes(hashes []common.Hash) []common.Hash {
if hashes == nil {
return []common.Hash{}
} }
return hashes rst := f.pop()
if rst == nil {
return []interface{}{}, nil
}
return rst, nil
}
return []interface{}{}, errors.New("filter not found")
} }

View File

@ -0,0 +1,76 @@
package rpcfilters
import (
"sync"
"testing"
"time"
"github.com/ethereum/go-ethereum/eth/filters"
"github.com/ethereum/go-ethereum/rpc"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestFilterLiveness(t *testing.T) {
api := &PublicAPI{
filters: make(map[rpc.ID]filter),
filterLivenessLoop: 10 * time.Millisecond,
filterLivenessPeriod: 15 * time.Millisecond,
client: func() ContextCaller { return &callTracker{} },
}
id, err := api.NewFilter(filters.FilterCriteria{})
require.NoError(t, err)
quit := make(chan struct{})
var wg sync.WaitGroup
wg.Add(1)
go func() {
api.timeoutLoop(quit)
wg.Done()
}()
tick := time.Tick(10 * time.Millisecond)
after := time.After(100 * time.Millisecond)
func() {
for {
select {
case <-after:
assert.FailNow(t, "filter wasn't removed")
close(quit)
return
case <-tick:
api.filtersMu.Lock()
_, exist := api.filters[id]
api.filtersMu.Unlock()
if !exist {
close(quit)
return
}
}
}
}()
wg.Wait()
}
func TestGetFilterChangesResetsTimer(t *testing.T) {
api := &PublicAPI{
filters: make(map[rpc.ID]filter),
filterLivenessLoop: 10 * time.Millisecond,
filterLivenessPeriod: 15 * time.Millisecond,
client: func() ContextCaller { return &callTracker{} },
}
id, err := api.NewFilter(filters.FilterCriteria{})
require.NoError(t, err)
api.filtersMu.Lock()
f := api.filters[id]
require.True(t, f.deadline().Stop())
fake := make(chan time.Time, 1)
fake <- time.Time{}
f.deadline().C = fake
api.filtersMu.Unlock()
require.False(t, f.deadline().Stop())
// GetFilterChanges will Reset deadline
_, err = api.GetFilterChanges(id)
require.NoError(t, err)
require.True(t, f.deadline().Stop())
}

View File

@ -0,0 +1,56 @@
package rpcfilters
import (
"errors"
"sync"
"time"
"github.com/ethereum/go-ethereum/common"
)
type hashFilter struct {
hashes []common.Hash
mu sync.Mutex
done chan struct{}
timer *time.Timer
}
// add adds a hash to the hashFilter
func (f *hashFilter) add(data interface{}) error {
hash, ok := data.(common.Hash)
if !ok {
return errors.New("provided data is not a common.Hash")
}
f.mu.Lock()
defer f.mu.Unlock()
f.hashes = append(f.hashes, hash)
return nil
}
// pop returns all the hashes stored in the hashFilter and clears the hashFilter contents
func (f *hashFilter) pop() interface{} {
f.mu.Lock()
defer f.mu.Unlock()
hashes := f.hashes
f.hashes = nil
return hashes
}
func (f *hashFilter) stop() {
select {
case <-f.done:
return
default:
close(f.done)
}
}
func (f *hashFilter) deadline() *time.Timer {
return f.timer
}
func newHashFilter() *hashFilter {
return &hashFilter{
done: make(chan struct{}),
}
}

View File

@ -0,0 +1,85 @@
package rpcfilters
import (
"context"
"math/big"
"time"
ethereum "github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc"
)
// ContextCaller provides CallContext method as ethereums rpc.Client.
type ContextCaller interface {
CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error
}
func pollLogs(client ContextCaller, f *logsFilter, timeout, period time.Duration) {
adjusted := false
query := func() {
ctx, cancel := context.WithTimeout(f.ctx, timeout)
logs, err := getLogs(ctx, client, f.crit)
cancel()
if err != nil {
log.Error("failed to get logs", "criteria", f.crit, "error", err)
} else if !adjusted {
adjustFromBlock(&f.crit)
adjusted = true
}
if err := f.add(logs); err != nil {
log.Error("error adding logs", "logs", logs, "error", err)
}
}
query()
latest := time.NewTicker(period)
defer latest.Stop()
for {
select {
case <-latest.C:
query()
case <-f.done:
log.Debug("filter was stopped", "ID", f.id, "crit", f.crit)
return
}
}
}
// adjustFromBlock adjusts crit.FromBlock to the latest to avoid querying same logs multiple times.
func adjustFromBlock(crit *ethereum.FilterQuery) {
latest := big.NewInt(rpc.LatestBlockNumber.Int64())
// don't adjust if filter is not interested in newer blocks
if crit.ToBlock != nil && crit.ToBlock.Cmp(latest) == 1 {
return
}
// don't adjust if from block is already pending
if crit.FromBlock != nil && crit.FromBlock.Cmp(latest) == -1 {
return
}
crit.FromBlock = latest
}
func getLogs(ctx context.Context, client ContextCaller, crit ethereum.FilterQuery) (rst []types.Log, err error) {
return rst, client.CallContext(ctx, &rst, "eth_getLogs", toFilterArg(crit))
}
func toFilterArg(q ethereum.FilterQuery) interface{} {
arg := map[string]interface{}{
"fromBlock": toBlockNumArg(q.FromBlock),
"toBlock": toBlockNumArg(q.ToBlock),
"address": q.Addresses,
"topics": q.Topics,
}
return arg
}
func toBlockNumArg(number *big.Int) string {
if number == nil || number.Int64() == rpc.LatestBlockNumber.Int64() {
return "latest"
} else if number.Int64() == rpc.PendingBlockNumber.Int64() {
return "pending"
}
return hexutil.EncodeBig(number)
}

View File

@ -0,0 +1,145 @@
package rpcfilters
import (
"context"
"errors"
"math/big"
"sync"
"testing"
"time"
ethereum "github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
type callTracker struct {
mu sync.Mutex
calls int
reply [][]types.Log
criterias []map[string]interface{}
}
func (c *callTracker) CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error {
c.mu.Lock()
defer c.mu.Unlock()
c.calls++
if len(args) != 1 {
return errors.New("unexpected length of args")
}
crit := args[0].(map[string]interface{})
c.criterias = append(c.criterias, crit)
select {
case <-ctx.Done():
return errors.New("context canceled")
default:
}
if c.calls <= len(c.reply) {
rst := result.(*[]types.Log)
*rst = c.reply[c.calls-1]
}
return nil
}
func runLogsFetcherTest(t *testing.T, f *logsFilter) *callTracker {
c := callTracker{reply: [][]types.Log{
make([]types.Log, 2),
}}
var wg sync.WaitGroup
wg.Add(1)
go func() {
pollLogs(&c, f, time.Second, 100*time.Millisecond)
wg.Done()
}()
tick := time.Tick(10 * time.Millisecond)
after := time.After(time.Second)
func() {
for {
select {
case <-after:
f.stop()
assert.FailNow(t, "failed waiting for requests")
return
case <-tick:
c.mu.Lock()
num := c.calls
c.mu.Unlock()
if num >= 2 {
f.stop()
return
}
}
}
}()
wg.Wait()
require.Len(t, c.criterias, 2)
return &c
}
func TestLogsFetcherAdjusted(t *testing.T) {
f := &logsFilter{
ctx: context.TODO(),
crit: ethereum.FilterQuery{
FromBlock: big.NewInt(10),
},
done: make(chan struct{}),
}
c := runLogsFetcherTest(t, f)
require.Equal(t, hexutil.EncodeBig(big.NewInt(10)), c.criterias[0]["fromBlock"])
require.Equal(t, c.criterias[1]["fromBlock"], "latest")
}
func TestLogsFetcherCanceledContext(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
f := &logsFilter{
ctx: ctx,
crit: ethereum.FilterQuery{
FromBlock: big.NewInt(10),
},
done: make(chan struct{}),
}
cancel()
c := runLogsFetcherTest(t, f)
require.Equal(t, hexutil.EncodeBig(big.NewInt(10)), c.criterias[0]["fromBlock"])
require.Equal(t, hexutil.EncodeBig(big.NewInt(10)), c.criterias[1]["fromBlock"])
}
func TestAdjustFromBlock(t *testing.T) {
type testCase struct {
description string
initial ethereum.FilterQuery
result ethereum.FilterQuery
}
for _, tc := range []testCase{
{
"ToBlockHigherThenLatest",
ethereum.FilterQuery{ToBlock: big.NewInt(10)},
ethereum.FilterQuery{ToBlock: big.NewInt(10)},
},
{
"FromBlockIsPending",
ethereum.FilterQuery{FromBlock: big.NewInt(-2)},
ethereum.FilterQuery{FromBlock: big.NewInt(-2)},
},
{
"FromBlockIsOlderThenLatest",
ethereum.FilterQuery{FromBlock: big.NewInt(10)},
ethereum.FilterQuery{FromBlock: big.NewInt(-1)},
},
{
"NotInterestedInLatestBlocks",
ethereum.FilterQuery{FromBlock: big.NewInt(10), ToBlock: big.NewInt(15)},
ethereum.FilterQuery{FromBlock: big.NewInt(10), ToBlock: big.NewInt(15)},
},
} {
tc := tc
t.Run(tc.description, func(t *testing.T) {
t.Parallel()
adjustFromBlock(&tc.initial)
require.Equal(t, tc.result, tc.initial)
})
}
}

View File

@ -0,0 +1,132 @@
package rpcfilters
import (
"context"
"errors"
"sync"
"time"
ethereum "github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rpc"
)
type logsFilter struct {
mu sync.Mutex
logs []types.Log
lastSeenBlockNumber uint64
lastSeenBlockHash common.Hash
id rpc.ID
crit ethereum.FilterQuery
timer *time.Timer
ctx context.Context
cancel context.CancelFunc
done chan struct{}
}
func (f *logsFilter) add(data interface{}) error {
logs, ok := data.([]types.Log)
if !ok {
return errors.New("provided value is not a []types.Log")
}
filtered, num, hash := filterLogs(logs, f.crit, f.lastSeenBlockNumber, f.lastSeenBlockHash)
f.mu.Lock()
f.lastSeenBlockNumber = num
f.lastSeenBlockHash = hash
f.logs = append(f.logs, filtered...)
f.mu.Unlock()
return nil
}
func (f *logsFilter) pop() interface{} {
f.mu.Lock()
defer f.mu.Unlock()
rst := f.logs
f.logs = nil
return rst
}
func (f *logsFilter) stop() {
select {
case <-f.done:
return
default:
close(f.done)
if f.cancel != nil {
f.cancel()
}
}
}
func (f *logsFilter) deadline() *time.Timer {
return f.timer
}
func includes(addresses []common.Address, a common.Address) bool {
for _, addr := range addresses {
if addr == a {
return true
}
}
return false
}
// filterLogs creates a slice of logs matching the given criteria.
func filterLogs(logs []types.Log, crit ethereum.FilterQuery, blockNum uint64, blockHash common.Hash) (
ret []types.Log, num uint64, hash common.Hash) {
num = blockNum
hash = blockHash
for _, log := range logs {
// skip logs from seen blocks
// find highest block number that we didnt see before
if log.BlockNumber >= num {
num = log.BlockNumber
hash = log.BlockHash
}
if matchLog(log, crit, blockNum, blockHash) {
ret = append(ret, log)
}
}
return
}
func matchLog(log types.Log, crit ethereum.FilterQuery, blockNum uint64, blockHash common.Hash) bool {
// skip logs from seen blocks
if log.BlockNumber < blockNum {
return false
} else if log.BlockNumber == blockNum && log.BlockHash == blockHash {
return false
}
if crit.FromBlock != nil && crit.FromBlock.Int64() >= 0 && crit.FromBlock.Uint64() > log.BlockNumber {
return false
}
if crit.ToBlock != nil && crit.ToBlock.Int64() >= 0 && crit.ToBlock.Uint64() < log.BlockNumber {
return false
}
if len(crit.Addresses) > 0 && !includes(crit.Addresses, log.Address) {
return false
}
if len(crit.Topics) > len(log.Topics) {
return false
}
return matchTopics(log, crit.Topics)
}
func matchTopics(log types.Log, topics [][]common.Hash) bool {
for i, sub := range topics {
match := len(sub) == 0 // empty rule set == wildcard
for _, topic := range sub {
if log.Topics[i] == topic {
match = true
break
}
}
if !match {
return false
}
}
return true
}

View File

@ -0,0 +1,125 @@
package rpcfilters
import (
"math/big"
"testing"
ethereum "github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/stretchr/testify/require"
)
func TestFilterLogs(t *testing.T) {
logs := []types.Log{
{
BlockNumber: 1,
BlockHash: common.Hash{1, 1},
Address: common.Address{1, 1, 1},
Topics: []common.Hash{
{1},
{1, 1},
},
},
{
BlockNumber: 2,
BlockHash: common.Hash{2, 2},
Address: common.Address{2, 2, 2},
Topics: []common.Hash{
{1},
{2, 2},
},
},
}
type testCase struct {
description string
blockNum uint64
blockHash common.Hash
crit ethereum.FilterQuery
expectedLogs []types.Log
expectedBlock uint64
expectedHash common.Hash
}
for _, tc := range []testCase{
{
description: "All",
crit: ethereum.FilterQuery{},
expectedLogs: []types.Log{logs[0], logs[1]},
expectedBlock: logs[1].BlockNumber,
expectedHash: logs[1].BlockHash,
},
{
description: "LimitedByBlock",
crit: ethereum.FilterQuery{ToBlock: big.NewInt(1)},
expectedLogs: []types.Log{logs[0]},
expectedBlock: logs[1].BlockNumber,
expectedHash: logs[1].BlockHash,
},
{
description: "LimitedByAddress",
crit: ethereum.FilterQuery{Addresses: []common.Address{logs[1].Address}},
expectedLogs: []types.Log{logs[1]},
expectedBlock: logs[1].BlockNumber,
expectedHash: logs[1].BlockHash,
},
{
description: "LimitedByAddress",
crit: ethereum.FilterQuery{Addresses: []common.Address{logs[1].Address}},
expectedLogs: []types.Log{logs[1]},
expectedBlock: logs[1].BlockNumber,
expectedHash: logs[1].BlockHash,
},
{
description: "MoreTopicsThanInLogs",
crit: ethereum.FilterQuery{Topics: make([][]common.Hash, 3)},
expectedBlock: logs[1].BlockNumber,
expectedHash: logs[1].BlockHash,
},
{
description: "Wildcard",
crit: ethereum.FilterQuery{Topics: make([][]common.Hash, 1)},
expectedLogs: []types.Log{logs[0], logs[1]},
expectedBlock: logs[1].BlockNumber,
expectedHash: logs[1].BlockHash,
},
{
description: "LimitedBySecondTopic",
crit: ethereum.FilterQuery{Topics: [][]common.Hash{
{}, logs[1].Topics}},
expectedLogs: []types.Log{logs[1]},
expectedBlock: logs[1].BlockNumber,
expectedHash: logs[1].BlockHash,
},
{
blockNum: logs[1].BlockNumber,
blockHash: logs[1].BlockHash,
description: "LimitedBySeenBlock",
crit: ethereum.FilterQuery{},
expectedBlock: logs[1].BlockNumber,
expectedHash: logs[1].BlockHash,
},
{
blockNum: logs[1].BlockNumber,
blockHash: common.Hash{7, 7, 7},
description: "SeenBlockDifferenthash",
crit: ethereum.FilterQuery{},
expectedLogs: []types.Log{logs[1]},
expectedBlock: logs[1].BlockNumber,
expectedHash: logs[1].BlockHash,
},
} {
tc := tc
t.Run(tc.description, func(t *testing.T) {
t.Parallel()
rst, num, hash := filterLogs(logs, tc.crit, tc.blockNum, tc.blockHash)
require.Equal(t, tc.expectedLogs, rst)
require.Equal(t, tc.expectedBlock, num)
require.Equal(t, tc.expectedHash, hash)
})
}
}

View File

@ -14,6 +14,9 @@ var _ node.Service = (*Service)(nil)
type Service struct { type Service struct {
latestBlockChangedEvent *latestBlockChangedEvent latestBlockChangedEvent *latestBlockChangedEvent
transactionSentToUpstreamEvent *transactionSentToUpstreamEvent transactionSentToUpstreamEvent *transactionSentToUpstreamEvent
rpc rpcProvider
quit chan struct{}
} }
// New returns a new Service. // New returns a new Service.
@ -22,8 +25,9 @@ func New(rpc rpcProvider) *Service {
latestBlockChangedEvent := newLatestBlockChangedEvent(provider) latestBlockChangedEvent := newLatestBlockChangedEvent(provider)
transactionSentToUpstreamEvent := newTransactionSentToUpstreamEvent() transactionSentToUpstreamEvent := newTransactionSentToUpstreamEvent()
return &Service{ return &Service{
latestBlockChangedEvent, latestBlockChangedEvent: latestBlockChangedEvent,
transactionSentToUpstreamEvent, transactionSentToUpstreamEvent: transactionSentToUpstreamEvent,
rpc: rpc,
} }
} }
@ -38,9 +42,7 @@ func (s *Service) APIs() []rpc.API {
{ {
Namespace: "eth", Namespace: "eth",
Version: "1.0", Version: "1.0",
Service: NewPublicAPI( Service: NewPublicAPI(s),
s.latestBlockChangedEvent,
s.transactionSentToUpstreamEvent),
Public: true, Public: true,
}, },
} }
@ -48,6 +50,7 @@ func (s *Service) APIs() []rpc.API {
// Start is run when a service is started. // Start is run when a service is started.
func (s *Service) Start(server *p2p.Server) error { func (s *Service) Start(server *p2p.Server) error {
s.quit = make(chan struct{})
err := s.transactionSentToUpstreamEvent.Start() err := s.transactionSentToUpstreamEvent.Start()
if err != nil { if err != nil {
return err return err
@ -57,6 +60,7 @@ func (s *Service) Start(server *p2p.Server) error {
// Stop is run when a service is stopped. // Stop is run when a service is stopped.
func (s *Service) Stop() error { func (s *Service) Stop() error {
close(s.quit)
s.transactionSentToUpstreamEvent.Stop() s.transactionSentToUpstreamEvent.Stop()
s.latestBlockChangedEvent.Stop() s.latestBlockChangedEvent.Stop()
return nil return nil

149
t/devtests/devnode.go Normal file
View File

@ -0,0 +1,149 @@
package devtests
import (
"crypto/ecdsa"
"io/ioutil"
"math/big"
"os"
"github.com/ethereum/go-ethereum/accounts/keystore"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/eth"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/rpc"
"github.com/status-im/status-go/api"
"github.com/status-im/status-go/params"
statusrpc "github.com/status-im/status-go/rpc"
"github.com/stretchr/testify/suite"
)
// NewDevNode returns node with clieque engine and prefunded accounts.
func NewDevNode(faucet common.Address) (*node.Node, error) {
cfg := node.DefaultConfig
ipc, err := ioutil.TempFile("", "devnode-ipc-")
if err != nil {
return nil, err
}
cfg.IPCPath = ipc.Name()
cfg.HTTPModules = []string{"eth"}
cfg.DataDir = ""
cfg.P2P.MaxPeers = 0
cfg.P2P.ListenAddr = ":0"
cfg.P2P.NoDiscovery = true
cfg.P2P.DiscoveryV5 = false
stack, err := node.New(&cfg)
if err != nil {
return nil, err
}
ks := stack.AccountManager().Backends(keystore.KeyStoreType)[0].(*keystore.KeyStore)
// ensure that etherbase is added to an account manager
etherbase, err := crypto.GenerateKey()
if err != nil {
return nil, err
}
acc, err := ks.ImportECDSA(etherbase, "")
if err != nil {
return nil, err
}
err = ks.Unlock(acc, "")
if err != nil {
return nil, err
}
ethcfg := eth.DefaultConfig
ethcfg.NetworkId = 1337
// 0 - mine only if transaction pending
ethcfg.Genesis = core.DeveloperGenesisBlock(0, faucet)
extra := make([]byte, 32) // extraVanity
extra = append(extra, acc.Address[:]...)
extra = append(extra, make([]byte, 65)...) // extraSeal
ethcfg.Genesis.ExtraData = extra
ethcfg.MinerGasPrice = big.NewInt(1)
ethcfg.Etherbase = acc.Address
return stack, stack.Register(func(ctx *node.ServiceContext) (node.Service, error) {
return eth.New(ctx, &ethcfg)
})
}
// StartWithMiner starts node with eth service and a miner.
func StartWithMiner(stack *node.Node) error {
err := stack.Start()
if err != nil {
return err
}
var ethereum *eth.Ethereum
err = stack.Service(&ethereum)
if err != nil {
return err
}
ethereum.TxPool().SetGasPrice(big.NewInt(1))
return ethereum.StartMining(0)
}
// DevNodeSuite provides convenient wrapper for starting node with clique backend for mining.
type DevNodeSuite struct {
suite.Suite
Remote *rpc.Client
Eth *ethclient.Client
Local *statusrpc.Client
DevAccount *ecdsa.PrivateKey
DevAccountAddress common.Address
dir string
backend *api.StatusBackend
miner *node.Node
}
// SetupTest creates clique node and status node with an rpc connection to a clique node.
func (s *DevNodeSuite) SetupTest() {
account, err := crypto.GenerateKey()
s.Require().NoError(err)
s.DevAccount = account
s.DevAccountAddress = crypto.PubkeyToAddress(account.PublicKey)
s.miner, err = NewDevNode(s.DevAccountAddress)
s.Require().NoError(err)
s.Require().NoError(StartWithMiner(s.miner))
s.dir, err = ioutil.TempDir("", "devtests-")
s.Require().NoError(err)
config, err := params.NewNodeConfig(
s.dir,
1337,
)
s.Require().NoError(err)
config.WhisperConfig.Enabled = false
config.LightEthConfig.Enabled = false
config.UpstreamConfig.Enabled = true
config.UpstreamConfig.URL = s.miner.IPCEndpoint()
s.backend = api.NewStatusBackend()
s.Require().NoError(s.backend.StartNode(config))
s.Remote, err = s.miner.Attach()
s.Require().NoError(err)
s.Eth = ethclient.NewClient(s.Remote)
s.Local = s.backend.StatusNode().RPCClient()
}
// TearDownTest stops status node and clique node.
func (s *DevNodeSuite) TearDownTest() {
if s.miner != nil {
s.Require().NoError(s.miner.Stop())
s.miner = nil
}
if s.backend != nil {
s.Require().NoError(s.backend.StopNode())
s.backend = nil
}
if len(s.dir) != 0 {
os.RemoveAll(s.dir)
s.dir = ""
}
}

View File

@ -0,0 +1,366 @@
// Code generated - DO NOT EDIT.
// This file is a generated binding and any manual changes will be lost.
package eventer
import (
"math/big"
"strings"
ethereum "github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/event"
)
// EventerABI is the input ABI used to generate the binding from.
const EventerABI = "[{\"constant\":true,\"inputs\":[],\"name\":\"crt\",\"outputs\":[{\"name\":\"\",\"type\":\"int256\"}],\"payable\":false,\"stateMutability\":\"view\",\"type\":\"function\"},{\"constant\":false,\"inputs\":[{\"name\":\"topic\",\"type\":\"bytes32\"}],\"name\":\"emit\",\"outputs\":[],\"payable\":false,\"stateMutability\":\"nonpayable\",\"type\":\"function\"},{\"inputs\":[],\"payable\":false,\"stateMutability\":\"nonpayable\",\"type\":\"constructor\"},{\"anonymous\":false,\"inputs\":[{\"indexed\":true,\"name\":\"topic\",\"type\":\"bytes32\"},{\"indexed\":true,\"name\":\"crt\",\"type\":\"int256\"}],\"name\":\"Message\",\"type\":\"event\"}]"
// EventerBin is the compiled bytecode used for deploying new contracts.
const EventerBin = `0x608060405234801561001057600080fd5b5060f28061001f6000396000f30060806040526004361060485763ffffffff7c01000000000000000000000000000000000000000000000000000000006000350416630868aad68114604d578063eee17820146071575b600080fd5b348015605857600080fd5b50605f6088565b60408051918252519081900360200190f35b348015607c57600080fd5b506086600435608e565b005b60005481565b60008054600101808255604051909183917fc3dab353f3a8451adb4c3071c9df72eebc7e900383c3295d66fe939bba21e1c99190a3505600a165627a7a72305820121895a4c6e091225d36a33443bf84f57560aa732841830ddf282def477ba9570029`
// DeployEventer deploys a new Ethereum contract, binding an instance of Eventer to it.
func DeployEventer(auth *bind.TransactOpts, backend bind.ContractBackend) (common.Address, *types.Transaction, *Eventer, error) {
parsed, err := abi.JSON(strings.NewReader(EventerABI))
if err != nil {
return common.Address{}, nil, nil, err
}
address, tx, contract, err := bind.DeployContract(auth, parsed, common.FromHex(EventerBin), backend)
if err != nil {
return common.Address{}, nil, nil, err
}
return address, tx, &Eventer{EventerCaller: EventerCaller{contract: contract}, EventerTransactor: EventerTransactor{contract: contract}, EventerFilterer: EventerFilterer{contract: contract}}, nil
}
// Eventer is an auto generated Go binding around an Ethereum contract.
type Eventer struct {
EventerCaller // Read-only binding to the contract
EventerTransactor // Write-only binding to the contract
EventerFilterer // Log filterer for contract events
}
// EventerCaller is an auto generated read-only Go binding around an Ethereum contract.
type EventerCaller struct {
contract *bind.BoundContract // Generic contract wrapper for the low level calls
}
// EventerTransactor is an auto generated write-only Go binding around an Ethereum contract.
type EventerTransactor struct {
contract *bind.BoundContract // Generic contract wrapper for the low level calls
}
// EventerFilterer is an auto generated log filtering Go binding around an Ethereum contract events.
type EventerFilterer struct {
contract *bind.BoundContract // Generic contract wrapper for the low level calls
}
// EventerSession is an auto generated Go binding around an Ethereum contract,
// with pre-set call and transact options.
type EventerSession struct {
Contract *Eventer // Generic contract binding to set the session for
CallOpts bind.CallOpts // Call options to use throughout this session
TransactOpts bind.TransactOpts // Transaction auth options to use throughout this session
}
// EventerCallerSession is an auto generated read-only Go binding around an Ethereum contract,
// with pre-set call options.
type EventerCallerSession struct {
Contract *EventerCaller // Generic contract caller binding to set the session for
CallOpts bind.CallOpts // Call options to use throughout this session
}
// EventerTransactorSession is an auto generated write-only Go binding around an Ethereum contract,
// with pre-set transact options.
type EventerTransactorSession struct {
Contract *EventerTransactor // Generic contract transactor binding to set the session for
TransactOpts bind.TransactOpts // Transaction auth options to use throughout this session
}
// EventerRaw is an auto generated low-level Go binding around an Ethereum contract.
type EventerRaw struct {
Contract *Eventer // Generic contract binding to access the raw methods on
}
// EventerCallerRaw is an auto generated low-level read-only Go binding around an Ethereum contract.
type EventerCallerRaw struct {
Contract *EventerCaller // Generic read-only contract binding to access the raw methods on
}
// EventerTransactorRaw is an auto generated low-level write-only Go binding around an Ethereum contract.
type EventerTransactorRaw struct {
Contract *EventerTransactor // Generic write-only contract binding to access the raw methods on
}
// NewEventer creates a new instance of Eventer, bound to a specific deployed contract.
func NewEventer(address common.Address, backend bind.ContractBackend) (*Eventer, error) {
contract, err := bindEventer(address, backend, backend, backend)
if err != nil {
return nil, err
}
return &Eventer{EventerCaller: EventerCaller{contract: contract}, EventerTransactor: EventerTransactor{contract: contract}, EventerFilterer: EventerFilterer{contract: contract}}, nil
}
// NewEventerCaller creates a new read-only instance of Eventer, bound to a specific deployed contract.
func NewEventerCaller(address common.Address, caller bind.ContractCaller) (*EventerCaller, error) {
contract, err := bindEventer(address, caller, nil, nil)
if err != nil {
return nil, err
}
return &EventerCaller{contract: contract}, nil
}
// NewEventerTransactor creates a new write-only instance of Eventer, bound to a specific deployed contract.
func NewEventerTransactor(address common.Address, transactor bind.ContractTransactor) (*EventerTransactor, error) {
contract, err := bindEventer(address, nil, transactor, nil)
if err != nil {
return nil, err
}
return &EventerTransactor{contract: contract}, nil
}
// NewEventerFilterer creates a new log filterer instance of Eventer, bound to a specific deployed contract.
func NewEventerFilterer(address common.Address, filterer bind.ContractFilterer) (*EventerFilterer, error) {
contract, err := bindEventer(address, nil, nil, filterer)
if err != nil {
return nil, err
}
return &EventerFilterer{contract: contract}, nil
}
// bindEventer binds a generic wrapper to an already deployed contract.
func bindEventer(address common.Address, caller bind.ContractCaller, transactor bind.ContractTransactor, filterer bind.ContractFilterer) (*bind.BoundContract, error) {
parsed, err := abi.JSON(strings.NewReader(EventerABI))
if err != nil {
return nil, err
}
return bind.NewBoundContract(address, parsed, caller, transactor, filterer), nil
}
// Call invokes the (constant) contract method with params as input values and
// sets the output to result. The result type might be a single field for simple
// returns, a slice of interfaces for anonymous returns and a struct for named
// returns.
func (_Eventer *EventerRaw) Call(opts *bind.CallOpts, result interface{}, method string, params ...interface{}) error {
return _Eventer.Contract.EventerCaller.contract.Call(opts, result, method, params...)
}
// Transfer initiates a plain transaction to move funds to the contract, calling
// its default method if one is available.
func (_Eventer *EventerRaw) Transfer(opts *bind.TransactOpts) (*types.Transaction, error) {
return _Eventer.Contract.EventerTransactor.contract.Transfer(opts)
}
// Transact invokes the (paid) contract method with params as input values.
func (_Eventer *EventerRaw) Transact(opts *bind.TransactOpts, method string, params ...interface{}) (*types.Transaction, error) {
return _Eventer.Contract.EventerTransactor.contract.Transact(opts, method, params...)
}
// Call invokes the (constant) contract method with params as input values and
// sets the output to result. The result type might be a single field for simple
// returns, a slice of interfaces for anonymous returns and a struct for named
// returns.
func (_Eventer *EventerCallerRaw) Call(opts *bind.CallOpts, result interface{}, method string, params ...interface{}) error {
return _Eventer.Contract.contract.Call(opts, result, method, params...)
}
// Transfer initiates a plain transaction to move funds to the contract, calling
// its default method if one is available.
func (_Eventer *EventerTransactorRaw) Transfer(opts *bind.TransactOpts) (*types.Transaction, error) {
return _Eventer.Contract.contract.Transfer(opts)
}
// Transact invokes the (paid) contract method with params as input values.
func (_Eventer *EventerTransactorRaw) Transact(opts *bind.TransactOpts, method string, params ...interface{}) (*types.Transaction, error) {
return _Eventer.Contract.contract.Transact(opts, method, params...)
}
// Crt is a free data retrieval call binding the contract method 0x0868aad6.
//
// Solidity: function crt() constant returns(int256)
func (_Eventer *EventerCaller) Crt(opts *bind.CallOpts) (*big.Int, error) {
var (
ret0 = new(*big.Int)
)
out := ret0
err := _Eventer.contract.Call(opts, out, "crt")
return *ret0, err
}
// Crt is a free data retrieval call binding the contract method 0x0868aad6.
//
// Solidity: function crt() constant returns(int256)
func (_Eventer *EventerSession) Crt() (*big.Int, error) {
return _Eventer.Contract.Crt(&_Eventer.CallOpts)
}
// Crt is a free data retrieval call binding the contract method 0x0868aad6.
//
// Solidity: function crt() constant returns(int256)
func (_Eventer *EventerCallerSession) Crt() (*big.Int, error) {
return _Eventer.Contract.Crt(&_Eventer.CallOpts)
}
// Emit is a paid mutator transaction binding the contract method 0xeee17820.
//
// Solidity: function emit(topic bytes32) returns()
func (_Eventer *EventerTransactor) Emit(opts *bind.TransactOpts, topic [32]byte) (*types.Transaction, error) {
return _Eventer.contract.Transact(opts, "emit", topic)
}
// Emit is a paid mutator transaction binding the contract method 0xeee17820.
//
// Solidity: function emit(topic bytes32) returns()
func (_Eventer *EventerSession) Emit(topic [32]byte) (*types.Transaction, error) {
return _Eventer.Contract.Emit(&_Eventer.TransactOpts, topic)
}
// Emit is a paid mutator transaction binding the contract method 0xeee17820.
//
// Solidity: function emit(topic bytes32) returns()
func (_Eventer *EventerTransactorSession) Emit(topic [32]byte) (*types.Transaction, error) {
return _Eventer.Contract.Emit(&_Eventer.TransactOpts, topic)
}
// EventerMessageIterator is returned from FilterMessage and is used to iterate over the raw logs and unpacked data for Message events raised by the Eventer contract.
type EventerMessageIterator struct {
Event *EventerMessage // Event containing the contract specifics and raw log
contract *bind.BoundContract // Generic contract to use for unpacking event data
event string // Event name to use for unpacking event data
logs chan types.Log // Log channel receiving the found contract events
sub ethereum.Subscription // Subscription for errors, completion and termination
done bool // Whether the subscription completed delivering logs
fail error // Occurred error to stop iteration
}
// Next advances the iterator to the subsequent event, returning whether there
// are any more events found. In case of a retrieval or parsing error, false is
// returned and Error() can be queried for the exact failure.
func (it *EventerMessageIterator) Next() bool {
// If the iterator failed, stop iterating
if it.fail != nil {
return false
}
// If the iterator completed, deliver directly whatever's available
if it.done {
select {
case log := <-it.logs:
it.Event = new(EventerMessage)
if err := it.contract.UnpackLog(it.Event, it.event, log); err != nil {
it.fail = err
return false
}
it.Event.Raw = log
return true
default:
return false
}
}
// Iterator still in progress, wait for either a data or an error event
select {
case log := <-it.logs:
it.Event = new(EventerMessage)
if err := it.contract.UnpackLog(it.Event, it.event, log); err != nil {
it.fail = err
return false
}
it.Event.Raw = log
return true
case err := <-it.sub.Err():
it.done = true
it.fail = err
return it.Next()
}
}
// Error returns any retrieval or parsing error occurred during filtering.
func (it *EventerMessageIterator) Error() error {
return it.fail
}
// Close terminates the iteration process, releasing any pending underlying
// resources.
func (it *EventerMessageIterator) Close() error {
it.sub.Unsubscribe()
return nil
}
// EventerMessage represents a Message event raised by the Eventer contract.
type EventerMessage struct {
Topic [32]byte
Crt *big.Int
Raw types.Log // Blockchain specific contextual infos
}
// FilterMessage is a free log retrieval operation binding the contract event 0xc3dab353f3a8451adb4c3071c9df72eebc7e900383c3295d66fe939bba21e1c9.
//
// Solidity: e Message(topic indexed bytes32, crt indexed int256)
func (_Eventer *EventerFilterer) FilterMessage(opts *bind.FilterOpts, topic [][32]byte, crt []*big.Int) (*EventerMessageIterator, error) {
var topicRule []interface{}
for _, topicItem := range topic {
topicRule = append(topicRule, topicItem)
}
var crtRule []interface{}
for _, crtItem := range crt {
crtRule = append(crtRule, crtItem)
}
logs, sub, err := _Eventer.contract.FilterLogs(opts, "Message", topicRule, crtRule)
if err != nil {
return nil, err
}
return &EventerMessageIterator{contract: _Eventer.contract, event: "Message", logs: logs, sub: sub}, nil
}
// WatchMessage is a free log subscription operation binding the contract event 0xc3dab353f3a8451adb4c3071c9df72eebc7e900383c3295d66fe939bba21e1c9.
//
// Solidity: e Message(topic indexed bytes32, crt indexed int256)
func (_Eventer *EventerFilterer) WatchMessage(opts *bind.WatchOpts, sink chan<- *EventerMessage, topic [][32]byte, crt []*big.Int) (event.Subscription, error) {
var topicRule []interface{}
for _, topicItem := range topic {
topicRule = append(topicRule, topicItem)
}
var crtRule []interface{}
for _, crtItem := range crt {
crtRule = append(crtRule, crtItem)
}
logs, sub, err := _Eventer.contract.WatchLogs(opts, "Message", topicRule, crtRule)
if err != nil {
return nil, err
}
return event.NewSubscription(func(quit <-chan struct{}) error {
defer sub.Unsubscribe()
for {
select {
case log := <-logs:
// New log arrived, parse the event and forward to the user
event := new(EventerMessage)
if err := _Eventer.contract.UnpackLog(event, "Message", log); err != nil {
return err
}
event.Raw = log
select {
case sink <- event:
case err := <-sub.Err():
return err
case <-quit:
return nil
}
case err := <-sub.Err():
return err
case <-quit:
return nil
}
}
}), nil
}

View File

@ -0,0 +1,18 @@
pragma solidity ^0.4.21;
contract Eventer {
int public crt;
event Message(
bytes32 indexed topic,
int indexed crt
);
constructor() public {}
function emit(bytes32 topic) public {
crt += 1;
emit Message(topic, crt);
}
}

View File

@ -0,0 +1,79 @@
package devtests
import (
"context"
"errors"
"testing"
"time"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/status-im/status-go/t/devtests/eventer"
"github.com/status-im/status-go/t/utils"
"github.com/stretchr/testify/suite"
)
func TestLogsSuite(t *testing.T) {
suite.Run(t, new(LogsSuite))
}
type LogsSuite struct {
DevNodeSuite
}
func (s *LogsSuite) testEmit(event *eventer.Eventer, opts *bind.TransactOpts, id string, topic [32]byte, expect int) {
tx, err := event.Emit(opts, topic)
s.NoError(err)
timeout, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_, err = bind.WaitMined(timeout, s.Eth, tx)
s.NoError(err)
s.NoError(utils.Eventually(func() error {
var logs []types.Log
err := s.Local.Call(&logs, "eth_getFilterChanges", id)
if err != nil {
return err
}
if len(logs) == 0 {
return errors.New("no logs")
}
if len(logs) > expect {
return errors.New("more logs than expected")
}
return nil
}, 10*time.Second, time.Second))
}
func (s *LogsSuite) TestLogsNewFilter() {
var (
fid, sid, tid string
ftopic, stopic = [32]byte{1}, [32]byte{2}
)
s.Require().NoError(s.Local.Call(&fid, "eth_newFilter", map[string]interface{}{
"topics": [][]common.Hash{
{},
{common.BytesToHash(ftopic[:])},
},
}))
s.Require().NoError(s.Local.Call(&sid, "eth_newFilter", map[string]interface{}{
"topics": [][]common.Hash{
{},
{common.BytesToHash(stopic[:])},
},
}))
s.Require().NoError(s.Local.Call(&tid, "eth_newFilter", map[string]interface{}{}))
opts := bind.NewKeyedTransactor(s.DevAccount)
_, tx, event, err := eventer.DeployEventer(opts, s.Eth)
s.Require().NoError(err)
timeout, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
addr, err := bind.WaitDeployed(timeout, s.Eth, tx)
s.Require().NoError(err)
s.Require().NotEmpty(addr)
s.testEmit(event, opts, fid, ftopic, 1)
s.testEmit(event, opts, sid, stopic, 1)
s.testEmit(event, opts, fid, ftopic, 1)
}

View File

@ -414,3 +414,22 @@ func copyFile(src, dst string) error {
} }
return nil return nil
} }
// Eventually will raise error if condition won't be met during the given timeout.
func Eventually(f func() error, timeout, period time.Duration) (err error) {
timer := time.NewTimer(timeout)
defer timer.Stop()
ticker := time.NewTicker(period)
defer ticker.Stop()
for {
select {
case <-timer.C:
return
case <-ticker.C:
err = f()
if err == nil {
return nil
}
}
}
}