eth: split and handle explicitly vs. download requested blocks

This commit is contained in:
Péter Szilágyi 2015-06-08 20:38:39 +03:00
parent fdccce781e
commit 9ed166c196
2 changed files with 81 additions and 23 deletions

View File

@ -18,15 +18,6 @@ import (
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
) )
const (
forceSyncCycle = 10 * time.Second // Time interval to force syncs, even if few peers are available
blockProcCycle = 500 * time.Millisecond // Time interval to check for new blocks to process
notifyCheckCycle = 100 * time.Millisecond // Time interval to allow hash notifies to fulfill before hard fetching
notifyArriveTimeout = 500 * time.Millisecond // Time allowance before an announced block is explicitly requested
minDesiredPeerCount = 5 // Amount of peers desired to start syncing
blockProcAmount = 256
)
func errResp(code errCode, format string, v ...interface{}) error { func errResp(code errCode, format string, v ...interface{}) error {
return fmt.Errorf("%v - %v", code, fmt.Sprintf(format, v...)) return fmt.Errorf("%v - %v", code, fmt.Sprintf(format, v...))
} }
@ -57,9 +48,11 @@ type ProtocolManager struct {
txSub event.Subscription txSub event.Subscription
minedBlockSub event.Subscription minedBlockSub event.Subscription
newPeerCh chan *peer newPeerCh chan *peer
newHashCh chan []*blockAnnounce newHashCh chan []*blockAnnounce
quitSync chan struct{} newBlockCh chan chan []*types.Block
quitSync chan struct{}
// wait group is used for graceful shutdowns during downloading // wait group is used for graceful shutdowns during downloading
// and processing // and processing
wg sync.WaitGroup wg sync.WaitGroup
@ -77,6 +70,7 @@ func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpo
peers: newPeerSet(), peers: newPeerSet(),
newPeerCh: make(chan *peer, 1), newPeerCh: make(chan *peer, 1),
newHashCh: make(chan []*blockAnnounce, 1), newHashCh: make(chan []*blockAnnounce, 1),
newBlockCh: make(chan chan []*types.Block),
quitSync: make(chan struct{}), quitSync: make(chan struct{}),
} }
@ -274,21 +268,26 @@ func (self *ProtocolManager) handleMsg(p *peer) error {
return p.sendBlocks(blocks) return p.sendBlocks(blocks)
case BlocksMsg: case BlocksMsg:
var blocks []*types.Block // Decode the arrived block message
msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size)) msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
var blocks []*types.Block
if err := msgStream.Decode(&blocks); err != nil { if err := msgStream.Decode(&blocks); err != nil {
glog.V(logger.Detail).Infoln("Decode error", err) glog.V(logger.Detail).Infoln("Decode error", err)
blocks = nil blocks = nil
} }
// Filter out any explicitly requested blocks (cascading select to get blocking back to peer)
// Either deliver to the downloader or the importer filter := make(chan []*types.Block)
if self.downloader.Synchronising() { select {
self.downloader.DeliverBlocks(p.id, blocks) case <-self.quitSync:
} else { case self.newBlockCh <- filter:
for _, block := range blocks { select {
if err := self.importBlock(p, block, nil); err != nil { case <-self.quitSync:
return err case filter <- blocks:
select {
case <-self.quitSync:
case blocks := <-filter:
self.downloader.DeliverBlocks(p.id, blocks)
} }
} }
} }
@ -322,7 +321,10 @@ func (self *ProtocolManager) handleMsg(p *peer) error {
} }
} }
if len(announces) > 0 { if len(announces) > 0 {
self.newHashCh <- announces select {
case self.newHashCh <- announces:
case <-self.quitSync:
}
} }
case NewBlockMsg: case NewBlockMsg:

View File

@ -12,6 +12,16 @@ import (
"github.com/ethereum/go-ethereum/logger/glog" "github.com/ethereum/go-ethereum/logger/glog"
) )
const (
forceSyncCycle = 10 * time.Second // Time interval to force syncs, even if few peers are available
blockProcCycle = 500 * time.Millisecond // Time interval to check for new blocks to process
notifyCheckCycle = 100 * time.Millisecond // Time interval to allow hash notifies to fulfill before hard fetching
notifyArriveTimeout = 500 * time.Millisecond // Time allowance before an announced block is explicitly requested
notifyFetchTimeout = 5 * time.Second // Maximum alloted time to return an explicitly requested block
minDesiredPeerCount = 5 // Amount of peers desired to start syncing
blockProcAmount = 256
)
// blockAnnounce is the hash notification of the availability of a new block in // blockAnnounce is the hash notification of the availability of a new block in
// the network. // the network.
type blockAnnounce struct { type blockAnnounce struct {
@ -25,6 +35,7 @@ type blockAnnounce struct {
func (pm *ProtocolManager) fetcher() { func (pm *ProtocolManager) fetcher() {
announces := make(map[common.Hash]*blockAnnounce) announces := make(map[common.Hash]*blockAnnounce)
request := make(map[*peer][]common.Hash) request := make(map[*peer][]common.Hash)
pending := make(map[common.Hash]*blockAnnounce)
cycle := time.Tick(notifyCheckCycle) cycle := time.Tick(notifyCheckCycle)
// Iterate the block fetching until a quit is requested // Iterate the block fetching until a quit is requested
@ -38,11 +49,18 @@ func (pm *ProtocolManager) fetcher() {
} }
case <-cycle: case <-cycle:
// Clean up any expired block fetches
for hash, announce := range pending {
if time.Since(announce.time) > notifyFetchTimeout {
delete(pending, hash)
}
}
// Check if any notified blocks failed to arrive // Check if any notified blocks failed to arrive
for hash, announce := range announces { for hash, announce := range announces {
if time.Since(announce.time) > notifyArriveTimeout { if time.Since(announce.time) > notifyArriveTimeout {
if !pm.chainman.HasBlock(hash) { if !pm.chainman.HasBlock(hash) {
request[announce.peer] = append(request[announce.peer], hash) request[announce.peer] = append(request[announce.peer], hash)
pending[hash] = announce
} }
delete(announces, hash) delete(announces, hash)
} }
@ -57,6 +75,44 @@ func (pm *ProtocolManager) fetcher() {
} }
request = make(map[*peer][]common.Hash) request = make(map[*peer][]common.Hash)
case filter := <-pm.newBlockCh:
// Blocks arrived, extract any explicit requests, return all else
var blocks types.Blocks
select {
case blocks = <-filter:
case <-pm.quitSync:
return
}
fetch, sync := []*types.Block{}, []*types.Block{}
for _, block := range blocks {
hash := block.Hash()
if _, ok := pending[hash]; ok {
fetch = append(fetch, block)
} else {
sync = append(sync, block)
}
}
select {
case filter <- sync:
case <-pm.quitSync:
return
}
// If any explicit fetches were replied to, import them
if len(fetch) > 0 {
go func() {
for _, block := range fetch {
if announce := pending[block.Hash()]; announce != nil {
if err := pm.importBlock(announce.peer, block, nil); err != nil {
glog.V(logger.Detail).Infof("Failed to import explicitly fetched block: %v", err)
return
}
}
}
}()
}
case <-pm.quitSync: case <-pm.quitSync:
return return
} }