mirror of https://github.com/status-im/op-geth.git
eth, eth/downloader: moved peer selection to protocol handler
This commit is contained in:
parent
9caf880ff9
commit
b86e7526e1
|
@ -89,7 +89,7 @@ func New(hasBlock hashCheckFn, insertChain chainInsertFn, currentTd currentTdFn)
|
|||
blockCh: make(chan blockPack, 1),
|
||||
quit: make(chan struct{}),
|
||||
}
|
||||
go downloader.peerHandler()
|
||||
//go downloader.peerHandler()
|
||||
go downloader.update()
|
||||
|
||||
return downloader
|
||||
|
@ -110,7 +110,6 @@ func (d *Downloader) RegisterPeer(id string, td *big.Int, hash common.Hash, getH
|
|||
// add peer to our peer set
|
||||
d.peers[id] = peer
|
||||
// broadcast new peer
|
||||
d.newPeerCh <- peer
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -125,55 +124,6 @@ func (d *Downloader) UnregisterPeer(id string) {
|
|||
delete(d.peers, id)
|
||||
}
|
||||
|
||||
func (d *Downloader) peerHandler() {
|
||||
// itimer is used to determine when to start ignoring `minDesiredPeerCount`
|
||||
itimer := time.NewTimer(peerCountTimeout)
|
||||
out:
|
||||
for {
|
||||
select {
|
||||
case <-d.newPeerCh:
|
||||
// Meet the `minDesiredPeerCount` before we select our best peer
|
||||
if len(d.peers) < minDesiredPeerCount {
|
||||
break
|
||||
}
|
||||
itimer.Stop()
|
||||
|
||||
d.selectPeer(d.peers.bestPeer())
|
||||
case <-itimer.C:
|
||||
// The timer will make sure that the downloader keeps an active state
|
||||
// in which it attempts to always check the network for highest td peers
|
||||
// Either select the peer or restart the timer if no peers could
|
||||
// be selected.
|
||||
if peer := d.peers.bestPeer(); peer != nil {
|
||||
d.selectPeer(d.peers.bestPeer())
|
||||
} else {
|
||||
itimer.Reset(5 * time.Second)
|
||||
}
|
||||
case <-d.quit:
|
||||
break out
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (d *Downloader) selectPeer(p *peer) {
|
||||
// Make sure it's doing neither. Once done we can restart the
|
||||
// downloading process if the TD is higher. For now just get on
|
||||
// with whatever is going on. This prevents unecessary switching.
|
||||
if d.isBusy() {
|
||||
return
|
||||
}
|
||||
// selected peer must be better than our own
|
||||
// XXX we also check the peer's recent hash to make sure we
|
||||
// don't have it. Some peers report (i think) incorrect TD.
|
||||
if p.td.Cmp(d.currentTd()) <= 0 || d.hasBlock(p.recentHash) {
|
||||
return
|
||||
}
|
||||
|
||||
glog.V(logger.Detail).Infoln("New peer with highest TD =", p.td)
|
||||
d.syncCh <- syncPack{p, p.recentHash, false}
|
||||
|
||||
}
|
||||
|
||||
func (d *Downloader) update() {
|
||||
out:
|
||||
for {
|
||||
|
@ -193,6 +143,61 @@ out:
|
|||
}
|
||||
}
|
||||
|
||||
// SynchroniseWithPeer will select the peer and use it for synchronising. If an empty string is given
|
||||
// it will use the best peer possible and synchronise if it's TD is higher than our own. If any of the
|
||||
// checks fail an error will be returned. This method is synchronous
|
||||
func (d *Downloader) Synchronise(id string, hash common.Hash) (types.Blocks, error) {
|
||||
// Make sure it's doing neither. Once done we can restart the
|
||||
// downloading process if the TD is higher. For now just get on
|
||||
// with whatever is going on. This prevents unecessary switching.
|
||||
if d.isBusy() {
|
||||
return nil, errBusy
|
||||
}
|
||||
|
||||
// Fetch the peer using the id or throw an error if the peer couldn't be found
|
||||
p := d.peers[id]
|
||||
if p == nil {
|
||||
return nil, errUnknownPeer
|
||||
}
|
||||
|
||||
// Get the hash from the peer and initiate the downloading progress.
|
||||
err := d.getFromPeer(p, hash, false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return d.queue.blocks, nil
|
||||
}
|
||||
|
||||
func (d *Downloader) getFromPeer(p *peer, hash common.Hash, ignoreInitial bool) error {
|
||||
d.activePeer = p.id
|
||||
|
||||
glog.V(logger.Detail).Infoln("Synchronising with the network using:", p.id)
|
||||
// Start the fetcher. This will block the update entirely
|
||||
// interupts need to be send to the appropriate channels
|
||||
// respectively.
|
||||
if err := d.startFetchingHashes(p, hash, ignoreInitial); err != nil {
|
||||
// handle error
|
||||
glog.V(logger.Debug).Infoln("Error fetching hashes:", err)
|
||||
// XXX Reset
|
||||
return err
|
||||
}
|
||||
|
||||
// Start fetching blocks in paralel. The strategy is simple
|
||||
// take any available peers, seserve a chunk for each peer available,
|
||||
// let the peer deliver the chunkn and periodically check if a peer
|
||||
// has timedout. When done downloading, process blocks.
|
||||
if err := d.startFetchingBlocks(p); err != nil {
|
||||
glog.V(logger.Debug).Infoln("Error downloading blocks:", err)
|
||||
// XXX reset
|
||||
return err
|
||||
}
|
||||
|
||||
glog.V(logger.Detail).Infoln("Sync completed")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// XXX Make synchronous
|
||||
func (d *Downloader) startFetchingHashes(p *peer, hash common.Hash, ignoreInitial bool) error {
|
||||
atomic.StoreInt32(&d.fetchingHashes, 1)
|
||||
|
|
|
@ -1,79 +0,0 @@
|
|||
package downloader
|
||||
|
||||
import (
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/core/types"
|
||||
"github.com/ethereum/go-ethereum/logger"
|
||||
"github.com/ethereum/go-ethereum/logger/glog"
|
||||
)
|
||||
|
||||
// THIS IS PENDING AND TO DO CHANGES FOR MAKING THE DOWNLOADER SYNCHRONOUS
|
||||
|
||||
// SynchroniseWithPeer will select the peer and use it for synchronising. If an empty string is given
|
||||
// it will use the best peer possible and synchronise if it's TD is higher than our own. If any of the
|
||||
// checks fail an error will be returned. This method is synchronous
|
||||
func (d *Downloader) SynchroniseWithPeer(id string) (types.Blocks, error) {
|
||||
// Check if we're busy
|
||||
if d.isBusy() {
|
||||
return nil, errBusy
|
||||
}
|
||||
|
||||
// Attempt to select a peer. This can either be nothing, which returns, best peer
|
||||
// or selected peer. If no peer could be found an error will be returned
|
||||
var p *peer
|
||||
if len(id) == 0 {
|
||||
p = d.peers[id]
|
||||
if p == nil {
|
||||
return nil, errUnknownPeer
|
||||
}
|
||||
} else {
|
||||
p = d.peers.bestPeer()
|
||||
}
|
||||
|
||||
// Make sure our td is lower than the peer's td
|
||||
if p.td.Cmp(d.currentTd()) <= 0 || d.hasBlock(p.recentHash) {
|
||||
return nil, errLowTd
|
||||
}
|
||||
|
||||
// Get the hash from the peer and initiate the downloading progress.
|
||||
err := d.getFromPeer(p, p.recentHash, false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return d.queue.blocks, nil
|
||||
}
|
||||
|
||||
// Synchronise will synchronise using the best peer.
|
||||
func (d *Downloader) Synchronise() (types.Blocks, error) {
|
||||
return d.SynchroniseWithPeer("")
|
||||
}
|
||||
|
||||
func (d *Downloader) getFromPeer(p *peer, hash common.Hash, ignoreInitial bool) error {
|
||||
d.activePeer = p.id
|
||||
|
||||
glog.V(logger.Detail).Infoln("Synchronising with the network using:", p.id)
|
||||
// Start the fetcher. This will block the update entirely
|
||||
// interupts need to be send to the appropriate channels
|
||||
// respectively.
|
||||
if err := d.startFetchingHashes(p, hash, ignoreInitial); err != nil {
|
||||
// handle error
|
||||
glog.V(logger.Debug).Infoln("Error fetching hashes:", err)
|
||||
// XXX Reset
|
||||
return err
|
||||
}
|
||||
|
||||
// Start fetching blocks in paralel. The strategy is simple
|
||||
// take any available peers, seserve a chunk for each peer available,
|
||||
// let the peer deliver the chunkn and periodically check if a peer
|
||||
// has timedout. When done downloading, process blocks.
|
||||
if err := d.startFetchingBlocks(p); err != nil {
|
||||
glog.V(logger.Debug).Infoln("Error downloading blocks:", err)
|
||||
// XXX reset
|
||||
return err
|
||||
}
|
||||
|
||||
glog.V(logger.Detail).Infoln("Sync completed")
|
||||
|
||||
return nil
|
||||
}
|
|
@ -39,6 +39,7 @@ import (
|
|||
"math"
|
||||
"math/big"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/core"
|
||||
|
@ -51,6 +52,11 @@ import (
|
|||
"github.com/ethereum/go-ethereum/rlp"
|
||||
)
|
||||
|
||||
const (
|
||||
peerCountTimeout = 12 * time.Second // Amount of time it takes for the peer handler to ignore minDesiredPeerCount
|
||||
minDesiredPeerCount = 5 // Amount of peers desired to start syncing
|
||||
)
|
||||
|
||||
func errResp(code errCode, format string, v ...interface{}) error {
|
||||
return fmt.Errorf("%v - %v", code, fmt.Sprintf(format, v...))
|
||||
}
|
||||
|
@ -82,6 +88,9 @@ type ProtocolManager struct {
|
|||
eventMux *event.TypeMux
|
||||
txSub event.Subscription
|
||||
minedBlockSub event.Subscription
|
||||
|
||||
newPeerCh chan *peer
|
||||
quit chan struct{}
|
||||
}
|
||||
|
||||
// NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
|
||||
|
@ -93,7 +102,10 @@ func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpo
|
|||
chainman: chainman,
|
||||
downloader: downloader,
|
||||
peers: make(map[string]*peer),
|
||||
newPeerCh: make(chan *peer, 1),
|
||||
quit: make(chan struct{}),
|
||||
}
|
||||
go manager.peerHandler()
|
||||
|
||||
manager.SubProtocol = p2p.Protocol{
|
||||
Name: "eth",
|
||||
|
@ -101,16 +113,61 @@ func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpo
|
|||
Length: ProtocolLength,
|
||||
Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
|
||||
peer := manager.newPeer(protocolVersion, networkId, p, rw)
|
||||
err := manager.handle(peer)
|
||||
//glog.V(logger.Detail).Infof("[%s]: %v\n", peer.id, err)
|
||||
|
||||
return err
|
||||
manager.newPeerCh <- peer
|
||||
|
||||
return manager.handle(peer)
|
||||
},
|
||||
}
|
||||
|
||||
return manager
|
||||
}
|
||||
|
||||
func (pm *ProtocolManager) peerHandler() {
|
||||
// itimer is used to determine when to start ignoring `minDesiredPeerCount`
|
||||
itimer := time.NewTimer(peerCountTimeout)
|
||||
out:
|
||||
for {
|
||||
select {
|
||||
case <-pm.newPeerCh:
|
||||
// Meet the `minDesiredPeerCount` before we select our best peer
|
||||
if len(pm.peers) < minDesiredPeerCount {
|
||||
break
|
||||
}
|
||||
itimer.Stop()
|
||||
|
||||
// Find the best peer
|
||||
peer := getBestPeer(pm.peers)
|
||||
if peer == nil {
|
||||
glog.V(logger.Debug).Infoln("Sync attempt cancelled. No peers available")
|
||||
return
|
||||
}
|
||||
go pm.synchronise(peer)
|
||||
case <-itimer.C:
|
||||
// The timer will make sure that the downloader keeps an active state
|
||||
// in which it attempts to always check the network for highest td peers
|
||||
// Either select the peer or restart the timer if no peers could
|
||||
// be selected.
|
||||
if peer := getBestPeer(pm.peers); peer != nil {
|
||||
go pm.synchronise(peer)
|
||||
} else {
|
||||
itimer.Reset(5 * time.Second)
|
||||
}
|
||||
case <-pm.quit:
|
||||
break out
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (pm *ProtocolManager) synchronise(peer *peer) {
|
||||
// Get the hashes from the peer (synchronously)
|
||||
_, err := pm.downloader.Synchronise(peer.id, peer.recentHash)
|
||||
if err != nil {
|
||||
// handle error
|
||||
glog.V(logger.Debug).Infoln("error downloading:", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (pm *ProtocolManager) Start() {
|
||||
// broadcast transactions
|
||||
pm.txSub = pm.eventMux.Subscribe(core.TxPreEvent{})
|
||||
|
@ -141,7 +198,7 @@ func (pm *ProtocolManager) handle(p *peer) error {
|
|||
pm.peers[p.id] = p
|
||||
pm.pmu.Unlock()
|
||||
|
||||
pm.downloader.RegisterPeer(p.id, p.td, p.currentHash, p.requestHashes, p.requestBlocks)
|
||||
pm.downloader.RegisterPeer(p.id, p.td, p.recentHash, p.requestHashes, p.requestBlocks)
|
||||
defer func() {
|
||||
pm.pmu.Lock()
|
||||
defer pm.pmu.Unlock()
|
||||
|
|
22
eth/peer.go
22
eth/peer.go
|
@ -25,6 +25,16 @@ type getBlockHashesMsgData struct {
|
|||
Amount uint64
|
||||
}
|
||||
|
||||
func getBestPeer(peers map[string]*peer) *peer {
|
||||
var peer *peer
|
||||
for _, cp := range peers {
|
||||
if peer == nil || cp.td.Cmp(peer.td) > 0 {
|
||||
peer = cp
|
||||
}
|
||||
}
|
||||
return peer
|
||||
}
|
||||
|
||||
type peer struct {
|
||||
*p2p.Peer
|
||||
|
||||
|
@ -32,9 +42,9 @@ type peer struct {
|
|||
|
||||
protv, netid int
|
||||
|
||||
currentHash common.Hash
|
||||
id string
|
||||
td *big.Int
|
||||
recentHash common.Hash
|
||||
id string
|
||||
td *big.Int
|
||||
|
||||
genesis, ourHash common.Hash
|
||||
ourTd *big.Int
|
||||
|
@ -43,14 +53,14 @@ type peer struct {
|
|||
blockHashes *set.Set
|
||||
}
|
||||
|
||||
func newPeer(protv, netid int, genesis, currentHash common.Hash, td *big.Int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
|
||||
func newPeer(protv, netid int, genesis, recentHash common.Hash, td *big.Int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
|
||||
id := p.ID()
|
||||
|
||||
return &peer{
|
||||
Peer: p,
|
||||
rw: rw,
|
||||
genesis: genesis,
|
||||
ourHash: currentHash,
|
||||
ourHash: recentHash,
|
||||
ourTd: td,
|
||||
protv: protv,
|
||||
netid: netid,
|
||||
|
@ -145,7 +155,7 @@ func (p *peer) handleStatus() error {
|
|||
// Set the total difficulty of the peer
|
||||
p.td = status.TD
|
||||
// set the best hash of the peer
|
||||
p.currentHash = status.CurrentBlock
|
||||
p.recentHash = status.CurrentBlock
|
||||
|
||||
return <-errc
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue