diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index cbdaa0ca2..60ff8dbc6 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -143,6 +143,8 @@ quitCh chan struct{} // Quit channel to signal termination quitLock sync.RWMutex // Lock to prevent double closes + downloads sync.WaitGroup // Keeps track of the currently active downloads + // Testing hooks syncInitHook func(uint64, uint64) // Method to call upon initiating a new sync run bodyFetchHook func([]*types.Header) // Method to call upon starting a block body fetch @@ -403,7 +405,9 @@ // specified peer and head hash. func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.Int) (err error) { d.mux.Post(StartEvent{}) + d.downloads.Add(1) defer func() { + d.downloads.Done() // reset on error if err != nil { d.mux.Post(FailedEvent{err}) @@ -471,12 +475,17 @@ } else if d.mode == FullSync { fetchers = append(fetchers, d.processFullSyncContent) } - return d.spawnSync(fetchers) + return d.spawnSync(errCancelHeaderFetch, fetchers) } // spawnSync runs d.process and all given fetcher functions to completion in // separate goroutines, returning the first error that appears. -func (d *Downloader) spawnSync(fetchers []func() error) error { +func (d *Downloader) spawnSync(errCancel error, fetchers []func() error) error { + select { + case <-d.cancelCh: + return errCancel + default: + } errc := make(chan error, len(fetchers)) d.cancelWg.Add(len(fetchers)) for _, fn := range fetchers { @@ -539,6 +548,10 @@ // Cancel any pending download requests d.Cancel() + + // Wait, so external dependencies aren't destroyed + // until the download processing is done. + d.downloads.Wait() } // fetchHeight retrieves the head header of the remote peer to aid in estimating diff --git a/eth/handler.go b/eth/handler.go index c2426544f..08818a507 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -230,6 +230,9 @@ func (pm *ProtocolManager) Stop() { // Quit fetcher, txsyncLoop. close(pm.quitSync) + // Stop downloader and make sure that all the running downloads are complete. + pm.downloader.Terminate() + // Disconnect existing sessions. // This also closes the gate for any new registrations on the peer set. // sessions which are already established but not added to pm.peers yet diff --git a/eth/sync.go b/eth/sync.go index 2da1464bc..f14e84303 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -135,7 +135,6 @@ func (pm *ProtocolManager) syncer() { // Start and ensure cleanup of sync mechanisms pm.fetcher.Start() defer pm.fetcher.Stop() - defer pm.downloader.Terminate() // Wait for different events to fire synchronisation operations forceSync := time.NewTicker(forceSyncCycle) diff --git a/les/handler.go b/les/handler.go index 864abe605..67e459594 100644 --- a/les/handler.go +++ b/les/handler.go @@ -241,6 +241,9 @@ func (pm *ProtocolManager) Stop() { close(pm.quitSync) // quits syncer, fetcher + // Stop downloader and make sure that all the running downloads are complete. + pm.downloader.Terminate() + // Disconnect existing sessions. // This also closes the gate for any new registrations on the peer set. // sessions which are already established but not added to pm.peers yet diff --git a/les/sync.go b/les/sync.go index c0e17f97d..983dc7f36 100644 --- a/les/sync.go +++ b/les/sync.go @@ -36,7 +36,6 @@ func (pm *ProtocolManager) syncer() { // Start and ensure cleanup of sync mechanisms //pm.fetcher.Start() //defer pm.fetcher.Stop() - defer pm.downloader.Terminate() // Wait for different events to fire synchronisation operations //forceSync := time.Tick(forceSyncCycle) diff --git a/les/backend.go b/les/backend.go index 6a324cb0..e3844bf8 100644 --- a/les/backend.go +++ b/les/backend.go @@ -20,7 +20,6 @@ package les import ( "fmt" "sync" - "time" "github.com/ethereum/go-ethereum/accounts" "github.com/ethereum/go-ethereum/common" @@ -250,7 +249,6 @@ func (s *LightEthereum) Stop() error { s.eventMux.Stop() - time.Sleep(time.Millisecond * 200) s.chainDb.Close() close(s.shutdownChan)