Wait for `Rollback` before closing the DB in the light client. (#696)

* Wait for `Rollback` before closing the DB in the light client.

* Unsubscribe from mux events properly, get rid of `time.Sleep`.
This commit is contained in:
Igor Mandrigin 2018-02-23 11:49:57 +01:00 committed by GitHub
parent 741422af73
commit abb5df88d1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 115 additions and 2 deletions

View File

@ -0,0 +1,85 @@
diff --git a/les/backend.go b/les/backend.go
index 333df920..450d8351 100644
--- a/les/backend.go
+++ b/les/backend.go
@@ -20,7 +20,6 @@ package les
import (
"fmt"
"sync"
- "time"
"github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/common"
@@ -248,7 +247,6 @@ func (s *LightEthereum) Stop() error {
s.eventMux.Stop()
- time.Sleep(time.Millisecond * 200)
s.chainDb.Close()
close(s.shutdownChan)
diff --git a/les/handler.go b/les/handler.go
index 613fbb79..a6ae09ef 100644
--- a/les/handler.go
+++ b/les/handler.go
@@ -124,6 +124,10 @@ type ProtocolManager struct {
// wait group is used for graceful shutdowns during downloading
// and processing
wg *sync.WaitGroup
+
+ // wait group is used for waiting for currend download
+ // to finish (if running)
+ downloads *sync.WaitGroup
}
// NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
@@ -145,6 +149,7 @@ func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, protoco
quitSync: quitSync,
wg: wg,
noMorePeers: make(chan struct{}),
+ downloads: &sync.WaitGroup{},
}
if odr != nil {
manager.retriever = odr.retriever
@@ -210,9 +215,32 @@ func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, protoco
manager.fetcher = newLightFetcher(manager)
}
+ go manager.monitorDownloads()
+
return manager, nil
}
+func (pm *ProtocolManager) monitorDownloads() {
+ sub := pm.eventMux.Subscribe(downloader.StartEvent{}, downloader.DoneEvent{}, downloader.FailedEvent{})
+ defer sub.Unsubscribe()
+
+ for {
+ select {
+ case event := <-sub.Chan():
+ if event == nil {
+ return
+ }
+
+ switch event.Data.(type) {
+ case downloader.StartEvent:
+ pm.downloads.Add(1)
+ case downloader.DoneEvent, downloader.FailedEvent:
+ pm.downloads.Done()
+ }
+ }
+ }
+}
+
// removePeer initiates disconnection from a peer by removing it from the peer set
func (pm *ProtocolManager) removePeer(id string) {
pm.peers.Unregister(id)
@@ -240,6 +267,8 @@ func (pm *ProtocolManager) Stop() {
close(pm.quitSync) // quits syncer, fetcher
+ pm.downloads.Wait() // Wait until current downloads are finished.
+
// Disconnect existing sessions.
// This also closes the gate for any new registrations on the peer set.
// sessions which are already established but not added to pm.peers yet

View File

@ -20,7 +20,6 @@ package les
import (
"fmt"
"sync"
"time"
"github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/common"
@ -248,7 +247,6 @@ func (s *LightEthereum) Stop() error {
s.eventMux.Stop()
time.Sleep(time.Millisecond * 200)
s.chainDb.Close()
close(s.shutdownChan)

View File

@ -124,6 +124,10 @@ type ProtocolManager struct {
// wait group is used for graceful shutdowns during downloading
// and processing
wg *sync.WaitGroup
// wait group is used for waiting for currend download
// to finish (if running)
downloads *sync.WaitGroup
}
// NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
@ -145,6 +149,7 @@ func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, protoco
quitSync: quitSync,
wg: wg,
noMorePeers: make(chan struct{}),
downloads: &sync.WaitGroup{},
}
if odr != nil {
manager.retriever = odr.retriever
@ -210,9 +215,32 @@ func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, protoco
manager.fetcher = newLightFetcher(manager)
}
go manager.monitorDownloads()
return manager, nil
}
func (pm *ProtocolManager) monitorDownloads() {
sub := pm.eventMux.Subscribe(downloader.StartEvent{}, downloader.DoneEvent{}, downloader.FailedEvent{})
defer sub.Unsubscribe()
for {
select {
case event := <-sub.Chan():
if event == nil {
return
}
switch event.Data.(type) {
case downloader.StartEvent:
pm.downloads.Add(1)
case downloader.DoneEvent, downloader.FailedEvent:
pm.downloads.Done()
}
}
}
}
// removePeer initiates disconnection from a peer by removing it from the peer set
func (pm *ProtocolManager) removePeer(id string) {
pm.peers.Unregister(id)
@ -240,6 +268,8 @@ func (pm *ProtocolManager) Stop() {
close(pm.quitSync) // quits syncer, fetcher
pm.downloads.Wait() // Wait until current downloads are finished.
// Disconnect existing sessions.
// This also closes the gate for any new registrations on the peer set.
// sessions which are already established but not added to pm.peers yet