2017-08-04 23:14:17 +07:00
// Copyright 2017 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package downloader
import (
"fmt"
"hash"
"sync"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/crypto/sha3"
2017-10-10 11:38:49 +02:00
"github.com/ethereum/go-ethereum/ethdb"
2017-08-04 23:14:17 +07:00
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/trie"
)
// stateReq represents a batch of state fetch requests groupped together into
// a single data retrieval network packet.
type stateReq struct {
items [ ] common . Hash // Hashes of the state items to download
tasks map [ common . Hash ] * stateTask // Download tasks to track previous attempts
timeout time . Duration // Maximum round trip time for this to complete
timer * time . Timer // Timer to fire when the RTT timeout expires
peer * peerConnection // Peer that we're requesting from
response [ ] [ ] byte // Response data of the peer (nil for timeouts)
2017-10-10 11:38:49 +02:00
dropped bool // Flag whether the peer dropped off early
2017-08-04 23:14:17 +07:00
}
// timedOut returns if this request timed out.
func ( req * stateReq ) timedOut ( ) bool {
return req . response == nil
}
// stateSyncStats is a collection of progress stats to report during a state trie
// sync to RPC requests as well as to display in user logs.
type stateSyncStats struct {
processed uint64 // Number of state entries processed
duplicate uint64 // Number of state entries downloaded twice
unexpected uint64 // Number of non-requested state entries received
pending uint64 // Number of still pending state entries
}
// syncState starts downloading state with the given root hash.
func ( d * Downloader ) syncState ( root common . Hash ) * stateSync {
s := newStateSync ( d , root )
select {
case d . stateSyncStart <- s :
case <- d . quitCh :
s . err = errCancelStateFetch
close ( s . done )
}
return s
}
// stateFetcher manages the active state sync and accepts requests
// on its behalf.
func ( d * Downloader ) stateFetcher ( ) {
for {
select {
case s := <- d . stateSyncStart :
for next := s ; next != nil ; {
next = d . runStateSync ( next )
}
case <- d . stateCh :
// Ignore state responses while no sync is running.
case <- d . quitCh :
return
}
}
}
// runStateSync runs a state synchronisation until it completes or another root
// hash is requested to be switched over to.
func ( d * Downloader ) runStateSync ( s * stateSync ) * stateSync {
var (
active = make ( map [ string ] * stateReq ) // Currently in-flight requests
finished [ ] * stateReq // Completed or failed requests
timeout = make ( chan * stateReq ) // Timed out active requests
)
defer func ( ) {
// Cancel active request timers on exit. Also set peers to idle so they're
// available for the next sync.
for _ , req := range active {
req . timer . Stop ( )
req . peer . SetNodeDataIdle ( len ( req . items ) )
}
} ( )
// Run the state sync.
go s . run ( )
defer s . Cancel ( )
2017-10-10 11:38:49 +02:00
// Listen for peer departure events to cancel assigned tasks
peerDrop := make ( chan * peerConnection , 1024 )
peerSub := s . d . peers . SubscribePeerDrops ( peerDrop )
defer peerSub . Unsubscribe ( )
2017-08-04 23:14:17 +07:00
for {
// Enable sending of the first buffered element if there is one.
var (
deliverReq * stateReq
deliverReqCh chan * stateReq
)
if len ( finished ) > 0 {
deliverReq = finished [ 0 ]
deliverReqCh = s . deliver
}
select {
// The stateSync lifecycle:
case next := <- d . stateSyncStart :
return next
case <- s . done :
return nil
// Send the next finished request to the current sync:
case deliverReqCh <- deliverReq :
2018-02-27 11:39:30 +01:00
// Shift out the first request, but also set the emptied slot to nil for GC
copy ( finished , finished [ 1 : ] )
finished [ len ( finished ) - 1 ] = nil
finished = finished [ : len ( finished ) - 1 ]
2017-08-04 23:14:17 +07:00
// Handle incoming state packs:
case pack := <- d . stateCh :
// Discard any data not requested (or previsouly timed out)
req := active [ pack . PeerId ( ) ]
if req == nil {
log . Debug ( "Unrequested node data" , "peer" , pack . PeerId ( ) , "len" , pack . Items ( ) )
continue
}
// Finalize the request and queue up for processing
req . timer . Stop ( )
req . response = pack . ( * statePack ) . states
finished = append ( finished , req )
delete ( active , pack . PeerId ( ) )
2017-10-10 11:38:49 +02:00
// Handle dropped peer connections:
case p := <- peerDrop :
// Skip if no request is currently pending
req := active [ p . id ]
if req == nil {
continue
}
// Finalize the request and queue up for processing
req . timer . Stop ( )
req . dropped = true
finished = append ( finished , req )
delete ( active , p . id )
2017-08-04 23:14:17 +07:00
// Handle timed-out requests:
case req := <- timeout :
// If the peer is already requesting something else, ignore the stale timeout.
// This can happen when the timeout and the delivery happens simultaneously,
// causing both pathways to trigger.
if active [ req . peer . id ] != req {
continue
}
// Move the timed out data back into the download queue
finished = append ( finished , req )
delete ( active , req . peer . id )
// Track outgoing state requests:
case req := <- d . trackStateReq :
// If an active request already exists for this peer, we have a problem. In
// theory the trie node schedule must never assign two requests to the same
// peer. In practive however, a peer might receive a request, disconnect and
// immediately reconnect before the previous times out. In this case the first
// request is never honored, alas we must not silently overwrite it, as that
// causes valid requests to go missing and sync to get stuck.
if old := active [ req . peer . id ] ; old != nil {
log . Warn ( "Busy peer assigned new state fetch" , "peer" , old . peer . id )
// Make sure the previous one doesn't get siletly lost
2017-10-10 11:38:49 +02:00
old . timer . Stop ( )
old . dropped = true
2017-08-04 23:14:17 +07:00
finished = append ( finished , old )
}
// Start a timer to notify the sync loop if the peer stalled.
req . timer = time . AfterFunc ( req . timeout , func ( ) {
select {
case timeout <- req :
case <- s . done :
// Prevent leaking of timer goroutines in the unlikely case where a
// timer is fired just before exiting runStateSync.
}
} )
active [ req . peer . id ] = req
}
}
}
// stateSync schedules requests for downloading a particular state trie defined
// by a given state root.
type stateSync struct {
d * Downloader // Downloader instance to access and manage current peerset
2017-10-10 11:38:49 +02:00
sched * trie . TrieSync // State trie sync scheduler defining the tasks
2017-08-04 23:14:17 +07:00
keccak hash . Hash // Keccak256 hasher to verify deliveries with
tasks map [ common . Hash ] * stateTask // Set of tasks currently queued for retrieval
2017-10-10 11:38:49 +02:00
numUncommitted int
bytesUncommitted int
2017-08-04 23:14:17 +07:00
deliver chan * stateReq // Delivery channel multiplexing peer responses
cancel chan struct { } // Channel to signal a termination request
cancelOnce sync . Once // Ensures cancel only ever gets called once
done chan struct { } // Channel to signal termination completion
err error // Any error hit during sync (set before completion)
}
// stateTask represents a single trie node download taks, containing a set of
// peers already attempted retrieval from to detect stalled syncs and abort.
type stateTask struct {
attempts map [ string ] struct { }
}
// newStateSync creates a new state trie download scheduler. This method does not
// yet start the sync. The user needs to call run to initiate.
func newStateSync ( d * Downloader , root common . Hash ) * stateSync {
return & stateSync {
d : d ,
sched : state . NewStateSync ( root , d . stateDB ) ,
keccak : sha3 . NewKeccak256 ( ) ,
tasks : make ( map [ common . Hash ] * stateTask ) ,
deliver : make ( chan * stateReq ) ,
cancel : make ( chan struct { } ) ,
done : make ( chan struct { } ) ,
}
}
// run starts the task assignment and response processing loop, blocking until
// it finishes, and finally notifying any goroutines waiting for the loop to
// finish.
func ( s * stateSync ) run ( ) {
s . err = s . loop ( )
close ( s . done )
}
// Wait blocks until the sync is done or canceled.
func ( s * stateSync ) Wait ( ) error {
<- s . done
return s . err
}
// Cancel cancels the sync and waits until it has shut down.
func ( s * stateSync ) Cancel ( ) error {
s . cancelOnce . Do ( func ( ) { close ( s . cancel ) } )
return s . Wait ( )
}
// loop is the main event loop of a state trie sync. It it responsible for the
// assignment of new tasks to peers (including sending it to them) as well as
// for the processing of inbound data. Note, that the loop does not directly
// receive data from peers, rather those are buffered up in the downloader and
// pushed here async. The reason is to decouple processing from data receipt
// and timeouts.
func ( s * stateSync ) loop ( ) error {
// Listen for new peer events to assign tasks to them
newPeer := make ( chan * peerConnection , 1024 )
peerSub := s . d . peers . SubscribeNewPeers ( newPeer )
defer peerSub . Unsubscribe ( )
// Keep assigning new tasks until the sync completes or aborts
for s . sched . Pending ( ) > 0 {
2017-10-10 11:38:49 +02:00
if err := s . commit ( false ) ; err != nil {
2017-08-04 23:14:17 +07:00
return err
}
2017-10-10 11:38:49 +02:00
s . assignTasks ( )
2017-08-04 23:14:17 +07:00
// Tasks assigned, wait for something to happen
select {
case <- newPeer :
// New peer arrived, try to assign it download tasks
case <- s . cancel :
return errCancelStateFetch
2018-02-27 11:39:30 +01:00
case <- s . d . cancelCh :
return errCancelStateFetch
2017-08-04 23:14:17 +07:00
case req := <- s . deliver :
2017-10-10 11:38:49 +02:00
// Response, disconnect or timeout triggered, drop the peer if stalling
log . Trace ( "Received node data response" , "peer" , req . peer . id , "count" , len ( req . response ) , "dropped" , req . dropped , "timeout" , ! req . dropped && req . timedOut ( ) )
if len ( req . items ) <= 2 && ! req . dropped && req . timedOut ( ) {
2017-08-04 23:14:17 +07:00
// 2 items are the minimum requested, if even that times out, we've no use of
// this peer at the moment.
log . Warn ( "Stalling state sync, dropping peer" , "peer" , req . peer . id )
s . d . dropPeer ( req . peer . id )
}
// Process all the received blobs and check for stale delivery
2018-02-27 11:39:30 +01:00
if err := s . process ( req ) ; err != nil {
2017-08-04 23:14:17 +07:00
log . Warn ( "Node data write error" , "err" , err )
return err
}
2018-02-27 11:39:30 +01:00
req . peer . SetNodeDataIdle ( len ( req . response ) )
2017-08-04 23:14:17 +07:00
}
}
2017-10-10 11:38:49 +02:00
return s . commit ( true )
}
func ( s * stateSync ) commit ( force bool ) error {
if ! force && s . bytesUncommitted < ethdb . IdealBatchSize {
return nil
}
start := time . Now ( )
b := s . d . stateDB . NewBatch ( )
s . sched . Commit ( b )
if err := b . Write ( ) ; err != nil {
return fmt . Errorf ( "DB write error: %v" , err )
}
s . updateStats ( s . numUncommitted , 0 , 0 , time . Since ( start ) )
s . numUncommitted = 0
s . bytesUncommitted = 0
2017-08-04 23:14:17 +07:00
return nil
}
// assignTasks attempts to assing new tasks to all idle peers, either from the
// batch currently being retried, or fetching new data from the trie sync itself.
2017-10-10 11:38:49 +02:00
func ( s * stateSync ) assignTasks ( ) {
2017-08-04 23:14:17 +07:00
// Iterate over all idle peers and try to assign them state fetches
peers , _ := s . d . peers . NodeDataIdlePeers ( )
for _ , p := range peers {
// Assign a batch of fetches proportional to the estimated latency/bandwidth
cap := p . NodeDataCapacity ( s . d . requestRTT ( ) )
req := & stateReq { peer : p , timeout : s . d . requestTTL ( ) }
s . fillTasks ( cap , req )
// If the peer was assigned tasks to fetch, send the network request
if len ( req . items ) > 0 {
req . peer . log . Trace ( "Requesting new batch of data" , "type" , "state" , "count" , len ( req . items ) )
select {
case s . d . trackStateReq <- req :
req . peer . FetchNodeData ( req . items )
case <- s . cancel :
2018-02-27 11:39:30 +01:00
case <- s . d . cancelCh :
2017-08-04 23:14:17 +07:00
}
}
}
}
// fillTasks fills the given request object with a maximum of n state download
// tasks to send to the remote peer.
func ( s * stateSync ) fillTasks ( n int , req * stateReq ) {
// Refill available tasks from the scheduler.
if len ( s . tasks ) < n {
new := s . sched . Missing ( n - len ( s . tasks ) )
for _ , hash := range new {
s . tasks [ hash ] = & stateTask { make ( map [ string ] struct { } ) }
}
}
// Find tasks that haven't been tried with the request's peer.
req . items = make ( [ ] common . Hash , 0 , n )
req . tasks = make ( map [ common . Hash ] * stateTask , n )
for hash , t := range s . tasks {
// Stop when we've gathered enough requests
if len ( req . items ) == n {
break
}
// Skip any requests we've already tried from this peer
if _ , ok := t . attempts [ req . peer . id ] ; ok {
continue
}
// Assign the request to this peer
t . attempts [ req . peer . id ] = struct { } { }
req . items = append ( req . items , hash )
req . tasks [ hash ] = t
delete ( s . tasks , hash )
}
}
// process iterates over a batch of delivered state data, injecting each item
// into a running state sync, re-queuing any items that were requested but not
// delivered.
2018-02-27 11:39:30 +01:00
func ( s * stateSync ) process ( req * stateReq ) error {
2017-08-04 23:14:17 +07:00
// Collect processing stats and update progress if valid data was received
2017-10-10 11:38:49 +02:00
duplicate , unexpected := 0 , 0
2017-08-04 23:14:17 +07:00
defer func ( start time . Time ) {
2017-10-10 11:38:49 +02:00
if duplicate > 0 || unexpected > 0 {
s . updateStats ( 0 , duplicate , unexpected , time . Since ( start ) )
2017-08-04 23:14:17 +07:00
}
} ( time . Now ( ) )
// Iterate over all the delivered data and inject one-by-one into the trie
2018-02-27 11:39:30 +01:00
progress := false
2017-08-04 23:14:17 +07:00
for _ , blob := range req . response {
prog , hash , err := s . processNodeData ( blob )
switch err {
case nil :
2017-10-10 11:38:49 +02:00
s . numUncommitted ++
s . bytesUncommitted += len ( blob )
progress = progress || prog
2017-08-04 23:14:17 +07:00
case trie . ErrNotRequested :
unexpected ++
case trie . ErrAlreadyProcessed :
duplicate ++
default :
2018-02-27 11:39:30 +01:00
return fmt . Errorf ( "invalid state node %s: %v" , hash . TerminalString ( ) , err )
2017-08-04 23:14:17 +07:00
}
if _ , ok := req . tasks [ hash ] ; ok {
delete ( req . tasks , hash )
}
}
// Put unfulfilled tasks back into the retry queue
npeers := s . d . peers . Len ( )
for hash , task := range req . tasks {
// If the node did deliver something, missing items may be due to a protocol
// limit or a previous timeout + delayed delivery. Both cases should permit
// the node to retry the missing items (to avoid single-peer stalls).
if len ( req . response ) > 0 || req . timedOut ( ) {
delete ( task . attempts , req . peer . id )
}
// If we've requested the node too many times already, it may be a malicious
// sync where nobody has the right data. Abort.
if len ( task . attempts ) >= npeers {
2018-02-27 11:39:30 +01:00
return fmt . Errorf ( "state node %s failed with all peers (%d tries, %d peers)" , hash . TerminalString ( ) , len ( task . attempts ) , npeers )
2017-08-04 23:14:17 +07:00
}
// Missing item, place into the retry queue.
s . tasks [ hash ] = task
}
2018-02-27 11:39:30 +01:00
return nil
2017-08-04 23:14:17 +07:00
}
// processNodeData tries to inject a trie node data blob delivered from a remote
// peer into the state trie, returning whether anything useful was written or any
// error occurred.
func ( s * stateSync ) processNodeData ( blob [ ] byte ) ( bool , common . Hash , error ) {
res := trie . SyncResult { Data : blob }
s . keccak . Reset ( )
s . keccak . Write ( blob )
s . keccak . Sum ( res . Hash [ : 0 ] )
committed , _ , err := s . sched . Process ( [ ] trie . SyncResult { res } )
return committed , res . Hash , err
}
// updateStats bumps the various state sync progress counters and displays a log
// message for the user to see.
2017-10-10 11:38:49 +02:00
func ( s * stateSync ) updateStats ( written , duplicate , unexpected int , duration time . Duration ) {
2017-08-04 23:14:17 +07:00
s . d . syncStatsLock . Lock ( )
defer s . d . syncStatsLock . Unlock ( )
s . d . syncStatsState . pending = uint64 ( s . sched . Pending ( ) )
2017-10-10 11:38:49 +02:00
s . d . syncStatsState . processed += uint64 ( written )
2017-08-04 23:14:17 +07:00
s . d . syncStatsState . duplicate += uint64 ( duplicate )
s . d . syncStatsState . unexpected += uint64 ( unexpected )
2017-10-10 11:38:49 +02:00
if written > 0 || duplicate > 0 || unexpected > 0 {
log . Info ( "Imported new state entries" , "count" , written , "elapsed" , common . PrettyDuration ( duration ) , "processed" , s . d . syncStatsState . processed , "pending" , s . d . syncStatsState . pending , "retry" , len ( s . tasks ) , "duplicate" , s . d . syncStatsState . duplicate , "unexpected" , s . d . syncStatsState . unexpected )
}
2017-08-04 23:14:17 +07:00
}