diff --git a/services/rpcfilters/api.go b/services/rpcfilters/api.go index 5352de7f5..aa7ff591d 100644 --- a/services/rpcfilters/api.go +++ b/services/rpcfilters/api.go @@ -1,58 +1,102 @@ package rpcfilters import ( + "context" "errors" "sync" + "time" - "github.com/ethereum/go-ethereum/common" + ethereum "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/eth/filters" + "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/rpc" "github.com/pborman/uuid" ) -type filter struct { - hashes []common.Hash - mu sync.Mutex - done chan struct{} -} +const ( + defaultFilterLivenessPeriod = 5 * time.Minute + defaultLogsPeriod = 3 * time.Second + defaultLogsQueryTimeout = 10 * time.Second +) -// AddHash adds a hash to the filter -func (f *filter) AddHash(hash common.Hash) { - f.mu.Lock() - defer f.mu.Unlock() - f.hashes = append(f.hashes, hash) -} - -// PopHashes returns all the hashes stored in the filter and clears the filter contents -func (f *filter) PopHashes() []common.Hash { - f.mu.Lock() - defer f.mu.Unlock() - hashes := f.hashes - f.hashes = nil - return returnHashes(hashes) -} - -func newFilter() *filter { - return &filter{ - done: make(chan struct{}), - } +type filter interface { + add(interface{}) error + pop() interface{} + stop() + deadline() *time.Timer } // PublicAPI represents filter API that is exported to `eth` namespace type PublicAPI struct { - filters map[rpc.ID]*filter - filtersMu sync.Mutex + filtersMu sync.Mutex + filters map[rpc.ID]filter + + // filterLivenessLoop defines how often timeout loop is executed + filterLivenessLoop time.Duration + // filter liveness increased by this period when changes are requested + filterLivenessPeriod time.Duration + + client func() ContextCaller + latestBlockChangedEvent *latestBlockChangedEvent transactionSentToUpstreamEvent *transactionSentToUpstreamEvent } // NewPublicAPI returns a reference to the PublicAPI object -func NewPublicAPI(latestBlockChangedEvent *latestBlockChangedEvent, - transactionSentToUpstreamEvent *transactionSentToUpstreamEvent) *PublicAPI { - return &PublicAPI{ - filters: make(map[rpc.ID]*filter), - latestBlockChangedEvent: latestBlockChangedEvent, - transactionSentToUpstreamEvent: transactionSentToUpstreamEvent, +func NewPublicAPI(s *Service) *PublicAPI { + api := &PublicAPI{ + filters: make(map[rpc.ID]filter), + latestBlockChangedEvent: s.latestBlockChangedEvent, + transactionSentToUpstreamEvent: s.transactionSentToUpstreamEvent, + client: func() ContextCaller { return s.rpc.RPCClient() }, + filterLivenessLoop: defaultFilterLivenessPeriod, + filterLivenessPeriod: defaultFilterLivenessPeriod + 10*time.Second, } + go api.timeoutLoop(s.quit) + return api +} + +func (api *PublicAPI) timeoutLoop(quit chan struct{}) { + for { + select { + case <-quit: + return + case <-time.After(api.filterLivenessLoop): + api.filtersMu.Lock() + for id, f := range api.filters { + deadline := f.deadline() + if deadline == nil { + continue + } + select { + case <-deadline.C: + delete(api.filters, id) + f.stop() + default: + continue + } + } + api.filtersMu.Unlock() + } + } +} + +func (api *PublicAPI) NewFilter(crit filters.FilterCriteria) (rpc.ID, error) { + id := rpc.ID(uuid.New()) + ctx, cancel := context.WithCancel(context.Background()) + f := &logsFilter{ + id: id, + crit: ethereum.FilterQuery(crit), + done: make(chan struct{}), + timer: time.NewTimer(api.filterLivenessPeriod), + ctx: ctx, + cancel: cancel, + } + api.filtersMu.Lock() + api.filters[id] = f + api.filtersMu.Unlock() + go pollLogs(api.client(), f, defaultLogsQueryTimeout, defaultLogsPeriod) + return id, nil } // NewBlockFilter is an implemenation of `eth_newBlockFilter` API @@ -61,7 +105,7 @@ func (api *PublicAPI) NewBlockFilter() rpc.ID { api.filtersMu.Lock() defer api.filtersMu.Unlock() - f := newFilter() + f := newHashFilter() id := rpc.ID(uuid.New()) api.filters[id] = f @@ -73,7 +117,9 @@ func (api *PublicAPI) NewBlockFilter() rpc.ID { for { select { case hash := <-s: - f.AddHash(hash) + if err := f.add(hash); err != nil { + log.Error("error adding value to filter", "hash", hash, "error", err) + } case <-f.done: return } @@ -90,7 +136,7 @@ func (api *PublicAPI) NewPendingTransactionFilter() rpc.ID { api.filtersMu.Lock() defer api.filtersMu.Unlock() - f := newFilter() + f := newHashFilter() id := rpc.ID(uuid.New()) api.filters[id] = f @@ -102,7 +148,9 @@ func (api *PublicAPI) NewPendingTransactionFilter() rpc.ID { for { select { case hash := <-s: - f.AddHash(hash) + if err := f.add(hash); err != nil { + log.Error("error adding value to filter", "hash", hash, "error", err) + } case <-f.done: return } @@ -125,7 +173,7 @@ func (api *PublicAPI) UninstallFilter(id rpc.ID) bool { api.filtersMu.Unlock() if found { - close(f.done) + f.stop() } return found @@ -135,22 +183,26 @@ func (api *PublicAPI) UninstallFilter(id rpc.ID) bool { // last time it was called. This can be used for polling. // // https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getfilterchanges -func (api *PublicAPI) GetFilterChanges(id rpc.ID) ([]common.Hash, error) { +func (api *PublicAPI) GetFilterChanges(id rpc.ID) (interface{}, error) { api.filtersMu.Lock() defer api.filtersMu.Unlock() if f, found := api.filters[id]; found { - return f.PopHashes(), nil + deadline := f.deadline() + if deadline != nil { + if !deadline.Stop() { + // timer expired but filter is not yet removed in timeout loop + // receive timer value and reset timer + // see https://golang.org/pkg/time/#Timer.Reset + <-deadline.C + } + deadline.Reset(api.filterLivenessPeriod) + } + rst := f.pop() + if rst == nil { + return []interface{}{}, nil + } + return rst, nil } - - return []common.Hash{}, errors.New("filter not found") -} - -// returnHashes is a helper that will return an empty hash array case the given hash array is nil, -// otherwise the given hashes array is returned. -func returnHashes(hashes []common.Hash) []common.Hash { - if hashes == nil { - return []common.Hash{} - } - return hashes + return []interface{}{}, errors.New("filter not found") } diff --git a/services/rpcfilters/api_test.go b/services/rpcfilters/api_test.go new file mode 100644 index 000000000..193a9f74c --- /dev/null +++ b/services/rpcfilters/api_test.go @@ -0,0 +1,76 @@ +package rpcfilters + +import ( + "sync" + "testing" + "time" + + "github.com/ethereum/go-ethereum/eth/filters" + "github.com/ethereum/go-ethereum/rpc" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestFilterLiveness(t *testing.T) { + api := &PublicAPI{ + filters: make(map[rpc.ID]filter), + filterLivenessLoop: 10 * time.Millisecond, + filterLivenessPeriod: 15 * time.Millisecond, + client: func() ContextCaller { return &callTracker{} }, + } + id, err := api.NewFilter(filters.FilterCriteria{}) + require.NoError(t, err) + quit := make(chan struct{}) + var wg sync.WaitGroup + wg.Add(1) + go func() { + api.timeoutLoop(quit) + wg.Done() + }() + tick := time.Tick(10 * time.Millisecond) + after := time.After(100 * time.Millisecond) + func() { + for { + select { + case <-after: + assert.FailNow(t, "filter wasn't removed") + close(quit) + return + case <-tick: + api.filtersMu.Lock() + _, exist := api.filters[id] + api.filtersMu.Unlock() + if !exist { + close(quit) + return + } + } + } + }() + wg.Wait() +} + +func TestGetFilterChangesResetsTimer(t *testing.T) { + api := &PublicAPI{ + filters: make(map[rpc.ID]filter), + filterLivenessLoop: 10 * time.Millisecond, + filterLivenessPeriod: 15 * time.Millisecond, + client: func() ContextCaller { return &callTracker{} }, + } + id, err := api.NewFilter(filters.FilterCriteria{}) + require.NoError(t, err) + + api.filtersMu.Lock() + f := api.filters[id] + require.True(t, f.deadline().Stop()) + fake := make(chan time.Time, 1) + fake <- time.Time{} + f.deadline().C = fake + api.filtersMu.Unlock() + + require.False(t, f.deadline().Stop()) + // GetFilterChanges will Reset deadline + _, err = api.GetFilterChanges(id) + require.NoError(t, err) + require.True(t, f.deadline().Stop()) +} diff --git a/services/rpcfilters/hash_filter.go b/services/rpcfilters/hash_filter.go new file mode 100644 index 000000000..de0ce0607 --- /dev/null +++ b/services/rpcfilters/hash_filter.go @@ -0,0 +1,56 @@ +package rpcfilters + +import ( + "errors" + "sync" + "time" + + "github.com/ethereum/go-ethereum/common" +) + +type hashFilter struct { + hashes []common.Hash + mu sync.Mutex + done chan struct{} + timer *time.Timer +} + +// add adds a hash to the hashFilter +func (f *hashFilter) add(data interface{}) error { + hash, ok := data.(common.Hash) + if !ok { + return errors.New("provided data is not a common.Hash") + } + f.mu.Lock() + defer f.mu.Unlock() + f.hashes = append(f.hashes, hash) + return nil +} + +// pop returns all the hashes stored in the hashFilter and clears the hashFilter contents +func (f *hashFilter) pop() interface{} { + f.mu.Lock() + defer f.mu.Unlock() + hashes := f.hashes + f.hashes = nil + return hashes +} + +func (f *hashFilter) stop() { + select { + case <-f.done: + return + default: + close(f.done) + } +} + +func (f *hashFilter) deadline() *time.Timer { + return f.timer +} + +func newHashFilter() *hashFilter { + return &hashFilter{ + done: make(chan struct{}), + } +} diff --git a/services/rpcfilters/latest_logs.go b/services/rpcfilters/latest_logs.go new file mode 100644 index 000000000..521892d98 --- /dev/null +++ b/services/rpcfilters/latest_logs.go @@ -0,0 +1,85 @@ +package rpcfilters + +import ( + "context" + "math/big" + "time" + + ethereum "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/rpc" +) + +// ContextCaller provides CallContext method as ethereums rpc.Client. +type ContextCaller interface { + CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error +} + +func pollLogs(client ContextCaller, f *logsFilter, timeout, period time.Duration) { + adjusted := false + query := func() { + ctx, cancel := context.WithTimeout(f.ctx, timeout) + logs, err := getLogs(ctx, client, f.crit) + cancel() + if err != nil { + log.Error("failed to get logs", "criteria", f.crit, "error", err) + } else if !adjusted { + adjustFromBlock(&f.crit) + adjusted = true + } + if err := f.add(logs); err != nil { + log.Error("error adding logs", "logs", logs, "error", err) + } + } + query() + latest := time.NewTicker(period) + defer latest.Stop() + for { + select { + case <-latest.C: + query() + case <-f.done: + log.Debug("filter was stopped", "ID", f.id, "crit", f.crit) + return + } + } +} + +// adjustFromBlock adjusts crit.FromBlock to the latest to avoid querying same logs multiple times. +func adjustFromBlock(crit *ethereum.FilterQuery) { + latest := big.NewInt(rpc.LatestBlockNumber.Int64()) + // don't adjust if filter is not interested in newer blocks + if crit.ToBlock != nil && crit.ToBlock.Cmp(latest) == 1 { + return + } + // don't adjust if from block is already pending + if crit.FromBlock != nil && crit.FromBlock.Cmp(latest) == -1 { + return + } + crit.FromBlock = latest +} + +func getLogs(ctx context.Context, client ContextCaller, crit ethereum.FilterQuery) (rst []types.Log, err error) { + return rst, client.CallContext(ctx, &rst, "eth_getLogs", toFilterArg(crit)) +} + +func toFilterArg(q ethereum.FilterQuery) interface{} { + arg := map[string]interface{}{ + "fromBlock": toBlockNumArg(q.FromBlock), + "toBlock": toBlockNumArg(q.ToBlock), + "address": q.Addresses, + "topics": q.Topics, + } + return arg +} + +func toBlockNumArg(number *big.Int) string { + if number == nil || number.Int64() == rpc.LatestBlockNumber.Int64() { + return "latest" + } else if number.Int64() == rpc.PendingBlockNumber.Int64() { + return "pending" + } + return hexutil.EncodeBig(number) +} diff --git a/services/rpcfilters/latest_logs_test.go b/services/rpcfilters/latest_logs_test.go new file mode 100644 index 000000000..6e41b9627 --- /dev/null +++ b/services/rpcfilters/latest_logs_test.go @@ -0,0 +1,145 @@ +package rpcfilters + +import ( + "context" + "errors" + "math/big" + "sync" + "testing" + "time" + + ethereum "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/core/types" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type callTracker struct { + mu sync.Mutex + calls int + reply [][]types.Log + criterias []map[string]interface{} +} + +func (c *callTracker) CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error { + c.mu.Lock() + defer c.mu.Unlock() + c.calls++ + if len(args) != 1 { + return errors.New("unexpected length of args") + } + crit := args[0].(map[string]interface{}) + c.criterias = append(c.criterias, crit) + select { + case <-ctx.Done(): + return errors.New("context canceled") + default: + } + if c.calls <= len(c.reply) { + rst := result.(*[]types.Log) + *rst = c.reply[c.calls-1] + } + return nil +} + +func runLogsFetcherTest(t *testing.T, f *logsFilter) *callTracker { + c := callTracker{reply: [][]types.Log{ + make([]types.Log, 2), + }} + var wg sync.WaitGroup + wg.Add(1) + go func() { + pollLogs(&c, f, time.Second, 100*time.Millisecond) + wg.Done() + }() + tick := time.Tick(10 * time.Millisecond) + after := time.After(time.Second) + func() { + for { + select { + case <-after: + f.stop() + assert.FailNow(t, "failed waiting for requests") + return + case <-tick: + c.mu.Lock() + num := c.calls + c.mu.Unlock() + if num >= 2 { + f.stop() + return + } + } + } + }() + wg.Wait() + require.Len(t, c.criterias, 2) + return &c +} + +func TestLogsFetcherAdjusted(t *testing.T) { + f := &logsFilter{ + ctx: context.TODO(), + crit: ethereum.FilterQuery{ + FromBlock: big.NewInt(10), + }, + done: make(chan struct{}), + } + c := runLogsFetcherTest(t, f) + require.Equal(t, hexutil.EncodeBig(big.NewInt(10)), c.criterias[0]["fromBlock"]) + require.Equal(t, c.criterias[1]["fromBlock"], "latest") +} + +func TestLogsFetcherCanceledContext(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + f := &logsFilter{ + ctx: ctx, + crit: ethereum.FilterQuery{ + FromBlock: big.NewInt(10), + }, + done: make(chan struct{}), + } + cancel() + c := runLogsFetcherTest(t, f) + require.Equal(t, hexutil.EncodeBig(big.NewInt(10)), c.criterias[0]["fromBlock"]) + require.Equal(t, hexutil.EncodeBig(big.NewInt(10)), c.criterias[1]["fromBlock"]) +} + +func TestAdjustFromBlock(t *testing.T) { + type testCase struct { + description string + initial ethereum.FilterQuery + result ethereum.FilterQuery + } + + for _, tc := range []testCase{ + { + "ToBlockHigherThenLatest", + ethereum.FilterQuery{ToBlock: big.NewInt(10)}, + ethereum.FilterQuery{ToBlock: big.NewInt(10)}, + }, + { + "FromBlockIsPending", + ethereum.FilterQuery{FromBlock: big.NewInt(-2)}, + ethereum.FilterQuery{FromBlock: big.NewInt(-2)}, + }, + { + "FromBlockIsOlderThenLatest", + ethereum.FilterQuery{FromBlock: big.NewInt(10)}, + ethereum.FilterQuery{FromBlock: big.NewInt(-1)}, + }, + { + "NotInterestedInLatestBlocks", + ethereum.FilterQuery{FromBlock: big.NewInt(10), ToBlock: big.NewInt(15)}, + ethereum.FilterQuery{FromBlock: big.NewInt(10), ToBlock: big.NewInt(15)}, + }, + } { + tc := tc + t.Run(tc.description, func(t *testing.T) { + t.Parallel() + adjustFromBlock(&tc.initial) + require.Equal(t, tc.result, tc.initial) + }) + } +} diff --git a/services/rpcfilters/logs_filter.go b/services/rpcfilters/logs_filter.go new file mode 100644 index 000000000..2734010fd --- /dev/null +++ b/services/rpcfilters/logs_filter.go @@ -0,0 +1,132 @@ +package rpcfilters + +import ( + "context" + "errors" + "sync" + "time" + + ethereum "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/rpc" +) + +type logsFilter struct { + mu sync.Mutex + logs []types.Log + lastSeenBlockNumber uint64 + lastSeenBlockHash common.Hash + + id rpc.ID + crit ethereum.FilterQuery + timer *time.Timer + + ctx context.Context + cancel context.CancelFunc + done chan struct{} +} + +func (f *logsFilter) add(data interface{}) error { + logs, ok := data.([]types.Log) + if !ok { + return errors.New("provided value is not a []types.Log") + } + filtered, num, hash := filterLogs(logs, f.crit, f.lastSeenBlockNumber, f.lastSeenBlockHash) + f.mu.Lock() + f.lastSeenBlockNumber = num + f.lastSeenBlockHash = hash + f.logs = append(f.logs, filtered...) + f.mu.Unlock() + return nil +} + +func (f *logsFilter) pop() interface{} { + f.mu.Lock() + defer f.mu.Unlock() + rst := f.logs + f.logs = nil + return rst +} + +func (f *logsFilter) stop() { + select { + case <-f.done: + return + default: + close(f.done) + if f.cancel != nil { + f.cancel() + } + } +} + +func (f *logsFilter) deadline() *time.Timer { + return f.timer +} + +func includes(addresses []common.Address, a common.Address) bool { + for _, addr := range addresses { + if addr == a { + return true + } + } + return false +} + +// filterLogs creates a slice of logs matching the given criteria. +func filterLogs(logs []types.Log, crit ethereum.FilterQuery, blockNum uint64, blockHash common.Hash) ( + ret []types.Log, num uint64, hash common.Hash) { + num = blockNum + hash = blockHash + for _, log := range logs { + // skip logs from seen blocks + // find highest block number that we didnt see before + if log.BlockNumber >= num { + num = log.BlockNumber + hash = log.BlockHash + } + if matchLog(log, crit, blockNum, blockHash) { + ret = append(ret, log) + } + } + return +} + +func matchLog(log types.Log, crit ethereum.FilterQuery, blockNum uint64, blockHash common.Hash) bool { + // skip logs from seen blocks + if log.BlockNumber < blockNum { + return false + } else if log.BlockNumber == blockNum && log.BlockHash == blockHash { + return false + } + if crit.FromBlock != nil && crit.FromBlock.Int64() >= 0 && crit.FromBlock.Uint64() > log.BlockNumber { + return false + } + if crit.ToBlock != nil && crit.ToBlock.Int64() >= 0 && crit.ToBlock.Uint64() < log.BlockNumber { + return false + } + if len(crit.Addresses) > 0 && !includes(crit.Addresses, log.Address) { + return false + } + if len(crit.Topics) > len(log.Topics) { + return false + } + return matchTopics(log, crit.Topics) +} + +func matchTopics(log types.Log, topics [][]common.Hash) bool { + for i, sub := range topics { + match := len(sub) == 0 // empty rule set == wildcard + for _, topic := range sub { + if log.Topics[i] == topic { + match = true + break + } + } + if !match { + return false + } + } + return true +} diff --git a/services/rpcfilters/logs_filter_test.go b/services/rpcfilters/logs_filter_test.go new file mode 100644 index 000000000..469269106 --- /dev/null +++ b/services/rpcfilters/logs_filter_test.go @@ -0,0 +1,125 @@ +package rpcfilters + +import ( + "math/big" + "testing" + + ethereum "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/stretchr/testify/require" +) + +func TestFilterLogs(t *testing.T) { + logs := []types.Log{ + { + BlockNumber: 1, + BlockHash: common.Hash{1, 1}, + Address: common.Address{1, 1, 1}, + Topics: []common.Hash{ + {1}, + {1, 1}, + }, + }, + { + BlockNumber: 2, + BlockHash: common.Hash{2, 2}, + Address: common.Address{2, 2, 2}, + Topics: []common.Hash{ + {1}, + {2, 2}, + }, + }, + } + + type testCase struct { + description string + + blockNum uint64 + blockHash common.Hash + crit ethereum.FilterQuery + + expectedLogs []types.Log + expectedBlock uint64 + expectedHash common.Hash + } + + for _, tc := range []testCase{ + { + description: "All", + crit: ethereum.FilterQuery{}, + expectedLogs: []types.Log{logs[0], logs[1]}, + expectedBlock: logs[1].BlockNumber, + expectedHash: logs[1].BlockHash, + }, + { + description: "LimitedByBlock", + crit: ethereum.FilterQuery{ToBlock: big.NewInt(1)}, + expectedLogs: []types.Log{logs[0]}, + expectedBlock: logs[1].BlockNumber, + expectedHash: logs[1].BlockHash, + }, + { + description: "LimitedByAddress", + crit: ethereum.FilterQuery{Addresses: []common.Address{logs[1].Address}}, + expectedLogs: []types.Log{logs[1]}, + expectedBlock: logs[1].BlockNumber, + expectedHash: logs[1].BlockHash, + }, + { + description: "LimitedByAddress", + crit: ethereum.FilterQuery{Addresses: []common.Address{logs[1].Address}}, + expectedLogs: []types.Log{logs[1]}, + expectedBlock: logs[1].BlockNumber, + expectedHash: logs[1].BlockHash, + }, + { + description: "MoreTopicsThanInLogs", + crit: ethereum.FilterQuery{Topics: make([][]common.Hash, 3)}, + expectedBlock: logs[1].BlockNumber, + expectedHash: logs[1].BlockHash, + }, + { + description: "Wildcard", + crit: ethereum.FilterQuery{Topics: make([][]common.Hash, 1)}, + expectedLogs: []types.Log{logs[0], logs[1]}, + expectedBlock: logs[1].BlockNumber, + expectedHash: logs[1].BlockHash, + }, + { + description: "LimitedBySecondTopic", + crit: ethereum.FilterQuery{Topics: [][]common.Hash{ + {}, logs[1].Topics}}, + expectedLogs: []types.Log{logs[1]}, + expectedBlock: logs[1].BlockNumber, + expectedHash: logs[1].BlockHash, + }, + { + blockNum: logs[1].BlockNumber, + blockHash: logs[1].BlockHash, + description: "LimitedBySeenBlock", + crit: ethereum.FilterQuery{}, + expectedBlock: logs[1].BlockNumber, + expectedHash: logs[1].BlockHash, + }, + { + blockNum: logs[1].BlockNumber, + blockHash: common.Hash{7, 7, 7}, + description: "SeenBlockDifferenthash", + crit: ethereum.FilterQuery{}, + expectedLogs: []types.Log{logs[1]}, + expectedBlock: logs[1].BlockNumber, + expectedHash: logs[1].BlockHash, + }, + } { + tc := tc + t.Run(tc.description, func(t *testing.T) { + t.Parallel() + rst, num, hash := filterLogs(logs, tc.crit, tc.blockNum, tc.blockHash) + require.Equal(t, tc.expectedLogs, rst) + require.Equal(t, tc.expectedBlock, num) + require.Equal(t, tc.expectedHash, hash) + }) + } + +} diff --git a/services/rpcfilters/service.go b/services/rpcfilters/service.go index a0ffe503b..1318111ed 100644 --- a/services/rpcfilters/service.go +++ b/services/rpcfilters/service.go @@ -14,6 +14,9 @@ var _ node.Service = (*Service)(nil) type Service struct { latestBlockChangedEvent *latestBlockChangedEvent transactionSentToUpstreamEvent *transactionSentToUpstreamEvent + rpc rpcProvider + + quit chan struct{} } // New returns a new Service. @@ -22,8 +25,9 @@ func New(rpc rpcProvider) *Service { latestBlockChangedEvent := newLatestBlockChangedEvent(provider) transactionSentToUpstreamEvent := newTransactionSentToUpstreamEvent() return &Service{ - latestBlockChangedEvent, - transactionSentToUpstreamEvent, + latestBlockChangedEvent: latestBlockChangedEvent, + transactionSentToUpstreamEvent: transactionSentToUpstreamEvent, + rpc: rpc, } } @@ -38,16 +42,15 @@ func (s *Service) APIs() []rpc.API { { Namespace: "eth", Version: "1.0", - Service: NewPublicAPI( - s.latestBlockChangedEvent, - s.transactionSentToUpstreamEvent), - Public: true, + Service: NewPublicAPI(s), + Public: true, }, } } // Start is run when a service is started. func (s *Service) Start(server *p2p.Server) error { + s.quit = make(chan struct{}) err := s.transactionSentToUpstreamEvent.Start() if err != nil { return err @@ -57,6 +60,7 @@ func (s *Service) Start(server *p2p.Server) error { // Stop is run when a service is stopped. func (s *Service) Stop() error { + close(s.quit) s.transactionSentToUpstreamEvent.Stop() s.latestBlockChangedEvent.Stop() return nil diff --git a/t/devtests/devnode.go b/t/devtests/devnode.go new file mode 100644 index 000000000..fcee535f2 --- /dev/null +++ b/t/devtests/devnode.go @@ -0,0 +1,149 @@ +package devtests + +import ( + "crypto/ecdsa" + "io/ioutil" + "math/big" + "os" + + "github.com/ethereum/go-ethereum/accounts/keystore" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/eth" + "github.com/ethereum/go-ethereum/ethclient" + "github.com/ethereum/go-ethereum/node" + "github.com/ethereum/go-ethereum/rpc" + "github.com/status-im/status-go/api" + "github.com/status-im/status-go/params" + statusrpc "github.com/status-im/status-go/rpc" + "github.com/stretchr/testify/suite" +) + +// NewDevNode returns node with clieque engine and prefunded accounts. +func NewDevNode(faucet common.Address) (*node.Node, error) { + cfg := node.DefaultConfig + ipc, err := ioutil.TempFile("", "devnode-ipc-") + if err != nil { + return nil, err + } + cfg.IPCPath = ipc.Name() + cfg.HTTPModules = []string{"eth"} + cfg.DataDir = "" + cfg.P2P.MaxPeers = 0 + cfg.P2P.ListenAddr = ":0" + cfg.P2P.NoDiscovery = true + cfg.P2P.DiscoveryV5 = false + + stack, err := node.New(&cfg) + if err != nil { + return nil, err + } + + ks := stack.AccountManager().Backends(keystore.KeyStoreType)[0].(*keystore.KeyStore) + // ensure that etherbase is added to an account manager + etherbase, err := crypto.GenerateKey() + if err != nil { + return nil, err + } + acc, err := ks.ImportECDSA(etherbase, "") + if err != nil { + return nil, err + } + err = ks.Unlock(acc, "") + if err != nil { + return nil, err + } + + ethcfg := eth.DefaultConfig + ethcfg.NetworkId = 1337 + // 0 - mine only if transaction pending + ethcfg.Genesis = core.DeveloperGenesisBlock(0, faucet) + extra := make([]byte, 32) // extraVanity + extra = append(extra, acc.Address[:]...) + extra = append(extra, make([]byte, 65)...) // extraSeal + ethcfg.Genesis.ExtraData = extra + ethcfg.MinerGasPrice = big.NewInt(1) + ethcfg.Etherbase = acc.Address + + return stack, stack.Register(func(ctx *node.ServiceContext) (node.Service, error) { + return eth.New(ctx, ðcfg) + }) + +} + +// StartWithMiner starts node with eth service and a miner. +func StartWithMiner(stack *node.Node) error { + err := stack.Start() + if err != nil { + return err + } + var ethereum *eth.Ethereum + err = stack.Service(ðereum) + if err != nil { + return err + } + ethereum.TxPool().SetGasPrice(big.NewInt(1)) + return ethereum.StartMining(0) +} + +// DevNodeSuite provides convenient wrapper for starting node with clique backend for mining. +type DevNodeSuite struct { + suite.Suite + + Remote *rpc.Client + Eth *ethclient.Client + Local *statusrpc.Client + DevAccount *ecdsa.PrivateKey + DevAccountAddress common.Address + + dir string + backend *api.StatusBackend + miner *node.Node +} + +// SetupTest creates clique node and status node with an rpc connection to a clique node. +func (s *DevNodeSuite) SetupTest() { + account, err := crypto.GenerateKey() + s.Require().NoError(err) + s.DevAccount = account + s.DevAccountAddress = crypto.PubkeyToAddress(account.PublicKey) + s.miner, err = NewDevNode(s.DevAccountAddress) + s.Require().NoError(err) + s.Require().NoError(StartWithMiner(s.miner)) + + s.dir, err = ioutil.TempDir("", "devtests-") + s.Require().NoError(err) + config, err := params.NewNodeConfig( + s.dir, + 1337, + ) + s.Require().NoError(err) + config.WhisperConfig.Enabled = false + config.LightEthConfig.Enabled = false + config.UpstreamConfig.Enabled = true + config.UpstreamConfig.URL = s.miner.IPCEndpoint() + s.backend = api.NewStatusBackend() + s.Require().NoError(s.backend.StartNode(config)) + + s.Remote, err = s.miner.Attach() + s.Require().NoError(err) + s.Eth = ethclient.NewClient(s.Remote) + s.Local = s.backend.StatusNode().RPCClient() +} + +// TearDownTest stops status node and clique node. +func (s *DevNodeSuite) TearDownTest() { + if s.miner != nil { + s.Require().NoError(s.miner.Stop()) + s.miner = nil + } + if s.backend != nil { + s.Require().NoError(s.backend.StopNode()) + s.backend = nil + } + if len(s.dir) != 0 { + os.RemoveAll(s.dir) + s.dir = "" + } +} diff --git a/t/devtests/eventer/eventer.go b/t/devtests/eventer/eventer.go new file mode 100644 index 000000000..5f88267a3 --- /dev/null +++ b/t/devtests/eventer/eventer.go @@ -0,0 +1,366 @@ +// Code generated - DO NOT EDIT. +// This file is a generated binding and any manual changes will be lost. + +package eventer + +import ( + "math/big" + "strings" + + ethereum "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/accounts/abi" + "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/event" +) + +// EventerABI is the input ABI used to generate the binding from. +const EventerABI = "[{\"constant\":true,\"inputs\":[],\"name\":\"crt\",\"outputs\":[{\"name\":\"\",\"type\":\"int256\"}],\"payable\":false,\"stateMutability\":\"view\",\"type\":\"function\"},{\"constant\":false,\"inputs\":[{\"name\":\"topic\",\"type\":\"bytes32\"}],\"name\":\"emit\",\"outputs\":[],\"payable\":false,\"stateMutability\":\"nonpayable\",\"type\":\"function\"},{\"inputs\":[],\"payable\":false,\"stateMutability\":\"nonpayable\",\"type\":\"constructor\"},{\"anonymous\":false,\"inputs\":[{\"indexed\":true,\"name\":\"topic\",\"type\":\"bytes32\"},{\"indexed\":true,\"name\":\"crt\",\"type\":\"int256\"}],\"name\":\"Message\",\"type\":\"event\"}]" + +// EventerBin is the compiled bytecode used for deploying new contracts. +const EventerBin = `0x608060405234801561001057600080fd5b5060f28061001f6000396000f30060806040526004361060485763ffffffff7c01000000000000000000000000000000000000000000000000000000006000350416630868aad68114604d578063eee17820146071575b600080fd5b348015605857600080fd5b50605f6088565b60408051918252519081900360200190f35b348015607c57600080fd5b506086600435608e565b005b60005481565b60008054600101808255604051909183917fc3dab353f3a8451adb4c3071c9df72eebc7e900383c3295d66fe939bba21e1c99190a3505600a165627a7a72305820121895a4c6e091225d36a33443bf84f57560aa732841830ddf282def477ba9570029` + +// DeployEventer deploys a new Ethereum contract, binding an instance of Eventer to it. +func DeployEventer(auth *bind.TransactOpts, backend bind.ContractBackend) (common.Address, *types.Transaction, *Eventer, error) { + parsed, err := abi.JSON(strings.NewReader(EventerABI)) + if err != nil { + return common.Address{}, nil, nil, err + } + address, tx, contract, err := bind.DeployContract(auth, parsed, common.FromHex(EventerBin), backend) + if err != nil { + return common.Address{}, nil, nil, err + } + return address, tx, &Eventer{EventerCaller: EventerCaller{contract: contract}, EventerTransactor: EventerTransactor{contract: contract}, EventerFilterer: EventerFilterer{contract: contract}}, nil +} + +// Eventer is an auto generated Go binding around an Ethereum contract. +type Eventer struct { + EventerCaller // Read-only binding to the contract + EventerTransactor // Write-only binding to the contract + EventerFilterer // Log filterer for contract events +} + +// EventerCaller is an auto generated read-only Go binding around an Ethereum contract. +type EventerCaller struct { + contract *bind.BoundContract // Generic contract wrapper for the low level calls +} + +// EventerTransactor is an auto generated write-only Go binding around an Ethereum contract. +type EventerTransactor struct { + contract *bind.BoundContract // Generic contract wrapper for the low level calls +} + +// EventerFilterer is an auto generated log filtering Go binding around an Ethereum contract events. +type EventerFilterer struct { + contract *bind.BoundContract // Generic contract wrapper for the low level calls +} + +// EventerSession is an auto generated Go binding around an Ethereum contract, +// with pre-set call and transact options. +type EventerSession struct { + Contract *Eventer // Generic contract binding to set the session for + CallOpts bind.CallOpts // Call options to use throughout this session + TransactOpts bind.TransactOpts // Transaction auth options to use throughout this session +} + +// EventerCallerSession is an auto generated read-only Go binding around an Ethereum contract, +// with pre-set call options. +type EventerCallerSession struct { + Contract *EventerCaller // Generic contract caller binding to set the session for + CallOpts bind.CallOpts // Call options to use throughout this session +} + +// EventerTransactorSession is an auto generated write-only Go binding around an Ethereum contract, +// with pre-set transact options. +type EventerTransactorSession struct { + Contract *EventerTransactor // Generic contract transactor binding to set the session for + TransactOpts bind.TransactOpts // Transaction auth options to use throughout this session +} + +// EventerRaw is an auto generated low-level Go binding around an Ethereum contract. +type EventerRaw struct { + Contract *Eventer // Generic contract binding to access the raw methods on +} + +// EventerCallerRaw is an auto generated low-level read-only Go binding around an Ethereum contract. +type EventerCallerRaw struct { + Contract *EventerCaller // Generic read-only contract binding to access the raw methods on +} + +// EventerTransactorRaw is an auto generated low-level write-only Go binding around an Ethereum contract. +type EventerTransactorRaw struct { + Contract *EventerTransactor // Generic write-only contract binding to access the raw methods on +} + +// NewEventer creates a new instance of Eventer, bound to a specific deployed contract. +func NewEventer(address common.Address, backend bind.ContractBackend) (*Eventer, error) { + contract, err := bindEventer(address, backend, backend, backend) + if err != nil { + return nil, err + } + return &Eventer{EventerCaller: EventerCaller{contract: contract}, EventerTransactor: EventerTransactor{contract: contract}, EventerFilterer: EventerFilterer{contract: contract}}, nil +} + +// NewEventerCaller creates a new read-only instance of Eventer, bound to a specific deployed contract. +func NewEventerCaller(address common.Address, caller bind.ContractCaller) (*EventerCaller, error) { + contract, err := bindEventer(address, caller, nil, nil) + if err != nil { + return nil, err + } + return &EventerCaller{contract: contract}, nil +} + +// NewEventerTransactor creates a new write-only instance of Eventer, bound to a specific deployed contract. +func NewEventerTransactor(address common.Address, transactor bind.ContractTransactor) (*EventerTransactor, error) { + contract, err := bindEventer(address, nil, transactor, nil) + if err != nil { + return nil, err + } + return &EventerTransactor{contract: contract}, nil +} + +// NewEventerFilterer creates a new log filterer instance of Eventer, bound to a specific deployed contract. +func NewEventerFilterer(address common.Address, filterer bind.ContractFilterer) (*EventerFilterer, error) { + contract, err := bindEventer(address, nil, nil, filterer) + if err != nil { + return nil, err + } + return &EventerFilterer{contract: contract}, nil +} + +// bindEventer binds a generic wrapper to an already deployed contract. +func bindEventer(address common.Address, caller bind.ContractCaller, transactor bind.ContractTransactor, filterer bind.ContractFilterer) (*bind.BoundContract, error) { + parsed, err := abi.JSON(strings.NewReader(EventerABI)) + if err != nil { + return nil, err + } + return bind.NewBoundContract(address, parsed, caller, transactor, filterer), nil +} + +// Call invokes the (constant) contract method with params as input values and +// sets the output to result. The result type might be a single field for simple +// returns, a slice of interfaces for anonymous returns and a struct for named +// returns. +func (_Eventer *EventerRaw) Call(opts *bind.CallOpts, result interface{}, method string, params ...interface{}) error { + return _Eventer.Contract.EventerCaller.contract.Call(opts, result, method, params...) +} + +// Transfer initiates a plain transaction to move funds to the contract, calling +// its default method if one is available. +func (_Eventer *EventerRaw) Transfer(opts *bind.TransactOpts) (*types.Transaction, error) { + return _Eventer.Contract.EventerTransactor.contract.Transfer(opts) +} + +// Transact invokes the (paid) contract method with params as input values. +func (_Eventer *EventerRaw) Transact(opts *bind.TransactOpts, method string, params ...interface{}) (*types.Transaction, error) { + return _Eventer.Contract.EventerTransactor.contract.Transact(opts, method, params...) +} + +// Call invokes the (constant) contract method with params as input values and +// sets the output to result. The result type might be a single field for simple +// returns, a slice of interfaces for anonymous returns and a struct for named +// returns. +func (_Eventer *EventerCallerRaw) Call(opts *bind.CallOpts, result interface{}, method string, params ...interface{}) error { + return _Eventer.Contract.contract.Call(opts, result, method, params...) +} + +// Transfer initiates a plain transaction to move funds to the contract, calling +// its default method if one is available. +func (_Eventer *EventerTransactorRaw) Transfer(opts *bind.TransactOpts) (*types.Transaction, error) { + return _Eventer.Contract.contract.Transfer(opts) +} + +// Transact invokes the (paid) contract method with params as input values. +func (_Eventer *EventerTransactorRaw) Transact(opts *bind.TransactOpts, method string, params ...interface{}) (*types.Transaction, error) { + return _Eventer.Contract.contract.Transact(opts, method, params...) +} + +// Crt is a free data retrieval call binding the contract method 0x0868aad6. +// +// Solidity: function crt() constant returns(int256) +func (_Eventer *EventerCaller) Crt(opts *bind.CallOpts) (*big.Int, error) { + var ( + ret0 = new(*big.Int) + ) + out := ret0 + err := _Eventer.contract.Call(opts, out, "crt") + return *ret0, err +} + +// Crt is a free data retrieval call binding the contract method 0x0868aad6. +// +// Solidity: function crt() constant returns(int256) +func (_Eventer *EventerSession) Crt() (*big.Int, error) { + return _Eventer.Contract.Crt(&_Eventer.CallOpts) +} + +// Crt is a free data retrieval call binding the contract method 0x0868aad6. +// +// Solidity: function crt() constant returns(int256) +func (_Eventer *EventerCallerSession) Crt() (*big.Int, error) { + return _Eventer.Contract.Crt(&_Eventer.CallOpts) +} + +// Emit is a paid mutator transaction binding the contract method 0xeee17820. +// +// Solidity: function emit(topic bytes32) returns() +func (_Eventer *EventerTransactor) Emit(opts *bind.TransactOpts, topic [32]byte) (*types.Transaction, error) { + return _Eventer.contract.Transact(opts, "emit", topic) +} + +// Emit is a paid mutator transaction binding the contract method 0xeee17820. +// +// Solidity: function emit(topic bytes32) returns() +func (_Eventer *EventerSession) Emit(topic [32]byte) (*types.Transaction, error) { + return _Eventer.Contract.Emit(&_Eventer.TransactOpts, topic) +} + +// Emit is a paid mutator transaction binding the contract method 0xeee17820. +// +// Solidity: function emit(topic bytes32) returns() +func (_Eventer *EventerTransactorSession) Emit(topic [32]byte) (*types.Transaction, error) { + return _Eventer.Contract.Emit(&_Eventer.TransactOpts, topic) +} + +// EventerMessageIterator is returned from FilterMessage and is used to iterate over the raw logs and unpacked data for Message events raised by the Eventer contract. +type EventerMessageIterator struct { + Event *EventerMessage // Event containing the contract specifics and raw log + + contract *bind.BoundContract // Generic contract to use for unpacking event data + event string // Event name to use for unpacking event data + + logs chan types.Log // Log channel receiving the found contract events + sub ethereum.Subscription // Subscription for errors, completion and termination + done bool // Whether the subscription completed delivering logs + fail error // Occurred error to stop iteration +} + +// Next advances the iterator to the subsequent event, returning whether there +// are any more events found. In case of a retrieval or parsing error, false is +// returned and Error() can be queried for the exact failure. +func (it *EventerMessageIterator) Next() bool { + // If the iterator failed, stop iterating + if it.fail != nil { + return false + } + // If the iterator completed, deliver directly whatever's available + if it.done { + select { + case log := <-it.logs: + it.Event = new(EventerMessage) + if err := it.contract.UnpackLog(it.Event, it.event, log); err != nil { + it.fail = err + return false + } + it.Event.Raw = log + return true + + default: + return false + } + } + // Iterator still in progress, wait for either a data or an error event + select { + case log := <-it.logs: + it.Event = new(EventerMessage) + if err := it.contract.UnpackLog(it.Event, it.event, log); err != nil { + it.fail = err + return false + } + it.Event.Raw = log + return true + + case err := <-it.sub.Err(): + it.done = true + it.fail = err + return it.Next() + } +} + +// Error returns any retrieval or parsing error occurred during filtering. +func (it *EventerMessageIterator) Error() error { + return it.fail +} + +// Close terminates the iteration process, releasing any pending underlying +// resources. +func (it *EventerMessageIterator) Close() error { + it.sub.Unsubscribe() + return nil +} + +// EventerMessage represents a Message event raised by the Eventer contract. +type EventerMessage struct { + Topic [32]byte + Crt *big.Int + Raw types.Log // Blockchain specific contextual infos +} + +// FilterMessage is a free log retrieval operation binding the contract event 0xc3dab353f3a8451adb4c3071c9df72eebc7e900383c3295d66fe939bba21e1c9. +// +// Solidity: e Message(topic indexed bytes32, crt indexed int256) +func (_Eventer *EventerFilterer) FilterMessage(opts *bind.FilterOpts, topic [][32]byte, crt []*big.Int) (*EventerMessageIterator, error) { + + var topicRule []interface{} + for _, topicItem := range topic { + topicRule = append(topicRule, topicItem) + } + var crtRule []interface{} + for _, crtItem := range crt { + crtRule = append(crtRule, crtItem) + } + + logs, sub, err := _Eventer.contract.FilterLogs(opts, "Message", topicRule, crtRule) + if err != nil { + return nil, err + } + return &EventerMessageIterator{contract: _Eventer.contract, event: "Message", logs: logs, sub: sub}, nil +} + +// WatchMessage is a free log subscription operation binding the contract event 0xc3dab353f3a8451adb4c3071c9df72eebc7e900383c3295d66fe939bba21e1c9. +// +// Solidity: e Message(topic indexed bytes32, crt indexed int256) +func (_Eventer *EventerFilterer) WatchMessage(opts *bind.WatchOpts, sink chan<- *EventerMessage, topic [][32]byte, crt []*big.Int) (event.Subscription, error) { + + var topicRule []interface{} + for _, topicItem := range topic { + topicRule = append(topicRule, topicItem) + } + var crtRule []interface{} + for _, crtItem := range crt { + crtRule = append(crtRule, crtItem) + } + + logs, sub, err := _Eventer.contract.WatchLogs(opts, "Message", topicRule, crtRule) + if err != nil { + return nil, err + } + return event.NewSubscription(func(quit <-chan struct{}) error { + defer sub.Unsubscribe() + for { + select { + case log := <-logs: + // New log arrived, parse the event and forward to the user + event := new(EventerMessage) + if err := _Eventer.contract.UnpackLog(event, "Message", log); err != nil { + return err + } + event.Raw = log + + select { + case sink <- event: + case err := <-sub.Err(): + return err + case <-quit: + return nil + } + case err := <-sub.Err(): + return err + case <-quit: + return nil + } + } + }), nil +} + diff --git a/t/devtests/eventer/eventer.sol b/t/devtests/eventer/eventer.sol new file mode 100644 index 000000000..b5ba49cd9 --- /dev/null +++ b/t/devtests/eventer/eventer.sol @@ -0,0 +1,18 @@ +pragma solidity ^0.4.21; + +contract Eventer { + + int public crt; + + event Message( + bytes32 indexed topic, + int indexed crt + ); + + constructor() public {} + + function emit(bytes32 topic) public { + crt += 1; + emit Message(topic, crt); + } +} diff --git a/t/devtests/filters_test.go b/t/devtests/filters_test.go new file mode 100644 index 000000000..4720e4578 --- /dev/null +++ b/t/devtests/filters_test.go @@ -0,0 +1,79 @@ +package devtests + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/status-im/status-go/t/devtests/eventer" + "github.com/status-im/status-go/t/utils" + "github.com/stretchr/testify/suite" +) + +func TestLogsSuite(t *testing.T) { + suite.Run(t, new(LogsSuite)) +} + +type LogsSuite struct { + DevNodeSuite +} + +func (s *LogsSuite) testEmit(event *eventer.Eventer, opts *bind.TransactOpts, id string, topic [32]byte, expect int) { + tx, err := event.Emit(opts, topic) + s.NoError(err) + timeout, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + _, err = bind.WaitMined(timeout, s.Eth, tx) + s.NoError(err) + + s.NoError(utils.Eventually(func() error { + var logs []types.Log + err := s.Local.Call(&logs, "eth_getFilterChanges", id) + if err != nil { + return err + } + if len(logs) == 0 { + return errors.New("no logs") + } + if len(logs) > expect { + return errors.New("more logs than expected") + } + return nil + }, 10*time.Second, time.Second)) +} + +func (s *LogsSuite) TestLogsNewFilter() { + var ( + fid, sid, tid string + ftopic, stopic = [32]byte{1}, [32]byte{2} + ) + s.Require().NoError(s.Local.Call(&fid, "eth_newFilter", map[string]interface{}{ + "topics": [][]common.Hash{ + {}, + {common.BytesToHash(ftopic[:])}, + }, + })) + s.Require().NoError(s.Local.Call(&sid, "eth_newFilter", map[string]interface{}{ + "topics": [][]common.Hash{ + {}, + {common.BytesToHash(stopic[:])}, + }, + })) + s.Require().NoError(s.Local.Call(&tid, "eth_newFilter", map[string]interface{}{})) + + opts := bind.NewKeyedTransactor(s.DevAccount) + _, tx, event, err := eventer.DeployEventer(opts, s.Eth) + s.Require().NoError(err) + timeout, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + addr, err := bind.WaitDeployed(timeout, s.Eth, tx) + s.Require().NoError(err) + s.Require().NotEmpty(addr) + s.testEmit(event, opts, fid, ftopic, 1) + s.testEmit(event, opts, sid, stopic, 1) + s.testEmit(event, opts, fid, ftopic, 1) +} diff --git a/t/utils/utils.go b/t/utils/utils.go index 6ce596ff6..54dcab2f3 100644 --- a/t/utils/utils.go +++ b/t/utils/utils.go @@ -414,3 +414,22 @@ func copyFile(src, dst string) error { } return nil } + +// Eventually will raise error if condition won't be met during the given timeout. +func Eventually(f func() error, timeout, period time.Duration) (err error) { + timer := time.NewTimer(timeout) + defer timer.Stop() + ticker := time.NewTicker(period) + defer ticker.Stop() + for { + select { + case <-timer.C: + return + case <-ticker.C: + err = f() + if err == nil { + return nil + } + } + } +}