2016-06-20 14:47:10 +00:00
// Copyright 2015 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
// Package downloader contains the manual full chain synchronisation.
package downloader
import (
"crypto/rand"
"errors"
"fmt"
"math"
"math/big"
"sync"
"sync/atomic"
"time"
2016-09-29 19:51:33 +00:00
ethereum "github.com/ethereum/go-ethereum"
2016-06-20 14:47:10 +00:00
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
2017-05-01 11:09:48 +00:00
"github.com/ethereum/go-ethereum/log"
2016-07-03 19:44:31 +00:00
"github.com/ethereum/go-ethereum/params"
2016-06-20 14:47:10 +00:00
"github.com/rcrowley/go-metrics"
)
var (
MaxHashFetch = 512 // Amount of hashes to be fetched per retrieval request
MaxBlockFetch = 128 // Amount of blocks to be fetched per retrieval request
MaxHeaderFetch = 192 // Amount of block headers to be fetched per retrieval request
2016-07-03 19:44:31 +00:00
MaxSkeletonSize = 128 // Number of header fetches to need for a skeleton assembly
2016-06-20 14:47:10 +00:00
MaxBodyFetch = 128 // Amount of block bodies to be fetched per retrieval request
MaxReceiptFetch = 256 // Amount of transaction receipts to allow fetching per request
MaxStateFetch = 384 // Amount of node state values to allow fetching per request
2017-05-01 11:09:48 +00:00
MaxForkAncestry = 3 * params . EpochDuration // Maximum chain reorganisation
rttMinEstimate = 2 * time . Second // Minimum round-trip time to target for download requests
rttMaxEstimate = 20 * time . Second // Maximum rount-trip time to target for download requests
rttMinConfidence = 0.1 // Worse confidence factor in our estimated RTT value
ttlScaling = 3 // Constant scaling factor for RTT -> TTL conversion
ttlLimit = time . Minute // Maximum TTL allowance to prevent reaching crazy timeouts
2016-07-03 19:44:31 +00:00
qosTuningPeers = 5 // Number of peers to tune based on (best peers)
qosConfidenceCap = 10 // Number of peers above which not to modify RTT confidence
qosTuningImpact = 0.25 // Impact that a new tuning target has on the previous value
2016-06-20 14:47:10 +00:00
2016-07-03 19:44:31 +00:00
maxQueuedHeaders = 32 * 1024 // [eth/62] Maximum number of headers to queue for import (DOS protection)
maxHeadersProcess = 2048 // Number of header download results to import at once into the chain
maxResultsProcess = 2048 // Number of content download results to import at once into the chain
2016-06-20 14:47:10 +00:00
2016-11-25 05:50:30 +00:00
fsHeaderCheckFrequency = 100 // Verification frequency of the downloaded headers during fast sync
fsHeaderSafetyNet = 2048 // Number of headers to discard in case a chain violation is detected
fsHeaderForceVerify = 24 // Number of headers to verify before and after the pivot to accept it
fsPivotInterval = 256 // Number of headers out of which to randomize the pivot point
fsMinFullBlocks = 64 // Number of blocks to retrieve fully even in fast sync
fsCriticalTrials = uint32 ( 32 ) // Number of times to retry in the cricical section before bailing
2016-06-20 14:47:10 +00:00
)
var (
2016-07-03 19:44:31 +00:00
errBusy = errors . New ( "busy" )
errUnknownPeer = errors . New ( "peer is unknown or unhealthy" )
errBadPeer = errors . New ( "action from bad peer ignored" )
errStallingPeer = errors . New ( "peer is stalling" )
errNoPeers = errors . New ( "no peers to keep download active" )
errTimeout = errors . New ( "timeout" )
errEmptyHeaderSet = errors . New ( "empty header set by peer" )
errPeersUnavailable = errors . New ( "no peers available or all tried for download" )
errInvalidAncestor = errors . New ( "retrieved ancestor is invalid" )
errInvalidChain = errors . New ( "retrieved hash chain is invalid" )
errInvalidBlock = errors . New ( "retrieved block is invalid" )
errInvalidBody = errors . New ( "retrieved block body is invalid" )
errInvalidReceipt = errors . New ( "retrieved receipt is invalid" )
errCancelBlockFetch = errors . New ( "block download canceled (requested)" )
errCancelHeaderFetch = errors . New ( "block header download canceled (requested)" )
errCancelBodyFetch = errors . New ( "block body download canceled (requested)" )
errCancelReceiptFetch = errors . New ( "receipt download canceled (requested)" )
errCancelStateFetch = errors . New ( "state data download canceled (requested)" )
errCancelHeaderProcessing = errors . New ( "header processing canceled (requested)" )
errCancelContentProcessing = errors . New ( "content processing canceled (requested)" )
errNoSyncActive = errors . New ( "no sync active" )
2016-09-08 09:45:12 +00:00
errTooOld = errors . New ( "peer doesn't speak recent enough protocol version (need version >= 62)" )
2016-06-20 14:47:10 +00:00
)
type Downloader struct {
2016-07-03 19:44:31 +00:00
mode SyncMode // Synchronisation mode defining the strategy used (per sync cycle)
mux * event . TypeMux // Event multiplexer to announce sync operation events
2016-06-20 14:47:10 +00:00
2017-08-04 16:14:17 +00:00
queue * queue // Scheduler for selecting the hashes to download
peers * peerSet // Set of active peers from which download can proceed
stateDB ethdb . Database
2016-06-20 14:47:10 +00:00
2016-07-03 19:44:31 +00:00
fsPivotLock * types . Header // Pivot header on critical section entry (cannot change between retries)
2016-11-25 05:50:30 +00:00
fsPivotFails uint32 // Number of subsequent fast sync failures in the critical section
2016-07-03 19:44:31 +00:00
rttEstimate uint64 // Round trip time to target for download requests
rttConfidence uint64 // Confidence in the estimated RTT (unit: millionths to allow atomic ops)
2016-06-20 14:47:10 +00:00
// Statistics
2017-08-04 16:14:17 +00:00
syncStatsChainOrigin uint64 // Origin block number where syncing started at
syncStatsChainHeight uint64 // Highest block number known when syncing started
syncStatsState stateSyncStats
2016-06-20 14:47:10 +00:00
syncStatsLock sync . RWMutex // Lock protecting the sync stats fields
2017-08-04 16:14:17 +00:00
lightchain LightChain
blockchain BlockChain
2016-06-20 14:47:10 +00:00
// Callbacks
2017-08-04 16:14:17 +00:00
dropPeer peerDropFn // Drops a peer for misbehaving
2016-06-20 14:47:10 +00:00
// Status
synchroniseMock func ( id string , hash common . Hash ) error // Replacement for synchronise during testing
synchronising int32
notified int32
// Channels
2016-07-03 19:44:31 +00:00
headerCh chan dataPack // [eth/62] Channel receiving inbound block headers
bodyCh chan dataPack // [eth/62] Channel receiving inbound block bodies
receiptCh chan dataPack // [eth/63] Channel receiving inbound receipts
bodyWakeCh chan bool // [eth/62] Channel to signal the block body fetcher of new tasks
receiptWakeCh chan bool // [eth/63] Channel to signal the receipt fetcher of new tasks
headerProcCh chan [ ] * types . Header // [eth/62] Channel to feed the header processor new tasks
2016-06-20 14:47:10 +00:00
2017-08-04 16:14:17 +00:00
// for stateFetcher
stateSyncStart chan * stateSync
trackStateReq chan * stateReq
stateCh chan dataPack // [eth/63] Channel receiving inbound node state data
2016-09-08 09:45:12 +00:00
// Cancellation and termination
cancelPeer string // Identifier of the peer currently being used as the master (cancel on drop)
2016-06-20 14:47:10 +00:00
cancelCh chan struct { } // Channel to cancel mid-flight syncs
2016-09-08 09:45:12 +00:00
cancelLock sync . RWMutex // Lock to protect the cancel channel and peer in delivers
2016-06-20 14:47:10 +00:00
2016-07-03 19:44:31 +00:00
quitCh chan struct { } // Quit channel to signal termination
quitLock sync . RWMutex // Lock to prevent double closes
2016-06-20 14:47:10 +00:00
// Testing hooks
syncInitHook func ( uint64 , uint64 ) // Method to call upon initiating a new sync run
bodyFetchHook func ( [ ] * types . Header ) // Method to call upon starting a block body fetch
receiptFetchHook func ( [ ] * types . Header ) // Method to call upon starting a receipt fetch
chainInsertHook func ( [ ] * fetchResult ) // Method to call upon inserting a chain of blocks (possibly in multiple invocations)
}
2017-08-04 16:14:17 +00:00
// LightChain encapsulates functions required to synchronise a light chain.
type LightChain interface {
// HasHeader verifies a header's presence in the local chain.
2017-10-10 09:38:49 +00:00
HasHeader ( h common . Hash , number uint64 ) bool
2017-08-04 16:14:17 +00:00
// GetHeaderByHash retrieves a header from the local chain.
GetHeaderByHash ( common . Hash ) * types . Header
// CurrentHeader retrieves the head header from the local chain.
CurrentHeader ( ) * types . Header
// GetTdByHash returns the total difficulty of a local block.
GetTdByHash ( common . Hash ) * big . Int
// InsertHeaderChain inserts a batch of headers into the local chain.
InsertHeaderChain ( [ ] * types . Header , int ) ( int , error )
// Rollback removes a few recently added elements from the local chain.
Rollback ( [ ] common . Hash )
}
// BlockChain encapsulates functions required to sync a (full or fast) blockchain.
type BlockChain interface {
LightChain
// HasBlockAndState verifies block and associated states' presence in the local chain.
HasBlockAndState ( common . Hash ) bool
// GetBlockByHash retrieves a block from the local chain.
GetBlockByHash ( common . Hash ) * types . Block
// CurrentBlock retrieves the head block from the local chain.
CurrentBlock ( ) * types . Block
// CurrentFastBlock retrieves the head fast block from the local chain.
CurrentFastBlock ( ) * types . Block
// FastSyncCommitHead directly commits the head block to a certain entity.
FastSyncCommitHead ( common . Hash ) error
// InsertChain inserts a batch of blocks into the local chain.
InsertChain ( types . Blocks ) ( int , error )
// InsertReceiptChain inserts a batch of receipts into the local chain.
InsertReceiptChain ( types . Blocks , [ ] types . Receipts ) ( int , error )
}
2016-06-20 14:47:10 +00:00
// New creates a new downloader to fetch hashes and blocks from remote peers.
2017-08-04 16:14:17 +00:00
func New ( mode SyncMode , stateDb ethdb . Database , mux * event . TypeMux , chain BlockChain , lightchain LightChain , dropPeer peerDropFn ) * Downloader {
if lightchain == nil {
lightchain = chain
}
2016-06-20 14:47:10 +00:00
2016-07-03 19:44:31 +00:00
dl := & Downloader {
2017-08-04 16:14:17 +00:00
mode : mode ,
stateDB : stateDb ,
mux : mux ,
queue : newQueue ( ) ,
peers : newPeerSet ( ) ,
rttEstimate : uint64 ( rttMaxEstimate ) ,
rttConfidence : uint64 ( 1000000 ) ,
blockchain : chain ,
lightchain : lightchain ,
dropPeer : dropPeer ,
headerCh : make ( chan dataPack , 1 ) ,
bodyCh : make ( chan dataPack , 1 ) ,
receiptCh : make ( chan dataPack , 1 ) ,
bodyWakeCh : make ( chan bool , 1 ) ,
receiptWakeCh : make ( chan bool , 1 ) ,
headerProcCh : make ( chan [ ] * types . Header , 1 ) ,
quitCh : make ( chan struct { } ) ,
stateCh : make ( chan dataPack ) ,
stateSyncStart : make ( chan * stateSync ) ,
trackStateReq : make ( chan * stateReq ) ,
2016-06-20 14:47:10 +00:00
}
2016-07-03 19:44:31 +00:00
go dl . qosTuner ( )
2017-08-04 16:14:17 +00:00
go dl . stateFetcher ( )
2016-07-03 19:44:31 +00:00
return dl
2016-06-20 14:47:10 +00:00
}
// Progress retrieves the synchronisation boundaries, specifically the origin
// block where synchronisation started at (may have failed/suspended); the block
// or header sync is currently at; and the latest known block which the sync targets.
//
// In addition, during the state download phase of fast synchronisation the number
// of processed and the total number of known states are also returned. Otherwise
// these are zero.
2016-09-29 19:51:33 +00:00
func ( d * Downloader ) Progress ( ) ethereum . SyncProgress {
2016-06-20 14:47:10 +00:00
// Lock the current stats and return the progress
d . syncStatsLock . RLock ( )
defer d . syncStatsLock . RUnlock ( )
current := uint64 ( 0 )
switch d . mode {
case FullSync :
2017-08-04 16:14:17 +00:00
current = d . blockchain . CurrentBlock ( ) . NumberU64 ( )
2016-06-20 14:47:10 +00:00
case FastSync :
2017-08-04 16:14:17 +00:00
current = d . blockchain . CurrentFastBlock ( ) . NumberU64 ( )
2016-06-20 14:47:10 +00:00
case LightSync :
2017-08-04 16:14:17 +00:00
current = d . lightchain . CurrentHeader ( ) . Number . Uint64 ( )
2016-06-20 14:47:10 +00:00
}
2016-09-29 19:51:33 +00:00
return ethereum . SyncProgress {
StartingBlock : d . syncStatsChainOrigin ,
CurrentBlock : current ,
HighestBlock : d . syncStatsChainHeight ,
2017-08-04 16:14:17 +00:00
PulledStates : d . syncStatsState . processed ,
KnownStates : d . syncStatsState . processed + d . syncStatsState . pending ,
2016-09-29 19:51:33 +00:00
}
2016-06-20 14:47:10 +00:00
}
// Synchronising returns whether the downloader is currently retrieving blocks.
func ( d * Downloader ) Synchronising ( ) bool {
return atomic . LoadInt32 ( & d . synchronising ) > 0
}
// RegisterPeer injects a new download peer into the set of block source to be
// used for fetching hashes and blocks from.
2017-08-04 16:14:17 +00:00
func ( d * Downloader ) RegisterPeer ( id string , version int , peer Peer ) error {
2016-06-20 14:47:10 +00:00
2017-05-01 11:09:48 +00:00
logger := log . New ( "peer" , id )
logger . Trace ( "Registering sync peer" )
2017-08-04 16:14:17 +00:00
if err := d . peers . Register ( newPeerConnection ( id , version , peer , logger ) ) ; err != nil {
2017-05-01 11:09:48 +00:00
logger . Error ( "Failed to register sync peer" , "err" , err )
2016-06-20 14:47:10 +00:00
return err
}
2016-07-03 19:44:31 +00:00
d . qosReduceConfidence ( )
2016-06-20 14:47:10 +00:00
return nil
}
2017-08-04 16:14:17 +00:00
// RegisterLightPeer injects a light client peer, wrapping it so it appears as a regular peer.
func ( d * Downloader ) RegisterLightPeer ( id string , version int , peer LightPeer ) error {
return d . RegisterPeer ( id , version , & lightPeerWrapper { peer } )
}
2016-06-20 14:47:10 +00:00
// UnregisterPeer remove a peer from the known list, preventing any action from
// the specified peer. An effort is also made to return any pending fetches into
// the queue.
func ( d * Downloader ) UnregisterPeer ( id string ) error {
2016-09-08 09:45:12 +00:00
// Unregister the peer from the active peer set and revoke any fetch tasks
2017-05-01 11:09:48 +00:00
logger := log . New ( "peer" , id )
logger . Trace ( "Unregistering sync peer" )
2016-06-20 14:47:10 +00:00
if err := d . peers . Unregister ( id ) ; err != nil {
2017-05-01 11:09:48 +00:00
logger . Error ( "Failed to unregister sync peer" , "err" , err )
2016-06-20 14:47:10 +00:00
return err
}
d . queue . Revoke ( id )
2016-09-08 09:45:12 +00:00
// If this peer was the master peer, abort sync immediately
d . cancelLock . RLock ( )
master := id == d . cancelPeer
d . cancelLock . RUnlock ( )
if master {
2017-05-01 11:09:48 +00:00
d . Cancel ( )
2016-09-08 09:45:12 +00:00
}
2016-06-20 14:47:10 +00:00
return nil
}
// Synchronise tries to sync up our local block chain with a remote peer, both
// adding various sanity checks as well as wrapping it with various log entries.
func ( d * Downloader ) Synchronise ( id string , head common . Hash , td * big . Int , mode SyncMode ) error {
err := d . synchronise ( id , head , td , mode )
switch err {
case nil :
case errBusy :
2016-09-08 09:45:12 +00:00
case errTimeout , errBadPeer , errStallingPeer ,
errEmptyHeaderSet , errPeersUnavailable , errTooOld ,
errInvalidAncestor , errInvalidChain :
2017-05-01 11:09:48 +00:00
log . Warn ( "Synchronisation failed, dropping peer" , "peer" , id , "err" , err )
2016-06-20 14:47:10 +00:00
d . dropPeer ( id )
default :
2017-05-01 11:09:48 +00:00
log . Warn ( "Synchronisation failed, retrying" , "err" , err )
2016-06-20 14:47:10 +00:00
}
return err
}
// synchronise will select the peer and use it for synchronising. If an empty string is given
// it will use the best peer possible and synchronize if it's TD is higher than our own. If any of the
// checks fail an error will be returned. This method is synchronous
func ( d * Downloader ) synchronise ( id string , hash common . Hash , td * big . Int , mode SyncMode ) error {
// Mock out the synchronisation if testing
if d . synchroniseMock != nil {
return d . synchroniseMock ( id , hash )
}
// Make sure only one goroutine is ever allowed past this point at once
if ! atomic . CompareAndSwapInt32 ( & d . synchronising , 0 , 1 ) {
return errBusy
}
defer atomic . StoreInt32 ( & d . synchronising , 0 )
// Post a user notification of the sync (only once per session)
if atomic . CompareAndSwapInt32 ( & d . notified , 0 , 1 ) {
2017-05-01 11:09:48 +00:00
log . Info ( "Block synchronisation started" )
2016-06-20 14:47:10 +00:00
}
// Reset the queue, peer set and wake channels to clean any internal leftover state
d . queue . Reset ( )
d . peers . Reset ( )
2017-08-04 16:14:17 +00:00
for _ , ch := range [ ] chan bool { d . bodyWakeCh , d . receiptWakeCh } {
2016-06-20 14:47:10 +00:00
select {
case <- ch :
default :
}
}
2017-08-04 16:14:17 +00:00
for _ , ch := range [ ] chan dataPack { d . headerCh , d . bodyCh , d . receiptCh } {
2016-07-03 19:44:31 +00:00
for empty := false ; ! empty ; {
select {
case <- ch :
default :
empty = true
}
}
}
for empty := false ; ! empty ; {
select {
case <- d . headerProcCh :
default :
empty = true
}
}
2016-09-08 09:45:12 +00:00
// Create cancel channel for aborting mid-flight and mark the master peer
2016-06-20 14:47:10 +00:00
d . cancelLock . Lock ( )
d . cancelCh = make ( chan struct { } )
2016-09-08 09:45:12 +00:00
d . cancelPeer = id
2016-06-20 14:47:10 +00:00
d . cancelLock . Unlock ( )
2017-05-01 11:09:48 +00:00
defer d . Cancel ( ) // No matter what, we can't leave the cancel channel open
2016-07-03 19:44:31 +00:00
2016-06-20 14:47:10 +00:00
// Set the requested sync mode, unless it's forbidden
d . mode = mode
2016-11-25 05:50:30 +00:00
if d . mode == FastSync && atomic . LoadUint32 ( & d . fsPivotFails ) >= fsCriticalTrials {
2016-06-20 14:47:10 +00:00
d . mode = FullSync
}
// Retrieve the origin peer and initiate the downloading process
p := d . peers . Peer ( id )
if p == nil {
return errUnknownPeer
}
return d . syncWithPeer ( p , hash , td )
}
// syncWithPeer starts a block synchronization based on the hash chain from the
// specified peer and head hash.
2017-08-04 16:14:17 +00:00
func ( d * Downloader ) syncWithPeer ( p * peerConnection , hash common . Hash , td * big . Int ) ( err error ) {
2016-06-20 14:47:10 +00:00
d . mux . Post ( StartEvent { } )
defer func ( ) {
// reset on error
if err != nil {
d . mux . Post ( FailedEvent { err } )
} else {
d . mux . Post ( DoneEvent { } )
}
} ( )
2016-09-08 09:45:12 +00:00
if p . version < 62 {
return errTooOld
}
2016-06-20 14:47:10 +00:00
2017-05-01 11:09:48 +00:00
log . Debug ( "Synchronising with the network" , "peer" , p . id , "eth" , p . version , "head" , hash , "td" , td , "mode" , d . mode )
2016-06-20 14:47:10 +00:00
defer func ( start time . Time ) {
2017-05-01 11:09:48 +00:00
log . Debug ( "Synchronisation terminated" , "elapsed" , time . Since ( start ) )
2016-06-20 14:47:10 +00:00
} ( time . Now ( ) )
2016-09-08 09:45:12 +00:00
// Look up the sync boundaries: the common ancestor and the target block
latest , err := d . fetchHeight ( p )
if err != nil {
return err
}
height := latest . Number . Uint64 ( )
2016-06-20 14:47:10 +00:00
2016-09-08 09:45:12 +00:00
origin , err := d . findAncestor ( p , height )
if err != nil {
return err
}
d . syncStatsLock . Lock ( )
if d . syncStatsChainHeight <= origin || d . syncStatsChainOrigin > origin {
d . syncStatsChainOrigin = origin
}
d . syncStatsChainHeight = height
d . syncStatsLock . Unlock ( )
2016-07-03 19:44:31 +00:00
2016-09-08 09:45:12 +00:00
// Initiate the sync using a concurrent header and content retrieval algorithm
pivot := uint64 ( 0 )
switch d . mode {
case LightSync :
pivot = height
case FastSync :
// Calculate the new fast/slow sync pivot point
if d . fsPivotLock == nil {
pivotOffset , err := rand . Int ( rand . Reader , big . NewInt ( int64 ( fsPivotInterval ) ) )
if err != nil {
panic ( fmt . Sprintf ( "Failed to access crypto random source: %v" , err ) )
2016-06-20 14:47:10 +00:00
}
2016-09-08 09:45:12 +00:00
if height > uint64 ( fsMinFullBlocks ) + pivotOffset . Uint64 ( ) {
pivot = height - uint64 ( fsMinFullBlocks ) - pivotOffset . Uint64 ( )
2016-06-20 14:47:10 +00:00
}
2016-09-08 09:45:12 +00:00
} else {
// Pivot point locked in, use this and do not pick a new one!
pivot = d . fsPivotLock . Number . Uint64 ( )
2016-06-20 14:47:10 +00:00
}
2016-09-08 09:45:12 +00:00
// If the point is below the origin, move origin back to ensure state download
if pivot < origin {
if pivot > 0 {
origin = pivot - 1
} else {
origin = 0
}
2016-06-20 14:47:10 +00:00
}
2017-05-01 11:09:48 +00:00
log . Debug ( "Fast syncing until pivot block" , "pivot" , pivot )
2016-06-20 14:47:10 +00:00
}
2016-09-08 09:45:12 +00:00
d . queue . Prepare ( origin + 1 , d . mode , pivot , latest )
if d . syncInitHook != nil {
d . syncInitHook ( origin , height )
}
2017-08-04 16:14:17 +00:00
fetchers := [ ] func ( ) error {
func ( ) error { return d . fetchHeaders ( p , origin + 1 ) } , // Headers are always retrieved
func ( ) error { return d . fetchBodies ( origin + 1 ) } , // Bodies are retrieved during normal and fast sync
func ( ) error { return d . fetchReceipts ( origin + 1 ) } , // Receipts are retrieved during fast sync
func ( ) error { return d . processHeaders ( origin + 1 , td ) } ,
}
if d . mode == FastSync {
fetchers = append ( fetchers , func ( ) error { return d . processFastSyncContent ( latest ) } )
} else if d . mode == FullSync {
fetchers = append ( fetchers , d . processFullSyncContent )
}
err = d . spawnSync ( fetchers )
if err != nil && d . mode == FastSync && d . fsPivotLock != nil {
// If sync failed in the critical section, bump the fail counter.
atomic . AddUint32 ( & d . fsPivotFails , 1 )
}
return err
2016-06-20 14:47:10 +00:00
}
// spawnSync runs d.process and all given fetcher functions to completion in
// separate goroutines, returning the first error that appears.
2017-08-04 16:14:17 +00:00
func ( d * Downloader ) spawnSync ( fetchers [ ] func ( ) error ) error {
2016-06-20 14:47:10 +00:00
var wg sync . WaitGroup
2017-08-04 16:14:17 +00:00
errc := make ( chan error , len ( fetchers ) )
wg . Add ( len ( fetchers ) )
2016-06-20 14:47:10 +00:00
for _ , fn := range fetchers {
fn := fn
go func ( ) { defer wg . Done ( ) ; errc <- fn ( ) } ( )
}
// Wait for the first error, then terminate the others.
var err error
2017-08-04 16:14:17 +00:00
for i := 0 ; i < len ( fetchers ) ; i ++ {
if i == len ( fetchers ) - 1 {
2016-06-20 14:47:10 +00:00
// Close the queue when all fetchers have exited.
// This will cause the block processor to end when
// it has processed the queue.
d . queue . Close ( )
}
if err = <- errc ; err != nil {
break
}
}
d . queue . Close ( )
2017-05-01 11:09:48 +00:00
d . Cancel ( )
2016-06-20 14:47:10 +00:00
wg . Wait ( )
return err
}
2017-05-01 11:09:48 +00:00
// Cancel cancels all of the operations and resets the queue. It returns true
2016-06-20 14:47:10 +00:00
// if the cancel operation was completed.
2017-05-01 11:09:48 +00:00
func ( d * Downloader ) Cancel ( ) {
2016-06-20 14:47:10 +00:00
// Close the current cancel channel
d . cancelLock . Lock ( )
if d . cancelCh != nil {
select {
case <- d . cancelCh :
// Channel was already closed
default :
close ( d . cancelCh )
}
}
d . cancelLock . Unlock ( )
}
// Terminate interrupts the downloader, canceling all pending operations.
// The downloader cannot be reused after calling Terminate.
func ( d * Downloader ) Terminate ( ) {
2016-07-03 19:44:31 +00:00
// Close the termination channel (make sure double close is allowed)
d . quitLock . Lock ( )
select {
case <- d . quitCh :
default :
close ( d . quitCh )
}
d . quitLock . Unlock ( )
// Cancel any pending download requests
2017-05-01 11:09:48 +00:00
d . Cancel ( )
2016-06-20 14:47:10 +00:00
}
// fetchHeight retrieves the head header of the remote peer to aid in estimating
// the total time a pending synchronisation would take.
2017-08-04 16:14:17 +00:00
func ( d * Downloader ) fetchHeight ( p * peerConnection ) ( * types . Header , error ) {
2017-05-01 11:09:48 +00:00
p . log . Debug ( "Retrieving remote chain height" )
2016-06-20 14:47:10 +00:00
// Request the advertised remote head block and wait for the response
2017-08-04 16:14:17 +00:00
head , _ := p . peer . Head ( )
go p . peer . RequestHeadersByHash ( head , 1 , 0 , false )
2016-06-20 14:47:10 +00:00
2017-05-01 11:09:48 +00:00
ttl := d . requestTTL ( )
timeout := time . After ( ttl )
2016-06-20 14:47:10 +00:00
for {
select {
case <- d . cancelCh :
2016-07-03 19:44:31 +00:00
return nil , errCancelBlockFetch
2016-06-20 14:47:10 +00:00
case packet := <- d . headerCh :
// Discard anything not from the origin peer
if packet . PeerId ( ) != p . id {
2017-05-01 11:09:48 +00:00
log . Debug ( "Received headers from incorrect peer" , "peer" , packet . PeerId ( ) )
2016-06-20 14:47:10 +00:00
break
}
// Make sure the peer actually gave something valid
headers := packet . ( * headerPack ) . headers
if len ( headers ) != 1 {
2017-05-01 11:09:48 +00:00
p . log . Debug ( "Multiple headers for single request" , "headers" , len ( headers ) )
2016-07-03 19:44:31 +00:00
return nil , errBadPeer
2016-06-20 14:47:10 +00:00
}
2017-05-01 11:09:48 +00:00
head := headers [ 0 ]
p . log . Debug ( "Remote head header identified" , "number" , head . Number , "hash" , head . Hash ( ) )
return head , nil
2016-06-20 14:47:10 +00:00
case <- timeout :
2017-05-01 11:09:48 +00:00
p . log . Debug ( "Waiting for head header timed out" , "elapsed" , ttl )
2016-07-03 19:44:31 +00:00
return nil , errTimeout
2016-06-20 14:47:10 +00:00
case <- d . bodyCh :
case <- d . receiptCh :
// Out of bounds delivery, ignore
}
}
}
// findAncestor tries to locate the common ancestor link of the local chain and
// a remote peers blockchain. In the general case when our node was in sync and
// on the correct chain, checking the top N links should already get us a match.
// In the rare scenario when we ended up on a long reorganisation (i.e. none of
// the head links match), we do a binary search to find the common ancestor.
2017-08-04 16:14:17 +00:00
func ( d * Downloader ) findAncestor ( p * peerConnection , height uint64 ) ( uint64 , error ) {
2016-07-03 19:44:31 +00:00
// Figure out the valid ancestor range to prevent rewrite attacks
2017-08-04 16:14:17 +00:00
floor , ceil := int64 ( - 1 ) , d . lightchain . CurrentHeader ( ) . Number . Uint64 ( )
2017-05-01 11:09:48 +00:00
p . log . Debug ( "Looking for common ancestor" , "local" , ceil , "remote" , height )
2016-06-20 14:47:10 +00:00
if d . mode == FullSync {
2017-08-04 16:14:17 +00:00
ceil = d . blockchain . CurrentBlock ( ) . NumberU64 ( )
2016-06-20 14:47:10 +00:00
} else if d . mode == FastSync {
2017-08-04 16:14:17 +00:00
ceil = d . blockchain . CurrentFastBlock ( ) . NumberU64 ( )
2016-07-03 19:44:31 +00:00
}
if ceil >= MaxForkAncestry {
floor = int64 ( ceil - MaxForkAncestry )
}
// Request the topmost blocks to short circuit binary ancestor lookup
head := ceil
if head > height {
head = height
2016-06-20 14:47:10 +00:00
}
2016-09-08 09:45:12 +00:00
from := int64 ( head ) - int64 ( MaxHeaderFetch )
2016-06-20 14:47:10 +00:00
if from < 0 {
from = 0
}
2016-09-08 09:45:12 +00:00
// Span out with 15 block gaps into the future to catch bad head reports
limit := 2 * MaxHeaderFetch / 16
count := 1 + int ( ( int64 ( ceil ) - from ) / 16 )
if count > limit {
count = limit
}
2017-08-04 16:14:17 +00:00
go p . peer . RequestHeadersByNumber ( uint64 ( from ) , count , 15 , false )
2016-06-20 14:47:10 +00:00
// Wait for the remote response to the head fetch
number , hash := uint64 ( 0 ) , common . Hash { }
2017-05-01 11:09:48 +00:00
ttl := d . requestTTL ( )
timeout := time . After ( ttl )
2016-06-20 14:47:10 +00:00
for finished := false ; ! finished ; {
select {
case <- d . cancelCh :
2016-09-08 09:45:12 +00:00
return 0 , errCancelHeaderFetch
2016-06-20 14:47:10 +00:00
case packet := <- d . headerCh :
// Discard anything not from the origin peer
if packet . PeerId ( ) != p . id {
2017-05-01 11:09:48 +00:00
log . Debug ( "Received headers from incorrect peer" , "peer" , packet . PeerId ( ) )
2016-06-20 14:47:10 +00:00
break
}
// Make sure the peer actually gave something valid
headers := packet . ( * headerPack ) . headers
if len ( headers ) == 0 {
2017-05-01 11:09:48 +00:00
p . log . Warn ( "Empty head header set" )
2016-06-20 14:47:10 +00:00
return 0 , errEmptyHeaderSet
}
// Make sure the peer's reply conforms to the request
for i := 0 ; i < len ( headers ) ; i ++ {
2016-09-08 09:45:12 +00:00
if number := headers [ i ] . Number . Int64 ( ) ; number != from + int64 ( i ) * 16 {
2017-05-01 11:09:48 +00:00
p . log . Warn ( "Head headers broke chain ordering" , "index" , i , "requested" , from + int64 ( i ) * 16 , "received" , number )
2016-06-20 14:47:10 +00:00
return 0 , errInvalidChain
}
}
// Check if a common ancestor was found
finished = true
for i := len ( headers ) - 1 ; i >= 0 ; i -- {
// Skip any headers that underflow/overflow our requested set
2016-09-08 09:45:12 +00:00
if headers [ i ] . Number . Int64 ( ) < from || headers [ i ] . Number . Uint64 ( ) > ceil {
2016-06-20 14:47:10 +00:00
continue
}
// Otherwise check if we already know the header or not
2017-10-10 09:38:49 +00:00
if ( d . mode == FullSync && d . blockchain . HasBlockAndState ( headers [ i ] . Hash ( ) ) ) || ( d . mode != FullSync && d . lightchain . HasHeader ( headers [ i ] . Hash ( ) , headers [ i ] . Number . Uint64 ( ) ) ) {
2016-06-20 14:47:10 +00:00
number , hash = headers [ i ] . Number . Uint64 ( ) , headers [ i ] . Hash ( )
2016-09-08 09:45:12 +00:00
// If every header is known, even future ones, the peer straight out lied about its head
if number > height && i == limit - 1 {
2017-05-01 11:09:48 +00:00
p . log . Warn ( "Lied about chain head" , "reported" , height , "found" , number )
2016-09-08 09:45:12 +00:00
return 0 , errStallingPeer
}
2016-06-20 14:47:10 +00:00
break
}
}
case <- timeout :
2017-05-01 11:09:48 +00:00
p . log . Debug ( "Waiting for head header timed out" , "elapsed" , ttl )
2016-06-20 14:47:10 +00:00
return 0 , errTimeout
case <- d . bodyCh :
case <- d . receiptCh :
// Out of bounds delivery, ignore
}
}
// If the head fetch already found an ancestor, return
if ! common . EmptyHash ( hash ) {
2016-07-03 19:44:31 +00:00
if int64 ( number ) <= floor {
2017-05-01 11:09:48 +00:00
p . log . Warn ( "Ancestor below allowance" , "number" , number , "hash" , hash , "allowance" , floor )
2016-07-03 19:44:31 +00:00
return 0 , errInvalidAncestor
}
2017-05-01 11:09:48 +00:00
p . log . Debug ( "Found common ancestor" , "number" , number , "hash" , hash )
2016-06-20 14:47:10 +00:00
return number , nil
}
// Ancestor not found, we need to binary search over our chain
start , end := uint64 ( 0 ) , head
2016-07-03 19:44:31 +00:00
if floor > 0 {
start = uint64 ( floor )
}
2016-06-20 14:47:10 +00:00
for start + 1 < end {
// Split our chain interval in two, and request the hash to cross check
check := ( start + end ) / 2
2017-05-01 11:09:48 +00:00
ttl := d . requestTTL ( )
timeout := time . After ( ttl )
2017-08-04 16:14:17 +00:00
go p . peer . RequestHeadersByNumber ( uint64 ( check ) , 1 , 0 , false )
2016-06-20 14:47:10 +00:00
// Wait until a reply arrives to this request
for arrived := false ; ! arrived ; {
select {
case <- d . cancelCh :
2016-09-08 09:45:12 +00:00
return 0 , errCancelHeaderFetch
2016-06-20 14:47:10 +00:00
case packer := <- d . headerCh :
// Discard anything not from the origin peer
if packer . PeerId ( ) != p . id {
2017-05-01 11:09:48 +00:00
log . Debug ( "Received headers from incorrect peer" , "peer" , packer . PeerId ( ) )
2016-06-20 14:47:10 +00:00
break
}
// Make sure the peer actually gave something valid
headers := packer . ( * headerPack ) . headers
if len ( headers ) != 1 {
2017-05-01 11:09:48 +00:00
p . log . Debug ( "Multiple headers for single request" , "headers" , len ( headers ) )
2016-06-20 14:47:10 +00:00
return 0 , errBadPeer
}
arrived = true
// Modify the search interval based on the response
2017-10-10 09:38:49 +00:00
if ( d . mode == FullSync && ! d . blockchain . HasBlockAndState ( headers [ 0 ] . Hash ( ) ) ) || ( d . mode != FullSync && ! d . lightchain . HasHeader ( headers [ 0 ] . Hash ( ) , headers [ 0 ] . Number . Uint64 ( ) ) ) {
2016-06-20 14:47:10 +00:00
end = check
break
}
2017-08-04 16:14:17 +00:00
header := d . lightchain . GetHeaderByHash ( headers [ 0 ] . Hash ( ) ) // Independent of sync mode, header surely exists
2016-06-20 14:47:10 +00:00
if header . Number . Uint64 ( ) != check {
2017-05-01 11:09:48 +00:00
p . log . Debug ( "Received non requested header" , "number" , header . Number , "hash" , header . Hash ( ) , "request" , check )
2016-06-20 14:47:10 +00:00
return 0 , errBadPeer
}
start = check
case <- timeout :
2017-05-01 11:09:48 +00:00
p . log . Debug ( "Waiting for search header timed out" , "elapsed" , ttl )
2016-06-20 14:47:10 +00:00
return 0 , errTimeout
case <- d . bodyCh :
case <- d . receiptCh :
// Out of bounds delivery, ignore
}
}
}
2016-07-03 19:44:31 +00:00
// Ensure valid ancestry and return
if int64 ( start ) <= floor {
2017-05-01 11:09:48 +00:00
p . log . Warn ( "Ancestor below allowance" , "number" , start , "hash" , hash , "allowance" , floor )
2016-07-03 19:44:31 +00:00
return 0 , errInvalidAncestor
}
2017-05-01 11:09:48 +00:00
p . log . Debug ( "Found common ancestor" , "number" , start , "hash" , hash )
2016-06-20 14:47:10 +00:00
return start , nil
}
2016-07-03 19:44:31 +00:00
// fetchHeaders keeps retrieving headers concurrently from the number
// requested, until no more are returned, potentially throttling on the way. To
// facilitate concurrency but still protect against malicious nodes sending bad
// headers, we construct a header chain skeleton using the "origin" peer we are
// syncing with, and fill in the missing headers using anyone else. Headers from
// other peers are only accepted if they map cleanly to the skeleton. If no one
// can fill in the skeleton - not even the origin peer - it's assumed invalid and
// the origin is dropped.
2017-08-04 16:14:17 +00:00
func ( d * Downloader ) fetchHeaders ( p * peerConnection , from uint64 ) error {
2017-05-01 11:09:48 +00:00
p . log . Debug ( "Directing header downloads" , "origin" , from )
defer p . log . Debug ( "Header download terminated" )
2016-06-20 14:47:10 +00:00
2016-07-03 19:44:31 +00:00
// Create a timeout timer, and the associated header fetcher
skeleton := true // Skeleton assembly phase or finishing up
request := time . Now ( ) // time of the last skeleton fetch request
2016-06-20 14:47:10 +00:00
timeout := time . NewTimer ( 0 ) // timer to dump a non-responsive active peer
<- timeout . C // timeout channel should be initially empty
defer timeout . Stop ( )
2017-05-01 11:09:48 +00:00
var ttl time . Duration
2016-06-20 14:47:10 +00:00
getHeaders := func ( from uint64 ) {
request = time . Now ( )
2017-05-01 11:09:48 +00:00
ttl = d . requestTTL ( )
timeout . Reset ( ttl )
2016-07-03 19:44:31 +00:00
if skeleton {
2017-05-01 11:09:48 +00:00
p . log . Trace ( "Fetching skeleton headers" , "count" , MaxHeaderFetch , "from" , from )
2017-08-04 16:14:17 +00:00
go p . peer . RequestHeadersByNumber ( from + uint64 ( MaxHeaderFetch ) - 1 , MaxSkeletonSize , MaxHeaderFetch - 1 , false )
2016-07-03 19:44:31 +00:00
} else {
2017-05-01 11:09:48 +00:00
p . log . Trace ( "Fetching full headers" , "count" , MaxHeaderFetch , "from" , from )
2017-08-04 16:14:17 +00:00
go p . peer . RequestHeadersByNumber ( from , MaxHeaderFetch , 0 , false )
2016-07-03 19:44:31 +00:00
}
2016-06-20 14:47:10 +00:00
}
2016-07-03 19:44:31 +00:00
// Start pulling the header chain skeleton until all is done
2016-06-20 14:47:10 +00:00
getHeaders ( from )
for {
select {
case <- d . cancelCh :
return errCancelHeaderFetch
case packet := <- d . headerCh :
2016-07-03 19:44:31 +00:00
// Make sure the active peer is giving us the skeleton headers
2016-06-20 14:47:10 +00:00
if packet . PeerId ( ) != p . id {
2017-05-01 11:09:48 +00:00
log . Debug ( "Received skeleton from incorrect peer" , "peer" , packet . PeerId ( ) )
2016-06-20 14:47:10 +00:00
break
}
headerReqTimer . UpdateSince ( request )
timeout . Stop ( )
2016-07-03 19:44:31 +00:00
// If the skeleton's finished, pull any remaining head headers directly from the origin
if packet . Items ( ) == 0 && skeleton {
skeleton = false
getHeaders ( from )
continue
}
2016-06-20 14:47:10 +00:00
// If no more headers are inbound, notify the content fetchers and return
if packet . Items ( ) == 0 {
2017-05-01 11:09:48 +00:00
p . log . Debug ( "No more headers available" )
2016-07-03 19:44:31 +00:00
select {
case d . headerProcCh <- nil :
return nil
case <- d . cancelCh :
return errCancelHeaderFetch
2016-06-20 14:47:10 +00:00
}
}
headers := packet . ( * headerPack ) . headers
2016-07-03 19:44:31 +00:00
// If we received a skeleton batch, resolve internals concurrently
if skeleton {
filled , proced , err := d . fillHeaderSkeleton ( from , headers )
if err != nil {
2017-05-01 11:09:48 +00:00
p . log . Debug ( "Skeleton chain invalid" , "err" , err )
2016-06-20 14:47:10 +00:00
return errInvalidChain
}
2016-07-03 19:44:31 +00:00
headers = filled [ proced : ]
from += uint64 ( proced )
2016-06-20 14:47:10 +00:00
}
2016-07-03 19:44:31 +00:00
// Insert all the new headers and fetch the next batch
if len ( headers ) > 0 {
2017-05-01 11:09:48 +00:00
p . log . Trace ( "Scheduling new headers" , "count" , len ( headers ) , "from" , from )
2016-07-03 19:44:31 +00:00
select {
case d . headerProcCh <- headers :
case <- d . cancelCh :
return errCancelHeaderFetch
2016-06-20 14:47:10 +00:00
}
2016-07-03 19:44:31 +00:00
from += uint64 ( len ( headers ) )
2016-06-20 14:47:10 +00:00
}
getHeaders ( from )
case <- timeout . C :
// Header retrieval timed out, consider the peer bad and drop
2017-05-01 11:09:48 +00:00
p . log . Debug ( "Header request timed out" , "elapsed" , ttl )
2016-06-20 14:47:10 +00:00
headerTimeoutMeter . Mark ( 1 )
d . dropPeer ( p . id )
// Finish the sync gracefully instead of dumping the gathered data though
2017-08-04 16:14:17 +00:00
for _ , ch := range [ ] chan bool { d . bodyWakeCh , d . receiptWakeCh } {
2016-06-20 14:47:10 +00:00
select {
case ch <- false :
case <- d . cancelCh :
}
}
2016-07-03 19:44:31 +00:00
select {
case d . headerProcCh <- nil :
case <- d . cancelCh :
}
return errBadPeer
2016-06-20 14:47:10 +00:00
}
}
}
2016-07-03 19:44:31 +00:00
// fillHeaderSkeleton concurrently retrieves headers from all our available peers
// and maps them to the provided skeleton header chain.
//
// Any partial results from the beginning of the skeleton is (if possible) forwarded
// immediately to the header processor to keep the rest of the pipeline full even
// in the case of header stalls.
//
// The method returs the entire filled skeleton and also the number of headers
// already forwarded for processing.
func ( d * Downloader ) fillHeaderSkeleton ( from uint64 , skeleton [ ] * types . Header ) ( [ ] * types . Header , int , error ) {
2017-05-01 11:09:48 +00:00
log . Debug ( "Filling up skeleton" , "from" , from )
2016-07-03 19:44:31 +00:00
d . queue . ScheduleSkeleton ( from , skeleton )
var (
deliver = func ( packet dataPack ) ( int , error ) {
pack := packet . ( * headerPack )
return d . queue . DeliverHeaders ( pack . peerId , pack . headers , d . headerProcCh )
}
expire = func ( ) map [ string ] int { return d . queue . ExpireHeaders ( d . requestTTL ( ) ) }
throttle = func ( ) bool { return false }
2017-08-04 16:14:17 +00:00
reserve = func ( p * peerConnection , count int ) ( * fetchRequest , bool , error ) {
2016-07-03 19:44:31 +00:00
return d . queue . ReserveHeaders ( p , count ) , false , nil
}
2017-08-04 16:14:17 +00:00
fetch = func ( p * peerConnection , req * fetchRequest ) error { return p . FetchHeaders ( req . From , MaxHeaderFetch ) }
capacity = func ( p * peerConnection ) int { return p . HeaderCapacity ( d . requestRTT ( ) ) }
setIdle = func ( p * peerConnection , accepted int ) { p . SetHeadersIdle ( accepted ) }
2016-07-03 19:44:31 +00:00
)
err := d . fetchParts ( errCancelHeaderFetch , d . headerCh , deliver , d . queue . headerContCh , expire ,
d . queue . PendingHeaders , d . queue . InFlightHeaders , throttle , reserve ,
2017-05-01 11:09:48 +00:00
nil , fetch , d . queue . CancelHeaders , capacity , d . peers . HeaderIdlePeers , setIdle , "headers" )
2016-07-03 19:44:31 +00:00
2017-05-01 11:09:48 +00:00
log . Debug ( "Skeleton fill terminated" , "err" , err )
2016-07-03 19:44:31 +00:00
filled , proced := d . queue . RetrieveHeaders ( )
return filled , proced , err
}
2016-06-20 14:47:10 +00:00
// fetchBodies iteratively downloads the scheduled block bodies, taking any
// available peers, reserving a chunk of blocks for each, waiting for delivery
// and also periodically checking for timeouts.
func ( d * Downloader ) fetchBodies ( from uint64 ) error {
2017-05-01 11:09:48 +00:00
log . Debug ( "Downloading block bodies" , "origin" , from )
2016-06-20 14:47:10 +00:00
var (
deliver = func ( packet dataPack ) ( int , error ) {
pack := packet . ( * bodyPack )
return d . queue . DeliverBodies ( pack . peerId , pack . transactions , pack . uncles )
}
2016-07-03 19:44:31 +00:00
expire = func ( ) map [ string ] int { return d . queue . ExpireBodies ( d . requestTTL ( ) ) }
2017-08-04 16:14:17 +00:00
fetch = func ( p * peerConnection , req * fetchRequest ) error { return p . FetchBodies ( req ) }
capacity = func ( p * peerConnection ) int { return p . BlockCapacity ( d . requestRTT ( ) ) }
setIdle = func ( p * peerConnection , accepted int ) { p . SetBodiesIdle ( accepted ) }
2016-06-20 14:47:10 +00:00
)
err := d . fetchParts ( errCancelBodyFetch , d . bodyCh , deliver , d . bodyWakeCh , expire ,
d . queue . PendingBlocks , d . queue . InFlightBlocks , d . queue . ShouldThrottleBlocks , d . queue . ReserveBodies ,
2017-05-01 11:09:48 +00:00
d . bodyFetchHook , fetch , d . queue . CancelBodies , capacity , d . peers . BodyIdlePeers , setIdle , "bodies" )
2016-06-20 14:47:10 +00:00
2017-05-01 11:09:48 +00:00
log . Debug ( "Block body download terminated" , "err" , err )
2016-06-20 14:47:10 +00:00
return err
}
// fetchReceipts iteratively downloads the scheduled block receipts, taking any
// available peers, reserving a chunk of receipts for each, waiting for delivery
// and also periodically checking for timeouts.
func ( d * Downloader ) fetchReceipts ( from uint64 ) error {
2017-05-01 11:09:48 +00:00
log . Debug ( "Downloading transaction receipts" , "origin" , from )
2016-06-20 14:47:10 +00:00
var (
deliver = func ( packet dataPack ) ( int , error ) {
pack := packet . ( * receiptPack )
return d . queue . DeliverReceipts ( pack . peerId , pack . receipts )
}
2016-07-03 19:44:31 +00:00
expire = func ( ) map [ string ] int { return d . queue . ExpireReceipts ( d . requestTTL ( ) ) }
2017-08-04 16:14:17 +00:00
fetch = func ( p * peerConnection , req * fetchRequest ) error { return p . FetchReceipts ( req ) }
capacity = func ( p * peerConnection ) int { return p . ReceiptCapacity ( d . requestRTT ( ) ) }
setIdle = func ( p * peerConnection , accepted int ) { p . SetReceiptsIdle ( accepted ) }
2016-06-20 14:47:10 +00:00
)
err := d . fetchParts ( errCancelReceiptFetch , d . receiptCh , deliver , d . receiptWakeCh , expire ,
d . queue . PendingReceipts , d . queue . InFlightReceipts , d . queue . ShouldThrottleReceipts , d . queue . ReserveReceipts ,
2017-05-01 11:09:48 +00:00
d . receiptFetchHook , fetch , d . queue . CancelReceipts , capacity , d . peers . ReceiptIdlePeers , setIdle , "receipts" )
2016-06-20 14:47:10 +00:00
2017-05-01 11:09:48 +00:00
log . Debug ( "Transaction receipt download terminated" , "err" , err )
2016-06-20 14:47:10 +00:00
return err
}
// fetchParts iteratively downloads scheduled block parts, taking any available
// peers, reserving a chunk of fetch requests for each, waiting for delivery and
// also periodically checking for timeouts.
2016-07-03 19:44:31 +00:00
//
// As the scheduling/timeout logic mostly is the same for all downloaded data
// types, this method is used by each for data gathering and is instrumented with
// various callbacks to handle the slight differences between processing them.
//
// The instrumentation parameters:
// - errCancel: error type to return if the fetch operation is cancelled (mostly makes logging nicer)
// - deliveryCh: channel from which to retrieve downloaded data packets (merged from all concurrent peers)
// - deliver: processing callback to deliver data packets into type specific download queues (usually within `queue`)
// - wakeCh: notification channel for waking the fetcher when new tasks are available (or sync completed)
// - expire: task callback method to abort requests that took too long and return the faulty peers (traffic shaping)
// - pending: task callback for the number of requests still needing download (detect completion/non-completability)
// - inFlight: task callback for the number of in-progress requests (wait for all active downloads to finish)
// - throttle: task callback to check if the processing queue is full and activate throttling (bound memory use)
// - reserve: task callback to reserve new download tasks to a particular peer (also signals partial completions)
// - fetchHook: tester callback to notify of new tasks being initiated (allows testing the scheduling logic)
// - fetch: network callback to actually send a particular download request to a physical remote peer
// - cancel: task callback to abort an in-flight download request and allow rescheduling it (in case of lost peer)
2017-02-23 00:22:43 +00:00
// - capacity: network callback to retrieve the estimated type-specific bandwidth capacity of a peer (traffic shaping)
2016-07-03 19:44:31 +00:00
// - idle: network callback to retrieve the currently (type specific) idle peers that can be assigned tasks
// - setIdle: network callback to set a peer back to idle and update its estimated capacity (traffic shaping)
// - kind: textual label of the type being downloaded to display in log mesages
2016-06-20 14:47:10 +00:00
func ( d * Downloader ) fetchParts ( errCancel error , deliveryCh chan dataPack , deliver func ( dataPack ) ( int , error ) , wakeCh chan bool ,
2017-08-04 16:14:17 +00:00
expire func ( ) map [ string ] int , pending func ( ) int , inFlight func ( ) bool , throttle func ( ) bool , reserve func ( * peerConnection , int ) ( * fetchRequest , bool , error ) ,
fetchHook func ( [ ] * types . Header ) , fetch func ( * peerConnection , * fetchRequest ) error , cancel func ( * fetchRequest ) , capacity func ( * peerConnection ) int ,
idle func ( ) ( [ ] * peerConnection , int ) , setIdle func ( * peerConnection , int ) , kind string ) error {
2016-06-20 14:47:10 +00:00
// Create a ticker to detect expired retrieval tasks
ticker := time . NewTicker ( 100 * time . Millisecond )
defer ticker . Stop ( )
update := make ( chan struct { } , 1 )
// Prepare the queue and fetch block parts until the block header fetcher's done
finished := false
for {
select {
case <- d . cancelCh :
return errCancel
case packet := <- deliveryCh :
// If the peer was previously banned and failed to deliver it's pack
// in a reasonable time frame, ignore it's message.
if peer := d . peers . Peer ( packet . PeerId ( ) ) ; peer != nil {
// Deliver the received chunk of data and check chain validity
accepted , err := deliver ( packet )
if err == errInvalidChain {
return err
}
// Unless a peer delivered something completely else than requested (usually
// caused by a timed out request which came through in the end), set it to
// idle. If the delivery's stale, the peer should have already been idled.
if err != errStaleDelivery {
setIdle ( peer , accepted )
}
// Issue a log to the user to see what's going on
switch {
case err == nil && packet . Items ( ) == 0 :
2017-05-01 11:09:48 +00:00
peer . log . Trace ( "Requested data not delivered" , "type" , kind )
2016-06-20 14:47:10 +00:00
case err == nil :
2017-05-01 11:09:48 +00:00
peer . log . Trace ( "Delivered new batch of data" , "type" , kind , "count" , packet . Stats ( ) )
2016-06-20 14:47:10 +00:00
default :
2017-05-01 11:09:48 +00:00
peer . log . Trace ( "Failed to deliver retrieved data" , "type" , kind , "err" , err )
2016-06-20 14:47:10 +00:00
}
}
// Blocks assembled, try to update the progress
select {
case update <- struct { } { } :
default :
}
case cont := <- wakeCh :
// The header fetcher sent a continuation flag, check if it's done
if ! cont {
finished = true
}
// Headers arrive, try to update the progress
select {
case update <- struct { } { } :
default :
}
case <- ticker . C :
// Sanity check update the progress
select {
case update <- struct { } { } :
default :
}
case <- update :
// Short circuit if we lost all our peers
if d . peers . Len ( ) == 0 {
return errNoPeers
}
// Check for fetch request timeouts and demote the responsible peers
for pid , fails := range expire ( ) {
if peer := d . peers . Peer ( pid ) ; peer != nil {
2016-09-08 09:45:12 +00:00
// If a lot of retrieval elements expired, we might have overestimated the remote peer or perhaps
// ourselves. Only reset to minimal throughput but don't drop just yet. If even the minimal times
// out that sync wise we need to get rid of the peer.
//
// The reason the minimum threshold is 2 is because the downloader tries to estimate the bandwidth
// and latency of a peer separately, which requires pushing the measures capacity a bit and seeing
// how response times reacts, to it always requests one more than the minimum (i.e. min 2).
if fails > 2 {
2017-05-01 11:09:48 +00:00
peer . log . Trace ( "Data delivery timed out" , "type" , kind )
2016-06-20 14:47:10 +00:00
setIdle ( peer , 0 )
} else {
2017-05-01 11:09:48 +00:00
peer . log . Debug ( "Stalling delivery, dropping" , "type" , kind )
2016-06-20 14:47:10 +00:00
d . dropPeer ( pid )
}
}
}
// If there's nothing more to fetch, wait or terminate
if pending ( ) == 0 {
if ! inFlight ( ) && finished {
2017-05-01 11:09:48 +00:00
log . Debug ( "Data fetching completed" , "type" , kind )
2016-06-20 14:47:10 +00:00
return nil
}
break
}
// Send a download request to all idle peers, until throttled
progressed , throttled , running := false , false , inFlight ( )
idles , total := idle ( )
for _ , peer := range idles {
// Short circuit if throttling activated
if throttle ( ) {
throttled = true
break
}
2017-10-10 09:38:49 +00:00
// Short circuit if there is no more available task.
if pending ( ) == 0 {
break
}
2016-06-20 14:47:10 +00:00
// Reserve a chunk of fetches for a peer. A nil can mean either that
// no more headers are available, or that the peer is known not to
// have them.
request , progress , err := reserve ( peer , capacity ( peer ) )
if err != nil {
return err
}
if progress {
progressed = true
}
if request == nil {
continue
}
2017-05-01 11:09:48 +00:00
if request . From > 0 {
peer . log . Trace ( "Requesting new batch of data" , "type" , kind , "from" , request . From )
} else if len ( request . Headers ) > 0 {
peer . log . Trace ( "Requesting new batch of data" , "type" , kind , "count" , len ( request . Headers ) , "from" , request . Headers [ 0 ] . Number )
} else {
peer . log . Trace ( "Requesting new batch of data" , "type" , kind , "count" , len ( request . Hashes ) )
2016-06-20 14:47:10 +00:00
}
// Fetch the chunk and make sure any errors return the hashes to the queue
if fetchHook != nil {
fetchHook ( request . Headers )
}
if err := fetch ( peer , request ) ; err != nil {
// Although we could try and make an attempt to fix this, this error really
// means that we've double allocated a fetch task to a peer. If that is the
// case, the internal state of the downloader and the queue is very wrong so
// better hard crash and note the error instead of silently accumulating into
// a much bigger issue.
2017-05-01 11:09:48 +00:00
panic ( fmt . Sprintf ( "%v: %s fetch assignment failed" , peer , kind ) )
2016-06-20 14:47:10 +00:00
}
running = true
}
// Make sure that we have peers available for fetching. If all peers have been tried
// and all failed throw an error
if ! progressed && ! throttled && ! running && len ( idles ) == total && pending ( ) > 0 {
return errPeersUnavailable
}
}
}
}
2016-07-03 19:44:31 +00:00
// processHeaders takes batches of retrieved headers from an input channel and
// keeps processing and scheduling them into the header chain and downloader's
// queue until the stream ends or a failure occurs.
func ( d * Downloader ) processHeaders ( origin uint64 , td * big . Int ) error {
// Calculate the pivoting point for switching from fast to slow sync
pivot := d . queue . FastSyncPivot ( )
// Keep a count of uncertain headers to roll back
rollback := [ ] * types . Header { }
defer func ( ) {
if len ( rollback ) > 0 {
// Flatten the headers and roll them back
hashes := make ( [ ] common . Hash , len ( rollback ) )
for i , header := range rollback {
hashes [ i ] = header . Hash ( )
}
2017-08-04 16:14:17 +00:00
lastHeader , lastFastBlock , lastBlock := d . lightchain . CurrentHeader ( ) . Number , common . Big0 , common . Big0
if d . mode != LightSync {
lastFastBlock = d . blockchain . CurrentFastBlock ( ) . Number ( )
lastBlock = d . blockchain . CurrentBlock ( ) . Number ( )
2016-07-03 19:44:31 +00:00
}
2017-08-04 16:14:17 +00:00
d . lightchain . Rollback ( hashes )
2016-07-03 19:44:31 +00:00
curFastBlock , curBlock := common . Big0 , common . Big0
2017-08-04 16:14:17 +00:00
if d . mode != LightSync {
curFastBlock = d . blockchain . CurrentFastBlock ( ) . Number ( )
curBlock = d . blockchain . CurrentBlock ( ) . Number ( )
2016-07-03 19:44:31 +00:00
}
2017-05-01 11:09:48 +00:00
log . Warn ( "Rolled back headers" , "count" , len ( hashes ) ,
2017-08-04 16:14:17 +00:00
"header" , fmt . Sprintf ( "%d->%d" , lastHeader , d . lightchain . CurrentHeader ( ) . Number ) ,
2017-05-01 11:09:48 +00:00
"fast" , fmt . Sprintf ( "%d->%d" , lastFastBlock , curFastBlock ) ,
"block" , fmt . Sprintf ( "%d->%d" , lastBlock , curBlock ) )
2016-07-03 19:44:31 +00:00
// If we're already past the pivot point, this could be an attack, thread carefully
if rollback [ len ( rollback ) - 1 ] . Number . Uint64 ( ) > pivot {
2017-10-10 09:38:49 +00:00
// If we didn't ever fail, lock in the pivot header (must! not! change!)
2016-11-25 05:50:30 +00:00
if atomic . LoadUint32 ( & d . fsPivotFails ) == 0 {
2016-07-03 19:44:31 +00:00
for _ , header := range rollback {
if header . Number . Uint64 ( ) == pivot {
2017-05-01 11:09:48 +00:00
log . Warn ( "Fast-sync pivot locked in" , "number" , pivot , "hash" , header . Hash ( ) )
2016-07-03 19:44:31 +00:00
d . fsPivotLock = header
}
}
}
}
}
} ( )
// Wait for batches of headers to process
gotHeaders := false
for {
select {
case <- d . cancelCh :
return errCancelHeaderProcessing
case headers := <- d . headerProcCh :
// Terminate header processing if we synced up
if len ( headers ) == 0 {
// Notify everyone that headers are fully processed
2017-08-04 16:14:17 +00:00
for _ , ch := range [ ] chan bool { d . bodyWakeCh , d . receiptWakeCh } {
2016-07-03 19:44:31 +00:00
select {
case ch <- false :
case <- d . cancelCh :
}
}
// If no headers were retrieved at all, the peer violated it's TD promise that it had a
// better chain compared to ours. The only exception is if it's promised blocks were
// already imported by other means (e.g. fecher):
//
// R <remote peer>, L <local node>: Both at block 10
// R: Mine block 11, and propagate it to L
// L: Queue block 11 for import
// L: Notice that R's head and TD increased compared to ours, start sync
// L: Import of block 11 finishes
// L: Sync begins, and finds common ancestor at 11
// L: Request new headers up from 11 (R's TD was higher, it must have something)
// R: Nothing to give
if d . mode != LightSync {
2017-08-04 16:14:17 +00:00
if ! gotHeaders && td . Cmp ( d . blockchain . GetTdByHash ( d . blockchain . CurrentBlock ( ) . Hash ( ) ) ) > 0 {
2016-07-03 19:44:31 +00:00
return errStallingPeer
}
}
// If fast or light syncing, ensure promised headers are indeed delivered. This is
// needed to detect scenarios where an attacker feeds a bad pivot and then bails out
// of delivering the post-pivot blocks that would flag the invalid content.
//
// This check cannot be executed "as is" for full imports, since blocks may still be
// queued for processing when the header download completes. However, as long as the
// peer gave us something useful, we're already happy/progressed (above check).
if d . mode == FastSync || d . mode == LightSync {
2017-08-04 16:14:17 +00:00
if td . Cmp ( d . lightchain . GetTdByHash ( d . lightchain . CurrentHeader ( ) . Hash ( ) ) ) > 0 {
2016-07-03 19:44:31 +00:00
return errStallingPeer
}
}
// Disable any rollback and return
rollback = nil
return nil
}
// Otherwise split the chunk of headers into batches and process them
gotHeaders = true
for len ( headers ) > 0 {
// Terminate if something failed in between processing chunks
select {
case <- d . cancelCh :
return errCancelHeaderProcessing
default :
}
// Select the next chunk of headers to import
limit := maxHeadersProcess
if limit > len ( headers ) {
limit = len ( headers )
}
chunk := headers [ : limit ]
// In case of header only syncing, validate the chunk immediately
if d . mode == FastSync || d . mode == LightSync {
// Collect the yet unknown headers to mark them as uncertain
unknown := make ( [ ] * types . Header , 0 , len ( headers ) )
for _ , header := range chunk {
2017-10-10 09:38:49 +00:00
if ! d . lightchain . HasHeader ( header . Hash ( ) , header . Number . Uint64 ( ) ) {
2016-07-03 19:44:31 +00:00
unknown = append ( unknown , header )
}
}
// If we're importing pure headers, verify based on their recentness
frequency := fsHeaderCheckFrequency
if chunk [ len ( chunk ) - 1 ] . Number . Uint64 ( ) + uint64 ( fsHeaderForceVerify ) > pivot {
frequency = 1
}
2017-08-04 16:14:17 +00:00
if n , err := d . lightchain . InsertHeaderChain ( chunk , frequency ) ; err != nil {
2016-07-03 19:44:31 +00:00
// If some headers were inserted, add them too to the rollback list
if n > 0 {
rollback = append ( rollback , chunk [ : n ] ... )
}
2017-05-01 11:09:48 +00:00
log . Debug ( "Invalid header encountered" , "number" , chunk [ n ] . Number , "hash" , chunk [ n ] . Hash ( ) , "err" , err )
2016-07-03 19:44:31 +00:00
return errInvalidChain
}
// All verifications passed, store newly found uncertain headers
rollback = append ( rollback , unknown ... )
if len ( rollback ) > fsHeaderSafetyNet {
rollback = append ( rollback [ : 0 ] , rollback [ len ( rollback ) - fsHeaderSafetyNet : ] ... )
}
}
// If we're fast syncing and just pulled in the pivot, make sure it's the one locked in
if d . mode == FastSync && d . fsPivotLock != nil && chunk [ 0 ] . Number . Uint64 ( ) <= pivot && chunk [ len ( chunk ) - 1 ] . Number . Uint64 ( ) >= pivot {
if pivot := chunk [ int ( pivot - chunk [ 0 ] . Number . Uint64 ( ) ) ] ; pivot . Hash ( ) != d . fsPivotLock . Hash ( ) {
2017-05-01 11:09:48 +00:00
log . Warn ( "Pivot doesn't match locked in one" , "remoteNumber" , pivot . Number , "remoteHash" , pivot . Hash ( ) , "localNumber" , d . fsPivotLock . Number , "localHash" , d . fsPivotLock . Hash ( ) )
2016-07-03 19:44:31 +00:00
return errInvalidChain
}
}
// Unless we're doing light chains, schedule the headers for associated content retrieval
if d . mode == FullSync || d . mode == FastSync {
// If we've reached the allowed number of pending headers, stall a bit
for d . queue . PendingBlocks ( ) >= maxQueuedHeaders || d . queue . PendingReceipts ( ) >= maxQueuedHeaders {
select {
case <- d . cancelCh :
return errCancelHeaderProcessing
case <- time . After ( time . Second ) :
}
}
// Otherwise insert the headers for content retrieval
inserts := d . queue . Schedule ( chunk , origin )
if len ( inserts ) != len ( chunk ) {
2017-05-01 11:09:48 +00:00
log . Debug ( "Stale headers" )
2016-07-03 19:44:31 +00:00
return errBadPeer
}
}
headers = headers [ limit : ]
origin += uint64 ( limit )
}
// Signal the content downloaders of the availablility of new tasks
2017-08-04 16:14:17 +00:00
for _ , ch := range [ ] chan bool { d . bodyWakeCh , d . receiptWakeCh } {
2016-07-03 19:44:31 +00:00
select {
case ch <- true :
default :
}
}
}
}
}
2017-08-04 16:14:17 +00:00
// processFullSyncContent takes fetch results from the queue and imports them into the chain.
func ( d * Downloader ) processFullSyncContent ( ) error {
2016-06-20 14:47:10 +00:00
for {
results := d . queue . WaitResults ( )
if len ( results ) == 0 {
2017-08-04 16:14:17 +00:00
return nil
2016-06-20 14:47:10 +00:00
}
if d . chainInsertHook != nil {
d . chainInsertHook ( results )
}
2017-08-04 16:14:17 +00:00
if err := d . importBlockResults ( results ) ; err != nil {
return err
}
}
}
func ( d * Downloader ) importBlockResults ( results [ ] * fetchResult ) error {
for len ( results ) != 0 {
// Check for any termination requests. This makes clean shutdown faster.
select {
case <- d . quitCh :
return errCancelContentProcessing
default :
}
// Retrieve the a batch of results to import
items := int ( math . Min ( float64 ( len ( results ) ) , float64 ( maxResultsProcess ) ) )
first , last := results [ 0 ] . Header , results [ items - 1 ] . Header
2017-05-01 11:09:48 +00:00
log . Debug ( "Inserting downloaded chain" , "items" , len ( results ) ,
"firstnum" , first . Number , "firsthash" , first . Hash ( ) ,
"lastnum" , last . Number , "lasthash" , last . Hash ( ) ,
)
2017-08-04 16:14:17 +00:00
blocks := make ( [ ] * types . Block , items )
for i , result := range results [ : items ] {
blocks [ i ] = types . NewBlockWithHeader ( result . Header ) . WithBody ( result . Transactions , result . Uncles )
}
if index , err := d . blockchain . InsertChain ( blocks ) ; err != nil {
log . Debug ( "Downloaded item processing failed" , "number" , results [ index ] . Header . Number , "hash" , results [ index ] . Header . Hash ( ) , "err" , err )
return errInvalidChain
}
// Shift the results to the next batch
results = results [ items : ]
}
return nil
}
// processFastSyncContent takes fetch results from the queue and writes them to the
// database. It also controls the synchronisation of state nodes of the pivot block.
func ( d * Downloader ) processFastSyncContent ( latest * types . Header ) error {
// Start syncing state of the reported head block.
// This should get us most of the state of the pivot block.
stateSync := d . syncState ( latest . Root )
defer stateSync . Cancel ( )
go func ( ) {
if err := stateSync . Wait ( ) ; err != nil {
d . queue . Close ( ) // wake up WaitResults
}
} ( )
pivot := d . queue . FastSyncPivot ( )
for {
results := d . queue . WaitResults ( )
if len ( results ) == 0 {
return stateSync . Cancel ( )
}
if d . chainInsertHook != nil {
d . chainInsertHook ( results )
}
P , beforeP , afterP := splitAroundPivot ( pivot , results )
if err := d . commitFastSyncData ( beforeP , stateSync ) ; err != nil {
return err
}
if P != nil {
stateSync . Cancel ( )
if err := d . commitPivotBlock ( P ) ; err != nil {
return err
2016-06-20 14:47:10 +00:00
}
2017-08-04 16:14:17 +00:00
}
if err := d . importBlockResults ( afterP ) ; err != nil {
return err
}
}
}
func splitAroundPivot ( pivot uint64 , results [ ] * fetchResult ) ( p * fetchResult , before , after [ ] * fetchResult ) {
for _ , result := range results {
num := result . Header . Number . Uint64 ( )
switch {
case num < pivot :
before = append ( before , result )
case num == pivot :
p = result
default :
after = append ( after , result )
}
}
return p , before , after
}
func ( d * Downloader ) commitFastSyncData ( results [ ] * fetchResult , stateSync * stateSync ) error {
for len ( results ) != 0 {
// Check for any termination requests.
select {
case <- d . quitCh :
return errCancelContentProcessing
case <- stateSync . done :
if err := stateSync . Wait ( ) ; err != nil {
return err
2016-06-20 14:47:10 +00:00
}
2017-08-04 16:14:17 +00:00
default :
}
// Retrieve the a batch of results to import
items := int ( math . Min ( float64 ( len ( results ) ) , float64 ( maxResultsProcess ) ) )
first , last := results [ 0 ] . Header , results [ items - 1 ] . Header
log . Debug ( "Inserting fast-sync blocks" , "items" , len ( results ) ,
"firstnum" , first . Number , "firsthash" , first . Hash ( ) ,
"lastnumn" , last . Number , "lasthash" , last . Hash ( ) ,
)
blocks := make ( [ ] * types . Block , items )
receipts := make ( [ ] types . Receipts , items )
for i , result := range results [ : items ] {
blocks [ i ] = types . NewBlockWithHeader ( result . Header ) . WithBody ( result . Transactions , result . Uncles )
receipts [ i ] = result . Receipts
}
if index , err := d . blockchain . InsertReceiptChain ( blocks , receipts ) ; err != nil {
log . Debug ( "Downloaded item processing failed" , "number" , results [ index ] . Header . Number , "hash" , results [ index ] . Header . Hash ( ) , "err" , err )
return errInvalidChain
2016-06-20 14:47:10 +00:00
}
2017-08-04 16:14:17 +00:00
// Shift the results to the next batch
results = results [ items : ]
2016-06-20 14:47:10 +00:00
}
2017-08-04 16:14:17 +00:00
return nil
}
func ( d * Downloader ) commitPivotBlock ( result * fetchResult ) error {
b := types . NewBlockWithHeader ( result . Header ) . WithBody ( result . Transactions , result . Uncles )
// Sync the pivot block state. This should complete reasonably quickly because
// we've already synced up to the reported head block state earlier.
if err := d . syncState ( b . Root ( ) ) . Wait ( ) ; err != nil {
return err
}
log . Debug ( "Committing fast sync pivot as new head" , "number" , b . Number ( ) , "hash" , b . Hash ( ) )
if _ , err := d . blockchain . InsertReceiptChain ( [ ] * types . Block { b } , [ ] types . Receipts { result . Receipts } ) ; err != nil {
return err
}
return d . blockchain . FastSyncCommitHead ( b . Hash ( ) )
2016-06-20 14:47:10 +00:00
}
// DeliverHeaders injects a new batch of block headers received from a remote
// node into the download schedule.
func ( d * Downloader ) DeliverHeaders ( id string , headers [ ] * types . Header ) ( err error ) {
return d . deliver ( id , d . headerCh , & headerPack { id , headers } , headerInMeter , headerDropMeter )
}
// DeliverBodies injects a new batch of block bodies received from a remote node.
func ( d * Downloader ) DeliverBodies ( id string , transactions [ ] [ ] * types . Transaction , uncles [ ] [ ] * types . Header ) ( err error ) {
return d . deliver ( id , d . bodyCh , & bodyPack { id , transactions , uncles } , bodyInMeter , bodyDropMeter )
}
// DeliverReceipts injects a new batch of receipts received from a remote node.
func ( d * Downloader ) DeliverReceipts ( id string , receipts [ ] [ ] * types . Receipt ) ( err error ) {
return d . deliver ( id , d . receiptCh , & receiptPack { id , receipts } , receiptInMeter , receiptDropMeter )
}
// DeliverNodeData injects a new batch of node state data received from a remote node.
func ( d * Downloader ) DeliverNodeData ( id string , data [ ] [ ] byte ) ( err error ) {
return d . deliver ( id , d . stateCh , & statePack { id , data } , stateInMeter , stateDropMeter )
}
// deliver injects a new batch of data received from a remote node.
func ( d * Downloader ) deliver ( id string , destCh chan dataPack , packet dataPack , inMeter , dropMeter metrics . Meter ) ( err error ) {
// Update the delivery metrics for both good and failed deliveries
inMeter . Mark ( int64 ( packet . Items ( ) ) )
defer func ( ) {
if err != nil {
dropMeter . Mark ( int64 ( packet . Items ( ) ) )
}
} ( )
// Deliver or abort if the sync is canceled while queuing
d . cancelLock . RLock ( )
cancel := d . cancelCh
d . cancelLock . RUnlock ( )
if cancel == nil {
return errNoSyncActive
}
select {
case destCh <- packet :
return nil
case <- cancel :
return errNoSyncActive
}
}
2016-07-03 19:44:31 +00:00
// qosTuner is the quality of service tuning loop that occasionally gathers the
// peer latency statistics and updates the estimated request round trip time.
func ( d * Downloader ) qosTuner ( ) {
for {
// Retrieve the current median RTT and integrate into the previoust target RTT
rtt := time . Duration ( float64 ( 1 - qosTuningImpact ) * float64 ( atomic . LoadUint64 ( & d . rttEstimate ) ) + qosTuningImpact * float64 ( d . peers . medianRTT ( ) ) )
atomic . StoreUint64 ( & d . rttEstimate , uint64 ( rtt ) )
// A new RTT cycle passed, increase our confidence in the estimated RTT
conf := atomic . LoadUint64 ( & d . rttConfidence )
conf = conf + ( 1000000 - conf ) / 2
atomic . StoreUint64 ( & d . rttConfidence , conf )
// Log the new QoS values and sleep until the next RTT
2017-05-01 11:09:48 +00:00
log . Debug ( "Recalculated downloader QoS values" , "rtt" , rtt , "confidence" , float64 ( conf ) / 1000000.0 , "ttl" , d . requestTTL ( ) )
2016-07-03 19:44:31 +00:00
select {
case <- d . quitCh :
return
case <- time . After ( rtt ) :
}
}
}
// qosReduceConfidence is meant to be called when a new peer joins the downloader's
// peer set, needing to reduce the confidence we have in out QoS estimates.
func ( d * Downloader ) qosReduceConfidence ( ) {
// If we have a single peer, confidence is always 1
peers := uint64 ( d . peers . Len ( ) )
2017-08-04 16:14:17 +00:00
if peers == 0 {
// Ensure peer connectivity races don't catch us off guard
return
}
2016-07-03 19:44:31 +00:00
if peers == 1 {
atomic . StoreUint64 ( & d . rttConfidence , 1000000 )
return
}
// If we have a ton of peers, don't drop confidence)
if peers >= uint64 ( qosConfidenceCap ) {
return
}
// Otherwise drop the confidence factor
conf := atomic . LoadUint64 ( & d . rttConfidence ) * ( peers - 1 ) / peers
if float64 ( conf ) / 1000000 < rttMinConfidence {
conf = uint64 ( rttMinConfidence * 1000000 )
}
atomic . StoreUint64 ( & d . rttConfidence , conf )
rtt := time . Duration ( atomic . LoadUint64 ( & d . rttEstimate ) )
2017-05-01 11:09:48 +00:00
log . Debug ( "Relaxed downloader QoS values" , "rtt" , rtt , "confidence" , float64 ( conf ) / 1000000.0 , "ttl" , d . requestTTL ( ) )
2016-07-03 19:44:31 +00:00
}
// requestRTT returns the current target round trip time for a download request
// to complete in.
//
// Note, the returned RTT is .9 of the actually estimated RTT. The reason is that
// the downloader tries to adapt queries to the RTT, so multiple RTT values can
// be adapted to, but smaller ones are preffered (stabler download stream).
func ( d * Downloader ) requestRTT ( ) time . Duration {
return time . Duration ( atomic . LoadUint64 ( & d . rttEstimate ) ) * 9 / 10
}
// requestTTL returns the current timeout allowance for a single download request
// to finish under.
func ( d * Downloader ) requestTTL ( ) time . Duration {
var (
rtt = time . Duration ( atomic . LoadUint64 ( & d . rttEstimate ) )
conf = float64 ( atomic . LoadUint64 ( & d . rttConfidence ) ) / 1000000.0
)
ttl := time . Duration ( ttlScaling ) * time . Duration ( float64 ( rtt ) / conf )
if ttl > ttlLimit {
ttl = ttlLimit
}
return ttl
}