mirror of
https://github.com/status-im/op-geth.git
synced 2025-01-27 15:05:18 +00:00
eth, eth/downloader: handle sync errors a bit more gracefully
This commit is contained in:
parent
9d188f73b5
commit
bd5720f480
@ -24,12 +24,12 @@ var (
|
|||||||
blockTtl = 20 * time.Second // The amount of time it takes for a block request to time out
|
blockTtl = 20 * time.Second // The amount of time it takes for a block request to time out
|
||||||
|
|
||||||
errLowTd = errors.New("peer's TD is too low")
|
errLowTd = errors.New("peer's TD is too low")
|
||||||
errBusy = errors.New("busy")
|
ErrBusy = errors.New("busy")
|
||||||
errUnknownPeer = errors.New("peer's unknown or unhealthy")
|
errUnknownPeer = errors.New("peer's unknown or unhealthy")
|
||||||
ErrBadPeer = errors.New("action from bad peer ignored")
|
errBadPeer = errors.New("action from bad peer ignored")
|
||||||
errNoPeers = errors.New("no peers to keep download active")
|
errNoPeers = errors.New("no peers to keep download active")
|
||||||
errPendingQueue = errors.New("pending items in queue")
|
errPendingQueue = errors.New("pending items in queue")
|
||||||
errTimeout = errors.New("timeout")
|
ErrTimeout = errors.New("timeout")
|
||||||
errEmptyHashSet = errors.New("empty hash set by peer")
|
errEmptyHashSet = errors.New("empty hash set by peer")
|
||||||
errPeersUnavailable = errors.New("no peers available or all peers tried for block download process")
|
errPeersUnavailable = errors.New("no peers available or all peers tried for block download process")
|
||||||
errAlreadyInPool = errors.New("hash already in pool")
|
errAlreadyInPool = errors.New("hash already in pool")
|
||||||
@ -68,7 +68,7 @@ type Downloader struct {
|
|||||||
getBlock getBlockFn
|
getBlock getBlockFn
|
||||||
|
|
||||||
// Status
|
// Status
|
||||||
synchronizing int32
|
synchronising int32
|
||||||
|
|
||||||
// Channels
|
// Channels
|
||||||
newPeerCh chan *peer
|
newPeerCh chan *peer
|
||||||
@ -119,15 +119,15 @@ func (d *Downloader) UnregisterPeer(id string) {
|
|||||||
delete(d.peers, id)
|
delete(d.peers, id)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Synchronize will select the peer and use it for synchronizing. If an empty string is given
|
// Synchronise will select the peer and use it for synchronising. If an empty string is given
|
||||||
// it will use the best peer possible and synchronize if it's TD is higher than our own. If any of the
|
// it will use the best peer possible and synchronize if it's TD is higher than our own. If any of the
|
||||||
// checks fail an error will be returned. This method is synchronous
|
// checks fail an error will be returned. This method is synchronous
|
||||||
func (d *Downloader) Synchronize(id string, hash common.Hash) error {
|
func (d *Downloader) Synchronise(id string, hash common.Hash) error {
|
||||||
// Make sure only one goroutine is ever allowed past this point at once
|
// Make sure only one goroutine is ever allowed past this point at once
|
||||||
if !atomic.CompareAndSwapInt32(&d.synchronizing, 0, 1) {
|
if !atomic.CompareAndSwapInt32(&d.synchronising, 0, 1) {
|
||||||
return nil
|
return ErrBusy
|
||||||
}
|
}
|
||||||
defer atomic.StoreInt32(&d.synchronizing, 0)
|
defer atomic.StoreInt32(&d.synchronising, 0)
|
||||||
|
|
||||||
// Abort if the queue still contains some leftover data
|
// Abort if the queue still contains some leftover data
|
||||||
if _, cached := d.queue.Size(); cached > 0 {
|
if _, cached := d.queue.Size(); cached > 0 {
|
||||||
@ -272,7 +272,7 @@ out:
|
|||||||
// the zero hash.
|
// the zero hash.
|
||||||
if p == nil || (hash == common.Hash{}) {
|
if p == nil || (hash == common.Hash{}) {
|
||||||
d.queue.Reset()
|
d.queue.Reset()
|
||||||
return errTimeout
|
return ErrTimeout
|
||||||
}
|
}
|
||||||
|
|
||||||
// set p to the active peer. this will invalidate any hashes that may be returned
|
// set p to the active peer. this will invalidate any hashes that may be returned
|
||||||
@ -282,7 +282,7 @@ out:
|
|||||||
glog.V(logger.Debug).Infof("Hash fetching switched to new peer(%s)\n", p.id)
|
glog.V(logger.Debug).Infof("Hash fetching switched to new peer(%s)\n", p.id)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
glog.V(logger.Detail).Infof("Downloaded hashes (%d) in %v\n", d.queue.Pending(), time.Since(start))
|
glog.V(logger.Debug).Infof("Downloaded hashes (%d) in %v\n", d.queue.Pending(), time.Since(start))
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -384,7 +384,6 @@ out:
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
glog.V(logger.Detail).Infoln("Downloaded block(s) in", time.Since(start))
|
glog.V(logger.Detail).Infoln("Downloaded block(s) in", time.Since(start))
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
@ -404,11 +403,10 @@ func (d *Downloader) AddHashes(id string, hashes []common.Hash) error {
|
|||||||
return fmt.Errorf("received hashes from %s while active peer is %s", id, d.activePeer)
|
return fmt.Errorf("received hashes from %s while active peer is %s", id, d.activePeer)
|
||||||
}
|
}
|
||||||
|
|
||||||
if glog.V(logger.Detail) && len(hashes) != 0 {
|
if glog.V(logger.Debug) && len(hashes) != 0 {
|
||||||
from, to := hashes[0], hashes[len(hashes)-1]
|
from, to := hashes[0], hashes[len(hashes)-1]
|
||||||
glog.Infof("adding %d (T=%d) hashes [ %x / %x ] from: %s\n", len(hashes), d.queue.Pending(), from[:4], to[:4], id)
|
glog.V(logger.Debug).Infof("adding %d (T=%d) hashes [ %x / %x ] from: %s\n", len(hashes), d.queue.Pending(), from[:4], to[:4], id)
|
||||||
}
|
}
|
||||||
|
|
||||||
d.hashCh <- hashPack{id, hashes}
|
d.hashCh <- hashPack{id, hashes}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
@ -61,7 +61,7 @@ func newTester(t *testing.T, hashes []common.Hash, blocks map[common.Hash]*types
|
|||||||
|
|
||||||
func (dl *downloadTester) sync(peerId string, hash common.Hash) error {
|
func (dl *downloadTester) sync(peerId string, hash common.Hash) error {
|
||||||
dl.activePeerId = peerId
|
dl.activePeerId = peerId
|
||||||
return dl.downloader.Synchronize(peerId, hash)
|
return dl.downloader.Synchronise(peerId, hash)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dl *downloadTester) hasBlock(hash common.Hash) bool {
|
func (dl *downloadTester) hasBlock(hash common.Hash) bool {
|
||||||
@ -217,13 +217,13 @@ func TestThrottling(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// Synchronize the two threads and verify
|
// Synchronise the two threads and verify
|
||||||
err := <-errc
|
err := <-errc
|
||||||
done <- struct{}{}
|
done <- struct{}{}
|
||||||
<-done
|
<-done
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("failed to synchronize blocks: %v", err)
|
t.Fatalf("failed to synchronise blocks: %v", err)
|
||||||
}
|
}
|
||||||
if len(took) != targetBlocks {
|
if len(took) != targetBlocks {
|
||||||
t.Fatalf("downloaded block mismatch: have %v, want %v", len(took), targetBlocks)
|
t.Fatalf("downloaded block mismatch: have %v, want %v", len(took), targetBlocks)
|
||||||
|
@ -19,9 +19,9 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
peerCountTimeout = 12 * time.Second // Amount of time it takes for the peer handler to ignore minDesiredPeerCount
|
forceSyncCycle = 10 * time.Second // Time interval to force syncs, even if few peers are available
|
||||||
blockProcTimer = 500 * time.Millisecond
|
blockProcCycle = 500 * time.Millisecond // Time interval to check for new blocks to process
|
||||||
minDesiredPeerCount = 5 // Amount of peers desired to start syncing
|
minDesiredPeerCount = 5 // Amount of peers desired to start syncing
|
||||||
blockProcAmount = 256
|
blockProcAmount = 256
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -324,7 +324,7 @@ func (self *ProtocolManager) handleMsg(p *peer) error {
|
|||||||
}
|
}
|
||||||
self.BroadcastBlock(hash, request.Block)
|
self.BroadcastBlock(hash, request.Block)
|
||||||
} else {
|
} else {
|
||||||
go self.synchronize(p)
|
go self.synchronise(p)
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
return errResp(ErrInvalidMsgCode, "%v", msg.Code)
|
return errResp(ErrInvalidMsgCode, "%v", msg.Code)
|
||||||
|
54
eth/sync.go
54
eth/sync.go
@ -12,10 +12,8 @@ import (
|
|||||||
// Sync contains all synchronisation code for the eth protocol
|
// Sync contains all synchronisation code for the eth protocol
|
||||||
|
|
||||||
func (pm *ProtocolManager) update() {
|
func (pm *ProtocolManager) update() {
|
||||||
// itimer is used to determine when to start ignoring `minDesiredPeerCount`
|
forceSync := time.Tick(forceSyncCycle)
|
||||||
itimer := time.NewTimer(peerCountTimeout)
|
blockProc := time.Tick(blockProcCycle)
|
||||||
// btimer is used for picking of blocks from the downloader
|
|
||||||
btimer := time.Tick(blockProcTimer)
|
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
@ -24,27 +22,22 @@ func (pm *ProtocolManager) update() {
|
|||||||
if len(pm.peers) < minDesiredPeerCount {
|
if len(pm.peers) < minDesiredPeerCount {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
// Find the best peer and synchronise with it
|
||||||
// Find the best peer
|
|
||||||
peer := getBestPeer(pm.peers)
|
peer := getBestPeer(pm.peers)
|
||||||
if peer == nil {
|
if peer == nil {
|
||||||
glog.V(logger.Debug).Infoln("Sync attempt cancelled. No peers available")
|
glog.V(logger.Debug).Infoln("Sync attempt canceled. No peers available")
|
||||||
}
|
}
|
||||||
|
go pm.synchronise(peer)
|
||||||
|
|
||||||
itimer.Stop()
|
case <-forceSync:
|
||||||
go pm.synchronize(peer)
|
// Force a sync even if not enough peers are present
|
||||||
case <-itimer.C:
|
|
||||||
// The timer will make sure that the downloader keeps an active state
|
|
||||||
// in which it attempts to always check the network for highest td peers
|
|
||||||
// Either select the peer or restart the timer if no peers could
|
|
||||||
// be selected.
|
|
||||||
if peer := getBestPeer(pm.peers); peer != nil {
|
if peer := getBestPeer(pm.peers); peer != nil {
|
||||||
go pm.synchronize(peer)
|
go pm.synchronise(peer)
|
||||||
} else {
|
|
||||||
itimer.Reset(5 * time.Second)
|
|
||||||
}
|
}
|
||||||
case <-btimer:
|
case <-blockProc:
|
||||||
|
// Try to pull some blocks from the downloaded
|
||||||
go pm.processBlocks()
|
go pm.processBlocks()
|
||||||
|
|
||||||
case <-pm.quitSync:
|
case <-pm.quitSync:
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -59,11 +52,11 @@ func (pm *ProtocolManager) processBlocks() error {
|
|||||||
pm.wg.Add(1)
|
pm.wg.Add(1)
|
||||||
defer pm.wg.Done()
|
defer pm.wg.Done()
|
||||||
|
|
||||||
|
// Take a batch of blocks (will return nil if a previous batch has not reached the chain yet)
|
||||||
blocks := pm.downloader.TakeBlocks()
|
blocks := pm.downloader.TakeBlocks()
|
||||||
if len(blocks) == 0 {
|
if len(blocks) == 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
glog.V(logger.Debug).Infof("Inserting chain with %d blocks (#%v - #%v)\n", len(blocks), blocks[0].Number(), blocks[len(blocks)-1].Number())
|
glog.V(logger.Debug).Infof("Inserting chain with %d blocks (#%v - #%v)\n", len(blocks), blocks[0].Number(), blocks[len(blocks)-1].Number())
|
||||||
|
|
||||||
for len(blocks) != 0 && !pm.quit {
|
for len(blocks) != 0 && !pm.quit {
|
||||||
@ -77,7 +70,7 @@ func (pm *ProtocolManager) processBlocks() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pm *ProtocolManager) synchronize(peer *peer) {
|
func (pm *ProtocolManager) synchronise(peer *peer) {
|
||||||
// Make sure the peer's TD is higher than our own. If not drop.
|
// Make sure the peer's TD is higher than our own. If not drop.
|
||||||
if peer.td.Cmp(pm.chainman.Td()) <= 0 {
|
if peer.td.Cmp(pm.chainman.Td()) <= 0 {
|
||||||
return
|
return
|
||||||
@ -89,12 +82,21 @@ func (pm *ProtocolManager) synchronize(peer *peer) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
// Get the hashes from the peer (synchronously)
|
// Get the hashes from the peer (synchronously)
|
||||||
err := pm.downloader.Synchronize(peer.id, peer.recentHash)
|
glog.V(logger.Debug).Infof("Attempting synchronisation: %v, 0x%x", peer.id, peer.recentHash)
|
||||||
if err != nil && err == downloader.ErrBadPeer {
|
|
||||||
glog.V(logger.Debug).Infoln("removed peer from peer set due to bad action")
|
err := pm.downloader.Synchronise(peer.id, peer.recentHash)
|
||||||
|
switch err {
|
||||||
|
case nil:
|
||||||
|
glog.V(logger.Debug).Infof("Synchronisation completed")
|
||||||
|
|
||||||
|
case downloader.ErrBusy:
|
||||||
|
glog.V(logger.Debug).Infof("Synchronisation already in progress")
|
||||||
|
|
||||||
|
case downloader.ErrTimeout:
|
||||||
|
glog.V(logger.Debug).Infof("Removing peer %v due to sync timeout", peer.id)
|
||||||
pm.removePeer(peer)
|
pm.removePeer(peer)
|
||||||
} else if err != nil {
|
|
||||||
// handle error
|
default:
|
||||||
glog.V(logger.Detail).Infoln("error downloading:", err)
|
glog.V(logger.Warn).Infof("Synchronisation failed: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user