eth/downloader: also drain stateCh, receiptCh in eth/61 mode

State and receipt deliveries from a previous eth/62+ sync can hang if
the downloader has moved on to syncing with eth/61. Fix this by also
draining the eth/63 channels while waiting for eth/61 data.

A nicer solution would be to take care of the channels in a central
place, but that would involve a major rewrite.
This commit is contained in:
Felix Lange 2015-11-17 22:55:32 +01:00
parent db52a6a0ff
commit b7b62d4b3c
1 changed files with 80 additions and 74 deletions

View File

@ -492,15 +492,6 @@ func (d *Downloader) fetchHeight61(p *peer) (uint64, error) {
case <-d.cancelCh: case <-d.cancelCh:
return 0, errCancelBlockFetch return 0, errCancelBlockFetch
case <-d.headerCh:
// Out of bounds eth/62 block headers received, ignore them
case <-d.bodyCh:
// Out of bounds eth/62 block bodies received, ignore them
case <-d.hashCh:
// Out of bounds hashes received, ignore them
case packet := <-d.blockCh: case packet := <-d.blockCh:
// Discard anything not from the origin peer // Discard anything not from the origin peer
if packet.PeerId() != p.id { if packet.PeerId() != p.id {
@ -518,6 +509,16 @@ func (d *Downloader) fetchHeight61(p *peer) (uint64, error) {
case <-timeout: case <-timeout:
glog.V(logger.Debug).Infof("%v: head block timeout", p) glog.V(logger.Debug).Infof("%v: head block timeout", p)
return 0, errTimeout return 0, errTimeout
case <-d.hashCh:
// Out of bounds hashes received, ignore them
case <-d.headerCh:
case <-d.bodyCh:
case <-d.stateCh:
case <-d.receiptCh:
// Ignore eth/{62,63} packets because this is eth/61.
// These can arrive as a late delivery from a previous sync.
} }
} }
} }
@ -568,18 +569,19 @@ func (d *Downloader) findAncestor61(p *peer) (uint64, error) {
} }
} }
case <-timeout:
glog.V(logger.Debug).Infof("%v: head hash timeout", p)
return 0, errTimeout
case <-d.blockCh: case <-d.blockCh:
// Out of bounds blocks received, ignore them // Out of bounds blocks received, ignore them
case <-d.headerCh: case <-d.headerCh:
// Out of bounds eth/62 block headers received, ignore them
case <-d.bodyCh: case <-d.bodyCh:
// Out of bounds eth/62 block bodies received, ignore them case <-d.stateCh:
case <-d.receiptCh:
case <-timeout: // Ignore eth/{62,63} packets because this is eth/61.
glog.V(logger.Debug).Infof("%v: head hash timeout", p) // These can arrive as a late delivery from a previous sync.
return 0, errTimeout
} }
} }
// If the head fetch already found an ancestor, return // If the head fetch already found an ancestor, return
@ -628,18 +630,19 @@ func (d *Downloader) findAncestor61(p *peer) (uint64, error) {
} }
start = check start = check
case <-timeout:
glog.V(logger.Debug).Infof("%v: search hash timeout", p)
return 0, errTimeout
case <-d.blockCh: case <-d.blockCh:
// Out of bounds blocks received, ignore them // Out of bounds blocks received, ignore them
case <-d.headerCh: case <-d.headerCh:
// Out of bounds eth/62 block headers received, ignore them
case <-d.bodyCh: case <-d.bodyCh:
// Out of bounds eth/62 block bodies received, ignore them case <-d.stateCh:
case <-d.receiptCh:
case <-timeout: // Ignore eth/{62,63} packets because this is eth/61.
glog.V(logger.Debug).Infof("%v: search hash timeout", p) // These can arrive as a late delivery from a previous sync.
return 0, errTimeout
} }
} }
} }
@ -673,12 +676,6 @@ func (d *Downloader) fetchHashes61(p *peer, td *big.Int, from uint64) error {
case <-d.cancelCh: case <-d.cancelCh:
return errCancelHashFetch return errCancelHashFetch
case <-d.headerCh:
// Out of bounds eth/62 block headers received, ignore them
case <-d.bodyCh:
// Out of bounds eth/62 block bodies received, ignore them
case packet := <-d.hashCh: case packet := <-d.hashCh:
// Make sure the active peer is giving us the hashes // Make sure the active peer is giving us the hashes
if packet.PeerId() != p.id { if packet.PeerId() != p.id {
@ -747,6 +744,13 @@ func (d *Downloader) fetchHashes61(p *peer, td *big.Int, from uint64) error {
glog.V(logger.Debug).Infof("%v: hash request timed out", p) glog.V(logger.Debug).Infof("%v: hash request timed out", p)
hashTimeoutMeter.Mark(1) hashTimeoutMeter.Mark(1)
return errTimeout return errTimeout
case <-d.headerCh:
case <-d.bodyCh:
case <-d.stateCh:
case <-d.receiptCh:
// Ignore eth/{62,63} packets because this is eth/61.
// These can arrive as a late delivery from a previous sync.
} }
} }
} }
@ -771,12 +775,6 @@ func (d *Downloader) fetchBlocks61(from uint64) error {
case <-d.cancelCh: case <-d.cancelCh:
return errCancelBlockFetch return errCancelBlockFetch
case <-d.headerCh:
// Out of bounds eth/62 block headers received, ignore them
case <-d.bodyCh:
// Out of bounds eth/62 block bodies received, ignore them
case packet := <-d.blockCh: case packet := <-d.blockCh:
// If the peer was previously banned and failed to deliver it's pack // If the peer was previously banned and failed to deliver it's pack
// in a reasonable time frame, ignore it's message. // in a reasonable time frame, ignore it's message.
@ -904,6 +902,13 @@ func (d *Downloader) fetchBlocks61(from uint64) error {
if !throttled && !d.queue.InFlightBlocks() && len(idles) == total { if !throttled && !d.queue.InFlightBlocks() && len(idles) == total {
return errPeersUnavailable return errPeersUnavailable
} }
case <-d.headerCh:
case <-d.bodyCh:
case <-d.stateCh:
case <-d.receiptCh:
// Ignore eth/{62,63} packets because this is eth/61.
// These can arrive as a late delivery from a previous sync.
} }
} }
} }
@ -936,18 +941,19 @@ func (d *Downloader) fetchHeight(p *peer) (uint64, error) {
} }
return headers[0].Number.Uint64(), nil return headers[0].Number.Uint64(), nil
case <-d.bodyCh:
// Out of bounds block bodies received, ignore them
case <-d.hashCh:
// Out of bounds eth/61 hashes received, ignore them
case <-d.blockCh:
// Out of bounds eth/61 blocks received, ignore them
case <-timeout: case <-timeout:
glog.V(logger.Debug).Infof("%v: head header timeout", p) glog.V(logger.Debug).Infof("%v: head header timeout", p)
return 0, errTimeout return 0, errTimeout
case <-d.bodyCh:
case <-d.stateCh:
case <-d.receiptCh:
// Out of bounds delivery, ignore
case <-d.hashCh:
case <-d.blockCh:
// Ignore eth/61 packets because this is eth/62+.
// These can arrive as a late delivery from a previous sync.
} }
} }
} }
@ -1003,18 +1009,19 @@ func (d *Downloader) findAncestor(p *peer) (uint64, error) {
} }
} }
case <-d.bodyCh:
// Out of bounds block bodies received, ignore them
case <-d.hashCh:
// Out of bounds eth/61 hashes received, ignore them
case <-d.blockCh:
// Out of bounds eth/61 blocks received, ignore them
case <-timeout: case <-timeout:
glog.V(logger.Debug).Infof("%v: head header timeout", p) glog.V(logger.Debug).Infof("%v: head header timeout", p)
return 0, errTimeout return 0, errTimeout
case <-d.bodyCh:
case <-d.stateCh:
case <-d.receiptCh:
// Out of bounds delivery, ignore
case <-d.hashCh:
case <-d.blockCh:
// Ignore eth/61 packets because this is eth/62+.
// These can arrive as a late delivery from a previous sync.
} }
} }
// If the head fetch already found an ancestor, return // If the head fetch already found an ancestor, return
@ -1063,18 +1070,19 @@ func (d *Downloader) findAncestor(p *peer) (uint64, error) {
} }
start = check start = check
case <-d.bodyCh:
// Out of bounds block bodies received, ignore them
case <-d.hashCh:
// Out of bounds eth/61 hashes received, ignore them
case <-d.blockCh:
// Out of bounds eth/61 blocks received, ignore them
case <-timeout: case <-timeout:
glog.V(logger.Debug).Infof("%v: search header timeout", p) glog.V(logger.Debug).Infof("%v: search header timeout", p)
return 0, errTimeout return 0, errTimeout
case <-d.bodyCh:
case <-d.stateCh:
case <-d.receiptCh:
// Out of bounds delivery, ignore
case <-d.hashCh:
case <-d.blockCh:
// Ignore eth/61 packets because this is eth/62+.
// These can arrive as a late delivery from a previous sync.
} }
} }
} }
@ -1136,12 +1144,6 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error {
case <-d.cancelCh: case <-d.cancelCh:
return errCancelHeaderFetch return errCancelHeaderFetch
case <-d.hashCh:
// Out of bounds eth/61 hashes received, ignore them
case <-d.blockCh:
// Out of bounds eth/61 blocks received, ignore them
case packet := <-d.headerCh: case packet := <-d.headerCh:
// Make sure the active peer is giving us the headers // Make sure the active peer is giving us the headers
if packet.PeerId() != p.id { if packet.PeerId() != p.id {
@ -1263,6 +1265,11 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error {
} }
} }
return nil return nil
case <-d.hashCh:
case <-d.blockCh:
// Ignore eth/61 packets because this is eth/62+.
// These can arrive as a late delivery from a previous sync.
} }
} }
} }
@ -1383,12 +1390,6 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv
case <-d.cancelCh: case <-d.cancelCh:
return errCancel return errCancel
case <-d.hashCh:
// Out of bounds eth/61 hashes received, ignore them
case <-d.blockCh:
// Out of bounds eth/61 blocks received, ignore them
case packet := <-deliveryCh: case packet := <-deliveryCh:
// If the peer was previously banned and failed to deliver it's pack // If the peer was previously banned and failed to deliver it's pack
// in a reasonable time frame, ignore it's message. // in a reasonable time frame, ignore it's message.
@ -1529,6 +1530,11 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv
if !progressed && !throttled && !running && len(idles) == total && pending() > 0 { if !progressed && !throttled && !running && len(idles) == total && pending() > 0 {
return errPeersUnavailable return errPeersUnavailable
} }
case <-d.hashCh:
case <-d.blockCh:
// Ignore eth/61 packets because this is eth/62+.
// These can arrive as a late delivery from a previous sync.
} }
} }
} }