Fix datarace when stopping a node. (#723)

This commit is contained in:
Igor Mandrigin 2018-03-05 21:36:32 +01:00 committed by GitHub
parent a5cec358a9
commit cfb3e6a080
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 104 additions and 101 deletions

View File

@ -1,5 +1,91 @@
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go
index 7ede530a9..60ff8dbc6 100644
--- a/eth/downloader/downloader.go
+++ b/eth/downloader/downloader.go
@@ -141,6 +141,8 @@ type Downloader struct {
quitCh chan struct{} // Quit channel to signal termination
quitLock sync.RWMutex // Lock to prevent double closes
+ downloads sync.WaitGroup // Keeps track of the currently active downloads
+
// Testing hooks
syncInitHook func(uint64, uint64) // Method to call upon initiating a new sync run
bodyFetchHook func([]*types.Header) // Method to call upon starting a block body fetch
@@ -398,7 +400,9 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode
// specified peer and head hash.
func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.Int) (err error) {
d.mux.Post(StartEvent{})
+ d.downloads.Add(1)
defer func() {
+ d.downloads.Done()
// reset on error
if err != nil {
d.mux.Post(FailedEvent{err})
@@ -528,6 +532,10 @@ func (d *Downloader) Terminate() {
// Cancel any pending download requests
d.Cancel()
+
+ // Wait, so external dependencies aren't destroyed
+ // until the download processing is done.
+ d.downloads.Wait()
}
// fetchHeight retrieves the head header of the remote peer to aid in estimating
diff --git a/eth/handler.go b/eth/handler.go
index c2426544f..08818a507 100644
--- a/eth/handler.go
+++ b/eth/handler.go
@@ -230,6 +230,9 @@ func (pm *ProtocolManager) Stop() {
// Quit fetcher, txsyncLoop.
close(pm.quitSync)
+ // Stop downloader and make sure that all the running downloads are complete.
+ pm.downloader.Terminate()
+
// Disconnect existing sessions.
// This also closes the gate for any new registrations on the peer set.
// sessions which are already established but not added to pm.peers yet
diff --git a/eth/sync.go b/eth/sync.go
index 2da1464bc..f14e84303 100644
--- a/eth/sync.go
+++ b/eth/sync.go
@@ -135,7 +135,6 @@ func (pm *ProtocolManager) syncer() {
// Start and ensure cleanup of sync mechanisms
pm.fetcher.Start()
defer pm.fetcher.Stop()
- defer pm.downloader.Terminate()
// Wait for different events to fire synchronisation operations
forceSync := time.NewTicker(forceSyncCycle)
diff --git a/les/handler.go b/les/handler.go
index 864abe605..67e459594 100644
--- a/les/handler.go
+++ b/les/handler.go
@@ -241,6 +241,9 @@ func (pm *ProtocolManager) Stop() {
close(pm.quitSync) // quits syncer, fetcher
+ // Stop downloader and make sure that all the running downloads are complete.
+ pm.downloader.Terminate()
+
// Disconnect existing sessions.
// This also closes the gate for any new registrations on the peer set.
// sessions which are already established but not added to pm.peers yet
diff --git a/les/sync.go b/les/sync.go
index c0e17f97d..983dc7f36 100644
--- a/les/sync.go
+++ b/les/sync.go
@@ -36,7 +36,6 @@ func (pm *ProtocolManager) syncer() {
// Start and ensure cleanup of sync mechanisms
//pm.fetcher.Start()
//defer pm.fetcher.Stop()
- defer pm.downloader.Terminate()
// Wait for different events to fire synchronisation operations
//forceSync := time.Tick(forceSyncCycle)
diff --git a/les/backend.go b/les/backend.go diff --git a/les/backend.go b/les/backend.go
index 333df920..450d8351 100644 index 6a324cb0..e3844bf8 100644
--- a/les/backend.go --- a/les/backend.go
+++ b/les/backend.go +++ b/les/backend.go
@@ -20,7 +20,6 @@ package les @@ -20,7 +20,6 @@ package les
@ -7,79 +93,14 @@ index 333df920..450d8351 100644
"fmt" "fmt"
"sync" "sync"
- "time" - "time"
"github.com/ethereum/go-ethereum/accounts" "github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
@@ -248,7 +247,6 @@ func (s *LightEthereum) Stop() error { @@ -250,7 +249,6 @@ func (s *LightEthereum) Stop() error {
s.eventMux.Stop() s.eventMux.Stop()
- time.Sleep(time.Millisecond * 200) - time.Sleep(time.Millisecond * 200)
s.chainDb.Close() s.chainDb.Close()
close(s.shutdownChan) close(s.shutdownChan)
diff --git a/les/handler.go b/les/handler.go
index 613fbb79..a6ae09ef 100644
--- a/les/handler.go
+++ b/les/handler.go
@@ -124,6 +124,10 @@ type ProtocolManager struct {
// wait group is used for graceful shutdowns during downloading
// and processing
wg *sync.WaitGroup
+
+ // wait group is used for waiting for currend download
+ // to finish (if running)
+ downloads *sync.WaitGroup
}
// NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
@@ -145,6 +149,7 @@ func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, protoco
quitSync: quitSync,
wg: wg,
noMorePeers: make(chan struct{}),
+ downloads: &sync.WaitGroup{},
}
if odr != nil {
manager.retriever = odr.retriever
@@ -210,9 +215,32 @@ func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, protoco
manager.fetcher = newLightFetcher(manager)
}
+ go manager.monitorDownloads()
+
return manager, nil
}
+func (pm *ProtocolManager) monitorDownloads() {
+ sub := pm.eventMux.Subscribe(downloader.StartEvent{}, downloader.DoneEvent{}, downloader.FailedEvent{})
+ defer sub.Unsubscribe()
+
+ for {
+ select {
+ case event := <-sub.Chan():
+ if event == nil {
+ return
+ }
+
+ switch event.Data.(type) {
+ case downloader.StartEvent:
+ pm.downloads.Add(1)
+ case downloader.DoneEvent, downloader.FailedEvent:
+ pm.downloads.Done()
+ }
+ }
+ }
+}
+
// removePeer initiates disconnection from a peer by removing it from the peer set
func (pm *ProtocolManager) removePeer(id string) {
pm.peers.Unregister(id)
@@ -240,6 +267,8 @@ func (pm *ProtocolManager) Stop() {
close(pm.quitSync) // quits syncer, fetcher
+ pm.downloads.Wait() // Wait until current downloads are finished.
+
// Disconnect existing sessions.
// This also closes the gate for any new registrations on the peer set.
// sessions which are already established but not added to pm.peers yet

View File

@ -141,6 +141,8 @@ type Downloader struct {
quitCh chan struct{} // Quit channel to signal termination quitCh chan struct{} // Quit channel to signal termination
quitLock sync.RWMutex // Lock to prevent double closes quitLock sync.RWMutex // Lock to prevent double closes
downloads sync.WaitGroup // Keeps track of the currently active downloads
// Testing hooks // Testing hooks
syncInitHook func(uint64, uint64) // Method to call upon initiating a new sync run syncInitHook func(uint64, uint64) // Method to call upon initiating a new sync run
bodyFetchHook func([]*types.Header) // Method to call upon starting a block body fetch bodyFetchHook func([]*types.Header) // Method to call upon starting a block body fetch
@ -398,7 +400,9 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode
// specified peer and head hash. // specified peer and head hash.
func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.Int) (err error) { func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.Int) (err error) {
d.mux.Post(StartEvent{}) d.mux.Post(StartEvent{})
d.downloads.Add(1)
defer func() { defer func() {
d.downloads.Done()
// reset on error // reset on error
if err != nil { if err != nil {
d.mux.Post(FailedEvent{err}) d.mux.Post(FailedEvent{err})
@ -528,6 +532,10 @@ func (d *Downloader) Terminate() {
// Cancel any pending download requests // Cancel any pending download requests
d.Cancel() d.Cancel()
// Wait, so external dependencies aren't destroyed
// until the download processing is done.
d.downloads.Wait()
} }
// fetchHeight retrieves the head header of the remote peer to aid in estimating // fetchHeight retrieves the head header of the remote peer to aid in estimating

View File

@ -230,6 +230,9 @@ func (pm *ProtocolManager) Stop() {
// Quit fetcher, txsyncLoop. // Quit fetcher, txsyncLoop.
close(pm.quitSync) close(pm.quitSync)
// Stop downloader and make sure that all the running downloads are complete.
pm.downloader.Terminate()
// Disconnect existing sessions. // Disconnect existing sessions.
// This also closes the gate for any new registrations on the peer set. // This also closes the gate for any new registrations on the peer set.
// sessions which are already established but not added to pm.peers yet // sessions which are already established but not added to pm.peers yet

View File

@ -135,7 +135,6 @@ func (pm *ProtocolManager) syncer() {
// Start and ensure cleanup of sync mechanisms // Start and ensure cleanup of sync mechanisms
pm.fetcher.Start() pm.fetcher.Start()
defer pm.fetcher.Stop() defer pm.fetcher.Stop()
defer pm.downloader.Terminate()
// Wait for different events to fire synchronisation operations // Wait for different events to fire synchronisation operations
forceSync := time.NewTicker(forceSyncCycle) forceSync := time.NewTicker(forceSyncCycle)

View File

@ -123,10 +123,6 @@ type ProtocolManager struct {
// wait group is used for graceful shutdowns during downloading // wait group is used for graceful shutdowns during downloading
// and processing // and processing
wg *sync.WaitGroup wg *sync.WaitGroup
// wait group is used for waiting for currend download
// to finish (if running)
downloads *sync.WaitGroup
} }
// NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable // NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
@ -148,7 +144,6 @@ func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, protoco
quitSync: quitSync, quitSync: quitSync,
wg: wg, wg: wg,
noMorePeers: make(chan struct{}), noMorePeers: make(chan struct{}),
downloads: &sync.WaitGroup{},
} }
if odr != nil { if odr != nil {
manager.retriever = odr.retriever manager.retriever = odr.retriever
@ -214,32 +209,9 @@ func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, protoco
manager.fetcher = newLightFetcher(manager) manager.fetcher = newLightFetcher(manager)
} }
go manager.monitorDownloads()
return manager, nil return manager, nil
} }
func (pm *ProtocolManager) monitorDownloads() {
sub := pm.eventMux.Subscribe(downloader.StartEvent{}, downloader.DoneEvent{}, downloader.FailedEvent{})
defer sub.Unsubscribe()
for {
select {
case event := <-sub.Chan():
if event == nil {
return
}
switch event.Data.(type) {
case downloader.StartEvent:
pm.downloads.Add(1)
case downloader.DoneEvent, downloader.FailedEvent:
pm.downloads.Done()
}
}
}
}
// removePeer initiates disconnection from a peer by removing it from the peer set // removePeer initiates disconnection from a peer by removing it from the peer set
func (pm *ProtocolManager) removePeer(id string) { func (pm *ProtocolManager) removePeer(id string) {
pm.peers.Unregister(id) pm.peers.Unregister(id)
@ -269,7 +241,8 @@ func (pm *ProtocolManager) Stop() {
close(pm.quitSync) // quits syncer, fetcher close(pm.quitSync) // quits syncer, fetcher
pm.downloads.Wait() // Wait until current downloads are finished. // Stop downloader and make sure that all the running downloads are complete.
pm.downloader.Terminate()
// Disconnect existing sessions. // Disconnect existing sessions.
// This also closes the gate for any new registrations on the peer set. // This also closes the gate for any new registrations on the peer set.

View File

@ -36,7 +36,6 @@ func (pm *ProtocolManager) syncer() {
// Start and ensure cleanup of sync mechanisms // Start and ensure cleanup of sync mechanisms
//pm.fetcher.Start() //pm.fetcher.Start()
//defer pm.fetcher.Stop() //defer pm.fetcher.Stop()
defer pm.downloader.Terminate()
// Wait for different events to fire synchronisation operations // Wait for different events to fire synchronisation operations
//forceSync := time.Tick(forceSyncCycle) //forceSync := time.Tick(forceSyncCycle)