Support chain diverges in block filters.

This commit is contained in:
Igor Mandrigin 2018-05-09 13:46:12 +02:00 committed by Igor Mandrigin
parent 6fec9fa653
commit 0241f95d6a
2 changed files with 76 additions and 16 deletions

View File

@ -11,15 +11,59 @@ import (
const (
defaultTickerPeriod = 3 * time.Second
defaultReportHistorySize = 20
)
// ringArray represents a thread-safe capped collection of hashes.
type ringArray struct {
mu sync.Mutex
maxCount int
currentIndex int
blocks []common.Hash
}
func newRingArray(maxCount int) *ringArray {
return &ringArray{
maxCount: maxCount,
blocks: make([]common.Hash, maxCount),
}
}
// TryAddUnique adds a hash to the array if the array doesn't have it.
// Returns true if the element was added.
func (r *ringArray) TryAddUnique(hash common.Hash) bool {
r.mu.Lock()
defer r.mu.Unlock()
if r.has(hash) {
return false
}
r.blocks[r.currentIndex] = hash
r.currentIndex++
if r.currentIndex >= len(r.blocks) {
r.currentIndex = 0
}
return true
}
// has returns `true` if the hash is in the array.
// It has linear complexity but on short arrays it isn't worth optimizing.
func (r *ringArray) has(hash common.Hash) bool {
for _, h := range r.blocks {
if h == hash {
return true
}
}
return false
}
// latestBlockChangedEvent represents an event that one can subscribe to
type latestBlockChangedEvent struct {
sxMu sync.Mutex
sx map[int]chan common.Hash
latestBlockMu sync.Mutex
previousLatestBlock blockInfo
reportedBlocks *ringArray
provider latestBlockProvider
quit chan struct{}
@ -64,21 +108,16 @@ func (e *latestBlockChangedEvent) numberOfSubscriptions() int {
}
func (e *latestBlockChangedEvent) processLatestBlock(latestBlock blockInfo) {
e.latestBlockMu.Lock()
defer e.latestBlockMu.Unlock()
// if we received the same or an older block than we already have, ignore it.
if latestBlock.Number().Cmp(e.previousLatestBlock.Number()) <= 0 {
// if we received the hash we already received before, don't add it
if !e.reportedBlocks.TryAddUnique(latestBlock.Hash) {
return
}
e.previousLatestBlock = latestBlock
e.sxMu.Lock()
defer e.sxMu.Unlock()
for _, channel := range e.sx {
channel <- e.previousLatestBlock.Hash
channel <- latestBlock.Hash
}
}
@ -116,6 +155,7 @@ func newLatestBlockChangedEvent(provider latestBlockProvider) *latestBlockChange
return &latestBlockChangedEvent{
sx: make(map[int]chan common.Hash),
provider: provider,
reportedBlocks: newRingArray(defaultReportHistorySize),
tickerPeriod: defaultTickerPeriod,
}
}

View File

@ -145,7 +145,8 @@ func testEvent(t *testing.T, event *latestBlockChangedEvent, expectedHashes []co
func TestEventReceivedBlocksOutOfOrders(t *testing.T) {
// We are sending blocks out of order (simulating load balancing on RPC
// nodes). We should still receive them in order and not have the event
// nodes). Note that hashes are the same.
// We should still receive them in order and not have the event
// fired for out-of-order events.
expectedHashes := []common.Hash{common.HexToHash("0xAA"), common.HexToHash("0xBB"), common.HexToHash("0xCC")}
sentHashes := []common.Hash{common.HexToHash("0xAA"), common.HexToHash("0xBB"), common.HexToHash("0xAA"), common.HexToHash("0xCC")}
@ -154,12 +155,31 @@ func TestEventReceivedBlocksOutOfOrders(t *testing.T) {
counter := 0
f := func() (blockInfo, error) {
counter++
number := big.NewInt(sentBlockNumbers[counter-1])
if counter > len(sentHashes) {
counter = len(sentHashes)
}
number := big.NewInt(sentBlockNumbers[counter-1])
return blockInfo{sentHashes[counter-1], hexutil.Bytes(number.Bytes())}, nil
}
testEventSubscribe(t, f, expectedHashes)
}
func TestEventDivergedChain(t *testing.T) {
// We are sending blocks out of order (simulating chain diverges).
// Note that every hash is unique. We should still receive them all.
hashes := []common.Hash{common.HexToHash("0xC11"), common.HexToHash("0xC12"), common.HexToHash("0xC21"), common.HexToHash("0xC22"), common.HexToHash("0xC23")}
blockNumbers := []int64{1, 2, 1, 2, 3}
counter := 0
f := func() (blockInfo, error) {
counter++
if counter > len(hashes) {
counter = len(hashes)
}
number := big.NewInt(blockNumbers[counter-1])
return blockInfo{hashes[counter-1], hexutil.Bytes(number.Bytes())}, nil
}
testEventSubscribe(t, f, hashes)
}