diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 153427ee4..ac324176d 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -74,7 +74,6 @@ var ( errBadPeer = errors.New("action from bad peer ignored") errStallingPeer = errors.New("peer is stalling") errNoPeers = errors.New("no peers to keep download active") - errPendingQueue = errors.New("pending items in queue") errTimeout = errors.New("timeout") errEmptyHashSet = errors.New("empty hash set by peer") errEmptyHeaderSet = errors.New("empty header set by peer") @@ -90,6 +89,7 @@ var ( errCancelBodyFetch = errors.New("block body download canceled (requested)") errCancelReceiptFetch = errors.New("receipt download canceled (requested)") errCancelStateFetch = errors.New("state data download canceled (requested)") + errCancelProcessing = errors.New("processing canceled (requested)") errNoSyncActive = errors.New("no sync active") ) @@ -129,7 +129,6 @@ type Downloader struct { // Status synchroniseMock func(id string, hash common.Hash) error // Replacement for synchronise during testing synchronising int32 - processing int32 notified int32 // Channels @@ -215,7 +214,7 @@ func (d *Downloader) Progress() (uint64, uint64, uint64) { // Synchronising returns whether the downloader is currently retrieving blocks. func (d *Downloader) Synchronising() bool { - return atomic.LoadInt32(&d.synchronising) > 0 || atomic.LoadInt32(&d.processing) > 0 + return atomic.LoadInt32(&d.synchronising) > 0 } // RegisterPeer injects a new download peer into the set of block source to be @@ -263,9 +262,6 @@ func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int, mode glog.V(logger.Debug).Infof("Removing peer %v: %v", id, err) d.dropPeer(id) - case errPendingQueue: - glog.V(logger.Debug).Infoln("Synchronisation aborted:", err) - default: glog.V(logger.Warn).Infof("Synchronisation failed: %v", err) } @@ -290,10 +286,6 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode if atomic.CompareAndSwapInt32(&d.notified, 0, 1) { glog.V(logger.Info).Infoln("Block synchronisation started") } - // Abort if the queue still contains some leftover data - if d.queue.GetHeadResult() != nil { - return errPendingQueue - } // Reset the queue, peer set and wake channels to clean any internal leftover state d.queue.Reset() d.peers.Reset() @@ -335,7 +327,6 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e defer func() { // reset on error if err != nil { - d.cancel() d.mux.Post(FailedEvent{err}) } else { d.mux.Post(DoneEvent{}) @@ -365,23 +356,15 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e d.syncStatsChainHeight = latest d.syncStatsLock.Unlock() - // Initiate the sync using a concurrent hash and block retrieval algorithm + // Initiate the sync using a concurrent hash and block retrieval algorithm + d.queue.Prepare(origin+1, d.mode, 0) if d.syncInitHook != nil { d.syncInitHook(origin, latest) } - d.queue.Prepare(origin+1, d.mode, 0) - - errc := make(chan error, 2) - go func() { errc <- d.fetchHashes61(p, td, origin+1) }() - go func() { errc <- d.fetchBlocks61(origin + 1) }() - - // If any fetcher fails, cancel the other - if err := <-errc; err != nil { - d.cancel() - <-errc - return err - } - return <-errc + return d.spawnSync( + func() error { return d.fetchHashes61(p, td, origin+1) }, + func() error { return d.fetchBlocks61(origin + 1) }, + ) case p.version >= 62: // Look up the sync boundaries: the common ancestor and the target block @@ -405,7 +388,6 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e switch d.mode { case LightSync: pivot = latest - case FastSync: // Calculate the new fast/slow sync pivot point pivotOffset, err := rand.Int(rand.Reader, big.NewInt(int64(fsPivotInterval))) @@ -426,34 +408,51 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e glog.V(logger.Debug).Infof("Fast syncing until pivot block #%d", pivot) } d.queue.Prepare(origin+1, d.mode, pivot) - if d.syncInitHook != nil { d.syncInitHook(origin, latest) } - errc := make(chan error, 4) - go func() { errc <- d.fetchHeaders(p, td, origin+1) }() // Headers are always retrieved - go func() { errc <- d.fetchBodies(origin + 1) }() // Bodies are retrieved during normal and fast sync - go func() { errc <- d.fetchReceipts(origin + 1) }() // Receipts are retrieved during fast sync - go func() { errc <- d.fetchNodeData() }() // Node state data is retrieved during fast sync - - // If any fetcher fails, cancel the others - var fail error - for i := 0; i < cap(errc); i++ { - if err := <-errc; err != nil { - if fail == nil { - fail = err - d.cancel() - } - } - } - return fail + return d.spawnSync( + func() error { return d.fetchHeaders(p, td, origin+1) }, // Headers are always retrieved + func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and fast sync + func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast sync + func() error { return d.fetchNodeData() }, // Node state data is retrieved during fast sync + ) default: // Something very wrong, stop right here glog.V(logger.Error).Infof("Unsupported eth protocol: %d", p.version) return errBadPeer } - return nil +} + +// spawnSync runs d.process and all given fetcher functions to completion in +// separate goroutines, returning the first error that appears. +func (d *Downloader) spawnSync(fetchers ...func() error) error { + var wg sync.WaitGroup + errc := make(chan error, len(fetchers)+1) + wg.Add(len(fetchers) + 1) + go func() { defer wg.Done(); errc <- d.process() }() + for _, fn := range fetchers { + fn := fn + go func() { defer wg.Done(); errc <- fn() }() + } + // Wait for the first error, then terminate the others. + var err error + for i := 0; i < len(fetchers)+1; i++ { + if i == len(fetchers) { + // Close the queue when all fetchers have exited. + // This will cause the block processor to end when + // it has processed the queue. + d.queue.Close() + } + if err = <-errc; err != nil { + break + } + } + d.queue.Close() + d.cancel() + wg.Wait() + return err } // cancel cancels all of the operations and resets the queue. It returns true @@ -470,12 +469,10 @@ func (d *Downloader) cancel() { } } d.cancelLock.Unlock() - - // Reset the queue - d.queue.Reset() } // Terminate interrupts the downloader, canceling all pending operations. +// The downloader cannot be reused after calling Terminate. func (d *Downloader) Terminate() { atomic.StoreInt32(&d.interrupt, 1) d.cancel() @@ -800,7 +797,6 @@ func (d *Downloader) fetchBlocks61(from uint64) error { peer.Promote() peer.SetBlocksIdle() glog.V(logger.Detail).Infof("%s: delivered %d blocks", peer, len(blocks)) - go d.process() case errInvalidChain: // The hash chain is invalid (blocks are not ordered properly), abort @@ -826,7 +822,6 @@ func (d *Downloader) fetchBlocks61(from uint64) error { peer.Demote() peer.SetBlocksIdle() glog.V(logger.Detail).Infof("%s: delivery partially failed: %v", peer, err) - go d.process() } } // Blocks arrived, try to update the progress @@ -1336,10 +1331,8 @@ func (d *Downloader) fetchNodeData() error { d.cancel() return } - // Processing succeeded, notify state fetcher and processor of continuation - if d.queue.PendingNodeData() == 0 { - go d.process() - } else { + // Processing succeeded, notify state fetcher of continuation + if d.queue.PendingNodeData() > 0 { select { case d.stateWakeCh <- true: default: @@ -1348,7 +1341,6 @@ func (d *Downloader) fetchNodeData() error { // Log a message to the user and return d.syncStatsLock.Lock() defer d.syncStatsLock.Unlock() - d.syncStatsStateDone += uint64(delivered) glog.V(logger.Info).Infof("imported %d state entries in %v: processed %d in total", delivered, time.Since(start), d.syncStatsStateDone) }) @@ -1415,7 +1407,6 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv peer.Promote() setIdle(peer) glog.V(logger.Detail).Infof("%s: delivered %s %s(s)", peer, packet.Stats(), strings.ToLower(kind)) - go d.process() case errInvalidChain: // The hash chain is invalid (blocks are not ordered properly), abort @@ -1441,7 +1432,6 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv peer.Demote() setIdle(peer) glog.V(logger.Detail).Infof("%s: %s delivery partially failed: %v", peer, strings.ToLower(kind), err) - go d.process() } } // Blocks assembled, try to update the progress @@ -1508,7 +1498,6 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv } if progress { progressed = true - go d.process() } if request == nil { continue @@ -1545,46 +1534,13 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv } // process takes fetch results from the queue and tries to import them into the -// chain. The type of import operation will depend on the result contents: -// - -// -// The algorithmic flow is as follows: -// - The `processing` flag is swapped to 1 to ensure singleton access -// - The current `cancel` channel is retrieved to detect sync abortions -// - Blocks are iteratively taken from the cache and inserted into the chain -// - When the cache becomes empty, insertion stops -// - The `processing` flag is swapped back to 0 -// - A post-exit check is made whether new blocks became available -// - This step is important: it handles a potential race condition between -// checking for no more work, and releasing the processing "mutex". In -// between these state changes, a block may have arrived, but a processing -// attempt denied, so we need to re-enter to ensure the block isn't left -// to idle in the cache. -func (d *Downloader) process() { - // Make sure only one goroutine is ever allowed to process blocks at once - if !atomic.CompareAndSwapInt32(&d.processing, 0, 1) { - return - } - // If the processor just exited, but there are freshly pending items, try to - // reenter. This is needed because the goroutine spinned up for processing - // the fresh results might have been rejected entry to to this present thread - // not yet releasing the `processing` state. - defer func() { - if atomic.LoadInt32(&d.interrupt) == 0 && d.queue.GetHeadResult() != nil { - d.process() - } - }() - // Release the lock upon exit (note, before checking for reentry!) - // the import statistics to zero. - defer atomic.StoreInt32(&d.processing, 0) - - // Repeat the processing as long as there are results to process +// chain. The type of import operation will depend on the result contents. +func (d *Downloader) process() error { + pivot := d.queue.FastSyncPivot() for { - // Fetch the next batch of results - pivot := d.queue.FastSyncPivot() // Fetch pivot before results to prevent reset race - results := d.queue.TakeResults() + results := d.queue.WaitResults() if len(results) == 0 { - return + return nil // queue empty } if d.chainInsertHook != nil { d.chainInsertHook(results) @@ -1597,7 +1553,7 @@ func (d *Downloader) process() { for len(results) != 0 { // Check for any termination requests if atomic.LoadInt32(&d.interrupt) == 1 { - return + return errCancelProcessing } // Retrieve the a batch of results to import var ( @@ -1633,8 +1589,7 @@ func (d *Downloader) process() { } if err != nil { glog.V(logger.Debug).Infof("Result #%d [%x…] processing failed: %v", results[index].Header.Number, results[index].Header.Hash().Bytes()[:4], err) - d.cancel() - return + return err } // Shift the results to the next batch results = results[items:] @@ -1685,19 +1640,16 @@ func (d *Downloader) deliver(id string, destCh chan dataPack, packet dataPack, i dropMeter.Mark(int64(packet.Items())) } }() - // Make sure the downloader is active - if atomic.LoadInt32(&d.synchronising) == 0 { - return errNoSyncActive - } // Deliver or abort if the sync is canceled while queuing d.cancelLock.RLock() cancel := d.cancelCh d.cancelLock.RUnlock() - + if cancel == nil { + return errNoSyncActive + } select { case destCh <- packet: return nil - case <-cancel: return errNoSyncActive } diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index ef6f74a6b..872a4d4f5 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -169,17 +169,7 @@ func (dl *downloadTester) sync(id string, td *big.Int, mode SyncMode) error { } } dl.lock.RUnlock() - - err := dl.downloader.synchronise(id, hash, td, mode) - for { - // If the queue is empty and processing stopped, break - if dl.downloader.queue.Idle() && atomic.LoadInt32(&dl.downloader.processing) == 0 { - break - } - // Otherwise sleep a bit and retry - time.Sleep(time.Millisecond) - } - return err + return dl.downloader.synchronise(id, hash, td, mode) } // hasHeader checks if a header is present in the testers canonical chain. @@ -757,8 +747,8 @@ func testThrottling(t *testing.T, protocol int, mode SyncMode) { for start := time.Now(); time.Since(start) < time.Second; { time.Sleep(25 * time.Millisecond) - tester.lock.RLock() - tester.downloader.queue.lock.RLock() + tester.lock.Lock() + tester.downloader.queue.lock.Lock() cached = len(tester.downloader.queue.blockDonePool) if mode == FastSync { if receipts := len(tester.downloader.queue.receiptDonePool); receipts < cached { @@ -769,8 +759,8 @@ func testThrottling(t *testing.T, protocol int, mode SyncMode) { } frozen = int(atomic.LoadUint32(&blocked)) retrieved = len(tester.ownBlocks) - tester.downloader.queue.lock.RUnlock() - tester.lock.RUnlock() + tester.downloader.queue.lock.Unlock() + tester.lock.Unlock() if cached == blockCacheLimit || retrieved+cached+frozen == targetBlocks+1 { break @@ -1209,25 +1199,26 @@ func testBlockHeaderAttackerDropping(t *testing.T, protocol int) { result error drop bool }{ - {nil, false}, // Sync succeeded, all is well - {errBusy, false}, // Sync is already in progress, no problem - {errUnknownPeer, false}, // Peer is unknown, was already dropped, don't double drop - {errBadPeer, true}, // Peer was deemed bad for some reason, drop it - {errStallingPeer, true}, // Peer was detected to be stalling, drop it - {errNoPeers, false}, // No peers to download from, soft race, no issue - {errPendingQueue, false}, // There are blocks still cached, wait to exhaust, no issue - {errTimeout, true}, // No hashes received in due time, drop the peer - {errEmptyHashSet, true}, // No hashes were returned as a response, drop as it's a dead end - {errEmptyHeaderSet, true}, // No headers were returned as a response, drop as it's a dead end - {errPeersUnavailable, true}, // Nobody had the advertised blocks, drop the advertiser - {errInvalidChain, true}, // Hash chain was detected as invalid, definitely drop - {errInvalidBlock, false}, // A bad peer was detected, but not the sync origin - {errInvalidBody, false}, // A bad peer was detected, but not the sync origin - {errInvalidReceipt, false}, // A bad peer was detected, but not the sync origin - {errCancelHashFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop - {errCancelBlockFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop - {errCancelHeaderFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop - {errCancelBodyFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop + {nil, false}, // Sync succeeded, all is well + {errBusy, false}, // Sync is already in progress, no problem + {errUnknownPeer, false}, // Peer is unknown, was already dropped, don't double drop + {errBadPeer, true}, // Peer was deemed bad for some reason, drop it + {errStallingPeer, true}, // Peer was detected to be stalling, drop it + {errNoPeers, false}, // No peers to download from, soft race, no issue + {errTimeout, true}, // No hashes received in due time, drop the peer + {errEmptyHashSet, true}, // No hashes were returned as a response, drop as it's a dead end + {errEmptyHeaderSet, true}, // No headers were returned as a response, drop as it's a dead end + {errPeersUnavailable, true}, // Nobody had the advertised blocks, drop the advertiser + {errInvalidChain, true}, // Hash chain was detected as invalid, definitely drop + {errInvalidBlock, false}, // A bad peer was detected, but not the sync origin + {errInvalidBody, false}, // A bad peer was detected, but not the sync origin + {errInvalidReceipt, false}, // A bad peer was detected, but not the sync origin + {errCancelHashFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop + {errCancelBlockFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop + {errCancelHeaderFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop + {errCancelBodyFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop + {errCancelReceiptFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop + {errCancelProcessing, false}, // Synchronisation was canceled, origin may be innocent, don't drop } // Run the tests and check disconnection status tester := newTester() @@ -1541,3 +1532,50 @@ func testFakedSyncProgress(t *testing.T, protocol int, mode SyncMode) { t.Fatalf("Final progress mismatch: have %v/%v/%v, want 0-%v/%v/%v", origin, current, latest, targetBlocks, targetBlocks, targetBlocks) } } + +// This test reproduces an issue where unexpected deliveries would +// block indefinitely if they arrived at the right time. +func TestDeliverHeadersHang62(t *testing.T) { testDeliverHeadersHang(t, 62, FullSync) } +func TestDeliverHeadersHang63Full(t *testing.T) { testDeliverHeadersHang(t, 63, FullSync) } +func TestDeliverHeadersHang63Fast(t *testing.T) { testDeliverHeadersHang(t, 63, FastSync) } +func TestDeliverHeadersHang64Full(t *testing.T) { testDeliverHeadersHang(t, 64, FullSync) } +func TestDeliverHeadersHang64Fast(t *testing.T) { testDeliverHeadersHang(t, 64, FastSync) } +func TestDeliverHeadersHang64Light(t *testing.T) { testDeliverHeadersHang(t, 64, LightSync) } + +func testDeliverHeadersHang(t *testing.T, protocol int, mode SyncMode) { + t.Parallel() + hashes, headers, blocks, receipts := makeChain(5, 0, genesis, nil) + fakeHeads := []*types.Header{{}, {}, {}, {}} + for i := 0; i < 200; i++ { + tester := newTester() + tester.newPeer("peer", protocol, hashes, headers, blocks, receipts) + // Whenever the downloader requests headers, flood it with + // a lot of unrequested header deliveries. + tester.downloader.peers.peers["peer"].getAbsHeaders = func(from uint64, count, skip int, reverse bool) error { + deliveriesDone := make(chan struct{}, 500) + for i := 0; i < cap(deliveriesDone); i++ { + peer := fmt.Sprintf("fake-peer%d", i) + go func() { + tester.downloader.DeliverHeaders(peer, fakeHeads) + deliveriesDone <- struct{}{} + }() + } + // Deliver the actual requested headers. + impl := tester.peerGetAbsHeadersFn("peer", 0) + go impl(from, count, skip, reverse) + // None of the extra deliveries should block. + timeout := time.After(5 * time.Second) + for i := 0; i < cap(deliveriesDone); i++ { + select { + case <-deliveriesDone: + case <-timeout: + panic("blocked") + } + } + return nil + } + if err := tester.sync("peer", nil, mode); err != nil { + t.Errorf("sync failed: %v", err) + } + } +} diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index 1fb5b6e12..584797d7b 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -101,11 +101,14 @@ type queue struct { resultCache []*fetchResult // Downloaded but not yet delivered fetch results resultOffset uint64 // Offset of the first cached fetch result in the block chain - lock sync.RWMutex + lock *sync.Mutex + active *sync.Cond + closed bool } // newQueue creates a new download queue for scheduling block retrieval. func newQueue(stateDb ethdb.Database) *queue { + lock := new(sync.Mutex) return &queue{ hashPool: make(map[common.Hash]int), hashQueue: prque.New(), @@ -122,6 +125,8 @@ func newQueue(stateDb ethdb.Database) *queue { statePendPool: make(map[string]*fetchRequest), stateDatabase: stateDb, resultCache: make([]*fetchResult, blockCacheLimit), + active: sync.NewCond(lock), + lock: lock, } } @@ -133,6 +138,7 @@ func (q *queue) Reset() { q.stateSchedLock.Lock() defer q.stateSchedLock.Unlock() + q.closed = false q.mode = FullSync q.fastSyncPivot = 0 @@ -162,18 +168,27 @@ func (q *queue) Reset() { q.resultOffset = 0 } +// Close marks the end of the sync, unblocking WaitResults. +// It may be called even if the queue is already closed. +func (q *queue) Close() { + q.lock.Lock() + q.closed = true + q.lock.Unlock() + q.active.Broadcast() +} + // PendingBlocks retrieves the number of block (body) requests pending for retrieval. func (q *queue) PendingBlocks() int { - q.lock.RLock() - defer q.lock.RUnlock() + q.lock.Lock() + defer q.lock.Unlock() return q.hashQueue.Size() + q.blockTaskQueue.Size() } // PendingReceipts retrieves the number of block receipts pending for retrieval. func (q *queue) PendingReceipts() int { - q.lock.RLock() - defer q.lock.RUnlock() + q.lock.Lock() + defer q.lock.Unlock() return q.receiptTaskQueue.Size() } @@ -192,8 +207,8 @@ func (q *queue) PendingNodeData() int { // InFlightBlocks retrieves whether there are block fetch requests currently in // flight. func (q *queue) InFlightBlocks() bool { - q.lock.RLock() - defer q.lock.RUnlock() + q.lock.Lock() + defer q.lock.Unlock() return len(q.blockPendPool) > 0 } @@ -201,8 +216,8 @@ func (q *queue) InFlightBlocks() bool { // InFlightReceipts retrieves whether there are receipt fetch requests currently // in flight. func (q *queue) InFlightReceipts() bool { - q.lock.RLock() - defer q.lock.RUnlock() + q.lock.Lock() + defer q.lock.Unlock() return len(q.receiptPendPool) > 0 } @@ -210,8 +225,8 @@ func (q *queue) InFlightReceipts() bool { // InFlightNodeData retrieves whether there are node data entry fetch requests // currently in flight. func (q *queue) InFlightNodeData() bool { - q.lock.RLock() - defer q.lock.RUnlock() + q.lock.Lock() + defer q.lock.Unlock() return len(q.statePendPool)+int(atomic.LoadInt32(&q.stateProcessors)) > 0 } @@ -219,8 +234,8 @@ func (q *queue) InFlightNodeData() bool { // Idle returns if the queue is fully idle or has some data still inside. This // method is used by the tester to detect termination events. func (q *queue) Idle() bool { - q.lock.RLock() - defer q.lock.RUnlock() + q.lock.Lock() + defer q.lock.Unlock() queued := q.hashQueue.Size() + q.blockTaskQueue.Size() + q.receiptTaskQueue.Size() + q.stateTaskQueue.Size() pending := len(q.blockPendPool) + len(q.receiptPendPool) + len(q.statePendPool) @@ -237,8 +252,8 @@ func (q *queue) Idle() bool { // FastSyncPivot retrieves the currently used fast sync pivot point. func (q *queue) FastSyncPivot() uint64 { - q.lock.RLock() - defer q.lock.RUnlock() + q.lock.Lock() + defer q.lock.Unlock() return q.fastSyncPivot } @@ -246,8 +261,8 @@ func (q *queue) FastSyncPivot() uint64 { // ShouldThrottleBlocks checks if the download should be throttled (active block (body) // fetches exceed block cache). func (q *queue) ShouldThrottleBlocks() bool { - q.lock.RLock() - defer q.lock.RUnlock() + q.lock.Lock() + defer q.lock.Unlock() // Calculate the currently in-flight block (body) requests pending := 0 @@ -261,8 +276,8 @@ func (q *queue) ShouldThrottleBlocks() bool { // ShouldThrottleReceipts checks if the download should be throttled (active receipt // fetches exceed block cache). func (q *queue) ShouldThrottleReceipts() bool { - q.lock.RLock() - defer q.lock.RUnlock() + q.lock.Lock() + defer q.lock.Unlock() // Calculate the currently in-flight receipt requests pending := 0 @@ -351,91 +366,74 @@ func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header { return inserts } -// GetHeadResult retrieves the first fetch result from the cache, or nil if it hasn't -// been downloaded yet (or simply non existent). -func (q *queue) GetHeadResult() *fetchResult { - q.lock.RLock() - defer q.lock.RUnlock() - - // If there are no results pending, return nil - if len(q.resultCache) == 0 || q.resultCache[0] == nil { - return nil - } - // If the next result is still incomplete, return nil - if q.resultCache[0].Pending > 0 { - return nil - } - // If the next result is the fast sync pivot... - if q.mode == FastSync && q.resultCache[0].Header.Number.Uint64() == q.fastSyncPivot { - // If the pivot state trie is still being pulled, return nil - if len(q.stateTaskPool) > 0 { - return nil - } - if q.PendingNodeData() > 0 { - return nil - } - // If the state is done, but not enough post-pivot headers were verified, stall... - for i := 0; i < fsHeaderForceVerify; i++ { - if i+1 >= len(q.resultCache) || q.resultCache[i+1] == nil { - return nil - } - } - } - return q.resultCache[0] -} - -// TakeResults retrieves and permanently removes a batch of fetch results from -// the cache. -func (q *queue) TakeResults() []*fetchResult { +// WaitResults retrieves and permanently removes a batch of fetch +// results from the cache. the result slice will be empty if the queue +// has been closed. +func (q *queue) WaitResults() []*fetchResult { q.lock.Lock() defer q.lock.Unlock() - // Accumulate all available results - results := []*fetchResult{} - for i, result := range q.resultCache { - // Stop if no more results are ready - if result == nil || result.Pending > 0 { - break + nproc := q.countProcessableItems() + for nproc == 0 && !q.closed { + q.active.Wait() + nproc = q.countProcessableItems() + } + results := make([]*fetchResult, nproc) + copy(results, q.resultCache[:nproc]) + if len(results) > 0 { + // Mark results as done before dropping them from the cache. + for _, result := range results { + hash := result.Header.Hash() + delete(q.blockDonePool, hash) + delete(q.receiptDonePool, hash) } - // The fast sync pivot block may only be processed after state fetch completes - if q.mode == FastSync && result.Header.Number.Uint64() == q.fastSyncPivot { - if len(q.stateTaskPool) > 0 { - break - } - if q.PendingNodeData() > 0 { - break - } - // Even is state fetch is done, ensure post-pivot headers passed verifications - safe := true - for j := 0; j < fsHeaderForceVerify; j++ { - if i+j+1 >= len(q.resultCache) || q.resultCache[i+j+1] == nil { - safe = false + // Delete the results from the cache and clear the tail. + copy(q.resultCache, q.resultCache[nproc:]) + for i := len(q.resultCache) - nproc; i < len(q.resultCache); i++ { + q.resultCache[i] = nil + } + // Advance the expected block number of the first cache entry. + q.resultOffset += uint64(nproc) + } + return results +} + +// countProcessableItems counts the processable items. +func (q *queue) countProcessableItems() int { + for i, result := range q.resultCache { + // Don't process incomplete or unavailable items. + if result == nil || result.Pending > 0 { + return i + } + // Special handling for the fast-sync pivot block: + if q.mode == FastSync { + bnum := result.Header.Number.Uint64() + if bnum == q.fastSyncPivot { + // If the state of the pivot block is not + // available yet, we cannot proceed and return 0. + // + // Stop before processing the pivot block to ensure that + // resultCache has space for fsHeaderForceVerify items. Not + // doing this could leave us unable to download the required + // amount of headers. + if i > 0 || len(q.stateTaskPool) > 0 || q.PendingNodeData() > 0 { + return i + } + for j := 0; j < fsHeaderForceVerify; j++ { + if i+j+1 >= len(q.resultCache) || q.resultCache[i+j+1] == nil { + return i + } } } - if !safe { - break + // If we're just the fast sync pivot, stop as well + // because the following batch needs different insertion. + // This simplifies handling the switchover in d.process. + if bnum == q.fastSyncPivot+1 && i > 0 { + return i } } - // If we've just inserted the fast sync pivot, stop as the following batch needs different insertion - if q.mode == FastSync && result.Header.Number.Uint64() == q.fastSyncPivot+1 && len(results) > 0 { - break - } - results = append(results, result) - - hash := result.Header.Hash() - delete(q.blockDonePool, hash) - delete(q.receiptDonePool, hash) } - // Delete the results from the slice and let them be garbage collected - // without this slice trick the results would stay in memory until nil - // would be assigned to them. - copy(q.resultCache, q.resultCache[len(results):]) - for k, n := len(q.resultCache)-len(results), len(q.resultCache); k < n; k++ { - q.resultCache[k] = nil - } - q.resultOffset += uint64(len(results)) - - return results + return len(q.resultCache) } // ReserveBlocks reserves a set of block hashes for the given peer, skipping any @@ -584,6 +582,7 @@ func (q *queue) reserveHeaders(p *peer, count int, taskPool map[common.Hash]*typ // If we're the first to request this task, initialise the result container index := int(header.Number.Int64() - int64(q.resultOffset)) if index >= len(q.resultCache) || index < 0 { + common.Report("index allocation went beyond available resultCache space") return nil, false, errInvalidChain } if q.resultCache[index] == nil { @@ -617,6 +616,10 @@ func (q *queue) reserveHeaders(p *peer, count int, taskPool map[common.Hash]*typ for _, header := range skip { taskQueue.Push(header, -float32(header.Number.Uint64())) } + if progress { + // Wake WaitResults, resultCache was modified + q.active.Signal() + } // Assemble and return the block download request if len(send) == 0 { return nil, progress, nil @@ -737,7 +740,7 @@ func (q *queue) ExpireNodeData(timeout time.Duration) []string { // expire is the generic check that move expired tasks from a pending pool back // into a task pool, returning all entities caught with expired tasks. // -// Note, this method expects the queue lock to be already held for writing. The +// Note, this method expects the queue lock to be already held. The // reason the lock is not obtained in here is because the parameters already need // to access the queue, so they already need a lock anyway. func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, taskQueue *prque.Prque, timeoutMeter metrics.Meter) []string { @@ -813,17 +816,16 @@ func (q *queue) DeliverBlocks(id string, blocks []*types.Block) error { for hash, index := range request.Hashes { q.hashQueue.Push(hash, float32(index)) } + // Wake up WaitResults + q.active.Signal() // If none of the blocks were good, it's a stale delivery switch { case len(errs) == 0: return nil - case len(errs) == 1 && (errs[0] == errInvalidChain || errs[0] == errInvalidBlock): return errs[0] - case len(errs) == len(blocks): return errStaleDelivery - default: return fmt.Errorf("multiple failures: %v", errs) } @@ -915,14 +917,14 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQ taskQueue.Push(header, -float32(header.Number.Uint64())) } } + // Wake up WaitResults + q.active.Signal() // If none of the data was good, it's a stale delivery switch { case failure == nil || failure == errInvalidChain: return failure - case useful: return fmt.Errorf("partial failure: %v", failure) - default: return errStaleDelivery } @@ -977,10 +979,8 @@ func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, i switch { case len(errs) == 0: return nil - case len(errs) == len(request.Hashes): return errStaleDelivery - default: return fmt.Errorf("multiple failures: %v", errs) } @@ -989,6 +989,10 @@ func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, i // deliverNodeData is the asynchronous node data processor that injects a batch // of sync results into the state scheduler. func (q *queue) deliverNodeData(results []trie.SyncResult, callback func(error, int)) { + // Wake up WaitResults after the state has been written because it + // might be waiting for the pivot block state to get completed. + defer q.active.Signal() + // Process results one by one to permit task fetches in between for i, result := range results { q.stateSchedLock.Lock()