From 11c8f83a583745b01a4c06cdd29529af1df364f6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Tue, 16 Jun 2015 18:14:52 +0300 Subject: [PATCH] eth, eth/fetcher: cache future propagated blocks too --- eth/fetcher/fetcher.go | 21 +++++++++++++++++++++ eth/fetcher/fetcher_test.go | 28 ++++++++++++++++++++++++++++ eth/handler.go | 2 ++ 3 files changed, 51 insertions(+) diff --git a/eth/fetcher/fetcher.go b/eth/fetcher/fetcher.go index e8a9cc093..34d368780 100644 --- a/eth/fetcher/fetcher.go +++ b/eth/fetcher/fetcher.go @@ -56,6 +56,7 @@ type inject struct { type Fetcher struct { // Various event channels notify chan *announce + insert chan *inject filter chan chan []*types.Block quit chan struct{} @@ -69,6 +70,7 @@ type Fetcher struct { func New(hasBlock hashCheckFn, importBlock blockImporterFn, chainHeight chainHeightFn) *Fetcher { return &Fetcher{ notify: make(chan *announce), + insert: make(chan *inject), filter: make(chan chan []*types.Block), quit: make(chan struct{}), hasBlock: hasBlock, @@ -106,6 +108,20 @@ func (f *Fetcher) Notify(peer string, hash common.Hash, time time.Time, fetcher } } +// Enqueue tries to fill gaps the the fetcher's future import queue. +func (f *Fetcher) Enqueue(peer string, block *types.Block) error { + op := &inject{ + origin: peer, + block: block, + } + select { + case f.insert <- op: + return nil + case <-f.quit: + return errTerminated + } +} + // Filter extracts all the blocks that were explicitly requested by the fetcher, // returning those that should be handled differently. func (f *Fetcher) Filter(blocks types.Blocks) types.Blocks { @@ -192,6 +208,11 @@ func (f *Fetcher) loop() { } announced[notification.hash] = append(announced[notification.hash], notification) + case op := <-f.insert: + // A direct block insertion was requested, try and fill any pending gaps + queued.Push(op, -float32(op.block.NumberU64())) + glog.V(logger.Detail).Infof("Peer %s: filled block %x, total %v", op.origin, op.block.Hash().Bytes()[:4], queued.Size()) + case hash := <-done: // A pending import finished, remove all traces of the notification delete(announced, hash) diff --git a/eth/fetcher/fetcher_test.go b/eth/fetcher/fetcher_test.go index cde4bb70a..7c975841c 100644 --- a/eth/fetcher/fetcher_test.go +++ b/eth/fetcher/fetcher_test.go @@ -271,3 +271,31 @@ func TestRandomArrivalImport(t *testing.T) { t.Fatalf("synchronised block mismatch: have %v, want %v", imported, targetBlocks+1) } } + +// Tests that direct block enqueues (due to block propagation vs. hash announce) +// are correctly schedule, filling and import queue gaps. +func TestQueueGapFill(t *testing.T) { + // Create a chain of blocks to import, and choose one to not announce at all + targetBlocks := 24 + hashes := createHashes(targetBlocks, knownHash) + blocks := createBlocksFromHashes(hashes) + skip := targetBlocks / 2 + + tester := newTester() + fetcher := tester.makeFetcher(blocks) + + // Iteratively announce blocks, skipping one entry + for i := len(hashes) - 1; i >= 0; i-- { + if i != skip { + tester.fetcher.Notify("valid", hashes[i], time.Now().Add(-arriveTimeout), fetcher) + time.Sleep(50 * time.Millisecond) + } + } + // Fill the missing block directly as if propagated + tester.fetcher.Enqueue("valid", blocks[hashes[skip]]) + time.Sleep(50 * time.Millisecond) + + if imported := len(tester.ownBlocks); imported != targetBlocks+1 { + t.Fatalf("synchronised block mismatch: have %v, want %v", imported, targetBlocks+1) + } +} diff --git a/eth/handler.go b/eth/handler.go index e5908dc88..604983a7b 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -340,6 +340,8 @@ func (self *ProtocolManager) handleMsg(p *peer) error { } request.Block.ReceivedAt = msg.ReceivedAt + // Try to import the propagated block, also making it fill any fetcher gaps + self.fetcher.Enqueue(p.id, request.Block) if err := self.importBlock(p, request.Block, request.TD); err != nil { return err }