diff --git a/core/blockchain.go b/core/blockchain.go
index 207c21a65..a57832df0 100644
--- a/core/blockchain.go
+++ b/core/blockchain.go
@@ -1313,6 +1313,11 @@ Error: %v
// of the header retrieval mechanisms already need to verify nonces, as well as
// because nonces can be verified sparsely, not needing to check each.
func (self *BlockChain) InsertHeaderChain(chain []*types.Header, checkFreq int) (int, error) {
+ start := time.Now()
+ if i, err := self.hc.ValidateHeaderChain(chain, checkFreq); err != nil {
+ return i, err
+ }
+
// Make sure only one thread manipulates the chain at once
self.chainmu.Lock()
defer self.chainmu.Unlock()
@@ -1328,7 +1333,7 @@ func (self *BlockChain) InsertHeaderChain(chain []*types.Header, checkFreq int)
return err
}
- return self.hc.InsertHeaderChain(chain, checkFreq, whFunc)
+ return self.hc.InsertHeaderChain(chain, whFunc, start)
}
// writeHeader writes a header into the local chain, given that its parent is
diff --git a/core/headerchain.go b/core/headerchain.go
index a3d622087..57da9771b 100644
--- a/core/headerchain.go
+++ b/core/headerchain.go
@@ -219,7 +219,8 @@ type WhCallback func(*types.Header) error
// should be done or not. The reason behind the optional check is because some
// of the header retrieval mechanisms already need to verfy nonces, as well as
// because nonces can be verified sparsely, not needing to check each.
-func (hc *HeaderChain) InsertHeaderChain(chain []*types.Header, checkFreq int, writeHeader WhCallback) (int, error) {
+
+func (hc *HeaderChain) ValidateHeaderChain(chain []*types.Header, checkFreq int) (int, error) {
// Do a sanity check that the provided chain is actually ordered and linked
for i := 1; i < len(chain); i++ {
if chain[i].Number.Uint64() != chain[i-1].Number.Uint64()+1 || chain[i].ParentHash != chain[i-1].Hash() {
@@ -231,9 +232,6 @@ func (hc *HeaderChain) InsertHeaderChain(chain []*types.Header, checkFreq int, w
chain[i-1].Hash().Bytes()[:4], i, chain[i].Number, chain[i].Hash().Bytes()[:4], chain[i].ParentHash[:4])
}
}
- // Collect some import statistics to report on
- stats := struct{ processed, ignored int }{}
- start := time.Now()
// Generate the list of headers that should be POW verified
verify := make([]bool, len(chain))
@@ -309,6 +307,13 @@ func (hc *HeaderChain) InsertHeaderChain(chain []*types.Header, checkFreq int, w
}
}
}
+
+ return 0, nil
+}
+
+func (hc *HeaderChain) InsertHeaderChain(chain []*types.Header, writeHeader WhCallback, start time.Time) (int, error) {
+ // Collect some import statistics to report on
+ stats := struct{ processed, ignored int }{}
// All headers passed verification, import them into the database
for i, header := range chain {
// Short circuit insertion if shutting down
diff --git a/les/backend.go b/les/backend.go
index 404728c0e..3cab75f33 100644
--- a/les/backend.go
+++ b/les/backend.go
@@ -107,6 +107,8 @@ func New(ctx *node.ServiceContext, config *eth.Config) (*LightEthereum, error) {
if eth.protocolManager, err = NewProtocolManager(eth.chainConfig, config.LightMode, config.NetworkId, eth.eventMux, eth.pow, eth.blockchain, nil, chainDb, odr, relay); err != nil {
return nil, err
}
+ relay.ps = eth.protocolManager.peers
+ relay.reqDist = eth.protocolManager.reqDist
eth.ApiBackend = &LesApiBackend{eth, nil}
eth.ApiBackend.gpo = gasprice.NewLightPriceOracle(eth.ApiBackend)
diff --git a/les/distributor.go b/les/distributor.go
new file mode 100644
index 000000000..c59b36146
--- /dev/null
+++ b/les/distributor.go
@@ -0,0 +1,259 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see .
+
+// Package light implements on-demand retrieval capable state and chain objects
+// for the Ethereum Light Client.
+package les
+
+import (
+ "container/list"
+ "errors"
+ "sync"
+ "time"
+)
+
+// ErrNoPeers is returned if no peers capable of serving a queued request are available
+var ErrNoPeers = errors.New("no suitable peers available")
+
+// requestDistributor implements a mechanism that distributes requests to
+// suitable peers, obeying flow control rules and prioritizing them in creation
+// order (even when a resend is necessary).
+type requestDistributor struct {
+ reqQueue *list.List
+ lastReqOrder uint64
+ stopChn, loopChn chan struct{}
+ loopNextSent bool
+ lock sync.Mutex
+
+ getAllPeers func() map[distPeer]struct{}
+}
+
+// distPeer is an LES server peer interface for the request distributor.
+// waitBefore returns either the necessary waiting time before sending a request
+// with the given upper estimated cost or the estimated remaining relative buffer
+// value after sending such a request (in which case the request can be sent
+// immediately). At least one of these values is always zero.
+type distPeer interface {
+ waitBefore(uint64) (time.Duration, float64)
+ canQueue() bool
+ queueSend(f func())
+}
+
+// distReq is the request abstraction used by the distributor. It is based on
+// three callback functions:
+// - getCost returns the upper estimate of the cost of sending the request to a given peer
+// - canSend tells if the server peer is suitable to serve the request
+// - request prepares sending the request to the given peer and returns a function that
+// does the actual sending. Request order should be preserved but the callback itself should not
+// block until it is sent because other peers might still be able to receive requests while
+// one of them is blocking. Instead, the returned function is put in the peer's send queue.
+type distReq struct {
+ getCost func(distPeer) uint64
+ canSend func(distPeer) bool
+ request func(distPeer) func()
+
+ reqOrder uint64
+ sentChn chan distPeer
+ element *list.Element
+}
+
+// newRequestDistributor creates a new request distributor
+func newRequestDistributor(getAllPeers func() map[distPeer]struct{}, stopChn chan struct{}) *requestDistributor {
+ r := &requestDistributor{
+ reqQueue: list.New(),
+ loopChn: make(chan struct{}, 2),
+ stopChn: stopChn,
+ getAllPeers: getAllPeers,
+ }
+ go r.loop()
+ return r
+}
+
+// distMaxWait is the maximum waiting time after which further necessary waiting
+// times are recalculated based on new feedback from the servers
+const distMaxWait = time.Millisecond * 10
+
+// main event loop
+func (d *requestDistributor) loop() {
+ for {
+ select {
+ case <-d.stopChn:
+ d.lock.Lock()
+ elem := d.reqQueue.Front()
+ for elem != nil {
+ close(elem.Value.(*distReq).sentChn)
+ elem = elem.Next()
+ }
+ d.lock.Unlock()
+ return
+ case <-d.loopChn:
+ d.lock.Lock()
+ d.loopNextSent = false
+ loop:
+ for {
+ peer, req, wait := d.nextRequest()
+ if req != nil && wait == 0 {
+ chn := req.sentChn // save sentChn because remove sets it to nil
+ d.remove(req)
+ send := req.request(peer)
+ if send != nil {
+ peer.queueSend(send)
+ }
+ chn <- peer
+ close(chn)
+ } else {
+ if wait == 0 {
+ // no request to send and nothing to wait for; the next
+ // queued request will wake up the loop
+ break loop
+ }
+ d.loopNextSent = true // a "next" signal has been sent, do not send another one until this one has been received
+ if wait > distMaxWait {
+ // waiting times may be reduced by incoming request replies, if it is too long, recalculate it periodically
+ wait = distMaxWait
+ }
+ go func() {
+ time.Sleep(wait)
+ d.loopChn <- struct{}{}
+ }()
+ break loop
+ }
+ }
+ d.lock.Unlock()
+ }
+ }
+}
+
+// selectPeerItem represents a peer to be selected for a request by weightedRandomSelect
+type selectPeerItem struct {
+ peer distPeer
+ req *distReq
+ weight int64
+}
+
+// Weight implements wrsItem interface
+func (sp selectPeerItem) Weight() int64 {
+ return sp.weight
+}
+
+// nextRequest returns the next possible request from any peer, along with the
+// associated peer and necessary waiting time
+func (d *requestDistributor) nextRequest() (distPeer, *distReq, time.Duration) {
+ peers := d.getAllPeers()
+
+ elem := d.reqQueue.Front()
+ var (
+ bestPeer distPeer
+ bestReq *distReq
+ bestWait time.Duration
+ sel *weightedRandomSelect
+ )
+
+ for (len(peers) > 0 || elem == d.reqQueue.Front()) && elem != nil {
+ req := elem.Value.(*distReq)
+ canSend := false
+ for peer, _ := range peers {
+ if peer.canQueue() && req.canSend(peer) {
+ canSend = true
+ cost := req.getCost(peer)
+ wait, bufRemain := peer.waitBefore(cost)
+ if wait == 0 {
+ if sel == nil {
+ sel = newWeightedRandomSelect()
+ }
+ sel.update(selectPeerItem{peer: peer, req: req, weight: int64(bufRemain*1000000) + 1})
+ } else {
+ if bestReq == nil || wait < bestWait {
+ bestPeer = peer
+ bestReq = req
+ bestWait = wait
+ }
+ }
+ delete(peers, peer)
+ }
+ }
+ next := elem.Next()
+ if !canSend && elem == d.reqQueue.Front() {
+ close(req.sentChn)
+ d.remove(req)
+ }
+ elem = next
+ }
+
+ if sel != nil {
+ c := sel.choose().(selectPeerItem)
+ return c.peer, c.req, 0
+ }
+ return bestPeer, bestReq, bestWait
+}
+
+// queue adds a request to the distribution queue, returns a channel where the
+// receiving peer is sent once the request has been sent (request callback returned).
+// If the request is cancelled or timed out without suitable peers, the channel is
+// closed without sending any peer references to it.
+func (d *requestDistributor) queue(r *distReq) chan distPeer {
+ d.lock.Lock()
+ defer d.lock.Unlock()
+
+ if r.reqOrder == 0 {
+ d.lastReqOrder++
+ r.reqOrder = d.lastReqOrder
+ }
+
+ back := d.reqQueue.Back()
+ if back == nil || r.reqOrder > back.Value.(*distReq).reqOrder {
+ r.element = d.reqQueue.PushBack(r)
+ } else {
+ before := d.reqQueue.Front()
+ for before.Value.(*distReq).reqOrder < r.reqOrder {
+ before = before.Next()
+ }
+ r.element = d.reqQueue.InsertBefore(r, before)
+ }
+
+ if !d.loopNextSent {
+ d.loopNextSent = true
+ d.loopChn <- struct{}{}
+ }
+
+ r.sentChn = make(chan distPeer, 1)
+ return r.sentChn
+}
+
+// cancel removes a request from the queue if it has not been sent yet (returns
+// false if it has been sent already). It is guaranteed that the callback functions
+// will not be called after cancel returns.
+func (d *requestDistributor) cancel(r *distReq) bool {
+ d.lock.Lock()
+ defer d.lock.Unlock()
+
+ if r.sentChn == nil {
+ return false
+ }
+
+ close(r.sentChn)
+ d.remove(r)
+ return true
+}
+
+// remove removes a request from the queue
+func (d *requestDistributor) remove(r *distReq) {
+ r.sentChn = nil
+ if r.element != nil {
+ d.reqQueue.Remove(r.element)
+ r.element = nil
+ }
+}
diff --git a/les/distributor_test.go b/les/distributor_test.go
new file mode 100644
index 000000000..f2eb80729
--- /dev/null
+++ b/les/distributor_test.go
@@ -0,0 +1,192 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see .
+
+// Package light implements on-demand retrieval capable state and chain objects
+// for the Ethereum Light Client.
+package les
+
+import (
+ "math/rand"
+ "sync"
+ "testing"
+ "time"
+)
+
+type testDistReq struct {
+ cost, procTime, order uint64
+ canSendTo map[*testDistPeer]struct{}
+}
+
+func (r *testDistReq) getCost(dp distPeer) uint64 {
+ return r.cost
+}
+
+func (r *testDistReq) canSend(dp distPeer) bool {
+ _, ok := r.canSendTo[dp.(*testDistPeer)]
+ return ok
+}
+
+func (r *testDistReq) request(dp distPeer) func() {
+ return func() { dp.(*testDistPeer).send(r) }
+}
+
+type testDistPeer struct {
+ sent []*testDistReq
+ sumCost uint64
+ lock sync.RWMutex
+}
+
+func (p *testDistPeer) send(r *testDistReq) {
+ p.lock.Lock()
+ defer p.lock.Unlock()
+
+ p.sent = append(p.sent, r)
+ p.sumCost += r.cost
+}
+
+func (p *testDistPeer) worker(t *testing.T, checkOrder bool, stop chan struct{}) {
+ var last uint64
+ for {
+ wait := time.Millisecond
+ p.lock.Lock()
+ if len(p.sent) > 0 {
+ rq := p.sent[0]
+ wait = time.Duration(rq.procTime)
+ p.sumCost -= rq.cost
+ if checkOrder {
+ if rq.order <= last {
+ t.Errorf("Requests processed in wrong order")
+ }
+ last = rq.order
+ }
+ p.sent = p.sent[1:]
+ }
+ p.lock.Unlock()
+ select {
+ case <-stop:
+ return
+ case <-time.After(wait):
+ }
+ }
+}
+
+const (
+ testDistBufLimit = 10000000
+ testDistMaxCost = 1000000
+ testDistPeerCount = 5
+ testDistReqCount = 50000
+ testDistMaxResendCount = 3
+)
+
+func (p *testDistPeer) waitBefore(cost uint64) (time.Duration, float64) {
+ p.lock.RLock()
+ sumCost := p.sumCost + cost
+ p.lock.RUnlock()
+ if sumCost < testDistBufLimit {
+ return 0, float64(testDistBufLimit-sumCost) / float64(testDistBufLimit)
+ } else {
+ return time.Duration(sumCost - testDistBufLimit), 0
+ }
+}
+
+func (p *testDistPeer) canQueue() bool {
+ return true
+}
+
+func (p *testDistPeer) queueSend(f func()) {
+ f()
+}
+
+func TestRequestDistributor(t *testing.T) {
+ testRequestDistributor(t, false)
+}
+
+func TestRequestDistributorResend(t *testing.T) {
+ testRequestDistributor(t, true)
+}
+
+func testRequestDistributor(t *testing.T, resend bool) {
+ stop := make(chan struct{})
+ defer close(stop)
+
+ var peers [testDistPeerCount]*testDistPeer
+ for i, _ := range peers {
+ peers[i] = &testDistPeer{}
+ go peers[i].worker(t, !resend, stop)
+ }
+
+ dist := newRequestDistributor(func() map[distPeer]struct{} {
+ m := make(map[distPeer]struct{})
+ for _, peer := range peers {
+ m[peer] = struct{}{}
+ }
+ return m
+ }, stop)
+
+ var wg sync.WaitGroup
+
+ for i := 1; i <= testDistReqCount; i++ {
+ cost := uint64(rand.Int63n(testDistMaxCost))
+ procTime := uint64(rand.Int63n(int64(cost + 1)))
+ rq := &testDistReq{
+ cost: cost,
+ procTime: procTime,
+ order: uint64(i),
+ canSendTo: make(map[*testDistPeer]struct{}),
+ }
+ for _, peer := range peers {
+ if rand.Intn(2) != 0 {
+ rq.canSendTo[peer] = struct{}{}
+ }
+ }
+
+ wg.Add(1)
+ req := &distReq{
+ getCost: rq.getCost,
+ canSend: rq.canSend,
+ request: rq.request,
+ }
+ chn := dist.queue(req)
+ go func() {
+ cnt := 1
+ if resend && len(rq.canSendTo) != 0 {
+ cnt = rand.Intn(testDistMaxResendCount) + 1
+ }
+ for i := 0; i < cnt; i++ {
+ if i != 0 {
+ chn = dist.queue(req)
+ }
+ p := <-chn
+ if p == nil {
+ if len(rq.canSendTo) != 0 {
+ t.Errorf("Request that could have been sent was dropped")
+ }
+ } else {
+ peer := p.(*testDistPeer)
+ if _, ok := rq.canSendTo[peer]; !ok {
+ t.Errorf("Request sent to wrong peer")
+ }
+ }
+ }
+ wg.Done()
+ }()
+ if rand.Intn(1000) == 0 {
+ time.Sleep(time.Duration(rand.Intn(5000000)))
+ }
+ }
+
+ wg.Wait()
+}
diff --git a/les/execqueue.go b/les/execqueue.go
new file mode 100644
index 000000000..ac779003b
--- /dev/null
+++ b/les/execqueue.go
@@ -0,0 +1,71 @@
+// Copyright 2017 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see .
+
+package les
+
+import (
+ "sync/atomic"
+)
+
+// ExecQueue implements a queue that executes function calls in a single thread,
+// in the same order as they have been queued.
+type execQueue struct {
+ chn chan func()
+ cnt, stop, capacity int32
+}
+
+// NewExecQueue creates a new execution queue.
+func newExecQueue(capacity int32) *execQueue {
+ q := &execQueue{
+ chn: make(chan func(), capacity),
+ capacity: capacity,
+ }
+ go q.loop()
+ return q
+}
+
+func (q *execQueue) loop() {
+ for f := range q.chn {
+ atomic.AddInt32(&q.cnt, -1)
+ if atomic.LoadInt32(&q.stop) != 0 {
+ return
+ }
+ f()
+ }
+}
+
+// CanQueue returns true if more function calls can be added to the execution queue.
+func (q *execQueue) canQueue() bool {
+ return atomic.LoadInt32(&q.stop) == 0 && atomic.LoadInt32(&q.cnt) < q.capacity
+}
+
+// Queue adds a function call to the execution queue. Returns true if successful.
+func (q *execQueue) queue(f func()) bool {
+ if atomic.LoadInt32(&q.stop) != 0 {
+ return false
+ }
+ if atomic.AddInt32(&q.cnt, 1) > q.capacity {
+ atomic.AddInt32(&q.cnt, -1)
+ return false
+ }
+ q.chn <- f
+ return true
+}
+
+// Stop stops the exec queue.
+func (q *execQueue) quit() {
+ atomic.StoreInt32(&q.stop, 1)
+}
diff --git a/les/fetcher.go b/les/fetcher.go
index f9e517d25..353e91932 100644
--- a/les/fetcher.go
+++ b/les/fetcher.go
@@ -135,35 +135,38 @@ func (f *lightFetcher) syncLoop() {
f.lock.Lock()
s := requesting
requesting = false
+ var (
+ rq *distReq
+ reqID uint64
+ )
if !f.syncing && !(newAnnounce && s) {
- reqID := getNextReqID()
- if peer, node, amount, retry := f.nextRequest(reqID); node != nil {
- requesting = true
- if reqID, ok := f.request(peer, reqID, node, amount); ok {
- go func() {
- time.Sleep(softRequestTimeout)
- f.reqMu.Lock()
- req, ok := f.requested[reqID]
- if ok {
- req.timeout = true
- f.requested[reqID] = req
- }
- f.reqMu.Unlock()
- // keep starting new requests while possible
- f.requestChn <- false
- }()
- }
- } else {
- if retry {
- requesting = true
- go func() {
- time.Sleep(time.Millisecond * 100)
- f.requestChn <- false
- }()
- }
+ rq, reqID = f.nextRequest()
+ }
+ syncing := f.syncing
+ f.lock.Unlock()
+
+ if rq != nil {
+ requesting = true
+ _, ok := <-f.pm.reqDist.queue(rq)
+ if !ok {
+ f.requestChn <- false
+ }
+
+ if !syncing {
+ go func() {
+ time.Sleep(softRequestTimeout)
+ f.reqMu.Lock()
+ req, ok := f.requested[reqID]
+ if ok {
+ req.timeout = true
+ f.requested[reqID] = req
+ }
+ f.reqMu.Unlock()
+ // keep starting new requests while possible
+ f.requestChn <- false
+ }()
}
}
- f.lock.Unlock()
case reqID := <-f.timeoutChn:
f.reqMu.Lock()
req, ok := f.requested[reqID]
@@ -334,6 +337,12 @@ func (f *lightFetcher) peerHasBlock(p *peer, hash common.Hash, number uint64) bo
f.lock.Lock()
defer f.lock.Unlock()
+ if f.syncing {
+ // always return true when syncing
+ // false positives are acceptable, a more sophisticated condition can be implemented later
+ return true
+ }
+
fp := f.peers[p]
if fp == nil || fp.root == nil {
return false
@@ -346,43 +355,13 @@ func (f *lightFetcher) peerHasBlock(p *peer, hash common.Hash, number uint64) bo
f.chain.LockChain()
defer f.chain.UnlockChain()
// if it's older than the peer's block tree root but it's in the same canonical chain
- // than the root, we can still be sure the peer knows it
+ // as the root, we can still be sure the peer knows it
+ //
+ // when syncing, just check if it is part of the known chain, there is nothing better we
+ // can do since we do not know the most recent block hash yet
return core.GetCanonicalHash(f.pm.chainDb, fp.root.number) == fp.root.hash && core.GetCanonicalHash(f.pm.chainDb, number) == hash
}
-// request initiates a header download request from a certain peer
-func (f *lightFetcher) request(p *peer, reqID uint64, n *fetcherTreeNode, amount uint64) (uint64, bool) {
- fp := f.peers[p]
- if fp == nil {
- p.Log().Debug("Requesting from unknown peer")
- p.fcServer.DeassignRequest(reqID)
- return 0, false
- }
- if fp.bestConfirmed == nil || fp.root == nil || !f.checkKnownNode(p, fp.root) {
- f.syncing = true
- go func() {
- p.Log().Debug("Synchronisation started")
- f.pm.synchronise(p)
- f.syncDone <- p
- }()
- p.fcServer.DeassignRequest(reqID)
- return 0, false
- }
-
- n.requested = true
- cost := p.GetRequestCost(GetBlockHeadersMsg, int(amount))
- p.fcServer.SendRequest(reqID, cost)
- f.reqMu.Lock()
- f.requested[reqID] = fetchRequest{hash: n.hash, amount: amount, peer: p, sent: mclock.Now()}
- f.reqMu.Unlock()
- go p.RequestHeadersByHash(reqID, cost, n.hash, int(amount), 0, true)
- go func() {
- time.Sleep(hardRequestTimeout)
- f.timeoutChn <- reqID
- }()
- return reqID, true
-}
-
// requestAmount calculates the amount of headers to be downloaded starting
// from a certain head backwards
func (f *lightFetcher) requestAmount(p *peer, n *fetcherTreeNode) uint64 {
@@ -408,12 +387,13 @@ func (f *lightFetcher) requestedID(reqID uint64) bool {
// nextRequest selects the peer and announced head to be requested next, amount
// to be downloaded starting from the head backwards is also returned
-func (f *lightFetcher) nextRequest(reqID uint64) (*peer, *fetcherTreeNode, uint64, bool) {
+func (f *lightFetcher) nextRequest() (*distReq, uint64) {
var (
bestHash common.Hash
bestAmount uint64
)
bestTd := f.maxConfirmedTd
+ bestSyncing := false
for p, fp := range f.peers {
for hash, n := range fp.nodeByHash {
@@ -423,29 +403,83 @@ func (f *lightFetcher) nextRequest(reqID uint64) (*peer, *fetcherTreeNode, uint6
bestHash = hash
bestAmount = amount
bestTd = n.td
+ bestSyncing = fp.bestConfirmed == nil || fp.root == nil || !f.checkKnownNode(p, fp.root)
}
}
}
}
if bestTd == f.maxConfirmedTd {
- return nil, nil, 0, false
+ return nil, 0
}
- peer, _, locked := f.pm.serverPool.selectPeer(reqID, func(p *peer) (bool, time.Duration) {
- fp := f.peers[p]
- if fp == nil || fp.nodeByHash[bestHash] == nil {
- return false, 0
+ f.syncing = bestSyncing
+
+ var rq *distReq
+ reqID := getNextReqID()
+ if f.syncing {
+ rq = &distReq{
+ getCost: func(dp distPeer) uint64 {
+ return 0
+ },
+ canSend: func(dp distPeer) bool {
+ p := dp.(*peer)
+ fp := f.peers[p]
+ return fp != nil && fp.nodeByHash[bestHash] != nil
+ },
+ request: func(dp distPeer) func() {
+ go func() {
+ p := dp.(*peer)
+ p.Log().Debug("Synchronisation started")
+ f.pm.synchronise(p)
+ f.syncDone <- p
+ }()
+ return nil
+ },
+ }
+ } else {
+ rq = &distReq{
+ getCost: func(dp distPeer) uint64 {
+ p := dp.(*peer)
+ return p.GetRequestCost(GetBlockHeadersMsg, int(bestAmount))
+ },
+ canSend: func(dp distPeer) bool {
+ p := dp.(*peer)
+ f.lock.Lock()
+ defer f.lock.Unlock()
+
+ fp := f.peers[p]
+ if fp == nil {
+ return false
+ }
+ n := fp.nodeByHash[bestHash]
+ return n != nil && !n.requested
+ },
+ request: func(dp distPeer) func() {
+ p := dp.(*peer)
+ f.lock.Lock()
+ fp := f.peers[p]
+ if fp != nil {
+ n := fp.nodeByHash[bestHash]
+ if n != nil {
+ n.requested = true
+ }
+ }
+ f.lock.Unlock()
+
+ cost := p.GetRequestCost(GetBlockHeadersMsg, int(bestAmount))
+ p.fcServer.QueueRequest(reqID, cost)
+ f.reqMu.Lock()
+ f.requested[reqID] = fetchRequest{hash: bestHash, amount: bestAmount, peer: p, sent: mclock.Now()}
+ f.reqMu.Unlock()
+ go func() {
+ time.Sleep(hardRequestTimeout)
+ f.timeoutChn <- reqID
+ }()
+ return func() { p.RequestHeadersByHash(reqID, cost, bestHash, int(bestAmount), 0, true) }
+ },
}
- return true, p.fcServer.CanSend(p.GetRequestCost(GetBlockHeadersMsg, int(bestAmount)))
- })
- if !locked {
- return nil, nil, 0, true
}
- var node *fetcherTreeNode
- if peer != nil {
- node = f.peers[peer].nodeByHash[bestHash]
- }
- return peer, node, bestAmount, false
+ return rq, reqID
}
// deliverHeaders delivers header download request responses for processing
diff --git a/les/flowcontrol/control.go b/les/flowcontrol/control.go
index e45537cf5..e40e69346 100644
--- a/les/flowcontrol/control.go
+++ b/les/flowcontrol/control.go
@@ -94,14 +94,12 @@ func (peer *ClientNode) RequestProcessed(cost uint64) (bv, realCost uint64) {
}
type ServerNode struct {
- bufEstimate uint64
- lastTime mclock.AbsTime
- params *ServerParams
- sumCost uint64 // sum of req costs sent to this server
- pending map[uint64]uint64 // value = sumCost after sending the given req
- assignedRequest uint64 // when != 0, only the request with the given ID can be sent to this peer
- assignToken chan struct{} // send to this channel before assigning, read from it after deassigning
- lock sync.RWMutex
+ bufEstimate uint64
+ lastTime mclock.AbsTime
+ params *ServerParams
+ sumCost uint64 // sum of req costs sent to this server
+ pending map[uint64]uint64 // value = sumCost after sending the given req
+ lock sync.RWMutex
}
func NewServerNode(params *ServerParams) *ServerNode {
@@ -110,7 +108,6 @@ func NewServerNode(params *ServerParams) *ServerNode {
lastTime: mclock.Now(),
params: params,
pending: make(map[uint64]uint64),
- assignToken: make(chan struct{}, 1),
}
}
@@ -127,94 +124,37 @@ func (peer *ServerNode) recalcBLE(time mclock.AbsTime) {
}
// safetyMargin is added to the flow control waiting time when estimated buffer value is low
-const safetyMargin = time.Millisecond * 200
+const safetyMargin = time.Millisecond
-func (peer *ServerNode) canSend(maxCost uint64) time.Duration {
+func (peer *ServerNode) canSend(maxCost uint64) (time.Duration, float64) {
+ peer.recalcBLE(mclock.Now())
maxCost += uint64(safetyMargin) * peer.params.MinRecharge / uint64(fcTimeConst)
if maxCost > peer.params.BufLimit {
maxCost = peer.params.BufLimit
}
if peer.bufEstimate >= maxCost {
- return 0
+ return 0, float64(peer.bufEstimate-maxCost) / float64(peer.params.BufLimit)
}
- return time.Duration((maxCost - peer.bufEstimate) * uint64(fcTimeConst) / peer.params.MinRecharge)
+ return time.Duration((maxCost - peer.bufEstimate) * uint64(fcTimeConst) / peer.params.MinRecharge), 0
}
// CanSend returns the minimum waiting time required before sending a request
-// with the given maximum estimated cost
-func (peer *ServerNode) CanSend(maxCost uint64) time.Duration {
+// with the given maximum estimated cost. Second return value is the relative
+// estimated buffer level after sending the request (divided by BufLimit).
+func (peer *ServerNode) CanSend(maxCost uint64) (time.Duration, float64) {
peer.lock.RLock()
defer peer.lock.RUnlock()
return peer.canSend(maxCost)
}
-// AssignRequest tries to assign the server node to the given request, guaranteeing
-// that once it returns true, no request will be sent to the node before this one
-func (peer *ServerNode) AssignRequest(reqID uint64) bool {
- select {
- case peer.assignToken <- struct{}{}:
- default:
- return false
- }
- peer.lock.Lock()
- peer.assignedRequest = reqID
- peer.lock.Unlock()
- return true
-}
-
-// MustAssignRequest waits until the node can be assigned to the given request.
-// It is always guaranteed that assignments are released in a short amount of time.
-func (peer *ServerNode) MustAssignRequest(reqID uint64) {
- peer.assignToken <- struct{}{}
- peer.lock.Lock()
- peer.assignedRequest = reqID
- peer.lock.Unlock()
-}
-
-// DeassignRequest releases a request assignment in case the planned request
-// is not being sent.
-func (peer *ServerNode) DeassignRequest(reqID uint64) {
- peer.lock.Lock()
- if peer.assignedRequest == reqID {
- peer.assignedRequest = 0
- <-peer.assignToken
- }
- peer.lock.Unlock()
-}
-
-// IsAssigned returns true if the server node has already been assigned to a request
-// (note that this function returning false does not guarantee that you can assign a request
-// immediately afterwards, its only purpose is to help peer selection)
-func (peer *ServerNode) IsAssigned() bool {
- peer.lock.RLock()
- locked := peer.assignedRequest != 0
- peer.lock.RUnlock()
- return locked
-}
-
-// blocks until request can be sent
-func (peer *ServerNode) SendRequest(reqID, maxCost uint64) {
+// QueueRequest should be called when the request has been assigned to the given
+// server node, before putting it in the send queue. It is mandatory that requests
+// are sent in the same order as the QueueRequest calls are made.
+func (peer *ServerNode) QueueRequest(reqID, maxCost uint64) {
peer.lock.Lock()
defer peer.lock.Unlock()
- if peer.assignedRequest != reqID {
- peer.lock.Unlock()
- peer.MustAssignRequest(reqID)
- peer.lock.Lock()
- }
-
- peer.recalcBLE(mclock.Now())
- wait := peer.canSend(maxCost)
- for wait > 0 {
- peer.lock.Unlock()
- time.Sleep(wait)
- peer.lock.Lock()
- peer.recalcBLE(mclock.Now())
- wait = peer.canSend(maxCost)
- }
- peer.assignedRequest = 0
- <-peer.assignToken
peer.bufEstimate -= maxCost
peer.sumCost += maxCost
if reqID >= 0 {
@@ -222,6 +162,8 @@ func (peer *ServerNode) SendRequest(reqID, maxCost uint64) {
}
}
+// GotReply adjusts estimated buffer value according to the value included in
+// the latest request reply.
func (peer *ServerNode) GotReply(reqID, bv uint64) {
peer.lock.Lock()
@@ -235,6 +177,10 @@ func (peer *ServerNode) GotReply(reqID, bv uint64) {
return
}
delete(peer.pending, reqID)
- peer.bufEstimate = bv - (peer.sumCost - sc)
+ cc := peer.sumCost - sc
+ peer.bufEstimate = 0
+ if bv > cc {
+ peer.bufEstimate = bv - cc
+ }
peer.lastTime = mclock.Now()
}
diff --git a/les/handler.go b/les/handler.go
index 4271da8b8..ece2060ee 100644
--- a/les/handler.go
+++ b/les/handler.go
@@ -102,6 +102,7 @@ type ProtocolManager struct {
odr *LesOdr
server *LesServer
serverPool *serverPool
+ reqDist *requestDistributor
downloader *downloader.Downloader
fetcher *lightFetcher
@@ -203,8 +204,17 @@ func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, network
blockchain.InsertHeaderChain, nil, nil, blockchain.Rollback, removePeer)
}
+ manager.reqDist = newRequestDistributor(func() map[distPeer]struct{} {
+ m := make(map[distPeer]struct{})
+ peers := manager.peers.AllPeers()
+ for _, peer := range peers {
+ m[peer] = struct{}{}
+ }
+ return m
+ }, manager.quitSync)
if odr != nil {
odr.removePeer = removePeer
+ odr.reqDist = manager.reqDist
}
/*validator := func(block *types.Block, parent *types.Block) error {
@@ -334,17 +344,49 @@ func (pm *ProtocolManager) handle(p *peer) error {
if pm.lightSync {
requestHeadersByHash := func(origin common.Hash, amount int, skip int, reverse bool) error {
reqID := getNextReqID()
- cost := p.GetRequestCost(GetBlockHeadersMsg, amount)
- p.fcServer.MustAssignRequest(reqID)
- p.fcServer.SendRequest(reqID, cost)
- return p.RequestHeadersByHash(reqID, cost, origin, amount, skip, reverse)
+ rq := &distReq{
+ getCost: func(dp distPeer) uint64 {
+ peer := dp.(*peer)
+ return peer.GetRequestCost(GetBlockHeadersMsg, amount)
+ },
+ canSend: func(dp distPeer) bool {
+ return dp.(*peer) == p
+ },
+ request: func(dp distPeer) func() {
+ peer := dp.(*peer)
+ cost := peer.GetRequestCost(GetBlockHeadersMsg, amount)
+ peer.fcServer.QueueRequest(reqID, cost)
+ return func() { peer.RequestHeadersByHash(reqID, cost, origin, amount, skip, reverse) }
+ },
+ }
+ _, ok := <-pm.reqDist.queue(rq)
+ if !ok {
+ return ErrNoPeers
+ }
+ return nil
}
requestHeadersByNumber := func(origin uint64, amount int, skip int, reverse bool) error {
reqID := getNextReqID()
- cost := p.GetRequestCost(GetBlockHeadersMsg, amount)
- p.fcServer.MustAssignRequest(reqID)
- p.fcServer.SendRequest(reqID, cost)
- return p.RequestHeadersByNumber(reqID, cost, origin, amount, skip, reverse)
+ rq := &distReq{
+ getCost: func(dp distPeer) uint64 {
+ peer := dp.(*peer)
+ return peer.GetRequestCost(GetBlockHeadersMsg, amount)
+ },
+ canSend: func(dp distPeer) bool {
+ return dp.(*peer) == p
+ },
+ request: func(dp distPeer) func() {
+ peer := dp.(*peer)
+ cost := peer.GetRequestCost(GetBlockHeadersMsg, amount)
+ peer.fcServer.QueueRequest(reqID, cost)
+ return func() { peer.RequestHeadersByNumber(reqID, cost, origin, amount, skip, reverse) }
+ },
+ }
+ _, ok := <-pm.reqDist.queue(rq)
+ if !ok {
+ return ErrNoPeers
+ }
+ return nil
}
if err := pm.downloader.RegisterPeer(p.id, ethVersion, p.HeadAndTd,
requestHeadersByHash, requestHeadersByNumber, nil, nil, nil); err != nil {
@@ -884,7 +926,13 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
if deliverMsg != nil {
- return pm.odr.Deliver(p, deliverMsg)
+ err := pm.odr.Deliver(p, deliverMsg)
+ if err != nil {
+ p.responseErrors++
+ if p.responseErrors > maxResponseErrors {
+ return err
+ }
+ }
}
return nil
}
diff --git a/les/helper_test.go b/les/helper_test.go
index f6293ad1a..3e8ce57b6 100644
--- a/les/helper_test.go
+++ b/les/helper_test.go
@@ -352,11 +352,15 @@ func (p *testServerPool) setPeer(peer *peer) {
p.peer = peer
}
-func (p *testServerPool) selectPeerWait(uint64, func(*peer) (bool, time.Duration), <-chan struct{}) *peer {
+func (p *testServerPool) getAllPeers() map[distPeer]struct{} {
p.lock.RLock()
defer p.lock.RUnlock()
- return p.peer
+ m := make(map[distPeer]struct{})
+ if p.peer != nil {
+ m[p.peer] = struct{}{}
+ }
+ return m
}
func (p *testServerPool) adjustResponseTime(*poolEntry, time.Duration, bool) {
diff --git a/les/odr.go b/les/odr.go
index afc894ab5..06b44d318 100644
--- a/les/odr.go
+++ b/les/odr.go
@@ -32,14 +32,12 @@ import (
var (
softRequestTimeout = time.Millisecond * 500
hardRequestTimeout = time.Second * 10
- retryPeers = time.Second * 1
)
// peerDropFn is a callback type for dropping a peer detected as malicious.
type peerDropFn func(id string)
type odrPeerSelector interface {
- selectPeerWait(uint64, func(*peer) (bool, time.Duration), <-chan struct{}) *peer
adjustResponseTime(*poolEntry, time.Duration, bool)
}
@@ -51,6 +49,7 @@ type LesOdr struct {
mlock, clock sync.Mutex
sentReqs map[uint64]*sentReq
serverPool odrPeerSelector
+ reqDist *requestDistributor
}
func NewLesOdr(db ethdb.Database) *LesOdr {
@@ -165,18 +164,48 @@ func (self *LesOdr) requestPeer(req *sentReq, peer *peer, delivered, timeout cha
func (self *LesOdr) networkRequest(ctx context.Context, lreq LesOdrRequest) error {
answered := make(chan struct{})
req := &sentReq{
- valFunc: lreq.Valid,
+ valFunc: lreq.Validate,
sentTo: make(map[*peer]chan struct{}),
answered: answered, // reply delivered by any peer
}
- reqID := getNextReqID()
- self.mlock.Lock()
- self.sentReqs[reqID] = req
- self.mlock.Unlock()
+
+ exclude := make(map[*peer]struct{})
reqWg := new(sync.WaitGroup)
reqWg.Add(1)
defer reqWg.Done()
+
+ var timeout chan struct{}
+ reqID := getNextReqID()
+ rq := &distReq{
+ getCost: func(dp distPeer) uint64 {
+ return lreq.GetCost(dp.(*peer))
+ },
+ canSend: func(dp distPeer) bool {
+ p := dp.(*peer)
+ _, ok := exclude[p]
+ return !ok && lreq.CanSend(p)
+ },
+ request: func(dp distPeer) func() {
+ p := dp.(*peer)
+ exclude[p] = struct{}{}
+ delivered := make(chan struct{})
+ timeout = make(chan struct{})
+ req.lock.Lock()
+ req.sentTo[p] = delivered
+ req.lock.Unlock()
+ reqWg.Add(1)
+ cost := lreq.GetCost(p)
+ p.fcServer.QueueRequest(reqID, cost)
+ go self.requestPeer(req, p, delivered, timeout, reqWg)
+ return func() { lreq.Request(reqID, p) }
+ },
+ }
+
+ self.mlock.Lock()
+ self.sentReqs[reqID] = req
+ self.mlock.Unlock()
+
go func() {
reqWg.Wait()
self.mlock.Lock()
@@ -184,50 +213,32 @@ func (self *LesOdr) networkRequest(ctx context.Context, lreq LesOdrRequest) erro
self.mlock.Unlock()
}()
- exclude := make(map[*peer]struct{})
for {
- var p *peer
- if self.serverPool != nil {
- p = self.serverPool.selectPeerWait(reqID, func(p *peer) (bool, time.Duration) {
- if _, ok := exclude[p]; ok || !lreq.CanSend(p) {
- return false, 0
- }
- return true, p.fcServer.CanSend(lreq.GetCost(p))
- }, ctx.Done())
+ peerChn := self.reqDist.queue(rq)
+ select {
+ case <-ctx.Done():
+ self.reqDist.cancel(rq)
+ return ctx.Err()
+ case <-answered:
+ self.reqDist.cancel(rq)
+ return nil
+ case _, ok := <-peerChn:
+ if !ok {
+ return ErrNoPeers
+ }
}
- if p == nil {
- select {
- case <-ctx.Done():
- return ctx.Err()
- case <-req.answered:
- return nil
- case <-time.After(retryPeers):
- }
- } else {
- exclude[p] = struct{}{}
- delivered := make(chan struct{})
- timeout := make(chan struct{})
- req.lock.Lock()
- req.sentTo[p] = delivered
- req.lock.Unlock()
- reqWg.Add(1)
- cost := lreq.GetCost(p)
- p.fcServer.SendRequest(reqID, cost)
- go self.requestPeer(req, p, delivered, timeout, reqWg)
- lreq.Request(reqID, p)
- select {
- case <-ctx.Done():
- return ctx.Err()
- case <-answered:
- return nil
- case <-timeout:
- }
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case <-answered:
+ return nil
+ case <-timeout:
}
}
}
-// Retrieve tries to fetch an object from the local db, then from the LES network.
+// Retrieve tries to fetch an object from the LES network.
// If the network retrieval was successful, it stores the object in local db.
func (self *LesOdr) Retrieve(ctx context.Context, req light.OdrRequest) (err error) {
lreq := LesRequest(req)
diff --git a/les/odr_requests.go b/les/odr_requests.go
index 53aced93c..1f853b341 100644
--- a/les/odr_requests.go
+++ b/les/odr_requests.go
@@ -49,7 +49,7 @@ type LesOdrRequest interface {
GetCost(*peer) uint64
CanSend(*peer) bool
Request(uint64, *peer) error
- Valid(ethdb.Database, *Msg) error // if true, keeps the retrieved object
+ Validate(ethdb.Database, *Msg) error
}
func LesRequest(req light.OdrRequest) LesOdrRequest {
@@ -92,7 +92,7 @@ func (r *BlockRequest) Request(reqID uint64, peer *peer) error {
// Valid processes an ODR request reply message from the LES network
// returns true and stores results in memory if the message was a valid reply
// to the request (implementation of LesOdrRequest)
-func (r *BlockRequest) Valid(db ethdb.Database, msg *Msg) error {
+func (r *BlockRequest) Validate(db ethdb.Database, msg *Msg) error {
log.Debug("Validating block body", "hash", r.Hash)
// Ensure we have a correct message with a single block body
@@ -148,7 +148,7 @@ func (r *ReceiptsRequest) Request(reqID uint64, peer *peer) error {
// Valid processes an ODR request reply message from the LES network
// returns true and stores results in memory if the message was a valid reply
// to the request (implementation of LesOdrRequest)
-func (r *ReceiptsRequest) Valid(db ethdb.Database, msg *Msg) error {
+func (r *ReceiptsRequest) Validate(db ethdb.Database, msg *Msg) error {
log.Debug("Validating block receipts", "hash", r.Hash)
// Ensure we have a correct message with a single block receipt
@@ -208,7 +208,7 @@ func (r *TrieRequest) Request(reqID uint64, peer *peer) error {
// Valid processes an ODR request reply message from the LES network
// returns true and stores results in memory if the message was a valid reply
// to the request (implementation of LesOdrRequest)
-func (r *TrieRequest) Valid(db ethdb.Database, msg *Msg) error {
+func (r *TrieRequest) Validate(db ethdb.Database, msg *Msg) error {
log.Debug("Validating trie proof", "root", r.Id.Root, "key", r.Key)
// Ensure we have a correct message with a single proof
@@ -259,7 +259,7 @@ func (r *CodeRequest) Request(reqID uint64, peer *peer) error {
// Valid processes an ODR request reply message from the LES network
// returns true and stores results in memory if the message was a valid reply
// to the request (implementation of LesOdrRequest)
-func (r *CodeRequest) Valid(db ethdb.Database, msg *Msg) error {
+func (r *CodeRequest) Validate(db ethdb.Database, msg *Msg) error {
log.Debug("Validating code data", "hash", r.Hash)
// Ensure we have a correct message with a single code element
@@ -319,7 +319,7 @@ func (r *ChtRequest) Request(reqID uint64, peer *peer) error {
// Valid processes an ODR request reply message from the LES network
// returns true and stores results in memory if the message was a valid reply
// to the request (implementation of LesOdrRequest)
-func (r *ChtRequest) Valid(db ethdb.Database, msg *Msg) error {
+func (r *ChtRequest) Validate(db ethdb.Database, msg *Msg) error {
log.Debug("Validating CHT", "cht", r.ChtNum, "block", r.BlockNum)
// Ensure we have a correct message with a single proof element
diff --git a/les/odr_test.go b/les/odr_test.go
index 4f1fccb24..1b436b8e6 100644
--- a/les/odr_test.go
+++ b/les/odr_test.go
@@ -162,8 +162,11 @@ func testOdr(t *testing.T, protocol int, expFail uint64, fn odrTestFn) {
lpm, ldb, odr := newTestProtocolManagerMust(t, true, 0, nil)
_, err1, lpeer, err2 := newTestPeerPair("peer", protocol, pm, lpm)
pool := &testServerPool{}
+ lpm.reqDist = newRequestDistributor(pool.getAllPeers, lpm.quitSync)
+ odr.reqDist = lpm.reqDist
pool.setPeer(lpeer)
odr.serverPool = pool
+ lpeer.hasBlock = func(common.Hash, uint64) bool { return true }
select {
case <-time.After(time.Millisecond * 100):
case err := <-err1:
diff --git a/les/peer.go b/les/peer.go
index ef5f8a6ce..4793da296 100644
--- a/les/peer.go
+++ b/les/peer.go
@@ -22,6 +22,7 @@ import (
"fmt"
"math/big"
"sync"
+ "time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
@@ -37,7 +38,10 @@ var (
errNotRegistered = errors.New("peer is not registered")
)
-const maxHeadInfoLen = 20
+const (
+ maxHeadInfoLen = 20
+ maxResponseErrors = 50 // number of invalid responses tolerated (makes the protocol less brittle but still avoids spam)
+)
type peer struct {
*p2p.Peer
@@ -53,9 +57,11 @@ type peer struct {
lock sync.RWMutex
announceChn chan announceData
+ sendQueue *execQueue
- poolEntry *poolEntry
- hasBlock func(common.Hash, uint64) bool
+ poolEntry *poolEntry
+ hasBlock func(common.Hash, uint64) bool
+ responseErrors int
fcClient *flowcontrol.ClientNode // nil if the peer is server only
fcServer *flowcontrol.ServerNode // nil if the peer is client only
@@ -76,6 +82,14 @@ func newPeer(version, network int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
}
}
+func (p *peer) canQueue() bool {
+ return p.sendQueue.canQueue()
+}
+
+func (p *peer) queueSend(f func()) {
+ p.sendQueue.queue(f)
+}
+
// Info gathers and returns a collection of metadata known about a peer.
func (p *peer) Info() *eth.PeerInfo {
return ð.PeerInfo{
@@ -117,6 +131,11 @@ func (p *peer) Td() *big.Int {
return new(big.Int).Set(p.headInfo.Td)
}
+// waitBefore implements distPeer interface
+func (p *peer) waitBefore(maxCost uint64) (time.Duration, float64) {
+ return p.fcServer.CanSend(maxCost)
+}
+
func sendRequest(w p2p.MsgWriter, msgcode, reqID, cost uint64, data interface{}) error {
type req struct {
ReqID uint64
@@ -237,11 +256,8 @@ func (p *peer) RequestHeaderProofs(reqID, cost uint64, reqs []*ChtReq) error {
return sendRequest(p.rw, GetHeaderProofsMsg, reqID, cost, reqs)
}
-func (p *peer) SendTxs(cost uint64, txs types.Transactions) error {
+func (p *peer) SendTxs(reqID, cost uint64, txs types.Transactions) error {
p.Log().Debug("Fetching batch of transactions", "count", len(txs))
- reqID := getNextReqID()
- p.fcServer.MustAssignRequest(reqID)
- p.fcServer.SendRequest(reqID, cost)
return p2p.Send(p.rw, SendTxMsg, txs)
}
@@ -444,6 +460,7 @@ func (ps *peerSet) Register(p *peer) error {
return errAlreadyRegistered
}
ps.peers[p.id] = p
+ p.sendQueue = newExecQueue(100)
return nil
}
@@ -453,8 +470,10 @@ func (ps *peerSet) Unregister(id string) error {
ps.lock.Lock()
defer ps.lock.Unlock()
- if _, ok := ps.peers[id]; !ok {
+ if p, ok := ps.peers[id]; !ok {
return errNotRegistered
+ } else {
+ p.sendQueue.quit()
}
delete(ps.peers, id)
return nil
diff --git a/les/request_test.go b/les/request_test.go
index 10e9edf8b..bec6bf1bc 100644
--- a/les/request_test.go
+++ b/les/request_test.go
@@ -72,8 +72,11 @@ func testAccess(t *testing.T, protocol int, fn accessTestFn) {
lpm, ldb, odr := newTestProtocolManagerMust(t, true, 0, nil)
_, err1, lpeer, err2 := newTestPeerPair("peer", protocol, pm, lpm)
pool := &testServerPool{}
+ lpm.reqDist = newRequestDistributor(pool.getAllPeers, lpm.quitSync)
+ odr.reqDist = lpm.reqDist
pool.setPeer(lpeer)
odr.serverPool = pool
+ lpeer.hasBlock = func(common.Hash, uint64) bool { return true }
select {
case <-time.After(time.Millisecond * 100):
case err := <-err1:
diff --git a/les/serverpool.go b/les/serverpool.go
index 55d481dbf..64fe991c6 100644
--- a/les/serverpool.go
+++ b/les/serverpool.go
@@ -268,82 +268,6 @@ func (pool *serverPool) adjustResponseTime(entry *poolEntry, time time.Duration,
}
}
-type selectPeerItem struct {
- peer *peer
- weight int64
- wait time.Duration
-}
-
-func (sp selectPeerItem) Weight() int64 {
- return sp.weight
-}
-
-// selectPeer selects a suitable peer for a request, also returning a necessary waiting time to perform the request
-// and a "locked" flag meaning that the request has been assigned to the given peer and its execution is guaranteed
-// after the given waiting time. If locked flag is false, selectPeer should be called again after the waiting time.
-func (pool *serverPool) selectPeer(reqID uint64, canSend func(*peer) (bool, time.Duration)) (*peer, time.Duration, bool) {
- pool.lock.Lock()
- type selectPeer struct {
- peer *peer
- rstat, tstat float64
- }
- var list []selectPeer
- sel := newWeightedRandomSelect()
- for _, entry := range pool.entries {
- if entry.state == psRegistered {
- if !entry.peer.fcServer.IsAssigned() {
- list = append(list, selectPeer{entry.peer, entry.responseStats.recentAvg(), entry.timeoutStats.recentAvg()})
- }
- }
- }
- pool.lock.Unlock()
-
- for _, sp := range list {
- ok, wait := canSend(sp.peer)
- if ok {
- w := int64(1000000000 * (peerSelectMinWeight + math.Exp(-(sp.rstat+float64(wait))/float64(responseScoreTC))*math.Pow((1-sp.tstat), timeoutPow)))
- sel.update(selectPeerItem{peer: sp.peer, weight: w, wait: wait})
- }
- }
- choice := sel.choose()
- if choice == nil {
- return nil, 0, false
- }
- peer, wait := choice.(selectPeerItem).peer, choice.(selectPeerItem).wait
- locked := false
- if wait < time.Millisecond*100 {
- if peer.fcServer.AssignRequest(reqID) {
- ok, w := canSend(peer)
- wait = time.Duration(w)
- if ok && wait < time.Millisecond*100 {
- locked = true
- } else {
- peer.fcServer.DeassignRequest(reqID)
- wait = time.Millisecond * 100
- }
- }
- } else {
- wait = time.Millisecond * 100
- }
- return peer, wait, locked
-}
-
-// selectPeer selects a suitable peer for a request, waiting until an assignment to
-// the request is guaranteed or the process is aborted.
-func (pool *serverPool) selectPeerWait(reqID uint64, canSend func(*peer) (bool, time.Duration), abort <-chan struct{}) *peer {
- for {
- peer, wait, locked := pool.selectPeer(reqID, canSend)
- if locked {
- return peer
- }
- select {
- case <-abort:
- return nil
- case <-time.After(wait):
- }
- }
-}
-
// eventLoop handles pool events and mutex locking for all internal functions
func (pool *serverPool) eventLoop() {
lookupCnt := 0
diff --git a/les/txrelay.go b/les/txrelay.go
index 76d416c57..1ca3467e4 100644
--- a/les/txrelay.go
+++ b/les/txrelay.go
@@ -35,13 +35,14 @@ type LesTxRelay struct {
peerList []*peer
peerStartPos int
lock sync.RWMutex
+
+ reqDist *requestDistributor
}
func NewLesTxRelay() *LesTxRelay {
return &LesTxRelay{
txSent: make(map[common.Hash]*ltrInfo),
txPending: make(map[common.Hash]struct{}),
- ps: newPeerSet(),
}
}
@@ -108,10 +109,26 @@ func (self *LesTxRelay) send(txs types.Transactions, count int) {
}
for p, list := range sendTo {
- cost := p.GetRequestCost(SendTxMsg, len(list))
- go func(p *peer, list types.Transactions, cost uint64) {
- p.SendTxs(cost, list)
- }(p, list, cost)
+ pp := p
+ ll := list
+
+ reqID := getNextReqID()
+ rq := &distReq{
+ getCost: func(dp distPeer) uint64 {
+ peer := dp.(*peer)
+ return peer.GetRequestCost(SendTxMsg, len(ll))
+ },
+ canSend: func(dp distPeer) bool {
+ return dp.(*peer) == pp
+ },
+ request: func(dp distPeer) func() {
+ peer := dp.(*peer)
+ cost := peer.GetRequestCost(SendTxMsg, len(ll))
+ peer.fcServer.QueueRequest(reqID, cost)
+ return func() { peer.SendTxs(reqID, cost, ll) }
+ },
+ }
+ self.reqDist.queue(rq)
}
}
diff --git a/light/lightchain.go b/light/lightchain.go
index 4370dc0fc..4715d47ab 100644
--- a/light/lightchain.go
+++ b/light/lightchain.go
@@ -20,6 +20,7 @@ import (
"math/big"
"sync"
"sync/atomic"
+ "time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
@@ -369,9 +370,17 @@ func (self *LightChain) postChainEvents(events []interface{}) {
// In the case of a light chain, InsertHeaderChain also creates and posts light
// chain events when necessary.
func (self *LightChain) InsertHeaderChain(chain []*types.Header, checkFreq int) (int, error) {
+ start := time.Now()
+ if i, err := self.hc.ValidateHeaderChain(chain, checkFreq); err != nil {
+ return i, err
+ }
+
// Make sure only one thread manipulates the chain at once
self.chainmu.Lock()
- defer self.chainmu.Unlock()
+ defer func() {
+ self.chainmu.Unlock()
+ time.Sleep(time.Millisecond * 10) // ugly hack; do not hog chain lock in case syncing is CPU-limited by validation
+ }()
self.wg.Add(1)
defer self.wg.Done()
@@ -397,7 +406,7 @@ func (self *LightChain) InsertHeaderChain(chain []*types.Header, checkFreq int)
}
return err
}
- i, err := self.hc.InsertHeaderChain(chain, checkFreq, whFunc)
+ i, err := self.hc.InsertHeaderChain(chain, whFunc, start)
go self.postChainEvents(events)
return i, err
}
diff --git a/light/txpool.go b/light/txpool.go
index 28c8d8ca5..5eb1ba801 100644
--- a/light/txpool.go
+++ b/light/txpool.go
@@ -276,15 +276,17 @@ func (pool *TxPool) setNewHead(ctx context.Context, newHeader *types.Header) (tx
// clear old mined tx entries of old blocks
if idx := newHeader.Number.Uint64(); idx > pool.clearIdx+txPermanent {
idx2 := idx - txPermanent
- for i := pool.clearIdx; i < idx2; i++ {
- hash := core.GetCanonicalHash(pool.chainDb, i)
- if list, ok := pool.mined[hash]; ok {
- hashes := make([]common.Hash, len(list))
- for i, tx := range list {
- hashes[i] = tx.Hash()
+ if len(pool.mined) > 0 {
+ for i := pool.clearIdx; i < idx2; i++ {
+ hash := core.GetCanonicalHash(pool.chainDb, i)
+ if list, ok := pool.mined[hash]; ok {
+ hashes := make([]common.Hash, len(list))
+ for i, tx := range list {
+ hashes[i] = tx.Hash()
+ }
+ pool.relay.Discard(hashes)
+ delete(pool.mined, hash)
}
- pool.relay.Discard(hashes)
- delete(pool.mined, hash)
}
}
pool.clearIdx = idx2
@@ -303,15 +305,16 @@ func (pool *TxPool) eventLoop() {
for ev := range pool.events.Chan() {
switch ev.Data.(type) {
case core.ChainHeadEvent:
+ head := pool.chain.CurrentHeader()
pool.mu.Lock()
ctx, _ := context.WithTimeout(context.Background(), blockCheckTimeout)
- head := pool.chain.CurrentHeader()
txc, _ := pool.setNewHead(ctx, head)
m, r := txc.getLists()
pool.relay.NewHead(pool.head, m, r)
pool.homestead = pool.config.IsHomestead(head.Number)
pool.signer = types.MakeSigner(pool.config, head.Number)
pool.mu.Unlock()
+ time.Sleep(time.Millisecond) // hack in order to avoid hogging the lock; this part will be replaced by a subsequent PR
}
}
}