mirror of https://github.com/status-im/op-geth.git
Merge pull request #950 from karalabe/downloader-cancel-sync
eth/downloader: fix deliveries to check for sync cancels
This commit is contained in:
commit
2b06fe3eff
|
@ -65,12 +65,15 @@ type Downloader struct {
|
|||
|
||||
// Status
|
||||
synchronising int32
|
||||
notified int32
|
||||
|
||||
// Channels
|
||||
newPeerCh chan *peer
|
||||
hashCh chan hashPack
|
||||
blockCh chan blockPack
|
||||
cancelCh chan struct{}
|
||||
|
||||
cancelCh chan struct{} // Channel to cancel mid-flight syncs
|
||||
cancelLock sync.RWMutex // Lock to protect the cancel channel in delivers
|
||||
}
|
||||
|
||||
func New(hasBlock hashCheckFn, getBlock getBlockFn) *Downloader {
|
||||
|
@ -83,7 +86,6 @@ func New(hasBlock hashCheckFn, getBlock getBlockFn) *Downloader {
|
|||
hashCh: make(chan hashPack, 1),
|
||||
blockCh: make(chan blockPack, 1),
|
||||
}
|
||||
|
||||
return downloader
|
||||
}
|
||||
|
||||
|
@ -123,8 +125,14 @@ func (d *Downloader) Synchronise(id string, hash common.Hash) error {
|
|||
}
|
||||
defer atomic.StoreInt32(&d.synchronising, 0)
|
||||
|
||||
// Create cancel channel for aborting midflight
|
||||
// Post a user notification of the sync (only once per session)
|
||||
if atomic.CompareAndSwapInt32(&d.notified, 0, 1) {
|
||||
glog.V(logger.Info).Infoln("Block synchronisation started")
|
||||
}
|
||||
// Create cancel channel for aborting mid-flight
|
||||
d.cancelLock.Lock()
|
||||
d.cancelCh = make(chan struct{})
|
||||
d.cancelLock.Unlock()
|
||||
|
||||
// Abort if the queue still contains some leftover data
|
||||
if _, cached := d.queue.Size(); cached > 0 && d.queue.GetHeadBlock() != nil {
|
||||
|
@ -183,32 +191,15 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash) (err error) {
|
|||
// Cancel cancels all of the operations and resets the queue. It returns true
|
||||
// if the cancel operation was completed.
|
||||
func (d *Downloader) Cancel() bool {
|
||||
hs, bs := d.queue.Size()
|
||||
// If we're not syncing just return.
|
||||
hs, bs := d.queue.Size()
|
||||
if atomic.LoadInt32(&d.synchronising) == 0 && hs == 0 && bs == 0 {
|
||||
return false
|
||||
}
|
||||
|
||||
// Close the current cancel channel
|
||||
d.cancelLock.RLock()
|
||||
close(d.cancelCh)
|
||||
|
||||
// clean up
|
||||
hashDone:
|
||||
for {
|
||||
select {
|
||||
case <-d.hashCh:
|
||||
default:
|
||||
break hashDone
|
||||
}
|
||||
}
|
||||
|
||||
blockDone:
|
||||
for {
|
||||
select {
|
||||
case <-d.blockCh:
|
||||
default:
|
||||
break blockDone
|
||||
}
|
||||
}
|
||||
d.cancelLock.RUnlock()
|
||||
|
||||
// reset the queue
|
||||
d.queue.Reset()
|
||||
|
@ -421,9 +412,18 @@ func (d *Downloader) DeliverBlocks(id string, blocks []*types.Block) error {
|
|||
if atomic.LoadInt32(&d.synchronising) == 0 {
|
||||
return errNoSyncActive
|
||||
}
|
||||
d.blockCh <- blockPack{id, blocks}
|
||||
// Deliver or abort if the sync is canceled while queuing
|
||||
d.cancelLock.RLock()
|
||||
cancel := d.cancelCh
|
||||
d.cancelLock.RUnlock()
|
||||
|
||||
select {
|
||||
case d.blockCh <- blockPack{id, blocks}:
|
||||
return nil
|
||||
|
||||
case <-cancel:
|
||||
return errNoSyncActive
|
||||
}
|
||||
}
|
||||
|
||||
// DeliverHashes injects a new batch of hashes received from a remote node into
|
||||
|
@ -434,11 +434,16 @@ func (d *Downloader) DeliverHashes(id string, hashes []common.Hash) error {
|
|||
if atomic.LoadInt32(&d.synchronising) == 0 {
|
||||
return errNoSyncActive
|
||||
}
|
||||
if glog.V(logger.Debug) && len(hashes) != 0 {
|
||||
from, to := hashes[0], hashes[len(hashes)-1]
|
||||
glog.V(logger.Debug).Infof("adding %d (T=%d) hashes [ %x / %x ] from: %s\n", len(hashes), d.queue.Pending(), from[:4], to[:4], id)
|
||||
}
|
||||
d.hashCh <- hashPack{id, hashes}
|
||||
// Deliver or abort if the sync is canceled while queuing
|
||||
d.cancelLock.RLock()
|
||||
cancel := d.cancelCh
|
||||
d.cancelLock.RUnlock()
|
||||
|
||||
select {
|
||||
case d.hashCh <- hashPack{id, hashes}:
|
||||
return nil
|
||||
|
||||
case <-cancel:
|
||||
return errNoSyncActive
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue