diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index d113aa33f..cd2fd81f1 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -341,12 +341,12 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { active.getHashes(head) continue } - // We're done, allocate the download cache and proceed pulling the blocks + // We're done, prepare the download cache and proceed pulling the blocks offset := 0 if block := d.getBlock(head); block != nil { offset = int(block.NumberU64() + 1) } - d.queue.Alloc(offset) + d.queue.Prepare(offset) finished = true case blockPack := <-d.blockCh: @@ -504,7 +504,7 @@ out: } // Get a possible chunk. If nil is returned no chunk // could be returned due to no hashes available. - request := d.queue.Reserve(peer) + request := d.queue.Reserve(peer, peer.Capacity()) if request == nil { continue } @@ -551,7 +551,7 @@ func (d *Downloader) banBlocks(peerId string, head common.Hash) error { if peer == nil { return nil } - request := d.queue.Reserve(peer) + request := d.queue.Reserve(peer, MaxBlockFetch) if request == nil { return nil } diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index cab213499..4e2d527b9 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -186,7 +186,7 @@ func TestSynchronisation(t *testing.T) { if err := tester.sync("peer", hashes[0]); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } - if queued := len(tester.downloader.queue.blockCache); queued != targetBlocks { + if queued := len(tester.downloader.queue.blockPool); queued != targetBlocks { t.Fatalf("synchronised block mismatch: have %v, want %v", queued, targetBlocks) } } diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index 671ffe51b..79ddbb857 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -50,10 +50,11 @@ type queue struct { // newQueue creates a new download queue for scheduling block retrieval. func newQueue() *queue { return &queue{ - hashPool: make(map[common.Hash]int), - hashQueue: prque.New(), - pendPool: make(map[string]*fetchRequest), - blockPool: make(map[common.Hash]int), + hashPool: make(map[common.Hash]int), + hashQueue: prque.New(), + pendPool: make(map[string]*fetchRequest), + blockPool: make(map[common.Hash]int), + blockCache: make([]*Block, blockCacheLimit), } } @@ -70,7 +71,7 @@ func (q *queue) Reset() { q.blockPool = make(map[common.Hash]int) q.blockOffset = 0 - q.blockCache = nil + q.blockCache = make([]*Block, blockCacheLimit) } // Size retrieves the number of hashes in the queue, returning separately for @@ -208,7 +209,7 @@ func (q *queue) TakeBlocks() []*Block { // Reserve reserves a set of hashes for the given peer, skipping any previously // failed download. -func (q *queue) Reserve(p *peer) *fetchRequest { +func (q *queue) Reserve(p *peer, count int) *fetchRequest { q.lock.Lock() defer q.lock.Unlock() @@ -345,20 +346,12 @@ func (q *queue) Deliver(id string, blocks []*types.Block) (err error) { return nil } -// Alloc ensures that the block cache is the correct size, given a starting -// offset, and a memory cap. -func (q *queue) Alloc(offset int) { +// Prepare configures the block cache offset to allow accepting inbound blocks. +func (q *queue) Prepare(offset int) { q.lock.Lock() defer q.lock.Unlock() if q.blockOffset < offset { q.blockOffset = offset } - size := len(q.hashPool) - if size > blockCacheLimit { - size = blockCacheLimit - } - if len(q.blockCache) < size { - q.blockCache = append(q.blockCache, make([]*Block, size-len(q.blockCache))...) - } }