les: improved header fetcher and server statistics

This commit is contained in:
Zsolt Felfoldi 2016-11-30 06:02:08 +01:00
parent e67500aa15
commit af8a742d00
10 changed files with 820 additions and 489 deletions

View File

@ -23,136 +23,364 @@ import (
"time" "time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/light"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog"
) )
const (
blockDelayTimeout = time.Second * 10 // timeout for a peer to announce a head that has already been confirmed by others
maxNodeCount = 20 // maximum number of fetcherTreeNode entries remembered for each peer
)
// lightFetcher
type lightFetcher struct { type lightFetcher struct {
pm *ProtocolManager pm *ProtocolManager
odr *LesOdr odr *LesOdr
chain BlockChain chain *light.LightChain
headAnnouncedMu sync.Mutex maxConfirmedTd *big.Int
headAnnouncedBy map[common.Hash][]*peer peers map[*peer]*fetcherPeerInfo
currentTd *big.Int lastUpdateStats *updateStatsEntry
lock sync.Mutex // qwerqwerqwe
deliverChn chan fetchResponse deliverChn chan fetchResponse
reqMu sync.RWMutex reqMu sync.RWMutex
requested map[uint64]fetchRequest requested map[uint64]fetchRequest
timeoutChn chan uint64 timeoutChn chan uint64
notifyChn chan bool // true if initiated from outside requestChn chan bool // true if initiated from outside
syncing bool syncing bool
syncDone chan struct{} syncDone chan *peer
} }
// fetcherPeerInfo holds fetcher-specific information about each active peer
type fetcherPeerInfo struct {
root, lastAnnounced *fetcherTreeNode
nodeCnt int
confirmedTd *big.Int
bestConfirmed *fetcherTreeNode
nodeByHash map[common.Hash]*fetcherTreeNode
firstUpdateStats *updateStatsEntry
}
// fetcherTreeNode is a node of a tree that holds information about blocks recently
// announced and confirmed by a certain peer. Each new announce message from a peer
// adds nodes to the tree, based on the previous announced head and the reorg depth.
// There are three possible states for a tree node:
// - announced: not downloaded (known) yet, but we know its head, number and td
// - intermediate: not known, hash and td are empty, they are filled out when it becomes known
// - known: both announced by this peer and downloaded (from any peer).
// This structure makes it possible to always know which peer has a certain block,
// which is necessary for selecting a suitable peer for ODR requests and also for
// canonizing new heads. It also helps to always download the minimum necessary
// amount of headers with a single request.
type fetcherTreeNode struct {
hash common.Hash
number uint64
td *big.Int
known, requested bool
parent *fetcherTreeNode
children []*fetcherTreeNode
}
// fetchRequest represents a header download request
type fetchRequest struct { type fetchRequest struct {
hash common.Hash hash common.Hash
amount uint64 amount uint64
peer *peer peer *peer
sent mclock.AbsTime
timeout bool
} }
// fetchResponse represents a header download response
type fetchResponse struct { type fetchResponse struct {
reqID uint64 reqID uint64
headers []*types.Header headers []*types.Header
peer *peer peer *peer
} }
// newLightFetcher creates a new light fetcher
func newLightFetcher(pm *ProtocolManager) *lightFetcher { func newLightFetcher(pm *ProtocolManager) *lightFetcher {
f := &lightFetcher{ f := &lightFetcher{
pm: pm, pm: pm,
chain: pm.blockchain, chain: pm.blockchain.(*light.LightChain),
odr: pm.odr, odr: pm.odr,
headAnnouncedBy: make(map[common.Hash][]*peer), peers: make(map[*peer]*fetcherPeerInfo),
deliverChn: make(chan fetchResponse, 100), deliverChn: make(chan fetchResponse, 100),
requested: make(map[uint64]fetchRequest), requested: make(map[uint64]fetchRequest),
timeoutChn: make(chan uint64), timeoutChn: make(chan uint64),
notifyChn: make(chan bool, 100), requestChn: make(chan bool, 100),
syncDone: make(chan struct{}), syncDone: make(chan *peer),
currentTd: big.NewInt(0), maxConfirmedTd: big.NewInt(0),
} }
go f.syncLoop() go f.syncLoop()
return f return f
} }
func (f *lightFetcher) notify(p *peer, head *announceData) { // syncLoop is the main event loop of the light fetcher
var headHash common.Hash func (f *lightFetcher) syncLoop() {
if head == nil { f.pm.wg.Add(1)
// initial notify defer f.pm.wg.Done()
headHash = p.Head()
} else { requestStarted := false
if core.GetTd(f.pm.chainDb, head.Hash, head.Number) != nil { for {
head.haveHeaders = head.Number select {
case <-f.pm.quitSync:
return
// when a new announce is received, request loop keeps running until
// no further requests are necessary or possible
case newAnnounce := <-f.requestChn:
f.lock.Lock()
s := requestStarted
requestStarted = false
if !f.syncing && !(newAnnounce && s) {
if peer, node, amount := f.nextRequest(); node != nil {
requestStarted = true
reqID, started := f.request(peer, node, amount)
if started {
go func() {
time.Sleep(softRequestTimeout)
f.reqMu.Lock()
req, ok := f.requested[reqID]
if ok {
req.timeout = true
f.requested[reqID] = req
} }
//fmt.Println("notify", p.id, head.Number, head.ReorgDepth, head.haveHeaders) f.reqMu.Unlock()
if !p.addNotify(head) { // keep starting new requests while possible
//fmt.Println("addNotify fail") f.requestChn <- false
f.pm.removePeer(p.id) }()
}
}
}
f.lock.Unlock()
case reqID := <-f.timeoutChn:
f.reqMu.Lock()
req, ok := f.requested[reqID]
if ok {
delete(f.requested, reqID)
}
f.reqMu.Unlock()
if ok {
f.pm.serverPool.adjustResponseTime(req.peer.poolEntry, time.Duration(mclock.Now()-req.sent), true)
glog.V(logger.Debug).Infof("hard timeout by peer %v", req.peer.id)
go f.pm.removePeer(req.peer.id)
}
case resp := <-f.deliverChn:
f.reqMu.Lock()
req, ok := f.requested[resp.reqID]
if ok && req.peer != resp.peer {
ok = false
}
if ok {
delete(f.requested, resp.reqID)
}
f.reqMu.Unlock()
if ok {
f.pm.serverPool.adjustResponseTime(req.peer.poolEntry, time.Duration(mclock.Now()-req.sent), req.timeout)
}
f.lock.Lock()
if !ok || !(f.syncing || f.processResponse(req, resp)) {
glog.V(logger.Debug).Infof("failed processing response by peer %v", resp.peer.id)
go f.pm.removePeer(resp.peer.id)
}
f.lock.Unlock()
case p := <-f.syncDone:
f.lock.Lock()
glog.V(logger.Debug).Infof("done synchronising with peer %v", p.id)
f.checkSyncedHeaders(p)
f.syncing = false
f.lock.Unlock()
} }
headHash = head.Hash
} }
f.headAnnouncedMu.Lock()
f.headAnnouncedBy[headHash] = append(f.headAnnouncedBy[headHash], p)
f.headAnnouncedMu.Unlock()
f.notifyChn <- true
} }
func (f *lightFetcher) gotHeader(header *types.Header) { // addPeer adds a new peer to the fetcher's peer set
f.headAnnouncedMu.Lock() func (f *lightFetcher) addPeer(p *peer) {
defer f.headAnnouncedMu.Unlock() f.lock.Lock()
defer f.lock.Unlock()
hash := header.Hash() f.peers[p] = &fetcherPeerInfo{nodeByHash: make(map[common.Hash]*fetcherTreeNode)}
peerList := f.headAnnouncedBy[hash] }
if peerList == nil {
// removePeer removes a new peer from the fetcher's peer set
func (f *lightFetcher) removePeer(p *peer) {
f.lock.Lock()
defer f.lock.Unlock()
// check for potential timed out block delay statistics
f.checkUpdateStats(p, nil)
delete(f.peers, p)
}
// announce processes a new announcement message received from a peer, adding new
// nodes to the peer's block tree and removing old nodes if necessary
func (f *lightFetcher) announce(p *peer, head *announceData) {
f.lock.Lock()
defer f.lock.Unlock()
glog.V(logger.Debug).Infof("received announce from peer %v #%d %016x reorg: %d", p.id, head.Number, head.Hash[:8], head.ReorgDepth)
fp := f.peers[p]
if fp == nil {
glog.V(logger.Debug).Infof("announce: unknown peer")
return return
} }
number := header.Number.Uint64()
td := core.GetTd(f.pm.chainDb, hash, number) if fp.lastAnnounced != nil && head.Td.Cmp(fp.lastAnnounced.td) <= 0 {
for _, peer := range peerList { // announced tds should be strictly monotonic
peer.lock.Lock() glog.V(logger.Debug).Infof("non-monotonic Td from peer %v", p.id)
ok := peer.gotHeader(hash, number, td) go f.pm.removePeer(p.id)
peer.lock.Unlock() return
if !ok { }
//fmt.Println("gotHeader fail")
f.pm.removePeer(peer.id) n := fp.lastAnnounced
for i := uint64(0); i < head.ReorgDepth; i++ {
if n == nil {
break
}
n = n.parent
}
if n != nil {
// n is now the reorg common ancestor, add a new branch of nodes
// check if the node count is too high to add new nodes
locked := false
for uint64(fp.nodeCnt)+head.Number-n.number > maxNodeCount && fp.root != nil {
if !locked {
f.chain.LockChain()
defer f.chain.UnlockChain()
locked = true
}
// if one of root's children is canonical, keep it, delete other branches and root itself
var newRoot *fetcherTreeNode
for i, nn := range fp.root.children {
if core.GetCanonicalHash(f.pm.chainDb, nn.number) == nn.hash {
fp.root.children = append(fp.root.children[:i], fp.root.children[i+1:]...)
nn.parent = nil
newRoot = nn
break
} }
} }
delete(f.headAnnouncedBy, hash) fp.deleteNode(fp.root)
if n == fp.root {
n = newRoot
}
fp.root = newRoot
if newRoot == nil || !f.checkKnownNode(p, newRoot) {
fp.bestConfirmed = nil
fp.confirmedTd = nil
}
if n == nil {
break
}
}
if n != nil {
for n.number < head.Number {
nn := &fetcherTreeNode{number: n.number + 1, parent: n}
n.children = append(n.children, nn)
n = nn
fp.nodeCnt++
}
n.hash = head.Hash
n.td = head.Td
fp.nodeByHash[n.hash] = n
}
}
if n == nil {
// could not find reorg common ancestor or had to delete entire tree, a new root and a resync is needed
if fp.root != nil {
fp.deleteNode(fp.root)
}
n = &fetcherTreeNode{hash: head.Hash, number: head.Number, td: head.Td}
fp.root = n
fp.nodeCnt++
fp.nodeByHash[n.hash] = n
fp.bestConfirmed = nil
fp.confirmedTd = nil
}
f.checkKnownNode(p, n)
p.lock.Lock()
p.headInfo = head
fp.lastAnnounced = n
p.lock.Unlock()
f.checkUpdateStats(p, nil)
f.requestChn <- true
} }
func (f *lightFetcher) nextRequest() (*peer, *announceData) { // peerHasBlock returns true if we can assume the peer knows the given block
var bestPeer *peer // based on its announcements
bestTd := f.currentTd func (f *lightFetcher) peerHasBlock(p *peer, hash common.Hash, number uint64) bool {
for _, peer := range f.pm.peers.AllPeers() { f.lock.Lock()
peer.lock.RLock() defer f.lock.Lock()
if !peer.headInfo.requested && (peer.headInfo.Td.Cmp(bestTd) > 0 ||
(bestPeer != nil && peer.headInfo.Td.Cmp(bestTd) == 0 && peer.headInfo.haveHeaders > bestPeer.headInfo.haveHeaders)) { fp := f.peers[p]
bestPeer = peer if fp == nil || fp.root == nil {
bestTd = peer.headInfo.Td return false
} }
peer.lock.RUnlock()
if number >= fp.root.number {
// it is recent enough that if it is known, is should be in the peer's block tree
return fp.nodeByHash[hash] != nil
} }
if bestPeer == nil { f.chain.LockChain()
return nil, nil defer f.chain.UnlockChain()
} // if it's older than the peer's block tree root but it's in the same canonical chain
bestPeer.lock.Lock() // than the root, we can still be sure the peer knows it
res := bestPeer.headInfo return core.GetCanonicalHash(f.pm.chainDb, fp.root.number) == fp.root.hash && core.GetCanonicalHash(f.pm.chainDb, number) == hash
res.requested = true
bestPeer.lock.Unlock()
for _, peer := range f.pm.peers.AllPeers() {
if peer != bestPeer {
peer.lock.Lock()
if peer.headInfo.Hash == bestPeer.headInfo.Hash && peer.headInfo.haveHeaders == bestPeer.headInfo.haveHeaders {
peer.headInfo.requested = true
}
peer.lock.Unlock()
}
}
return bestPeer, res
} }
func (f *lightFetcher) deliverHeaders(peer *peer, reqID uint64, headers []*types.Header) { // request initiates a header download request from a certain peer
f.deliverChn <- fetchResponse{reqID: reqID, headers: headers, peer: peer} func (f *lightFetcher) request(p *peer, n *fetcherTreeNode, amount uint64) (uint64, bool) {
fp := f.peers[p]
if fp == nil {
glog.V(logger.Debug).Infof("request: unknown peer")
return 0, false
}
if fp.bestConfirmed == nil || fp.root == nil || !f.checkKnownNode(p, fp.root) {
f.syncing = true
go func() {
glog.V(logger.Debug).Infof("synchronising with peer %v", p.id)
f.pm.synchronise(p)
f.syncDone <- p
}()
return 0, false
}
reqID := f.odr.getNextReqID()
n.requested = true
cost := p.GetRequestCost(GetBlockHeadersMsg, int(amount))
p.fcServer.SendRequest(reqID, cost)
f.reqMu.Lock()
f.requested[reqID] = fetchRequest{hash: n.hash, amount: amount, peer: p, sent: mclock.Now()}
f.reqMu.Unlock()
go p.RequestHeadersByHash(reqID, cost, n.hash, int(amount), 0, true)
go func() {
time.Sleep(hardRequestTimeout)
f.timeoutChn <- reqID
}()
return reqID, true
} }
// requestAmount calculates the amount of headers to be downloaded starting
// from a certain head backwards
func (f *lightFetcher) requestAmount(p *peer, n *fetcherTreeNode) uint64 {
amount := uint64(0)
nn := n
for nn != nil && !f.checkKnownNode(p, nn) {
nn = nn.parent
amount++
}
if nn == nil {
amount = n.number
}
return amount
}
// requestedID tells if a certain reqID has been requested by the fetcher
func (f *lightFetcher) requestedID(reqID uint64) bool { func (f *lightFetcher) requestedID(reqID uint64) bool {
f.reqMu.RLock() f.reqMu.RLock()
_, ok := f.requested[reqID] _, ok := f.requested[reqID]
@ -160,36 +388,51 @@ func (f *lightFetcher) requestedID(reqID uint64) bool {
return ok return ok
} }
func (f *lightFetcher) request(p *peer, block *announceData) { // nextRequest selects the peer and announced head to be requested next, amount
//fmt.Println("request", p.id, block.Number, block.haveHeaders) // to be downloaded starting from the head backwards is also returned
amount := block.Number - block.haveHeaders func (f *lightFetcher) nextRequest() (*peer, *fetcherTreeNode, uint64) {
if amount == 0 { var (
return bestHash common.Hash
bestAmount uint64
)
bestTd := f.maxConfirmedTd
for p, fp := range f.peers {
for hash, n := range fp.nodeByHash {
if !f.checkKnownNode(p, n) && !n.requested && (bestTd == nil || n.td.Cmp(bestTd) >= 0) {
amount := f.requestAmount(p, n)
if bestTd == nil || n.td.Cmp(bestTd) > 0 || amount < bestAmount {
bestHash = hash
bestAmount = amount
bestTd = n.td
} }
if amount > 100 { }
f.syncing = true }
go func() { }
//fmt.Println("f.pm.synchronise(p)") if bestTd == f.maxConfirmedTd {
f.pm.synchronise(p) return nil, nil, 0
//fmt.Println("sync done")
f.syncDone <- struct{}{}
}()
return
} }
reqID := f.odr.getNextReqID() peer := f.pm.serverPool.selectPeer(func(p *peer) (bool, uint64) {
f.reqMu.Lock() fp := f.peers[p]
f.requested[reqID] = fetchRequest{hash: block.Hash, amount: amount, peer: p} if fp == nil || fp.nodeByHash[bestHash] == nil {
f.reqMu.Unlock() return false, 0
cost := p.GetRequestCost(GetBlockHeadersMsg, int(amount)) }
p.fcServer.SendRequest(reqID, cost) return true, p.fcServer.CanSend(p.GetRequestCost(GetBlockHeadersMsg, int(bestAmount)))
go p.RequestHeadersByHash(reqID, cost, block.Hash, int(amount), 0, true) })
go func() { var node *fetcherTreeNode
time.Sleep(hardRequestTimeout) if peer != nil {
f.timeoutChn <- reqID node = f.peers[peer].nodeByHash[bestHash]
}() }
return peer, node, bestAmount
} }
// deliverHeaders delivers header download request responses for processing
func (f *lightFetcher) deliverHeaders(peer *peer, reqID uint64, headers []*types.Header) {
f.deliverChn <- fetchResponse{reqID: reqID, headers: headers, peer: peer}
}
// processResponse processes header download request responses
func (f *lightFetcher) processResponse(req fetchRequest, resp fetchResponse) bool { func (f *lightFetcher) processResponse(req fetchRequest, resp fetchResponse) bool {
if uint64(len(resp.headers)) != req.amount || resp.headers[0].Hash() != req.hash { if uint64(len(resp.headers)) != req.amount || resp.headers[0].Hash() != req.hash {
return false return false
@ -201,101 +444,248 @@ func (f *lightFetcher) processResponse(req fetchRequest, resp fetchResponse) boo
if _, err := f.chain.InsertHeaderChain(headers, 1); err != nil { if _, err := f.chain.InsertHeaderChain(headers, 1); err != nil {
return false return false
} }
for _, header := range headers { tds := make([]*big.Int, len(headers))
td := core.GetTd(f.pm.chainDb, header.Hash(), header.Number.Uint64()) for i, header := range headers {
td := f.chain.GetTd(header.Hash(), header.Number.Uint64())
if td == nil { if td == nil {
return false return false
} }
if td.Cmp(f.currentTd) > 0 { tds[i] = td
f.currentTd = td
}
f.gotHeader(header)
} }
f.newHeaders(headers, tds)
return true return true
} }
func (f *lightFetcher) checkSyncedHeaders() { // newHeaders updates the block trees of all active peers according to a newly
//fmt.Println("checkSyncedHeaders()") // downloaded and validated batch or headers
for _, peer := range f.pm.peers.AllPeers() { func (f *lightFetcher) newHeaders(headers []*types.Header, tds []*big.Int) {
peer.lock.Lock() var maxTd *big.Int
h := peer.firstHeadInfo for p, fp := range f.peers {
remove := false if !f.checkAnnouncedHeaders(fp, headers, tds) {
loop: glog.V(logger.Debug).Infof("announce inconsistency by peer %v", p.id)
for h != nil { go f.pm.removePeer(p.id)
if td := core.GetTd(f.pm.chainDb, h.Hash, h.Number); td != nil {
//fmt.Println(" found", h.Number)
ok := peer.gotHeader(h.Hash, h.Number, td)
if !ok {
remove = true
break loop
} }
if td.Cmp(f.currentTd) > 0 { if fp.confirmedTd != nil && (maxTd == nil || maxTd.Cmp(fp.confirmedTd) > 0) {
f.currentTd = td maxTd = fp.confirmedTd
} }
} }
h = h.next if maxTd != nil {
f.updateMaxConfirmedTd(maxTd)
}
}
// checkAnnouncedHeaders updates peer's block tree if necessary after validating
// a batch of headers. It searches for the latest header in the batch that has a
// matching tree node (if any), and if it has not been marked as known already,
// sets it and its parents to known (even those which are older than the currently
// validated ones). Return value shows if all hashes, numbers and Tds matched
// correctly to the announced values (otherwise the peer should be dropped).
func (f *lightFetcher) checkAnnouncedHeaders(fp *fetcherPeerInfo, headers []*types.Header, tds []*big.Int) bool {
var (
n *fetcherTreeNode
header *types.Header
td *big.Int
)
for i := len(headers) - 1; ; i-- {
if i < 0 {
if n == nil {
// no more headers and nothing to match
return true
}
// we ran out of recently delivered headers but have not reached a node known by this peer yet, continue matching
td = f.chain.GetTd(header.ParentHash, header.Number.Uint64()-1)
header = f.chain.GetHeader(header.ParentHash, header.Number.Uint64()-1)
} else {
header = headers[i]
td = tds[i]
}
hash := header.Hash()
number := header.Number.Uint64()
if n == nil {
n = fp.nodeByHash[hash]
}
if n != nil {
if n.td == nil {
// node was unannounced
if nn := fp.nodeByHash[hash]; nn != nil {
// if there was already a node with the same hash, continue there and drop this one
nn.children = append(nn.children, n.children...)
n.children = nil
fp.deleteNode(n)
n = nn
} else {
n.hash = hash
n.td = td
fp.nodeByHash[hash] = n
}
}
// check if it matches the header
if n.hash != hash || n.number != number || n.td.Cmp(td) != 0 {
// peer has previously made an invalid announcement
return false
}
if n.known {
// we reached a known node that matched our expectations, return with success
return true
}
n.known = true
if fp.confirmedTd == nil || td.Cmp(fp.confirmedTd) > 0 {
fp.confirmedTd = td
fp.bestConfirmed = n
}
n = n.parent
if n == nil {
return true
} }
peer.lock.Unlock()
if remove {
//fmt.Println("checkSync fail")
f.pm.removePeer(peer.id)
} }
} }
} }
func (f *lightFetcher) syncLoop() { // checkSyncedHeaders updates peer's block tree after synchronisation by marking
f.pm.wg.Add(1) // downloaded headers as known. If none of the announced headers are found after
defer f.pm.wg.Done() // syncing, the peer is dropped.
func (f *lightFetcher) checkSyncedHeaders(p *peer) {
srtoNotify := false fp := f.peers[p]
for { if fp == nil {
select { glog.V(logger.Debug).Infof("checkSyncedHeaders: unknown peer")
case <-f.pm.quitSync:
return return
case ext := <-f.notifyChn: }
//fmt.Println("<-f.notifyChn", f.syncing, ext, srtoNotify) n := fp.lastAnnounced
s := srtoNotify var td *big.Int
srtoNotify = false for n != nil {
if !f.syncing && !(ext && s) { if td = f.chain.GetTd(n.hash, n.number); td != nil {
if p, r := f.nextRequest(); r != nil { break
srtoNotify = true }
go func() { n = n.parent
time.Sleep(softRequestTimeout) }
f.notifyChn <- false // now n is the latest downloaded header after syncing
}() if n == nil {
f.request(p, r) glog.V(logger.Debug).Infof("synchronisation failed with peer %v", p.id)
go f.pm.removePeer(p.id)
} else {
header := f.chain.GetHeader(n.hash, n.number)
f.newHeaders([]*types.Header{header}, []*big.Int{td})
}
}
// checkKnownNode checks if a block tree node is known (downloaded and validated)
// If it was not known previously but found in the database, sets its known flag
func (f *lightFetcher) checkKnownNode(p *peer, n *fetcherTreeNode) bool {
if n.known {
return true
}
td := f.chain.GetTd(n.hash, n.number)
if td == nil {
return false
}
fp := f.peers[p]
if fp == nil {
glog.V(logger.Debug).Infof("checkKnownNode: unknown peer")
return false
}
header := f.chain.GetHeader(n.hash, n.number)
if !f.checkAnnouncedHeaders(fp, []*types.Header{header}, []*big.Int{td}) {
glog.V(logger.Debug).Infof("announce inconsistency by peer %v", p.id)
go f.pm.removePeer(p.id)
}
if fp.confirmedTd != nil {
f.updateMaxConfirmedTd(fp.confirmedTd)
}
return n.known
}
// deleteNode deletes a node and its child subtrees from a peer's block tree
func (fp *fetcherPeerInfo) deleteNode(n *fetcherTreeNode) {
if n.parent != nil {
for i, nn := range n.parent.children {
if nn == n {
n.parent.children = append(n.parent.children[:i], n.parent.children[i+1:]...)
break
} }
} }
case reqID := <-f.timeoutChn:
f.reqMu.Lock()
req, ok := f.requested[reqID]
if ok {
delete(f.requested, reqID)
} }
f.reqMu.Unlock() for {
if ok { if n.td != nil {
//fmt.Println("hard timeout") delete(fp.nodeByHash, n.hash)
f.pm.removePeer(req.peer.id)
} }
case resp := <-f.deliverChn: fp.nodeCnt--
//fmt.Println("<-f.deliverChn", f.syncing) if len(n.children) == 0 {
f.reqMu.Lock() return
req, ok := f.requested[resp.reqID]
if ok && req.peer != resp.peer {
ok = false
} }
if ok { for i, nn := range n.children {
delete(f.requested, resp.reqID) if i == 0 {
n = nn
} else {
fp.deleteNode(nn)
} }
f.reqMu.Unlock() }
if !ok || !(f.syncing || f.processResponse(req, resp)) { }
//fmt.Println("processResponse fail") }
f.pm.removePeer(resp.peer.id)
} // updateStatsEntry items form a linked list that is expanded with a new item every time a new head with a higher Td
case <-f.syncDone: // than the previous one has been downloaded and validated. The list contains a series of maximum confirmed Td values
//fmt.Println("<-f.syncDone", f.syncing) // and the time these values have been confirmed, both increasing monotonically. A maximum confirmed Td is calculated
f.checkSyncedHeaders() // both globally for all peers and also for each individual peer (meaning that the given peer has announced the head
f.syncing = false // and it has also been downloaded from any peer, either before or after the given announcement).
// The linked list has a global tail where new confirmed Td entries are added and a separate head for each peer,
// pointing to the next Td entry that is higher than the peer's max confirmed Td (nil if it has already confirmed
// the current global head).
type updateStatsEntry struct {
time mclock.AbsTime
td *big.Int
next *updateStatsEntry
}
// updateMaxConfirmedTd updates the block delay statistics of active peers. Whenever a new highest Td is confirmed,
// adds it to the end of a linked list together with the time it has been confirmed. Then checks which peers have
// already confirmed a head with the same or higher Td (which counts as zero block delay) and updates their statistics.
// Those who have not confirmed such a head by now will be updated by a subsequent checkUpdateStats call with a
// positive block delay value.
func (f *lightFetcher) updateMaxConfirmedTd(td *big.Int) {
if f.maxConfirmedTd == nil || td.Cmp(f.maxConfirmedTd) > 0 {
f.maxConfirmedTd = td
newEntry := &updateStatsEntry{
time: mclock.Now(),
td: td,
}
if f.lastUpdateStats != nil {
f.lastUpdateStats.next = newEntry
}
f.lastUpdateStats = newEntry
for p, _ := range f.peers {
f.checkUpdateStats(p, newEntry)
}
}
}
// checkUpdateStats checks those peers who have not confirmed a certain highest Td (or a larger one) by the time it
// has been confirmed by another peer. If they have confirmed such a head by now, their stats are updated with the
// block delay which is (this peer's confirmation time)-(first confirmation time). After blockDelayTimeout has passed,
// the stats are updated with blockDelayTimeout value. In either case, the confirmed or timed out updateStatsEntry
// items are removed from the head of the linked list.
// If a new entry has been added to the global tail, it is passed as a parameter here even though this function
// assumes that it has already been added, so that if the peer's list is empty (all heads confirmed, head is nil),
// it can set the new head to newEntry.
func (f *lightFetcher) checkUpdateStats(p *peer, newEntry *updateStatsEntry) {
now := mclock.Now()
fp := f.peers[p]
if fp == nil {
glog.V(logger.Debug).Infof("checkUpdateStats: unknown peer")
return
}
if newEntry != nil && fp.firstUpdateStats == nil {
fp.firstUpdateStats = newEntry
}
for fp.firstUpdateStats != nil && fp.firstUpdateStats.time <= now-mclock.AbsTime(blockDelayTimeout) {
f.pm.serverPool.adjustBlockDelay(p.poolEntry, blockDelayTimeout)
fp.firstUpdateStats = fp.firstUpdateStats.next
}
if fp.confirmedTd != nil {
for fp.firstUpdateStats != nil && fp.firstUpdateStats.td.Cmp(fp.confirmedTd) <= 0 {
f.pm.serverPool.adjustBlockDelay(p.poolEntry, time.Duration(now-fp.firstUpdateStats.time))
fp.firstUpdateStats = fp.firstUpdateStats.next
} }
} }
} }

View File

@ -24,10 +24,8 @@ import (
"math/big" "math/big"
"net" "net"
"sync" "sync"
"time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
@ -60,7 +58,7 @@ const (
MaxHeaderProofsFetch = 64 // Amount of merkle proofs to be fetched per retrieval request MaxHeaderProofsFetch = 64 // Amount of merkle proofs to be fetched per retrieval request
MaxTxSend = 64 // Amount of transactions to be send per request MaxTxSend = 64 // Amount of transactions to be send per request
disableClientRemovePeer = true disableClientRemovePeer = false
) )
// errIncompatibleConfig is returned if the requested protocols and configs are // errIncompatibleConfig is returned if the requested protocols and configs are
@ -157,44 +155,27 @@ func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, network
Length: ProtocolLengths[i], Length: ProtocolLengths[i],
Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error { Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
var entry *poolEntry var entry *poolEntry
peer := manager.newPeer(int(version), networkId, p, rw)
if manager.serverPool != nil { if manager.serverPool != nil {
addr := p.RemoteAddr().(*net.TCPAddr) addr := p.RemoteAddr().(*net.TCPAddr)
entry = manager.serverPool.connect(p.ID(), addr.IP, uint16(addr.Port)) entry = manager.serverPool.connect(peer, addr.IP, uint16(addr.Port))
if entry == nil { if entry == nil {
return fmt.Errorf("unwanted connection") return fmt.Errorf("unwanted connection")
} }
} }
peer := manager.newPeer(int(version), networkId, p, rw)
peer.poolEntry = entry peer.poolEntry = entry
select { select {
case manager.newPeerCh <- peer: case manager.newPeerCh <- peer:
manager.wg.Add(1) manager.wg.Add(1)
defer manager.wg.Done() defer manager.wg.Done()
start := mclock.Now()
err := manager.handle(peer) err := manager.handle(peer)
if entry != nil { if entry != nil {
connTime := time.Duration(mclock.Now() - start) manager.serverPool.disconnect(entry)
stopped := false
select {
case <-manager.quitSync:
stopped = true
default:
}
//fmt.Println("connTime", peer.id, connTime, stopped, err)
quality := float64(1)
setQuality := true
if connTime < time.Minute*10 {
quality = 0
if stopped {
setQuality = false
}
}
manager.serverPool.disconnect(entry, quality, setQuality)
} }
return err return err
case <-manager.quitSync: case <-manager.quitSync:
if entry != nil { if entry != nil {
manager.serverPool.disconnect(entry, 0, false) manager.serverPool.disconnect(entry)
} }
return p2p.DiscQuitting return p2p.DiscQuitting
} }
@ -224,7 +205,6 @@ func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, network
manager.downloader = downloader.New(downloader.LightSync, chainDb, manager.eventMux, blockchain.HasHeader, nil, blockchain.GetHeaderByHash, manager.downloader = downloader.New(downloader.LightSync, chainDb, manager.eventMux, blockchain.HasHeader, nil, blockchain.GetHeaderByHash,
nil, blockchain.CurrentHeader, nil, nil, nil, blockchain.GetTdByHash, nil, blockchain.CurrentHeader, nil, nil, nil, blockchain.GetTdByHash,
blockchain.InsertHeaderChain, nil, nil, blockchain.Rollback, removePeer) blockchain.InsertHeaderChain, nil, nil, blockchain.Rollback, removePeer)
manager.fetcher = newLightFetcher(manager)
} }
if odr != nil { if odr != nil {
@ -254,10 +234,12 @@ func (pm *ProtocolManager) removePeer(id string) {
glog.V(logger.Debug).Infof("LES: unregister peer %v", id) glog.V(logger.Debug).Infof("LES: unregister peer %v", id)
if pm.lightSync { if pm.lightSync {
pm.downloader.UnregisterPeer(id) pm.downloader.UnregisterPeer(id)
pm.odr.UnregisterPeer(peer)
if pm.txrelay != nil { if pm.txrelay != nil {
pm.txrelay.removePeer(id) pm.txrelay.removePeer(id)
} }
if pm.fetcher != nil {
pm.fetcher.removePeer(peer)
}
} }
if err := pm.peers.Unregister(id); err != nil { if err := pm.peers.Unregister(id); err != nil {
glog.V(logger.Error).Infoln("Removal failed:", err) glog.V(logger.Error).Infoln("Removal failed:", err)
@ -276,8 +258,10 @@ func (pm *ProtocolManager) Start(srvr *p2p.Server) {
lesTopic := discv5.Topic("LES@" + common.Bytes2Hex(pm.blockchain.Genesis().Hash().Bytes()[0:8])) lesTopic := discv5.Topic("LES@" + common.Bytes2Hex(pm.blockchain.Genesis().Hash().Bytes()[0:8]))
if pm.lightSync { if pm.lightSync {
// start sync handler // start sync handler
if srvr != nil { if srvr != nil { // srvr is nil during testing
pm.serverPool = newServerPool(pm.chainDb, []byte("serverPool/"), srvr, lesTopic, pm.quitSync, &pm.wg) pm.serverPool = newServerPool(pm.chainDb, []byte("serverPool/"), srvr, lesTopic, pm.quitSync, &pm.wg)
pm.odr.serverPool = pm.serverPool
pm.fetcher = newLightFetcher(pm)
} }
go pm.syncer() go pm.syncer()
} else { } else {
@ -369,12 +353,17 @@ func (pm *ProtocolManager) handle(p *peer) error {
requestHeadersByHash, requestHeadersByNumber, nil, nil, nil); err != nil { requestHeadersByHash, requestHeadersByNumber, nil, nil, nil); err != nil {
return err return err
} }
pm.odr.RegisterPeer(p)
if pm.txrelay != nil { if pm.txrelay != nil {
pm.txrelay.addPeer(p) pm.txrelay.addPeer(p)
} }
pm.fetcher.notify(p, nil) p.lock.Lock()
head := p.headInfo
p.lock.Unlock()
if pm.fetcher != nil {
pm.fetcher.addPeer(p)
pm.fetcher.announce(p, head)
}
if p.poolEntry != nil { if p.poolEntry != nil {
pm.serverPool.registered(p.poolEntry) pm.serverPool.registered(p.poolEntry)
@ -460,7 +449,9 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
return errResp(ErrDecode, "%v: %v", msg, err) return errResp(ErrDecode, "%v: %v", msg, err)
} }
glog.V(logger.Detail).Infoln("AnnounceMsg:", req.Number, req.Hash, req.Td, req.ReorgDepth) glog.V(logger.Detail).Infoln("AnnounceMsg:", req.Number, req.Hash, req.Td, req.ReorgDepth)
pm.fetcher.notify(p, &req) if pm.fetcher != nil {
go pm.fetcher.announce(p, &req)
}
case GetBlockHeadersMsg: case GetBlockHeadersMsg:
glog.V(logger.Debug).Infof("<=== GetBlockHeadersMsg from peer %v", p.id) glog.V(logger.Debug).Infof("<=== GetBlockHeadersMsg from peer %v", p.id)
@ -558,7 +549,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
return errResp(ErrDecode, "msg %v: %v", msg, err) return errResp(ErrDecode, "msg %v: %v", msg, err)
} }
p.fcServer.GotReply(resp.ReqID, resp.BV) p.fcServer.GotReply(resp.ReqID, resp.BV)
if pm.fetcher.requestedID(resp.ReqID) { if pm.fetcher != nil && pm.fetcher.requestedID(resp.ReqID) {
pm.fetcher.deliverHeaders(p, resp.ReqID, resp.Headers) pm.fetcher.deliverHeaders(p, resp.ReqID, resp.Headers)
} else { } else {
err := pm.downloader.DeliverHeaders(p.id, resp.Headers) err := pm.downloader.DeliverHeaders(p.id, resp.Headers)

View File

@ -25,6 +25,7 @@ import (
"math/big" "math/big"
"sync" "sync"
"testing" "testing"
"time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core"
@ -334,3 +335,13 @@ func (p *testPeer) handshake(t *testing.T, td *big.Int, head common.Hash, headNu
func (p *testPeer) close() { func (p *testPeer) close() {
p.app.Close() p.app.Close()
} }
type testServerPool peer
func (p *testServerPool) selectPeer(func(*peer) (bool, uint64)) *peer {
return (*peer)(p)
}
func (p *testServerPool) adjustResponseTime(*poolEntry, time.Duration, bool) {
}

View File

@ -37,6 +37,11 @@ var (
// peerDropFn is a callback type for dropping a peer detected as malicious. // peerDropFn is a callback type for dropping a peer detected as malicious.
type peerDropFn func(id string) type peerDropFn func(id string)
type odrPeerSelector interface {
selectPeer(func(*peer) (bool, uint64)) *peer
adjustResponseTime(*poolEntry, time.Duration, bool)
}
type LesOdr struct { type LesOdr struct {
light.OdrBackend light.OdrBackend
db ethdb.Database db ethdb.Database
@ -44,7 +49,7 @@ type LesOdr struct {
removePeer peerDropFn removePeer peerDropFn
mlock, clock sync.Mutex mlock, clock sync.Mutex
sentReqs map[uint64]*sentReq sentReqs map[uint64]*sentReq
peers *odrPeerSet serverPool odrPeerSelector
lastReqID uint64 lastReqID uint64
} }
@ -52,7 +57,6 @@ func NewLesOdr(db ethdb.Database) *LesOdr {
return &LesOdr{ return &LesOdr{
db: db, db: db,
stop: make(chan struct{}), stop: make(chan struct{}),
peers: newOdrPeerSet(),
sentReqs: make(map[uint64]*sentReq), sentReqs: make(map[uint64]*sentReq),
} }
} }
@ -77,16 +81,6 @@ type sentReq struct {
answered chan struct{} // closed and set to nil when any peer answers it answered chan struct{} // closed and set to nil when any peer answers it
} }
// RegisterPeer registers a new LES peer to the ODR capable peer set
func (self *LesOdr) RegisterPeer(p *peer) error {
return self.peers.register(p)
}
// UnregisterPeer removes a peer from the ODR capable peer set
func (self *LesOdr) UnregisterPeer(p *peer) {
self.peers.unregister(p)
}
const ( const (
MsgBlockBodies = iota MsgBlockBodies = iota
MsgCode MsgCode
@ -142,29 +136,26 @@ func (self *LesOdr) requestPeer(req *sentReq, peer *peer, delivered, timeout cha
select { select {
case <-delivered: case <-delivered:
servTime := uint64(mclock.Now() - stime) if self.serverPool != nil {
self.peers.updateTimeout(peer, false) self.serverPool.adjustResponseTime(peer.poolEntry, time.Duration(mclock.Now()-stime), false)
self.peers.updateServTime(peer, servTime) }
return return
case <-time.After(softRequestTimeout): case <-time.After(softRequestTimeout):
close(timeout) close(timeout)
if self.peers.updateTimeout(peer, true) {
self.removePeer(peer.id)
}
case <-self.stop: case <-self.stop:
return return
} }
select { select {
case <-delivered: case <-delivered:
servTime := uint64(mclock.Now() - stime)
self.peers.updateServTime(peer, servTime)
return
case <-time.After(hardRequestTimeout): case <-time.After(hardRequestTimeout):
self.removePeer(peer.id) go self.removePeer(peer.id)
case <-self.stop: case <-self.stop:
return return
} }
if self.serverPool != nil {
self.serverPool.adjustResponseTime(peer.poolEntry, time.Duration(mclock.Now()-stime), true)
}
} }
// networkRequest sends a request to known peers until an answer is received // networkRequest sends a request to known peers until an answer is received
@ -193,7 +184,13 @@ func (self *LesOdr) networkRequest(ctx context.Context, lreq LesOdrRequest) erro
exclude := make(map[*peer]struct{}) exclude := make(map[*peer]struct{})
for { for {
if peer := self.peers.bestPeer(lreq, exclude); peer == nil { var p *peer
if self.serverPool != nil {
p = self.serverPool.selectPeer(func(p *peer) (bool, uint64) {
return true, p.fcServer.CanSend(lreq.GetCost(p))
})
}
if p == nil {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() return ctx.Err()
@ -202,17 +199,17 @@ func (self *LesOdr) networkRequest(ctx context.Context, lreq LesOdrRequest) erro
case <-time.After(retryPeers): case <-time.After(retryPeers):
} }
} else { } else {
exclude[peer] = struct{}{} exclude[p] = struct{}{}
delivered := make(chan struct{}) delivered := make(chan struct{})
timeout := make(chan struct{}) timeout := make(chan struct{})
req.lock.Lock() req.lock.Lock()
req.sentTo[peer] = delivered req.sentTo[p] = delivered
req.lock.Unlock() req.lock.Unlock()
reqWg.Add(1) reqWg.Add(1)
cost := lreq.GetCost(peer) cost := lreq.GetCost(p)
peer.fcServer.SendRequest(reqID, cost) p.fcServer.SendRequest(reqID, cost)
go self.requestPeer(req, peer, delivered, timeout, reqWg) go self.requestPeer(req, p, delivered, timeout, reqWg)
lreq.Request(reqID, peer) lreq.Request(reqID, p)
select { select {
case <-ctx.Done(): case <-ctx.Done():

View File

@ -1,120 +0,0 @@
// Copyright 2016 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package les
import (
"sync"
)
const dropTimeoutRatio = 20
type odrPeerInfo struct {
reqTimeSum, reqTimeCnt, reqCnt, timeoutCnt uint64
}
// odrPeerSet represents the collection of active peer participating in the block
// download procedure.
type odrPeerSet struct {
peers map[*peer]*odrPeerInfo
lock sync.RWMutex
}
// newPeerSet creates a new peer set top track the active download sources.
func newOdrPeerSet() *odrPeerSet {
return &odrPeerSet{
peers: make(map[*peer]*odrPeerInfo),
}
}
// Register injects a new peer into the working set, or returns an error if the
// peer is already known.
func (ps *odrPeerSet) register(p *peer) error {
ps.lock.Lock()
defer ps.lock.Unlock()
if _, ok := ps.peers[p]; ok {
return errAlreadyRegistered
}
ps.peers[p] = &odrPeerInfo{}
return nil
}
// Unregister removes a remote peer from the active set, disabling any further
// actions to/from that particular entity.
func (ps *odrPeerSet) unregister(p *peer) error {
ps.lock.Lock()
defer ps.lock.Unlock()
if _, ok := ps.peers[p]; !ok {
return errNotRegistered
}
delete(ps.peers, p)
return nil
}
func (ps *odrPeerSet) peerPriority(p *peer, info *odrPeerInfo, req LesOdrRequest) uint64 {
tm := p.fcServer.CanSend(req.GetCost(p))
if info.reqTimeCnt > 0 {
tm += info.reqTimeSum / info.reqTimeCnt
}
return tm
}
func (ps *odrPeerSet) bestPeer(req LesOdrRequest, exclude map[*peer]struct{}) *peer {
var best *peer
var bpv uint64
ps.lock.Lock()
defer ps.lock.Unlock()
for p, info := range ps.peers {
if _, ok := exclude[p]; !ok {
pv := ps.peerPriority(p, info, req)
if best == nil || pv < bpv {
best = p
bpv = pv
}
}
}
return best
}
func (ps *odrPeerSet) updateTimeout(p *peer, timeout bool) (drop bool) {
ps.lock.Lock()
defer ps.lock.Unlock()
if info, ok := ps.peers[p]; ok {
info.reqCnt++
if timeout {
// check ratio before increase to allow an extra timeout
if info.timeoutCnt*dropTimeoutRatio >= info.reqCnt {
return true
}
info.timeoutCnt++
}
}
return false
}
func (ps *odrPeerSet) updateServTime(p *peer, servTime uint64) {
ps.lock.Lock()
defer ps.lock.Unlock()
if info, ok := ps.peers[p]; ok {
info.reqTimeSum += servTime
info.reqTimeCnt++
}
}

View File

@ -160,6 +160,8 @@ func testOdr(t *testing.T, protocol int, expFail uint64, fn odrTestFn) {
pm, db, odr := newTestProtocolManagerMust(t, false, 4, testChainGen) pm, db, odr := newTestProtocolManagerMust(t, false, 4, testChainGen)
lpm, ldb, odr := newTestProtocolManagerMust(t, true, 0, nil) lpm, ldb, odr := newTestProtocolManagerMust(t, true, 0, nil)
_, err1, lpeer, err2 := newTestPeerPair("peer", protocol, pm, lpm) _, err1, lpeer, err2 := newTestPeerPair("peer", protocol, pm, lpm)
pool := (*testServerPool)(lpeer)
odr.serverPool = pool
select { select {
case <-time.After(time.Millisecond * 100): case <-time.After(time.Millisecond * 100):
case err := <-err1: case err := <-err1:
@ -188,13 +190,13 @@ func testOdr(t *testing.T, protocol int, expFail uint64, fn odrTestFn) {
} }
// temporarily remove peer to test odr fails // temporarily remove peer to test odr fails
odr.UnregisterPeer(lpeer) odr.serverPool = nil
// expect retrievals to fail (except genesis block) without a les peer // expect retrievals to fail (except genesis block) without a les peer
test(expFail) test(expFail)
odr.RegisterPeer(lpeer) odr.serverPool = pool
// expect all retrievals to pass // expect all retrievals to pass
test(5) test(5)
odr.UnregisterPeer(lpeer) odr.serverPool = nil
// still expect all retrievals to pass, now data should be cached locally // still expect all retrievals to pass, now data should be cached locally
test(5) test(5)
} }

View File

@ -51,8 +51,7 @@ type peer struct {
id string id string
firstHeadInfo, headInfo *announceData headInfo *announceData
headInfoLen int
lock sync.RWMutex lock sync.RWMutex
announceChn chan announceData announceChn chan announceData
@ -111,67 +110,6 @@ func (p *peer) headBlockInfo() blockInfo {
return blockInfo{Hash: p.headInfo.Hash, Number: p.headInfo.Number, Td: p.headInfo.Td} return blockInfo{Hash: p.headInfo.Hash, Number: p.headInfo.Number, Td: p.headInfo.Td}
} }
func (p *peer) addNotify(announce *announceData) bool {
p.lock.Lock()
defer p.lock.Unlock()
if announce.Td.Cmp(p.headInfo.Td) < 1 {
return false
}
if p.headInfoLen >= maxHeadInfoLen {
//return false
p.firstHeadInfo = p.firstHeadInfo.next
p.headInfoLen--
}
if announce.haveHeaders == 0 {
hh := p.headInfo.Number - announce.ReorgDepth
if p.headInfo.haveHeaders < hh {
hh = p.headInfo.haveHeaders
}
announce.haveHeaders = hh
}
p.headInfo.next = announce
p.headInfo = announce
p.headInfoLen++
return true
}
func (p *peer) gotHeader(hash common.Hash, number uint64, td *big.Int) bool {
h := p.firstHeadInfo
ptr := 0
for h != nil {
if h.Hash == hash {
if h.Number != number || h.Td.Cmp(td) != 0 {
return false
}
h.headKnown = true
h.haveHeaders = h.Number
p.firstHeadInfo = h
p.headInfoLen -= ptr
last := h
h = h.next
// propagate haveHeaders through the chain
for h != nil {
hh := last.Number - h.ReorgDepth
if last.haveHeaders < hh {
hh = last.haveHeaders
}
if hh > h.haveHeaders {
h.haveHeaders = hh
} else {
return true
}
last = h
h = h.next
}
return true
}
h = h.next
ptr++
}
return true
}
// Td retrieves the current total difficulty of a peer. // Td retrieves the current total difficulty of a peer.
func (p *peer) Td() *big.Int { func (p *peer) Td() *big.Int {
p.lock.RLock() p.lock.RLock()
@ -455,9 +393,7 @@ func (p *peer) Handshake(td *big.Int, head common.Hash, headNum uint64, genesis
p.fcCosts = MRC.decode() p.fcCosts = MRC.decode()
} }
p.firstHeadInfo = &announceData{Td: rTd, Hash: rHash, Number: rNum} p.headInfo = &announceData{Td: rTd, Hash: rHash, Number: rNum}
p.headInfo = p.firstHeadInfo
p.headInfoLen = 1
return nil return nil
} }

View File

@ -71,6 +71,8 @@ func testAccess(t *testing.T, protocol int, fn accessTestFn) {
pm, db, _ := newTestProtocolManagerMust(t, false, 4, testChainGen) pm, db, _ := newTestProtocolManagerMust(t, false, 4, testChainGen)
lpm, ldb, odr := newTestProtocolManagerMust(t, true, 0, nil) lpm, ldb, odr := newTestProtocolManagerMust(t, true, 0, nil)
_, err1, lpeer, err2 := newTestPeerPair("peer", protocol, pm, lpm) _, err1, lpeer, err2 := newTestPeerPair("peer", protocol, pm, lpm)
pool := (*testServerPool)(lpeer)
odr.serverPool = pool
select { select {
case <-time.After(time.Millisecond * 100): case <-time.After(time.Millisecond * 100):
case err := <-err1: case err := <-err1:
@ -100,11 +102,10 @@ func testAccess(t *testing.T, protocol int, fn accessTestFn) {
} }
// temporarily remove peer to test odr fails // temporarily remove peer to test odr fails
odr.UnregisterPeer(lpeer) odr.serverPool = nil
// expect retrievals to fail (except genesis block) without a les peer // expect retrievals to fail (except genesis block) without a les peer
test(0) test(0)
odr.RegisterPeer(lpeer) odr.serverPool = pool
// expect all retrievals to pass // expect all retrievals to pass
test(5) test(5)
odr.UnregisterPeer(lpeer)
} }

View File

@ -59,6 +59,9 @@ const (
targetKnownSelect = 3 targetKnownSelect = 3
// after dialTimeout, consider the server unavailable and adjust statistics // after dialTimeout, consider the server unavailable and adjust statistics
dialTimeout = time.Second * 30 dialTimeout = time.Second * 30
// targetConnTime is the minimum expected connection duration before a server
// drops a client without any specific reason
targetConnTime = time.Minute * 10
// new entry selection weight calculation based on most recent discovery time: // new entry selection weight calculation based on most recent discovery time:
// unity until discoverExpireStart, then exponential decay with discoverExpireConst // unity until discoverExpireStart, then exponential decay with discoverExpireConst
discoverExpireStart = time.Minute * 20 discoverExpireStart = time.Minute * 20
@ -75,6 +78,17 @@ const (
// node address selection weight is dropped by a factor of exp(-addrFailDropLn) after // node address selection weight is dropped by a factor of exp(-addrFailDropLn) after
// each unsuccessful connection (restored after a successful one) // each unsuccessful connection (restored after a successful one)
addrFailDropLn = math.Ln2 addrFailDropLn = math.Ln2
// responseScoreTC and delayScoreTC are exponential decay time constants for
// calculating selection chances from response times and block delay times
responseScoreTC = time.Millisecond * 100
delayScoreTC = time.Second * 5
timeoutPow = 10
// peerSelectMinWeight is added to calculated weights at request peer selection
// to give poorly performing peers a little chance of coming back
peerSelectMinWeight = 0.005
// initStatsWeight is used to initialize previously unknown peers with good
// statistics to give a chance to prove themselves
initStatsWeight = 1
) )
// serverPool implements a pool for storing and selecting newly discovered and already // serverPool implements a pool for storing and selecting newly discovered and already
@ -95,6 +109,7 @@ type serverPool struct {
entries map[discover.NodeID]*poolEntry entries map[discover.NodeID]*poolEntry
lock sync.Mutex lock sync.Mutex
timeout, enableRetry chan *poolEntry timeout, enableRetry chan *poolEntry
adjustStats chan poolStatAdjust
knownQueue, newQueue poolEntryQueue knownQueue, newQueue poolEntryQueue
knownSelect, newSelect *weightedRandomSelect knownSelect, newSelect *weightedRandomSelect
@ -112,6 +127,7 @@ func newServerPool(db ethdb.Database, dbPrefix []byte, server *p2p.Server, topic
wg: wg, wg: wg,
entries: make(map[discover.NodeID]*poolEntry), entries: make(map[discover.NodeID]*poolEntry),
timeout: make(chan *poolEntry, 1), timeout: make(chan *poolEntry, 1),
adjustStats: make(chan poolStatAdjust, 100),
enableRetry: make(chan *poolEntry, 1), enableRetry: make(chan *poolEntry, 1),
knownSelect: newWeightedRandomSelect(), knownSelect: newWeightedRandomSelect(),
newSelect: newWeightedRandomSelect(), newSelect: newWeightedRandomSelect(),
@ -139,18 +155,19 @@ func newServerPool(db ethdb.Database, dbPrefix []byte, server *p2p.Server, topic
// Otherwise, the connection should be rejected. // Otherwise, the connection should be rejected.
// Note that whenever a connection has been accepted and a pool entry has been returned, // Note that whenever a connection has been accepted and a pool entry has been returned,
// disconnect should also always be called. // disconnect should also always be called.
func (pool *serverPool) connect(id discover.NodeID, ip net.IP, port uint16) *poolEntry { func (pool *serverPool) connect(p *peer, ip net.IP, port uint16) *poolEntry {
pool.lock.Lock() pool.lock.Lock()
defer pool.lock.Unlock() defer pool.lock.Unlock()
entry := pool.entries[id] entry := pool.entries[p.ID()]
if entry == nil { if entry == nil {
return nil return nil
} }
glog.V(logger.Debug).Infof("connecting to %v, state: %v", id.String(), entry.state) glog.V(logger.Debug).Infof("connecting to %v, state: %v", p.id, entry.state)
if entry.state != psDialed { if entry.state != psDialed {
return nil return nil
} }
pool.connWg.Add(1) pool.connWg.Add(1)
entry.peer = p
entry.state = psConnected entry.state = psConnected
addr := &poolEntryAddress{ addr := &poolEntryAddress{
ip: ip, ip: ip,
@ -172,42 +189,111 @@ func (pool *serverPool) registered(entry *poolEntry) {
defer pool.lock.Unlock() defer pool.lock.Unlock()
entry.state = psRegistered entry.state = psRegistered
entry.regTime = mclock.Now()
if !entry.known { if !entry.known {
pool.newQueue.remove(entry) pool.newQueue.remove(entry)
entry.known = true entry.known = true
} }
pool.knownQueue.setLatest(entry) pool.knownQueue.setLatest(entry)
entry.shortRetry = shortRetryCnt entry.shortRetry = shortRetryCnt
entry.connectStats.add(1)
} }
// disconnect should be called when ending a connection. Service quality statistics // disconnect should be called when ending a connection. Service quality statistics
// can be updated optionally (not updated if no registration happened, in this case // can be updated optionally (not updated if no registration happened, in this case
// only connection statistics are updated, just like in case of timeout) // only connection statistics are updated, just like in case of timeout)
func (pool *serverPool) disconnect(entry *poolEntry, quality float64, setQuality bool) { func (pool *serverPool) disconnect(entry *poolEntry) {
glog.V(logger.Debug).Infof("disconnected %v", entry.id.String()) glog.V(logger.Debug).Infof("disconnected %v", entry.id.String())
pool.lock.Lock() pool.lock.Lock()
defer pool.lock.Unlock() defer pool.lock.Unlock()
if entry.state != psRegistered { if entry.state == psRegistered {
setQuality = false connTime := mclock.Now() - entry.regTime
connAdjust := float64(connTime) / float64(targetConnTime)
if connAdjust > 1 {
connAdjust = 1
} }
stopped := false
select {
case <-pool.quit:
stopped = true
default:
}
if stopped {
entry.connectStats.add(1, connAdjust)
} else {
entry.connectStats.add(connAdjust, 1)
}
}
entry.state = psNotConnected entry.state = psNotConnected
if entry.knownSelected { if entry.knownSelected {
pool.knownSelected-- pool.knownSelected--
} else { } else {
pool.newSelected-- pool.newSelected--
} }
if setQuality {
glog.V(logger.Debug).Infof("update quality %v %v", quality, entry.id.String())
entry.qualityStats.add(quality)
} else {
glog.V(logger.Debug).Infof("do not update quality")
}
pool.setRetryDial(entry) pool.setRetryDial(entry)
pool.connWg.Done() pool.connWg.Done()
} }
const (
pseBlockDelay = iota
pseResponseTime
pseResponseTimeout
)
// poolStatAdjust records are sent to adjust peer block delay/response time statistics
type poolStatAdjust struct {
adjustType int
entry *poolEntry
time time.Duration
}
// adjustBlockDelay adjusts the block announce delay statistics of a node
func (pool *serverPool) adjustBlockDelay(entry *poolEntry, time time.Duration) {
pool.adjustStats <- poolStatAdjust{pseBlockDelay, entry, time}
}
// adjustResponseTime adjusts the request response time statistics of a node
func (pool *serverPool) adjustResponseTime(entry *poolEntry, time time.Duration, timeout bool) {
if timeout {
pool.adjustStats <- poolStatAdjust{pseResponseTimeout, entry, time}
} else {
pool.adjustStats <- poolStatAdjust{pseResponseTime, entry, time}
}
}
type selectPeerItem struct {
peer *peer
weight int64
}
func (sp selectPeerItem) Weight() int64 {
return sp.weight
}
// selectPeer selects a suitable peer for a request
func (pool *serverPool) selectPeer(canSend func(*peer) (bool, uint64)) *peer {
pool.lock.Lock()
defer pool.lock.Unlock()
sel := newWeightedRandomSelect()
for _, entry := range pool.entries {
if entry.state == psRegistered {
p := entry.peer
ok, cost := canSend(p)
if ok {
w := int64(1000000000 * (peerSelectMinWeight + math.Exp(-(entry.responseStats.recentAvg()+float64(cost))/float64(responseScoreTC))*math.Pow((1-entry.timeoutStats.recentAvg()), timeoutPow)))
sel.update(selectPeerItem{peer: p, weight: w})
}
}
}
choice := sel.choose()
if choice == nil {
return nil
}
return choice.(selectPeerItem).peer
}
// eventLoop handles pool events and mutex locking for all internal functions // eventLoop handles pool events and mutex locking for all internal functions
func (pool *serverPool) eventLoop() { func (pool *serverPool) eventLoop() {
lookupCnt := 0 lookupCnt := 0
@ -230,6 +316,19 @@ func (pool *serverPool) eventLoop() {
} }
pool.lock.Unlock() pool.lock.Unlock()
case adj := <-pool.adjustStats:
pool.lock.Lock()
switch adj.adjustType {
case pseBlockDelay:
adj.entry.delayStats.add(float64(adj.time), 1)
case pseResponseTime:
adj.entry.responseStats.add(float64(adj.time), 1)
adj.entry.timeoutStats.add(0, 1)
case pseResponseTimeout:
adj.entry.timeoutStats.add(1, 1)
}
pool.lock.Unlock()
case node := <-pool.discNodes: case node := <-pool.discNodes:
pool.lock.Lock() pool.lock.Lock()
now := mclock.Now() now := mclock.Now()
@ -244,6 +343,11 @@ func (pool *serverPool) eventLoop() {
shortRetry: shortRetryCnt, shortRetry: shortRetryCnt,
} }
pool.entries[id] = entry pool.entries[id] = entry
// initialize previously unknown peers with good statistics to give a chance to prove themselves
entry.connectStats.add(1, initStatsWeight)
entry.delayStats.add(0, initStatsWeight)
entry.responseStats.add(0, initStatsWeight)
entry.timeoutStats.add(0, initStatsWeight)
} }
entry.lastDiscovered = now entry.lastDiscovered = now
addr := &poolEntryAddress{ addr := &poolEntryAddress{
@ -298,9 +402,8 @@ func (pool *serverPool) loadNodes() {
glog.V(logger.Debug).Infof("node list decode error: %v", err) glog.V(logger.Debug).Infof("node list decode error: %v", err)
return return
} }
glog.V(logger.Debug).Infof("loaded node list")
for _, e := range list { for _, e := range list {
glog.V(logger.Debug).Infof(" adding node %v fails: %v connStats sum: %v cnt: %v qualityStats sum: %v cnt: %v", e.id.String()+"@"+e.lastConnected.strKey(), e.lastConnected.fails, e.connectStats.sum, e.connectStats.cnt, e.qualityStats.sum, e.qualityStats.cnt) glog.V(logger.Debug).Infof("loaded server stats %016x fails: %v connStats: %v / %v delayStats: %v / %v responseStats: %v / %v timeoutStats: %v / %v", e.id[0:8], e.lastConnected.fails, e.connectStats.avg, e.connectStats.weight, time.Duration(e.delayStats.avg), e.delayStats.weight, time.Duration(e.responseStats.avg), e.responseStats.weight, e.timeoutStats.avg, e.timeoutStats.weight)
pool.entries[e.id] = e pool.entries[e.id] = e
pool.knownQueue.setLatest(e) pool.knownQueue.setLatest(e)
pool.knownSelect.update((*knownEntry)(e)) pool.knownSelect.update((*knownEntry)(e))
@ -433,7 +536,7 @@ func (pool *serverPool) checkDialTimeout(entry *poolEntry) {
} else { } else {
pool.newSelected-- pool.newSelected--
} }
entry.connectStats.add(0) entry.connectStats.add(0, 1)
entry.dialed.fails++ entry.dialed.fails++
pool.setRetryDial(entry) pool.setRetryDial(entry)
} }
@ -447,6 +550,7 @@ const (
// poolEntry represents a server node and stores its current state and statistics. // poolEntry represents a server node and stores its current state and statistics.
type poolEntry struct { type poolEntry struct {
peer *peer
id discover.NodeID id discover.NodeID
addr map[string]*poolEntryAddress addr map[string]*poolEntryAddress
lastConnected, dialed *poolEntryAddress lastConnected, dialed *poolEntryAddress
@ -454,8 +558,10 @@ type poolEntry struct {
lastDiscovered mclock.AbsTime lastDiscovered mclock.AbsTime
known, knownSelected bool known, knownSelected bool
connectStats, qualityStats poolStats connectStats, delayStats poolStats
responseStats, timeoutStats poolStats
state int state int
regTime mclock.AbsTime
queueIdx int queueIdx int
removed bool removed bool
@ -464,7 +570,7 @@ type poolEntry struct {
} }
func (e *poolEntry) EncodeRLP(w io.Writer) error { func (e *poolEntry) EncodeRLP(w io.Writer) error {
return rlp.Encode(w, []interface{}{e.id, e.lastConnected.ip, e.lastConnected.port, e.lastConnected.fails, &e.connectStats, &e.qualityStats}) return rlp.Encode(w, []interface{}{e.id, e.lastConnected.ip, e.lastConnected.port, e.lastConnected.fails, &e.connectStats, &e.delayStats, &e.responseStats, &e.timeoutStats})
} }
func (e *poolEntry) DecodeRLP(s *rlp.Stream) error { func (e *poolEntry) DecodeRLP(s *rlp.Stream) error {
@ -473,7 +579,7 @@ func (e *poolEntry) DecodeRLP(s *rlp.Stream) error {
IP net.IP IP net.IP
Port uint16 Port uint16
Fails uint Fails uint
CStat, QStat poolStats CStat, DStat, RStat, TStat poolStats
} }
if err := s.Decode(&entry); err != nil { if err := s.Decode(&entry); err != nil {
return err return err
@ -486,7 +592,9 @@ func (e *poolEntry) DecodeRLP(s *rlp.Stream) error {
e.addrSelect.update(addr) e.addrSelect.update(addr)
e.lastConnected = addr e.lastConnected = addr
e.connectStats = entry.CStat e.connectStats = entry.CStat
e.qualityStats = entry.QStat e.delayStats = entry.DStat
e.responseStats = entry.RStat
e.timeoutStats = entry.TStat
e.shortRetry = shortRetryCnt e.shortRetry = shortRetryCnt
e.known = true e.known = true
return nil return nil
@ -516,7 +624,7 @@ func (e *knownEntry) Weight() int64 {
if e.state != psNotConnected || !e.known || e.delayedRetry { if e.state != psNotConnected || !e.known || e.delayedRetry {
return 0 return 0
} }
return int64(1000000000 * e.connectStats.recentAvg() * (e.qualityStats.recentAvg() + 0.001) * math.Exp(-float64(e.lastConnected.fails)*failDropLn)) return int64(1000000000 * e.connectStats.recentAvg() * math.Exp(-float64(e.lastConnected.fails)*failDropLn-e.responseStats.recentAvg()/float64(responseScoreTC)-e.delayStats.recentAvg()/float64(delayScoreTC)) * math.Pow((1-e.timeoutStats.recentAvg()), timeoutPow))
} }
// poolEntryAddress is a separate object because currently it is necessary to remember // poolEntryAddress is a separate object because currently it is necessary to remember
@ -544,18 +652,17 @@ func (a *poolEntryAddress) strKey() string {
// pstatRecentAdjust with each update and also returned exponentially to the // pstatRecentAdjust with each update and also returned exponentially to the
// average with the time constant pstatReturnToMeanTC // average with the time constant pstatReturnToMeanTC
type poolStats struct { type poolStats struct {
sum, avg, recent float64 sum, weight, avg, recent float64
cnt uint
lastRecalc mclock.AbsTime lastRecalc mclock.AbsTime
} }
// init initializes stats with a long term sum/update count pair retrieved from the database // init initializes stats with a long term sum/update count pair retrieved from the database
func (s *poolStats) init(sum float64, cnt uint) { func (s *poolStats) init(sum, weight float64) {
s.sum = sum s.sum = sum
s.cnt = cnt s.weight = weight
var avg float64 var avg float64
if cnt > 0 { if weight > 0 {
avg = s.sum / float64(cnt) avg = s.sum / weight
} }
s.avg = avg s.avg = avg
s.recent = avg s.recent = avg
@ -566,16 +673,22 @@ func (s *poolStats) init(sum float64, cnt uint) {
func (s *poolStats) recalc() { func (s *poolStats) recalc() {
now := mclock.Now() now := mclock.Now()
s.recent = s.avg + (s.recent-s.avg)*math.Exp(-float64(now-s.lastRecalc)/float64(pstatReturnToMeanTC)) s.recent = s.avg + (s.recent-s.avg)*math.Exp(-float64(now-s.lastRecalc)/float64(pstatReturnToMeanTC))
if s.cnt > 0 { if s.sum == 0 {
s.avg = s.sum / float64(s.cnt) s.avg = 0
} else {
if s.sum > s.weight*1e30 {
s.avg = 1e30
} else {
s.avg = s.sum / s.weight
}
} }
s.lastRecalc = now s.lastRecalc = now
} }
// add updates the stats with a new value // add updates the stats with a new value
func (s *poolStats) add(val float64) { func (s *poolStats) add(value, weight float64) {
s.cnt++ s.weight += weight
s.sum += val s.sum += value * weight
s.recalc() s.recalc()
} }
@ -586,18 +699,17 @@ func (s *poolStats) recentAvg() float64 {
} }
func (s *poolStats) EncodeRLP(w io.Writer) error { func (s *poolStats) EncodeRLP(w io.Writer) error {
return rlp.Encode(w, []interface{}{math.Float64bits(s.sum), s.cnt}) return rlp.Encode(w, []interface{}{math.Float64bits(s.sum), math.Float64bits(s.weight)})
} }
func (s *poolStats) DecodeRLP(st *rlp.Stream) error { func (s *poolStats) DecodeRLP(st *rlp.Stream) error {
var stats struct { var stats struct {
SumUint uint64 SumUint, WeightUint uint64
Cnt uint
} }
if err := st.Decode(&stats); err != nil { if err := st.Decode(&stats); err != nil {
return err return err
} }
s.init(math.Float64frombits(stats.SumUint), stats.Cnt) s.init(math.Float64frombits(stats.SumUint), math.Float64frombits(stats.WeightUint))
return nil return nil
} }

View File

@ -505,3 +505,14 @@ func (self *LightChain) SyncCht(ctx context.Context) bool {
} }
return false return false
} }
// LockChain locks the chain mutex for reading so that multiple canonical hashes can be
// retrieved while it is guaranteed that they belong to the same version of the chain
func (self *LightChain) LockChain() {
self.chainmu.RLock()
}
// UnlockChain unlocks the chain mutex
func (self *LightChain) UnlockChain() {
self.chainmu.RUnlock()
}