Ensure that wg.Add is called before wg.Wait

Now if Add is to be called it will be called before Wait, this is achieved
by doing following:
- if cancel() gets lock first and closes channelCh before spawnSync is called
we will exit right away
- if not than we will ensure that we hold a lock until syncers are spawned
so that cancel() will be blocked for this time and it will prevent whole Terminate() from
progressing
This commit is contained in:
Dmitry Shulyak 2018-05-04 08:00:55 +03:00 committed by Pedro Pombeiro
parent 707221954f
commit 84cb5ca917
2 changed files with 58 additions and 50 deletions

View File

@ -1,8 +1,8 @@
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go diff --git i/eth/downloader/downloader.go w/eth/downloader/downloader.go
index cbdaa0ca2..60ff8dbc6 100644 index 43f0e3db9..b337f95c9 100644
--- a/eth/downloader/downloader.go --- i/eth/downloader/downloader.go
+++ b/eth/downloader/downloader.go +++ w/eth/downloader/downloader.go
@@ -143,6 +143,8 @@ @@ -143,6 +143,8 @@ type Downloader struct {
quitCh chan struct{} // Quit channel to signal termination quitCh chan struct{} // Quit channel to signal termination
quitLock sync.RWMutex // Lock to prevent double closes quitLock sync.RWMutex // Lock to prevent double closes
@ -11,7 +11,7 @@ index cbdaa0ca2..60ff8dbc6 100644
// Testing hooks // Testing hooks
syncInitHook func(uint64, uint64) // Method to call upon initiating a new sync run syncInitHook func(uint64, uint64) // Method to call upon initiating a new sync run
bodyFetchHook func([]*types.Header) // Method to call upon starting a block body fetch bodyFetchHook func([]*types.Header) // Method to call upon starting a block body fetch
@@ -403,7 +405,9 @@ @@ -403,7 +405,9 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode
// specified peer and head hash. // specified peer and head hash.
func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.Int) (err error) { func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.Int) (err error) {
d.mux.Post(StartEvent{}) d.mux.Post(StartEvent{})
@ -21,7 +21,7 @@ index cbdaa0ca2..60ff8dbc6 100644
// reset on error // reset on error
if err != nil { if err != nil {
d.mux.Post(FailedEvent{err}) d.mux.Post(FailedEvent{err})
@@ -471,12 +475,17 @@ @@ -471,14 +475,22 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I
} else if d.mode == FullSync { } else if d.mode == FullSync {
fetchers = append(fetchers, d.processFullSyncContent) fetchers = append(fetchers, d.processFullSyncContent)
} }
@ -33,15 +33,20 @@ index cbdaa0ca2..60ff8dbc6 100644
// separate goroutines, returning the first error that appears. // separate goroutines, returning the first error that appears.
-func (d *Downloader) spawnSync(fetchers []func() error) error { -func (d *Downloader) spawnSync(fetchers []func() error) error {
+func (d *Downloader) spawnSync(errCancel error, fetchers []func() error) error { +func (d *Downloader) spawnSync(errCancel error, fetchers []func() error) error {
+ d.cancelLock.Lock()
+ select { + select {
+ case <-d.cancelCh: + case <-d.cancelCh:
+ d.cancelLock.Unlock()
+ return errCancel + return errCancel
+ default: + default:
+ } + }
errc := make(chan error, len(fetchers)) errc := make(chan error, len(fetchers))
d.cancelWg.Add(len(fetchers)) d.cancelWg.Add(len(fetchers))
+ d.cancelLock.Unlock()
for _, fn := range fetchers { for _, fn := range fetchers {
@@ -539,6 +548,10 @@ fn := fn
go func() { defer d.cancelWg.Done(); errc <- fn() }()
@@ -539,6 +551,10 @@ func (d *Downloader) Terminate() {
// Cancel any pending download requests // Cancel any pending download requests
d.Cancel() d.Cancel()
@ -52,75 +57,75 @@ index cbdaa0ca2..60ff8dbc6 100644
} }
// fetchHeight retrieves the head header of the remote peer to aid in estimating // fetchHeight retrieves the head header of the remote peer to aid in estimating
diff --git a/eth/handler.go b/eth/handler.go diff --git i/eth/handler.go w/eth/handler.go
index c2426544f..08818a507 100644 index 4069359c9..da9ebb243 100644
--- a/eth/handler.go --- i/eth/handler.go
+++ b/eth/handler.go +++ w/eth/handler.go
@@ -230,6 +230,9 @@ func (pm *ProtocolManager) Stop() { @@ -230,6 +230,9 @@ func (pm *ProtocolManager) Stop() {
// Quit fetcher, txsyncLoop. // Quit fetcher, txsyncLoop.
close(pm.quitSync) close(pm.quitSync)
+ // Stop downloader and make sure that all the running downloads are complete. + // Stop downloader and make sure that all the running downloads are complete.
+ pm.downloader.Terminate() + pm.downloader.Terminate()
+ +
// Disconnect existing sessions. // Disconnect existing sessions.
// This also closes the gate for any new registrations on the peer set. // This also closes the gate for any new registrations on the peer set.
// sessions which are already established but not added to pm.peers yet // sessions which are already established but not added to pm.peers yet
diff --git a/eth/sync.go b/eth/sync.go diff --git i/eth/sync.go w/eth/sync.go
index 2da1464bc..f14e84303 100644 index e49e40087..4367434a6 100644
--- a/eth/sync.go --- i/eth/sync.go
+++ b/eth/sync.go +++ w/eth/sync.go
@@ -135,7 +135,6 @@ func (pm *ProtocolManager) syncer() { @@ -135,7 +135,6 @@ func (pm *ProtocolManager) syncer() {
// Start and ensure cleanup of sync mechanisms // Start and ensure cleanup of sync mechanisms
pm.fetcher.Start() pm.fetcher.Start()
defer pm.fetcher.Stop() defer pm.fetcher.Stop()
- defer pm.downloader.Terminate() - defer pm.downloader.Terminate()
// Wait for different events to fire synchronisation operations // Wait for different events to fire synchronisation operations
forceSync := time.NewTicker(forceSyncCycle) forceSync := time.NewTicker(forceSyncCycle)
diff --git a/les/handler.go b/les/handler.go diff --git i/les/backend.go w/les/backend.go
index 864abe605..67e459594 100644 index 6a324cb04..e3844bf84 100644
--- a/les/handler.go --- i/les/backend.go
+++ b/les/handler.go +++ w/les/backend.go
@@ -20,7 +20,6 @@ package les
import (
"fmt"
"sync"
- "time"
"github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/common"
@@ -250,7 +249,6 @@ func (s *LightEthereum) Stop() error {
s.eventMux.Stop()
- time.Sleep(time.Millisecond * 200)
s.chainDb.Close()
close(s.shutdownChan)
diff --git i/les/handler.go w/les/handler.go
index 9627f392b..f2bbe899f 100644
--- i/les/handler.go
+++ w/les/handler.go
@@ -241,6 +241,9 @@ func (pm *ProtocolManager) Stop() { @@ -241,6 +241,9 @@ func (pm *ProtocolManager) Stop() {
close(pm.quitSync) // quits syncer, fetcher close(pm.quitSync) // quits syncer, fetcher
+ // Stop downloader and make sure that all the running downloads are complete. + // Stop downloader and make sure that all the running downloads are complete.
+ pm.downloader.Terminate() + pm.downloader.Terminate()
+ +
// Disconnect existing sessions. // Disconnect existing sessions.
// This also closes the gate for any new registrations on the peer set. // This also closes the gate for any new registrations on the peer set.
// sessions which are already established but not added to pm.peers yet // sessions which are already established but not added to pm.peers yet
diff --git a/les/sync.go b/les/sync.go diff --git i/les/sync.go w/les/sync.go
index c0e17f97d..983dc7f36 100644 index c3d37e2f3..fc1f076c7 100644
--- a/les/sync.go --- i/les/sync.go
+++ b/les/sync.go +++ w/les/sync.go
@@ -36,7 +36,6 @@ func (pm *ProtocolManager) syncer() { @@ -31,7 +31,6 @@ func (pm *ProtocolManager) syncer() {
// Start and ensure cleanup of sync mechanisms // Start and ensure cleanup of sync mechanisms
//pm.fetcher.Start() //pm.fetcher.Start()
//defer pm.fetcher.Stop() //defer pm.fetcher.Stop()
- defer pm.downloader.Terminate() - defer pm.downloader.Terminate()
// Wait for different events to fire synchronisation operations // Wait for different events to fire synchronisation operations
//forceSync := time.Tick(forceSyncCycle) //forceSync := time.Tick(forceSyncCycle)
diff --git a/les/backend.go b/les/backend.go
index 6a324cb0..e3844bf8 100644
--- a/les/backend.go
+++ b/les/backend.go
@@ -20,7 +20,6 @@ package les
import (
"fmt"
"sync"
- "time"
"github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/common"
@@ -250,7 +249,6 @@ func (s *LightEthereum) Stop() error {
s.eventMux.Stop()
- time.Sleep(time.Millisecond * 200)
s.chainDb.Close()
close(s.shutdownChan)

View File

@ -481,13 +481,16 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I
// spawnSync runs d.process and all given fetcher functions to completion in // spawnSync runs d.process and all given fetcher functions to completion in
// separate goroutines, returning the first error that appears. // separate goroutines, returning the first error that appears.
func (d *Downloader) spawnSync(errCancel error, fetchers []func() error) error { func (d *Downloader) spawnSync(errCancel error, fetchers []func() error) error {
d.cancelLock.Lock()
select { select {
case <-d.cancelCh: case <-d.cancelCh:
d.cancelLock.Unlock()
return errCancel return errCancel
default: default:
} }
errc := make(chan error, len(fetchers)) errc := make(chan error, len(fetchers))
d.cancelWg.Add(len(fetchers)) d.cancelWg.Add(len(fetchers))
d.cancelLock.Unlock()
for _, fn := range fetchers { for _, fn := range fetchers {
fn := fn fn := fn
go func() { defer d.cancelWg.Done(); errc <- fn() }() go func() { defer d.cancelWg.Done(); errc <- fn() }()