diff --git a/services/rpcfilters/latest_block_changed_event.go b/services/rpcfilters/latest_block_changed_event.go index 2aff10ca0..b93c67b08 100644 --- a/services/rpcfilters/latest_block_changed_event.go +++ b/services/rpcfilters/latest_block_changed_event.go @@ -10,16 +10,60 @@ import ( ) const ( - defaultTickerPeriod = 3 * time.Second + defaultTickerPeriod = 3 * time.Second + defaultReportHistorySize = 20 ) +// ringArray represents a thread-safe capped collection of hashes. +type ringArray struct { + mu sync.Mutex + maxCount int + currentIndex int + blocks []common.Hash +} + +func newRingArray(maxCount int) *ringArray { + return &ringArray{ + maxCount: maxCount, + blocks: make([]common.Hash, maxCount), + } +} + +// TryAddUnique adds a hash to the array if the array doesn't have it. +// Returns true if the element was added. +func (r *ringArray) TryAddUnique(hash common.Hash) bool { + r.mu.Lock() + defer r.mu.Unlock() + + if r.has(hash) { + return false + } + + r.blocks[r.currentIndex] = hash + r.currentIndex++ + if r.currentIndex >= len(r.blocks) { + r.currentIndex = 0 + } + return true +} + +// has returns `true` if the hash is in the array. +// It has linear complexity but on short arrays it isn't worth optimizing. +func (r *ringArray) has(hash common.Hash) bool { + for _, h := range r.blocks { + if h == hash { + return true + } + } + return false +} + // latestBlockChangedEvent represents an event that one can subscribe to type latestBlockChangedEvent struct { sxMu sync.Mutex sx map[int]chan common.Hash - latestBlockMu sync.Mutex - previousLatestBlock blockInfo + reportedBlocks *ringArray provider latestBlockProvider quit chan struct{} @@ -64,21 +108,16 @@ func (e *latestBlockChangedEvent) numberOfSubscriptions() int { } func (e *latestBlockChangedEvent) processLatestBlock(latestBlock blockInfo) { - e.latestBlockMu.Lock() - defer e.latestBlockMu.Unlock() - - // if we received the same or an older block than we already have, ignore it. - if latestBlock.Number().Cmp(e.previousLatestBlock.Number()) <= 0 { + // if we received the hash we already received before, don't add it + if !e.reportedBlocks.TryAddUnique(latestBlock.Hash) { return } - e.previousLatestBlock = latestBlock - e.sxMu.Lock() defer e.sxMu.Unlock() for _, channel := range e.sx { - channel <- e.previousLatestBlock.Hash + channel <- latestBlock.Hash } } @@ -114,8 +153,9 @@ func (e *latestBlockChangedEvent) Unsubscribe(id int) { func newLatestBlockChangedEvent(provider latestBlockProvider) *latestBlockChangedEvent { return &latestBlockChangedEvent{ - sx: make(map[int]chan common.Hash), - provider: provider, - tickerPeriod: defaultTickerPeriod, + sx: make(map[int]chan common.Hash), + provider: provider, + reportedBlocks: newRingArray(defaultReportHistorySize), + tickerPeriod: defaultTickerPeriod, } } diff --git a/services/rpcfilters/latest_block_changed_event_test.go b/services/rpcfilters/latest_block_changed_event_test.go index f97b78981..c53d4ae6a 100644 --- a/services/rpcfilters/latest_block_changed_event_test.go +++ b/services/rpcfilters/latest_block_changed_event_test.go @@ -145,7 +145,8 @@ func testEvent(t *testing.T, event *latestBlockChangedEvent, expectedHashes []co func TestEventReceivedBlocksOutOfOrders(t *testing.T) { // We are sending blocks out of order (simulating load balancing on RPC - // nodes). We should still receive them in order and not have the event + // nodes). Note that hashes are the same. + // We should still receive them in order and not have the event // fired for out-of-order events. expectedHashes := []common.Hash{common.HexToHash("0xAA"), common.HexToHash("0xBB"), common.HexToHash("0xCC")} sentHashes := []common.Hash{common.HexToHash("0xAA"), common.HexToHash("0xBB"), common.HexToHash("0xAA"), common.HexToHash("0xCC")} @@ -154,12 +155,31 @@ func TestEventReceivedBlocksOutOfOrders(t *testing.T) { counter := 0 f := func() (blockInfo, error) { counter++ - number := big.NewInt(sentBlockNumbers[counter-1]) if counter > len(sentHashes) { counter = len(sentHashes) } + number := big.NewInt(sentBlockNumbers[counter-1]) return blockInfo{sentHashes[counter-1], hexutil.Bytes(number.Bytes())}, nil } testEventSubscribe(t, f, expectedHashes) } + +func TestEventDivergedChain(t *testing.T) { + // We are sending blocks out of order (simulating chain diverges). + // Note that every hash is unique. We should still receive them all. + hashes := []common.Hash{common.HexToHash("0xC11"), common.HexToHash("0xC12"), common.HexToHash("0xC21"), common.HexToHash("0xC22"), common.HexToHash("0xC23")} + blockNumbers := []int64{1, 2, 1, 2, 3} + + counter := 0 + f := func() (blockInfo, error) { + counter++ + if counter > len(hashes) { + counter = len(hashes) + } + number := big.NewInt(blockNumbers[counter-1]) + return blockInfo{hashes[counter-1], hexutil.Bytes(number.Bytes())}, nil + } + + testEventSubscribe(t, f, hashes) +}