mirror of
https://github.com/status-im/status-go.git
synced 2025-02-16 08:50:09 +00:00
Implement cache for recording removed logs due to reorg (#1241)
Adjust criteria if replaced logs were received and add more tests Improve validation Changes after review
This commit is contained in:
parent
22eb53ea8c
commit
539fa01d48
@ -48,9 +48,10 @@ func NewPublicAPI(s *Service) *PublicAPI {
|
||||
filters: make(map[rpc.ID]filter),
|
||||
latestBlockChangedEvent: s.latestBlockChangedEvent,
|
||||
transactionSentToUpstreamEvent: s.transactionSentToUpstreamEvent,
|
||||
client: func() ContextCaller { return s.rpc.RPCClient() },
|
||||
filterLivenessLoop: defaultFilterLivenessPeriod,
|
||||
filterLivenessPeriod: defaultFilterLivenessPeriod + 10*time.Second,
|
||||
|
||||
client: func() ContextCaller { return s.rpc.RPCClient() },
|
||||
filterLivenessLoop: defaultFilterLivenessPeriod,
|
||||
filterLivenessPeriod: defaultFilterLivenessPeriod + 10*time.Second,
|
||||
}
|
||||
go api.timeoutLoop(s.quit)
|
||||
return api
|
||||
@ -85,12 +86,13 @@ func (api *PublicAPI) NewFilter(crit filters.FilterCriteria) (rpc.ID, error) {
|
||||
id := rpc.ID(uuid.New())
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
f := &logsFilter{
|
||||
id: id,
|
||||
crit: ethereum.FilterQuery(crit),
|
||||
done: make(chan struct{}),
|
||||
timer: time.NewTimer(api.filterLivenessPeriod),
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
id: id,
|
||||
crit: ethereum.FilterQuery(crit),
|
||||
done: make(chan struct{}),
|
||||
timer: time.NewTimer(api.filterLivenessPeriod),
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
logsCache: newCache(defaultCacheSize),
|
||||
}
|
||||
api.filtersMu.Lock()
|
||||
api.filters[id] = f
|
||||
|
@ -18,19 +18,16 @@ type ContextCaller interface {
|
||||
}
|
||||
|
||||
func pollLogs(client ContextCaller, f *logsFilter, timeout, period time.Duration) {
|
||||
adjusted := false
|
||||
query := func() {
|
||||
ctx, cancel := context.WithTimeout(f.ctx, timeout)
|
||||
logs, err := getLogs(ctx, client, f.crit)
|
||||
cancel()
|
||||
defer cancel()
|
||||
logs, err := getLogs(ctx, client, f.criteria())
|
||||
if err != nil {
|
||||
log.Error("failed to get logs", "criteria", f.crit, "error", err)
|
||||
} else if !adjusted {
|
||||
adjustFromBlock(&f.crit)
|
||||
adjusted = true
|
||||
log.Error("Error fetch logs", "criteria", f.crit, "error", err)
|
||||
return
|
||||
}
|
||||
if err := f.add(logs); err != nil {
|
||||
log.Error("error adding logs", "logs", logs, "error", err)
|
||||
log.Error("Error adding logs", "logs", logs, "error", err)
|
||||
}
|
||||
}
|
||||
query()
|
||||
@ -41,26 +38,11 @@ func pollLogs(client ContextCaller, f *logsFilter, timeout, period time.Duration
|
||||
case <-latest.C:
|
||||
query()
|
||||
case <-f.done:
|
||||
log.Debug("filter was stopped", "ID", f.id, "crit", f.crit)
|
||||
log.Debug("Filter was stopped", "ID", f.id, "crit", f.crit)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// adjustFromBlock adjusts crit.FromBlock to the latest to avoid querying same logs multiple times.
|
||||
func adjustFromBlock(crit *ethereum.FilterQuery) {
|
||||
latest := big.NewInt(rpc.LatestBlockNumber.Int64())
|
||||
// don't adjust if filter is not interested in newer blocks
|
||||
if crit.ToBlock != nil && crit.ToBlock.Cmp(latest) == 1 {
|
||||
return
|
||||
}
|
||||
// don't adjust if from block is already pending
|
||||
if crit.FromBlock != nil && crit.FromBlock.Cmp(latest) == -1 {
|
||||
return
|
||||
}
|
||||
crit.FromBlock = latest
|
||||
}
|
||||
|
||||
func getLogs(ctx context.Context, client ContextCaller, crit ethereum.FilterQuery) (rst []types.Log, err error) {
|
||||
return rst, client.CallContext(ctx, &rst, "eth_getLogs", toFilterArg(crit))
|
||||
}
|
||||
|
@ -9,6 +9,7 @@ import (
|
||||
"time"
|
||||
|
||||
ethereum "github.com/ethereum/go-ethereum"
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/common/hexutil"
|
||||
"github.com/ethereum/go-ethereum/core/types"
|
||||
"github.com/stretchr/testify/assert"
|
||||
@ -43,10 +44,8 @@ func (c *callTracker) CallContext(ctx context.Context, result interface{}, metho
|
||||
return nil
|
||||
}
|
||||
|
||||
func runLogsFetcherTest(t *testing.T, f *logsFilter) *callTracker {
|
||||
c := callTracker{reply: [][]types.Log{
|
||||
make([]types.Log, 2),
|
||||
}}
|
||||
func runLogsFetcherTest(t *testing.T, f *logsFilter, replies [][]types.Log, queries int) *callTracker {
|
||||
c := callTracker{reply: replies}
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
@ -66,7 +65,7 @@ func runLogsFetcherTest(t *testing.T, f *logsFilter) *callTracker {
|
||||
c.mu.Lock()
|
||||
num := c.calls
|
||||
c.mu.Unlock()
|
||||
if num >= 2 {
|
||||
if num >= queries {
|
||||
f.stop()
|
||||
return
|
||||
}
|
||||
@ -74,7 +73,7 @@ func runLogsFetcherTest(t *testing.T, f *logsFilter) *callTracker {
|
||||
}
|
||||
}()
|
||||
wg.Wait()
|
||||
require.Len(t, c.criterias, 2)
|
||||
require.Len(t, c.criterias, queries)
|
||||
return &c
|
||||
}
|
||||
|
||||
@ -84,13 +83,38 @@ func TestLogsFetcherAdjusted(t *testing.T) {
|
||||
crit: ethereum.FilterQuery{
|
||||
FromBlock: big.NewInt(10),
|
||||
},
|
||||
done: make(chan struct{}),
|
||||
done: make(chan struct{}),
|
||||
logsCache: newCache(defaultCacheSize),
|
||||
}
|
||||
c := runLogsFetcherTest(t, f)
|
||||
logs := []types.Log{
|
||||
{BlockNumber: 11}, {BlockNumber: 12},
|
||||
}
|
||||
c := runLogsFetcherTest(t, f, [][]types.Log{logs}, 2)
|
||||
require.Equal(t, hexutil.EncodeBig(big.NewInt(10)), c.criterias[0]["fromBlock"])
|
||||
require.Equal(t, c.criterias[1]["fromBlock"], "latest")
|
||||
}
|
||||
|
||||
func TestAdjustedDueToReorg(t *testing.T) {
|
||||
f := &logsFilter{
|
||||
ctx: context.TODO(),
|
||||
crit: ethereum.FilterQuery{
|
||||
FromBlock: big.NewInt(10),
|
||||
},
|
||||
done: make(chan struct{}),
|
||||
logsCache: newCache(defaultCacheSize),
|
||||
}
|
||||
logs := []types.Log{
|
||||
{BlockNumber: 11, BlockHash: common.Hash{1}}, {BlockNumber: 12, BlockHash: common.Hash{2}},
|
||||
}
|
||||
reorg := []types.Log{
|
||||
{BlockNumber: 12, BlockHash: common.Hash{2, 2}},
|
||||
}
|
||||
c := runLogsFetcherTest(t, f, [][]types.Log{logs, reorg}, 3)
|
||||
require.Equal(t, hexutil.EncodeBig(big.NewInt(10)), c.criterias[0]["fromBlock"])
|
||||
require.Equal(t, "latest", c.criterias[1]["fromBlock"])
|
||||
require.Equal(t, hexutil.EncodeBig(big.NewInt(11)), c.criterias[2]["fromBlock"])
|
||||
}
|
||||
|
||||
func TestLogsFetcherCanceledContext(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
f := &logsFilter{
|
||||
@ -98,48 +122,11 @@ func TestLogsFetcherCanceledContext(t *testing.T) {
|
||||
crit: ethereum.FilterQuery{
|
||||
FromBlock: big.NewInt(10),
|
||||
},
|
||||
done: make(chan struct{}),
|
||||
done: make(chan struct{}),
|
||||
logsCache: newCache(defaultCacheSize),
|
||||
}
|
||||
cancel()
|
||||
c := runLogsFetcherTest(t, f)
|
||||
c := runLogsFetcherTest(t, f, [][]types.Log{make([]types.Log, 2)}, 2)
|
||||
require.Equal(t, hexutil.EncodeBig(big.NewInt(10)), c.criterias[0]["fromBlock"])
|
||||
require.Equal(t, hexutil.EncodeBig(big.NewInt(10)), c.criterias[1]["fromBlock"])
|
||||
}
|
||||
|
||||
func TestAdjustFromBlock(t *testing.T) {
|
||||
type testCase struct {
|
||||
description string
|
||||
initial ethereum.FilterQuery
|
||||
result ethereum.FilterQuery
|
||||
}
|
||||
|
||||
for _, tc := range []testCase{
|
||||
{
|
||||
"ToBlockHigherThenLatest",
|
||||
ethereum.FilterQuery{ToBlock: big.NewInt(10)},
|
||||
ethereum.FilterQuery{ToBlock: big.NewInt(10)},
|
||||
},
|
||||
{
|
||||
"FromBlockIsPending",
|
||||
ethereum.FilterQuery{FromBlock: big.NewInt(-2)},
|
||||
ethereum.FilterQuery{FromBlock: big.NewInt(-2)},
|
||||
},
|
||||
{
|
||||
"FromBlockIsOlderThenLatest",
|
||||
ethereum.FilterQuery{FromBlock: big.NewInt(10)},
|
||||
ethereum.FilterQuery{FromBlock: big.NewInt(-1)},
|
||||
},
|
||||
{
|
||||
"NotInterestedInLatestBlocks",
|
||||
ethereum.FilterQuery{FromBlock: big.NewInt(10), ToBlock: big.NewInt(15)},
|
||||
ethereum.FilterQuery{FromBlock: big.NewInt(10), ToBlock: big.NewInt(15)},
|
||||
},
|
||||
} {
|
||||
tc := tc
|
||||
t.Run(tc.description, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
adjustFromBlock(&tc.initial)
|
||||
require.Equal(t, tc.result, tc.initial)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
146
services/rpcfilters/logs_cache.go
Normal file
146
services/rpcfilters/logs_cache.go
Normal file
@ -0,0 +1,146 @@
|
||||
package rpcfilters
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sort"
|
||||
"sync"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/core/types"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultCacheSize = 20
|
||||
)
|
||||
|
||||
type cacheRecord struct {
|
||||
block uint64
|
||||
hash common.Hash
|
||||
logs []types.Log
|
||||
}
|
||||
|
||||
func newCache(size int) *cache {
|
||||
return &cache{
|
||||
records: make([]cacheRecord, 0, size),
|
||||
size: size,
|
||||
}
|
||||
}
|
||||
|
||||
type cache struct {
|
||||
mu sync.RWMutex
|
||||
size int // length of the records
|
||||
records []cacheRecord
|
||||
}
|
||||
|
||||
// add inserts logs into cache and returns added and replaced logs.
|
||||
// replaced logs with will be returned with Removed=true.
|
||||
func (c *cache) add(logs []types.Log) (added, replaced []types.Log, err error) {
|
||||
if len(logs) == 0 {
|
||||
return nil, nil, nil
|
||||
}
|
||||
aggregated := aggregateLogs(logs, c.size) // size doesn't change
|
||||
if len(aggregated) == 0 {
|
||||
return nil, nil, nil
|
||||
}
|
||||
if err := checkLogsAreInOrder(aggregated); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
// find common block. e.g. [3,4] and [1,2,3,4] = 3
|
||||
last := 0
|
||||
if len(c.records) > 0 {
|
||||
last = len(c.records) - 1
|
||||
for aggregated[0].block < c.records[last].block && last > 0 {
|
||||
last--
|
||||
}
|
||||
}
|
||||
c.records, added, replaced = merge(last, c.records, aggregated)
|
||||
if lth := len(c.records); lth > c.size {
|
||||
copy(c.records, c.records[lth-c.size:])
|
||||
}
|
||||
return added, replaced, nil
|
||||
}
|
||||
|
||||
func (c *cache) earliestBlockNum() uint64 {
|
||||
if len(c.records) == 0 {
|
||||
return 0
|
||||
}
|
||||
return c.records[0].block
|
||||
}
|
||||
|
||||
func checkLogsAreInOrder(records []cacheRecord) error {
|
||||
for prev, i := 0, 1; i < len(records); i++ {
|
||||
if records[prev].block == records[i].block-1 {
|
||||
prev = i
|
||||
} else {
|
||||
return fmt.Errorf(
|
||||
"logs must be delivered straight in order. gaps between blocks '%d' and '%d'",
|
||||
records[prev].block, records[i].block,
|
||||
)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// merge merges received records into old slice starting at provided position, example:
|
||||
// [1, 2, 3]
|
||||
// [2, 3, 4]
|
||||
// [1, 2, 3, 4]
|
||||
// if hash doesn't match previously received hash - such block was removed due to reorg
|
||||
// logs that were a part of that block will be returned with Removed set to true
|
||||
func merge(last int, old, received []cacheRecord) ([]cacheRecord, []types.Log, []types.Log) {
|
||||
var (
|
||||
added, replaced []types.Log
|
||||
block uint64
|
||||
hash common.Hash
|
||||
)
|
||||
for i := range received {
|
||||
record := received[i]
|
||||
if last < len(old) {
|
||||
block = old[last].block
|
||||
hash = old[last].hash
|
||||
}
|
||||
if record.block > block {
|
||||
// simply add new records
|
||||
added = append(added, record.logs...)
|
||||
old = append(old, record)
|
||||
} else if record.hash != hash && record.block == block {
|
||||
// record hash is not equal to previous record hash at the same height
|
||||
// replace record in hash and add logs as replaced
|
||||
replaced = append(replaced, old[last].logs...)
|
||||
added = append(added, record.logs...)
|
||||
old[last] = record
|
||||
}
|
||||
last++
|
||||
}
|
||||
return old, added, replaced
|
||||
}
|
||||
|
||||
// aggregateLogs creates at most requested amount of cacheRecords from provided logs.
|
||||
// cacheRecords will be sorted in ascending order, starting from lowest block to highest.
|
||||
func aggregateLogs(logs []types.Log, limit int) []cacheRecord {
|
||||
// sort in reverse order, so that iteration will start from latest blocks
|
||||
sort.Slice(logs, func(i, j int) bool {
|
||||
return logs[i].BlockNumber > logs[j].BlockNumber
|
||||
})
|
||||
rst := make([]cacheRecord, limit)
|
||||
pos, start := len(rst)-1, 0
|
||||
var hash common.Hash
|
||||
for i := range logs {
|
||||
log := logs[i]
|
||||
if (hash != common.Hash{}) && hash != log.BlockHash {
|
||||
rst[pos].logs = logs[start:i]
|
||||
start = i
|
||||
if pos-1 < 0 {
|
||||
break
|
||||
}
|
||||
pos--
|
||||
}
|
||||
rst[pos].logs = logs[start:]
|
||||
rst[pos].block = log.BlockNumber
|
||||
rst[pos].hash = log.BlockHash
|
||||
hash = log.BlockHash
|
||||
}
|
||||
return rst[pos:]
|
||||
}
|
120
services/rpcfilters/logs_cache_test.go
Normal file
120
services/rpcfilters/logs_cache_test.go
Normal file
@ -0,0 +1,120 @@
|
||||
package rpcfilters
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/core/types"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestAggregateLogs(t *testing.T) {
|
||||
logs := []types.Log{}
|
||||
for i := 1; i <= 15; i++ {
|
||||
logs = append(logs,
|
||||
types.Log{BlockNumber: uint64(i), BlockHash: common.Hash{byte(i)}},
|
||||
types.Log{BlockNumber: uint64(i), BlockHash: common.Hash{byte(i)}})
|
||||
}
|
||||
aggregated := aggregateLogs(logs, 10)
|
||||
start := 15 - len(aggregated) + 1
|
||||
for _, record := range aggregated {
|
||||
assert.Equal(t, start, int(record.block)) // numbers are small
|
||||
assert.Len(t, record.logs, 2)
|
||||
start++
|
||||
}
|
||||
}
|
||||
|
||||
func TestAggregateLessThenFull(t *testing.T) {
|
||||
logs := []types.Log{}
|
||||
for i := 1; i <= 3; i++ {
|
||||
logs = append(logs,
|
||||
types.Log{BlockNumber: uint64(i), BlockHash: common.Hash{byte(i)}})
|
||||
}
|
||||
aggregated := aggregateLogs(logs, 10)
|
||||
start := 1
|
||||
for _, record := range aggregated {
|
||||
assert.Equal(t, start, int(record.block)) // numbers are small
|
||||
assert.Len(t, record.logs, 1)
|
||||
start++
|
||||
}
|
||||
}
|
||||
|
||||
func TestMerge(t *testing.T) {
|
||||
step1Logs := []types.Log{
|
||||
{BlockNumber: 1, BlockHash: common.Hash{1}},
|
||||
{BlockNumber: 2, BlockHash: common.Hash{2}},
|
||||
{BlockNumber: 3, BlockHash: common.Hash{3}},
|
||||
}
|
||||
step2Logs := []types.Log{
|
||||
{BlockNumber: 2, BlockHash: common.Hash{2}},
|
||||
{BlockNumber: 3, BlockHash: common.Hash{3}},
|
||||
{BlockNumber: 4, BlockHash: common.Hash{4}},
|
||||
}
|
||||
reorg := []types.Log{
|
||||
{BlockNumber: 2, BlockHash: common.Hash{2, 2}},
|
||||
{BlockNumber: 3, BlockHash: common.Hash{3, 3}},
|
||||
{BlockNumber: 4, BlockHash: common.Hash{4, 4}},
|
||||
{BlockNumber: 5, BlockHash: common.Hash{5, 4}},
|
||||
}
|
||||
|
||||
limit := 7
|
||||
cache := make([]cacheRecord, 0, limit)
|
||||
cache, added, replaced := merge(0, cache, aggregateLogs(step1Logs, limit))
|
||||
require.Len(t, added, 3)
|
||||
require.Empty(t, replaced)
|
||||
require.Equal(t, 3, int(cache[2].block))
|
||||
cache, added, replaced = merge(1, cache, aggregateLogs(step2Logs, limit))
|
||||
require.Len(t, added, 1)
|
||||
require.Empty(t, replaced)
|
||||
require.Equal(t, 4, int(cache[3].block))
|
||||
_, added, replaced = merge(1, cache, aggregateLogs(reorg, limit))
|
||||
require.Len(t, added, 4)
|
||||
require.Len(t, replaced, 3)
|
||||
}
|
||||
|
||||
func TestMergeFull(t *testing.T) {
|
||||
old := []cacheRecord{
|
||||
{block: 1, hash: common.Hash{1}},
|
||||
{block: 2, hash: common.Hash{2}},
|
||||
{block: 3, hash: common.Hash{3}},
|
||||
}
|
||||
new := []cacheRecord{
|
||||
{block: 4, hash: common.Hash{4}},
|
||||
{block: 5, hash: common.Hash{5}},
|
||||
}
|
||||
old, _, _ = merge(0, old, new)
|
||||
require.Len(t, old, 5)
|
||||
require.Equal(t, int(old[2].block), 3)
|
||||
require.Equal(t, int(old[3].block), 4)
|
||||
require.Equal(t, int(old[4].block), 5)
|
||||
}
|
||||
|
||||
func TestAddLogs(t *testing.T) {
|
||||
c := newCache(7)
|
||||
step1Logs := []types.Log{
|
||||
{BlockNumber: 1, BlockHash: common.Hash{1}},
|
||||
{BlockNumber: 2, BlockHash: common.Hash{2}},
|
||||
{BlockNumber: 3, BlockHash: common.Hash{3}},
|
||||
}
|
||||
step2Logs := []types.Log{
|
||||
{BlockNumber: 2, BlockHash: common.Hash{2}},
|
||||
{BlockNumber: 3, BlockHash: common.Hash{3}},
|
||||
{BlockNumber: 4, BlockHash: common.Hash{4}},
|
||||
}
|
||||
added, replaced, err := c.add(step1Logs)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, added, 3)
|
||||
require.Empty(t, replaced)
|
||||
added, replaced, err = c.add(step2Logs)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, added, 1)
|
||||
require.Empty(t, replaced)
|
||||
}
|
||||
|
||||
func TestAddLogsNotInOrder(t *testing.T) {
|
||||
c := newCache(7)
|
||||
logs := []types.Log{{BlockNumber: 1, BlockHash: common.Hash{1}}, {BlockNumber: 3, BlockHash: common.Hash{3}}}
|
||||
_, _, err := c.add(logs)
|
||||
require.EqualError(t, err, "logs must be delivered straight in order. gaps between blocks '1' and '3'")
|
||||
}
|
@ -2,7 +2,8 @@ package rpcfilters
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math/big"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@ -13,13 +14,13 @@ import (
|
||||
)
|
||||
|
||||
type logsFilter struct {
|
||||
mu sync.Mutex
|
||||
logs []types.Log
|
||||
lastSeenBlockNumber uint64
|
||||
lastSeenBlockHash common.Hash
|
||||
mu sync.RWMutex
|
||||
logs []types.Log
|
||||
crit ethereum.FilterQuery
|
||||
|
||||
logsCache *cache
|
||||
|
||||
id rpc.ID
|
||||
crit ethereum.FilterQuery
|
||||
timer *time.Timer
|
||||
|
||||
ctx context.Context
|
||||
@ -27,17 +28,43 @@ type logsFilter struct {
|
||||
done chan struct{}
|
||||
}
|
||||
|
||||
func (f *logsFilter) criteria() ethereum.FilterQuery {
|
||||
f.mu.RLock()
|
||||
defer f.mu.RUnlock()
|
||||
return f.crit
|
||||
}
|
||||
|
||||
func (f *logsFilter) add(data interface{}) error {
|
||||
logs, ok := data.([]types.Log)
|
||||
if !ok {
|
||||
return errors.New("provided value is not a []types.Log")
|
||||
return fmt.Errorf("can't cast %v to types.Log", data)
|
||||
}
|
||||
filtered := filterLogs(logs, f.crit)
|
||||
if len(filtered) > 0 {
|
||||
f.mu.Lock()
|
||||
defer f.mu.Unlock()
|
||||
added, replaced, err := f.logsCache.add(filtered)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, log := range replaced {
|
||||
log.Removed = true
|
||||
f.logs = append(f.logs, log)
|
||||
}
|
||||
if len(added) > 0 {
|
||||
f.logs = append(f.logs, added...)
|
||||
}
|
||||
// if there was no replaced logs - keep polling only latest logs
|
||||
if len(replaced) == 0 {
|
||||
adjustFromBlock(&f.crit)
|
||||
} else {
|
||||
// otherwise poll earliest known block in cache
|
||||
earliest := f.logsCache.earliestBlockNum()
|
||||
if earliest != 0 {
|
||||
f.crit.FromBlock = new(big.Int).SetUint64(earliest)
|
||||
}
|
||||
}
|
||||
}
|
||||
filtered, num, hash := filterLogs(logs, f.crit, f.lastSeenBlockNumber, f.lastSeenBlockHash)
|
||||
f.mu.Lock()
|
||||
f.lastSeenBlockNumber = num
|
||||
f.lastSeenBlockHash = hash
|
||||
f.logs = append(f.logs, filtered...)
|
||||
f.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -65,6 +92,20 @@ func (f *logsFilter) deadline() *time.Timer {
|
||||
return f.timer
|
||||
}
|
||||
|
||||
// adjustFromBlock adjusts crit.FromBlock to latest to avoid querying same logs.
|
||||
func adjustFromBlock(crit *ethereum.FilterQuery) {
|
||||
latest := big.NewInt(rpc.LatestBlockNumber.Int64())
|
||||
// don't adjust if filter is not interested in newer blocks
|
||||
if crit.ToBlock != nil && crit.ToBlock.Cmp(latest) == 1 {
|
||||
return
|
||||
}
|
||||
// don't adjust if from block is already pending
|
||||
if crit.FromBlock != nil && crit.FromBlock.Cmp(latest) == -1 {
|
||||
return
|
||||
}
|
||||
crit.FromBlock = latest
|
||||
}
|
||||
|
||||
func includes(addresses []common.Address, a common.Address) bool {
|
||||
for _, addr := range addresses {
|
||||
if addr == a {
|
||||
@ -75,31 +116,17 @@ func includes(addresses []common.Address, a common.Address) bool {
|
||||
}
|
||||
|
||||
// filterLogs creates a slice of logs matching the given criteria.
|
||||
func filterLogs(logs []types.Log, crit ethereum.FilterQuery, blockNum uint64, blockHash common.Hash) (
|
||||
ret []types.Log, num uint64, hash common.Hash) {
|
||||
num = blockNum
|
||||
hash = blockHash
|
||||
func filterLogs(logs []types.Log, crit ethereum.FilterQuery) (
|
||||
ret []types.Log) {
|
||||
for _, log := range logs {
|
||||
// skip logs from seen blocks
|
||||
// find highest block number that we didnt see before
|
||||
if log.BlockNumber >= num {
|
||||
num = log.BlockNumber
|
||||
hash = log.BlockHash
|
||||
}
|
||||
if matchLog(log, crit, blockNum, blockHash) {
|
||||
if matchLog(log, crit) {
|
||||
ret = append(ret, log)
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func matchLog(log types.Log, crit ethereum.FilterQuery, blockNum uint64, blockHash common.Hash) bool {
|
||||
// skip logs from seen blocks
|
||||
if log.BlockNumber < blockNum {
|
||||
return false
|
||||
} else if log.BlockNumber == blockNum && log.BlockHash == blockHash {
|
||||
return false
|
||||
}
|
||||
func matchLog(log types.Log, crit ethereum.FilterQuery) bool {
|
||||
if crit.FromBlock != nil && crit.FromBlock.Int64() >= 0 && crit.FromBlock.Uint64() > log.BlockNumber {
|
||||
return false
|
||||
}
|
||||
|
@ -33,93 +33,90 @@ func TestFilterLogs(t *testing.T) {
|
||||
}
|
||||
|
||||
type testCase struct {
|
||||
description string
|
||||
|
||||
blockNum uint64
|
||||
blockHash common.Hash
|
||||
crit ethereum.FilterQuery
|
||||
|
||||
expectedLogs []types.Log
|
||||
expectedBlock uint64
|
||||
expectedHash common.Hash
|
||||
description string
|
||||
crit ethereum.FilterQuery
|
||||
expectedLogs []types.Log
|
||||
}
|
||||
|
||||
for _, tc := range []testCase{
|
||||
{
|
||||
description: "All",
|
||||
crit: ethereum.FilterQuery{},
|
||||
expectedLogs: []types.Log{logs[0], logs[1]},
|
||||
expectedBlock: logs[1].BlockNumber,
|
||||
expectedHash: logs[1].BlockHash,
|
||||
description: "All",
|
||||
crit: ethereum.FilterQuery{},
|
||||
expectedLogs: []types.Log{logs[0], logs[1]},
|
||||
},
|
||||
{
|
||||
description: "LimitedByBlock",
|
||||
crit: ethereum.FilterQuery{ToBlock: big.NewInt(1)},
|
||||
expectedLogs: []types.Log{logs[0]},
|
||||
expectedBlock: logs[1].BlockNumber,
|
||||
expectedHash: logs[1].BlockHash,
|
||||
description: "LimitedByBlock",
|
||||
crit: ethereum.FilterQuery{ToBlock: big.NewInt(1)},
|
||||
expectedLogs: []types.Log{logs[0]},
|
||||
},
|
||||
{
|
||||
description: "LimitedByAddress",
|
||||
crit: ethereum.FilterQuery{Addresses: []common.Address{logs[1].Address}},
|
||||
expectedLogs: []types.Log{logs[1]},
|
||||
expectedBlock: logs[1].BlockNumber,
|
||||
expectedHash: logs[1].BlockHash,
|
||||
description: "LimitedByAddress",
|
||||
crit: ethereum.FilterQuery{Addresses: []common.Address{logs[1].Address}},
|
||||
expectedLogs: []types.Log{logs[1]},
|
||||
},
|
||||
{
|
||||
description: "LimitedByAddress",
|
||||
crit: ethereum.FilterQuery{Addresses: []common.Address{logs[1].Address}},
|
||||
expectedLogs: []types.Log{logs[1]},
|
||||
expectedBlock: logs[1].BlockNumber,
|
||||
expectedHash: logs[1].BlockHash,
|
||||
description: "LimitedByAddress",
|
||||
crit: ethereum.FilterQuery{Addresses: []common.Address{logs[1].Address}},
|
||||
expectedLogs: []types.Log{logs[1]},
|
||||
},
|
||||
{
|
||||
description: "MoreTopicsThanInLogs",
|
||||
crit: ethereum.FilterQuery{Topics: make([][]common.Hash, 3)},
|
||||
expectedBlock: logs[1].BlockNumber,
|
||||
expectedHash: logs[1].BlockHash,
|
||||
description: "MoreTopicsThanInLogs",
|
||||
crit: ethereum.FilterQuery{Topics: make([][]common.Hash, 3)},
|
||||
},
|
||||
{
|
||||
description: "Wildcard",
|
||||
crit: ethereum.FilterQuery{Topics: make([][]common.Hash, 1)},
|
||||
expectedLogs: []types.Log{logs[0], logs[1]},
|
||||
expectedBlock: logs[1].BlockNumber,
|
||||
expectedHash: logs[1].BlockHash,
|
||||
description: "Wildcard",
|
||||
crit: ethereum.FilterQuery{Topics: make([][]common.Hash, 1)},
|
||||
expectedLogs: []types.Log{logs[0], logs[1]},
|
||||
},
|
||||
{
|
||||
description: "LimitedBySecondTopic",
|
||||
crit: ethereum.FilterQuery{Topics: [][]common.Hash{
|
||||
{}, logs[1].Topics}},
|
||||
expectedLogs: []types.Log{logs[1]},
|
||||
expectedBlock: logs[1].BlockNumber,
|
||||
expectedHash: logs[1].BlockHash,
|
||||
},
|
||||
{
|
||||
blockNum: logs[1].BlockNumber,
|
||||
blockHash: logs[1].BlockHash,
|
||||
description: "LimitedBySeenBlock",
|
||||
crit: ethereum.FilterQuery{},
|
||||
expectedBlock: logs[1].BlockNumber,
|
||||
expectedHash: logs[1].BlockHash,
|
||||
},
|
||||
{
|
||||
blockNum: logs[1].BlockNumber,
|
||||
blockHash: common.Hash{7, 7, 7},
|
||||
description: "SeenBlockDifferenthash",
|
||||
crit: ethereum.FilterQuery{},
|
||||
expectedLogs: []types.Log{logs[1]},
|
||||
expectedBlock: logs[1].BlockNumber,
|
||||
expectedHash: logs[1].BlockHash,
|
||||
description: "LimitedBySecondTopic",
|
||||
crit: ethereum.FilterQuery{Topics: [][]common.Hash{{}, logs[1].Topics}},
|
||||
expectedLogs: []types.Log{logs[1]},
|
||||
},
|
||||
} {
|
||||
tc := tc
|
||||
t.Run(tc.description, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
rst, num, hash := filterLogs(logs, tc.crit, tc.blockNum, tc.blockHash)
|
||||
rst := filterLogs(logs, tc.crit)
|
||||
require.Equal(t, tc.expectedLogs, rst)
|
||||
require.Equal(t, tc.expectedBlock, num)
|
||||
require.Equal(t, tc.expectedHash, hash)
|
||||
})
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestAdjustFromBlock(t *testing.T) {
|
||||
type testCase struct {
|
||||
description string
|
||||
initial ethereum.FilterQuery
|
||||
result ethereum.FilterQuery
|
||||
}
|
||||
|
||||
for _, tc := range []testCase{
|
||||
{
|
||||
"ToBlockHigherThenLatest",
|
||||
ethereum.FilterQuery{ToBlock: big.NewInt(10)},
|
||||
ethereum.FilterQuery{ToBlock: big.NewInt(10)},
|
||||
},
|
||||
{
|
||||
"FromBlockIsPending",
|
||||
ethereum.FilterQuery{FromBlock: big.NewInt(-2)},
|
||||
ethereum.FilterQuery{FromBlock: big.NewInt(-2)},
|
||||
},
|
||||
{
|
||||
"FromBlockIsOlderThenLatest",
|
||||
ethereum.FilterQuery{FromBlock: big.NewInt(10)},
|
||||
ethereum.FilterQuery{FromBlock: big.NewInt(-1)},
|
||||
},
|
||||
{
|
||||
"NotInterestedInLatestBlocks",
|
||||
ethereum.FilterQuery{FromBlock: big.NewInt(10), ToBlock: big.NewInt(15)},
|
||||
ethereum.FilterQuery{FromBlock: big.NewInt(10), ToBlock: big.NewInt(15)},
|
||||
},
|
||||
} {
|
||||
tc := tc
|
||||
t.Run(tc.description, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
adjustFromBlock(&tc.initial)
|
||||
require.Equal(t, tc.result, tc.initial)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -27,7 +27,8 @@ func New(rpc rpcProvider) *Service {
|
||||
return &Service{
|
||||
latestBlockChangedEvent: latestBlockChangedEvent,
|
||||
transactionSentToUpstreamEvent: transactionSentToUpstreamEvent,
|
||||
rpc: rpc,
|
||||
|
||||
rpc: rpc,
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user