diff --git a/les/api.go b/les/api.go index b53512196..95e1b009e 100644 --- a/les/api.go +++ b/les/api.go @@ -17,462 +17,16 @@ package les import ( - "context" "errors" - "fmt" - "sync" - "time" "github.com/ethereum/go-ethereum/common/hexutil" - "github.com/ethereum/go-ethereum/common/mclock" - "github.com/ethereum/go-ethereum/les/csvlogger" - "github.com/ethereum/go-ethereum/p2p/enode" - "github.com/ethereum/go-ethereum/rpc" ) var ( - ErrMinCap = errors.New("capacity too small") - ErrTotalCap = errors.New("total capacity exceeded") - ErrUnknownBenchmarkType = errors.New("unknown benchmark type") - ErrNoCheckpoint = errors.New("no local checkpoint provided") - ErrNotActivated = errors.New("checkpoint registrar is not activated") - - dropCapacityDelay = time.Second // delay applied to decreasing capacity changes + errNoCheckpoint = errors.New("no local checkpoint provided") + errNotActivated = errors.New("checkpoint registrar is not activated") ) -// PrivateLightServerAPI provides an API to access the LES light server. -// It offers only methods that operate on public data that is freely available to anyone. -type PrivateLightServerAPI struct { - server *LesServer -} - -// NewPrivateLightServerAPI creates a new LES light server API. -func NewPrivateLightServerAPI(server *LesServer) *PrivateLightServerAPI { - return &PrivateLightServerAPI{ - server: server, - } -} - -// TotalCapacity queries total available capacity for all clients -func (api *PrivateLightServerAPI) TotalCapacity() hexutil.Uint64 { - return hexutil.Uint64(api.server.priorityClientPool.totalCapacity()) -} - -// SubscribeTotalCapacity subscribes to changed total capacity events. -// If onlyUnderrun is true then notification is sent only if the total capacity -// drops under the total capacity of connected priority clients. -// -// Note: actually applying decreasing total capacity values is delayed while the -// notification is sent instantly. This allows lowering the capacity of a priority client -// or choosing which one to drop before the system drops some of them automatically. -func (api *PrivateLightServerAPI) SubscribeTotalCapacity(ctx context.Context, onlyUnderrun bool) (*rpc.Subscription, error) { - notifier, supported := rpc.NotifierFromContext(ctx) - if !supported { - return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported - } - rpcSub := notifier.CreateSubscription() - api.server.priorityClientPool.subscribeTotalCapacity(&tcSubscription{notifier, rpcSub, onlyUnderrun}) - return rpcSub, nil -} - -type ( - // tcSubscription represents a total capacity subscription - tcSubscription struct { - notifier *rpc.Notifier - rpcSub *rpc.Subscription - onlyUnderrun bool - } - tcSubs map[*tcSubscription]struct{} -) - -// send sends a changed total capacity event to the subscribers -func (s tcSubs) send(tc uint64, underrun bool) { - for sub := range s { - select { - case <-sub.rpcSub.Err(): - delete(s, sub) - case <-sub.notifier.Closed(): - delete(s, sub) - default: - if underrun || !sub.onlyUnderrun { - sub.notifier.Notify(sub.rpcSub.ID, tc) - } - } - } -} - -// MinimumCapacity queries minimum assignable capacity for a single client -func (api *PrivateLightServerAPI) MinimumCapacity() hexutil.Uint64 { - return hexutil.Uint64(api.server.minCapacity) -} - -// FreeClientCapacity queries the capacity provided for free clients -func (api *PrivateLightServerAPI) FreeClientCapacity() hexutil.Uint64 { - return hexutil.Uint64(api.server.freeClientCap) -} - -// SetClientCapacity sets the priority capacity assigned to a given client. -// If the assigned capacity is bigger than zero then connection is always -// guaranteed. The sum of capacity assigned to priority clients can not exceed -// the total available capacity. -// -// Note: assigned capacity can be changed while the client is connected with -// immediate effect. -func (api *PrivateLightServerAPI) SetClientCapacity(id enode.ID, cap uint64) error { - if cap != 0 && cap < api.server.minCapacity { - return ErrMinCap - } - return api.server.priorityClientPool.setClientCapacity(id, cap) -} - -// GetClientCapacity returns the capacity assigned to a given client -func (api *PrivateLightServerAPI) GetClientCapacity(id enode.ID) hexutil.Uint64 { - api.server.priorityClientPool.lock.Lock() - defer api.server.priorityClientPool.lock.Unlock() - - return hexutil.Uint64(api.server.priorityClientPool.clients[id].cap) -} - -// clientPool is implemented by both the free and priority client pools -type clientPool interface { - peerSetNotify - setLimits(count int, totalCap uint64) -} - -// priorityClientPool stores information about prioritized clients -type priorityClientPool struct { - lock sync.Mutex - child clientPool - ps *peerSet - clients map[enode.ID]priorityClientInfo - totalCap, totalCapAnnounced uint64 - totalConnectedCap, freeClientCap uint64 - maxPeers, priorityCount int - logger *csvlogger.Logger - logTotalPriConn *csvlogger.Channel - - subs tcSubs - updateSchedule []scheduledUpdate - scheduleCounter uint64 -} - -// scheduledUpdate represents a delayed total capacity update -type scheduledUpdate struct { - time mclock.AbsTime - totalCap, id uint64 -} - -// priorityClientInfo entries exist for all prioritized clients and currently connected non-priority clients -type priorityClientInfo struct { - cap uint64 // zero for non-priority clients - connected bool - peer *peer -} - -// newPriorityClientPool creates a new priority client pool -func newPriorityClientPool(freeClientCap uint64, ps *peerSet, child clientPool, metricsLogger, eventLogger *csvlogger.Logger) *priorityClientPool { - return &priorityClientPool{ - clients: make(map[enode.ID]priorityClientInfo), - freeClientCap: freeClientCap, - ps: ps, - child: child, - logger: eventLogger, - logTotalPriConn: metricsLogger.NewChannel("totalPriConn", 0), - } -} - -// registerPeer is called when a new client is connected. If the client has no -// priority assigned then it is passed to the child pool which may either keep it -// or disconnect it. -// -// Note: priorityClientPool also stores a record about free clients while they are -// connected in order to be able to assign priority to them later. -func (v *priorityClientPool) registerPeer(p *peer) { - v.lock.Lock() - defer v.lock.Unlock() - - id := p.ID() - c := v.clients[id] - v.logger.Event(fmt.Sprintf("priorityClientPool: registerPeer cap=%d connected=%v, %x", c.cap, c.connected, id.Bytes())) - if c.connected { - return - } - if c.cap == 0 && v.child != nil { - v.child.registerPeer(p) - } - if c.cap != 0 && v.totalConnectedCap+c.cap > v.totalCap { - v.logger.Event(fmt.Sprintf("priorityClientPool: rejected, %x", id.Bytes())) - go v.ps.Unregister(p.id) - return - } - - c.connected = true - c.peer = p - v.clients[id] = c - if c.cap != 0 { - v.priorityCount++ - v.totalConnectedCap += c.cap - v.logger.Event(fmt.Sprintf("priorityClientPool: accepted with %d capacity, %x", c.cap, id.Bytes())) - v.logTotalPriConn.Update(float64(v.totalConnectedCap)) - if v.child != nil { - v.child.setLimits(v.maxPeers-v.priorityCount, v.totalCap-v.totalConnectedCap) - } - p.updateCapacity(c.cap) - } -} - -// unregisterPeer is called when a client is disconnected. If the client has no -// priority assigned then it is also removed from the child pool. -func (v *priorityClientPool) unregisterPeer(p *peer) { - v.lock.Lock() - defer v.lock.Unlock() - - id := p.ID() - c := v.clients[id] - v.logger.Event(fmt.Sprintf("priorityClientPool: unregisterPeer cap=%d connected=%v, %x", c.cap, c.connected, id.Bytes())) - if !c.connected { - return - } - if c.cap != 0 { - c.connected = false - v.clients[id] = c - v.priorityCount-- - v.totalConnectedCap -= c.cap - v.logTotalPriConn.Update(float64(v.totalConnectedCap)) - if v.child != nil { - v.child.setLimits(v.maxPeers-v.priorityCount, v.totalCap-v.totalConnectedCap) - } - } else { - if v.child != nil { - v.child.unregisterPeer(p) - } - delete(v.clients, id) - } -} - -// setLimits updates the allowed peer count and total capacity of the priority -// client pool. Since the free client pool is a child of the priority pool the -// remaining peer count and capacity is assigned to the free pool by calling its -// own setLimits function. -// -// Note: a decreasing change of the total capacity is applied with a delay. -func (v *priorityClientPool) setLimits(count int, totalCap uint64) { - v.lock.Lock() - defer v.lock.Unlock() - - v.totalCapAnnounced = totalCap - if totalCap > v.totalCap { - v.setLimitsNow(count, totalCap) - v.subs.send(totalCap, false) - return - } - v.setLimitsNow(count, v.totalCap) - if totalCap < v.totalCap { - v.subs.send(totalCap, totalCap < v.totalConnectedCap) - for i, s := range v.updateSchedule { - if totalCap >= s.totalCap { - s.totalCap = totalCap - v.updateSchedule = v.updateSchedule[:i+1] - return - } - } - v.updateSchedule = append(v.updateSchedule, scheduledUpdate{time: mclock.Now() + mclock.AbsTime(dropCapacityDelay), totalCap: totalCap}) - if len(v.updateSchedule) == 1 { - v.scheduleCounter++ - id := v.scheduleCounter - v.updateSchedule[0].id = id - time.AfterFunc(dropCapacityDelay, func() { v.checkUpdate(id) }) - } - } else { - v.updateSchedule = nil - } -} - -// checkUpdate performs the next scheduled update if possible and schedules -// the one after that -func (v *priorityClientPool) checkUpdate(id uint64) { - v.lock.Lock() - defer v.lock.Unlock() - - if len(v.updateSchedule) == 0 || v.updateSchedule[0].id != id { - return - } - v.setLimitsNow(v.maxPeers, v.updateSchedule[0].totalCap) - v.updateSchedule = v.updateSchedule[1:] - if len(v.updateSchedule) != 0 { - v.scheduleCounter++ - id := v.scheduleCounter - v.updateSchedule[0].id = id - dt := time.Duration(v.updateSchedule[0].time - mclock.Now()) - time.AfterFunc(dt, func() { v.checkUpdate(id) }) - } -} - -// setLimits updates the allowed peer count and total capacity immediately -func (v *priorityClientPool) setLimitsNow(count int, totalCap uint64) { - if v.priorityCount > count || v.totalConnectedCap > totalCap { - for id, c := range v.clients { - if c.connected { - v.logger.Event(fmt.Sprintf("priorityClientPool: setLimitsNow kicked out, %x", id.Bytes())) - c.connected = false - v.totalConnectedCap -= c.cap - v.logTotalPriConn.Update(float64(v.totalConnectedCap)) - v.priorityCount-- - v.clients[id] = c - go v.ps.Unregister(c.peer.id) - if v.priorityCount <= count && v.totalConnectedCap <= totalCap { - break - } - } - } - } - v.maxPeers = count - v.totalCap = totalCap - if v.child != nil { - v.child.setLimits(v.maxPeers-v.priorityCount, v.totalCap-v.totalConnectedCap) - } -} - -// totalCapacity queries total available capacity for all clients -func (v *priorityClientPool) totalCapacity() uint64 { - v.lock.Lock() - defer v.lock.Unlock() - - return v.totalCapAnnounced -} - -// subscribeTotalCapacity subscribes to changed total capacity events -func (v *priorityClientPool) subscribeTotalCapacity(sub *tcSubscription) { - v.lock.Lock() - defer v.lock.Unlock() - - v.subs[sub] = struct{}{} -} - -// setClientCapacity sets the priority capacity assigned to a given client -func (v *priorityClientPool) setClientCapacity(id enode.ID, cap uint64) error { - v.lock.Lock() - defer v.lock.Unlock() - - c := v.clients[id] - if c.cap == cap { - return nil - } - if c.connected { - if v.totalConnectedCap+cap > v.totalCap+c.cap { - return ErrTotalCap - } - if c.cap == 0 { - if v.child != nil { - v.child.unregisterPeer(c.peer) - } - v.priorityCount++ - } - if cap == 0 { - v.priorityCount-- - } - v.totalConnectedCap += cap - c.cap - v.logTotalPriConn.Update(float64(v.totalConnectedCap)) - if v.child != nil { - v.child.setLimits(v.maxPeers-v.priorityCount, v.totalCap-v.totalConnectedCap) - } - if cap == 0 { - if v.child != nil { - v.child.registerPeer(c.peer) - } - c.peer.updateCapacity(v.freeClientCap) - } else { - c.peer.updateCapacity(cap) - } - } - if cap != 0 || c.connected { - c.cap = cap - v.clients[id] = c - } else { - delete(v.clients, id) - } - if c.connected { - v.logger.Event(fmt.Sprintf("priorityClientPool: changed capacity to %d, %x", cap, id.Bytes())) - } - return nil -} - -// Benchmark runs a request performance benchmark with a given set of measurement setups -// in multiple passes specified by passCount. The measurement time for each setup in each -// pass is specified in milliseconds by length. -// -// Note: measurement time is adjusted for each pass depending on the previous ones. -// Therefore a controlled total measurement time is achievable in multiple passes. -func (api *PrivateLightServerAPI) Benchmark(setups []map[string]interface{}, passCount, length int) ([]map[string]interface{}, error) { - benchmarks := make([]requestBenchmark, len(setups)) - for i, setup := range setups { - if t, ok := setup["type"].(string); ok { - getInt := func(field string, def int) int { - if value, ok := setup[field].(float64); ok { - return int(value) - } - return def - } - getBool := func(field string, def bool) bool { - if value, ok := setup[field].(bool); ok { - return value - } - return def - } - switch t { - case "header": - benchmarks[i] = &benchmarkBlockHeaders{ - amount: getInt("amount", 1), - skip: getInt("skip", 1), - byHash: getBool("byHash", false), - reverse: getBool("reverse", false), - } - case "body": - benchmarks[i] = &benchmarkBodiesOrReceipts{receipts: false} - case "receipts": - benchmarks[i] = &benchmarkBodiesOrReceipts{receipts: true} - case "proof": - benchmarks[i] = &benchmarkProofsOrCode{code: false} - case "code": - benchmarks[i] = &benchmarkProofsOrCode{code: true} - case "cht": - benchmarks[i] = &benchmarkHelperTrie{ - bloom: false, - reqCount: getInt("amount", 1), - } - case "bloom": - benchmarks[i] = &benchmarkHelperTrie{ - bloom: true, - reqCount: getInt("amount", 1), - } - case "txSend": - benchmarks[i] = &benchmarkTxSend{} - case "txStatus": - benchmarks[i] = &benchmarkTxStatus{} - default: - return nil, ErrUnknownBenchmarkType - } - } else { - return nil, ErrUnknownBenchmarkType - } - } - rs := api.server.protocolManager.runBenchmark(benchmarks, passCount, time.Millisecond*time.Duration(length)) - result := make([]map[string]interface{}, len(setups)) - for i, r := range rs { - res := make(map[string]interface{}) - if r.err == nil { - res["totalCount"] = r.totalCount - res["avgTime"] = r.avgTime - res["maxInSize"] = r.maxInSize - res["maxOutSize"] = r.maxOutSize - } else { - res["error"] = r.err.Error() - } - result[i] = res - } - return result, nil -} - // PrivateLightAPI provides an API to access the LES light server or light client. type PrivateLightAPI struct { backend *lesCommons @@ -498,7 +52,7 @@ func (api *PrivateLightAPI) LatestCheckpoint() ([4]string, error) { var res [4]string cp := api.backend.latestLocalCheckpoint() if cp.Empty() { - return res, ErrNoCheckpoint + return res, errNoCheckpoint } res[0] = hexutil.EncodeUint64(cp.SectionIndex) res[1], res[2], res[3] = cp.SectionHead.Hex(), cp.CHTRoot.Hex(), cp.BloomRoot.Hex() @@ -515,7 +69,7 @@ func (api *PrivateLightAPI) GetCheckpoint(index uint64) ([3]string, error) { var res [3]string cp := api.backend.getLocalCheckpoint(index) if cp.Empty() { - return res, ErrNoCheckpoint + return res, errNoCheckpoint } res[0], res[1], res[2] = cp.SectionHead.Hex(), cp.CHTRoot.Hex(), cp.BloomRoot.Hex() return res, nil @@ -524,7 +78,7 @@ func (api *PrivateLightAPI) GetCheckpoint(index uint64) ([3]string, error) { // GetCheckpointContractAddress returns the contract contract address in hex format. func (api *PrivateLightAPI) GetCheckpointContractAddress() (string, error) { if api.reg == nil { - return "", ErrNotActivated + return "", errNotActivated } return api.reg.config.Address.Hex(), nil } diff --git a/les/costtracker.go b/les/costtracker.go index e463c9f8b..2d9c95af7 100644 --- a/les/costtracker.go +++ b/les/costtracker.go @@ -18,7 +18,6 @@ package les import ( "encoding/binary" - "fmt" "math" "sync" "sync/atomic" @@ -27,7 +26,6 @@ import ( "github.com/ethereum/go-ethereum/common/mclock" "github.com/ethereum/go-ethereum/eth" "github.com/ethereum/go-ethereum/ethdb" - "github.com/ethereum/go-ethereum/les/csvlogger" "github.com/ethereum/go-ethereum/les/flowcontrol" "github.com/ethereum/go-ethereum/log" ) @@ -96,40 +94,50 @@ const ( // as the number of cost units per nanosecond of serving time in a single thread. // It is based on statistics collected during serving requests in high-load periods // and practically acts as a one-dimension request price scaling factor over the -// pre-defined cost estimate table. Instead of scaling the cost values, the real -// value of cost units is changed by applying the factor to the serving times. This -// is more convenient because the changes in the cost factor can be applied immediately -// without always notifying the clients about the changed cost tables. +// pre-defined cost estimate table. +// +// The reason for dynamically maintaining the global factor on the server side is: +// the estimated time cost of the request is fixed(hardcoded) but the configuration +// of the machine running the server is really different. Therefore, the request serving +// time in different machine will vary greatly. And also, the request serving time +// in same machine may vary greatly with different request pressure. +// +// In order to more effectively limit resources, we apply the global factor to serving +// time to make the result as close as possible to the estimated time cost no matter +// the server is slow or fast. And also we scale the totalRecharge with global factor +// so that fast server can serve more requests than estimation and slow server can +// reduce request pressure. +// +// Instead of scaling the cost values, the real value of cost units is changed by +// applying the factor to the serving times. This is more convenient because the +// changes in the cost factor can be applied immediately without always notifying +// the clients about the changed cost tables. type costTracker struct { db ethdb.Database stopCh chan chan struct{} - inSizeFactor, outSizeFactor float64 - gf, utilTarget float64 - minBufLimit uint64 + inSizeFactor float64 + outSizeFactor float64 + factor float64 + utilTarget float64 + minBufLimit uint64 - gfUpdateCh chan gfUpdate gfLock sync.RWMutex + reqInfoCh chan reqInfo totalRechargeCh chan uint64 - stats map[uint64][]uint64 - logger *csvlogger.Logger - logRecentTime, logRecentAvg, logTotalRecharge, logRelCost *csvlogger.Channel + stats map[uint64][]uint64 // Used for testing purpose. } // newCostTracker creates a cost tracker and loads the cost factor statistics from the database. // It also returns the minimum capacity that can be assigned to any peer. -func newCostTracker(db ethdb.Database, config *eth.Config, logger *csvlogger.Logger) (*costTracker, uint64) { +func newCostTracker(db ethdb.Database, config *eth.Config) (*costTracker, uint64) { utilTarget := float64(config.LightServ) * flowcontrol.FixedPointMultiplier / 100 ct := &costTracker{ - db: db, - stopCh: make(chan chan struct{}), - utilTarget: utilTarget, - logger: logger, - logRelCost: logger.NewMinMaxChannel("relativeCost", true), - logRecentTime: logger.NewMinMaxChannel("recentTime", true), - logRecentAvg: logger.NewMinMaxChannel("recentAvg", true), - logTotalRecharge: logger.NewChannel("totalRecharge", 0.01), + db: db, + stopCh: make(chan chan struct{}), + reqInfoCh: make(chan reqInfo, 100), + utilTarget: utilTarget, } if config.LightBandwidthIn > 0 { ct.inSizeFactor = utilTarget / float64(config.LightBandwidthIn) @@ -204,8 +212,15 @@ func (ct *costTracker) makeCostList(globalFactor float64) RequestCostList { return list } -type gfUpdate struct { - avgTimeCost, servingTime float64 +// reqInfo contains the estimated time cost and the actual request serving time +// which acts as a feed source to update factor maintained by costTracker. +type reqInfo struct { + // avgTimeCost is the estimated time cost corresponding to maxCostTable. + avgTimeCost float64 + + // servingTime is the CPU time corresponding to the actual processing of + // the request. + servingTime float64 } // gfLoop starts an event loop which updates the global cost factor which is @@ -218,43 +233,48 @@ type gfUpdate struct { // total allowed serving time per second but nominated in cost units, should // also be scaled with the cost factor and is also updated by this loop. func (ct *costTracker) gfLoop() { - var gfLog, recentTime, recentAvg float64 - lastUpdate := mclock.Now() - expUpdate := lastUpdate + var ( + factor, totalRecharge float64 + gfLog, recentTime, recentAvg float64 + lastUpdate, expUpdate = mclock.Now(), mclock.Now() + ) + + // Load historical cost factor statistics from the database. data, _ := ct.db.Get([]byte(gfDbKey)) if len(data) == 8 { gfLog = math.Float64frombits(binary.BigEndian.Uint64(data[:])) } - gf := math.Exp(gfLog) - ct.gf = gf - totalRecharge := ct.utilTarget * gf - ct.gfUpdateCh = make(chan gfUpdate, 100) - threshold := gfUsageThreshold * float64(gfUsageTC) * ct.utilTarget / 1000000 + ct.factor = math.Exp(gfLog) + factor, totalRecharge = ct.factor, ct.utilTarget*ct.factor + + // In order to perform factor data statistics under the high request pressure, + // we only adjust factor when recent factor usage beyond the threshold. + threshold := gfUsageThreshold * float64(gfUsageTC) * ct.utilTarget / flowcontrol.FixedPointMultiplier go func() { saveCostFactor := func() { var data [8]byte binary.BigEndian.PutUint64(data[:], math.Float64bits(gfLog)) ct.db.Put([]byte(gfDbKey), data[:]) - log.Debug("global cost factor saved", "value", gf) + log.Debug("global cost factor saved", "value", factor) } saveTicker := time.NewTicker(time.Minute * 10) for { select { - case r := <-ct.gfUpdateCh: + case r := <-ct.reqInfoCh: + requestServedMeter.Mark(int64(r.servingTime)) + requestEstimatedMeter.Mark(int64(r.avgTimeCost / factor)) + requestServedTimer.Update(time.Duration(r.servingTime)) + relativeCostHistogram.Update(int64(r.avgTimeCost / factor / r.servingTime)) + now := mclock.Now() - if ct.logRelCost != nil && r.avgTimeCost > 1e-20 { - ct.logRelCost.Update(r.servingTime * gf / r.avgTimeCost) - } - if r.servingTime > 1000000000 { - ct.logger.Event(fmt.Sprintf("Very long servingTime = %f avgTimeCost = %f costFactor = %f", r.servingTime, r.avgTimeCost, gf)) - } dt := float64(now - expUpdate) expUpdate = now exp := math.Exp(-dt / float64(gfUsageTC)) - // calculate gf correction until now, based on previous values + + // calculate factor correction until now, based on previous values var gfCorr float64 max := recentTime if recentAvg > max { @@ -268,27 +288,28 @@ func (ct *costTracker) gfLoop() { } else { gfCorr = math.Log(max/threshold) * float64(gfUsageTC) } - // calculate log(gf) correction with the right direction and time constant + // calculate log(factor) correction with the right direction and time constant if recentTime > recentAvg { - // drop gf if actual serving times are larger than average estimates + // drop factor if actual serving times are larger than average estimates gfCorr /= -float64(gfDropTC) } else { - // raise gf if actual serving times are smaller than average estimates + // raise factor if actual serving times are smaller than average estimates gfCorr /= float64(gfRaiseTC) } } // update recent cost values with current request recentTime = recentTime*exp + r.servingTime - recentAvg = recentAvg*exp + r.avgTimeCost/gf + recentAvg = recentAvg*exp + r.avgTimeCost/factor if gfCorr != 0 { + // Apply the correction to factor gfLog += gfCorr - gf = math.Exp(gfLog) + factor = math.Exp(gfLog) + // Notify outside modules the new factor and totalRecharge. if time.Duration(now-lastUpdate) > time.Second { - totalRecharge = ct.utilTarget * gf - lastUpdate = now + totalRecharge, lastUpdate = ct.utilTarget*factor, now ct.gfLock.Lock() - ct.gf = gf + ct.factor = factor ch := ct.totalRechargeCh ct.gfLock.Unlock() if ch != nil { @@ -297,12 +318,12 @@ func (ct *costTracker) gfLoop() { default: } } - log.Debug("global cost factor updated", "gf", gf) + log.Debug("global cost factor updated", "factor", factor) } } - ct.logRecentTime.Update(recentTime) - ct.logRecentAvg.Update(recentAvg) - ct.logTotalRecharge.Update(totalRecharge) + recentServedGauge.Update(int64(recentTime)) + recentEstimatedGauge.Update(int64(recentAvg)) + totalRechargeGauge.Update(int64(totalRecharge)) case <-saveTicker.C: saveCostFactor() @@ -321,7 +342,7 @@ func (ct *costTracker) globalFactor() float64 { ct.gfLock.RLock() defer ct.gfLock.RUnlock() - return ct.gf + return ct.factor } // totalRecharge returns the current total recharge parameter which is used by @@ -330,7 +351,7 @@ func (ct *costTracker) totalRecharge() uint64 { ct.gfLock.RLock() defer ct.gfLock.RUnlock() - return uint64(ct.gf * ct.utilTarget) + return uint64(ct.factor * ct.utilTarget) } // subscribeTotalRecharge returns all future updates to the total recharge value @@ -340,7 +361,7 @@ func (ct *costTracker) subscribeTotalRecharge(ch chan uint64) uint64 { defer ct.gfLock.Unlock() ct.totalRechargeCh = ch - return uint64(ct.gf * ct.utilTarget) + return uint64(ct.factor * ct.utilTarget) } // updateStats updates the global cost factor and (if enabled) the real cost vs. @@ -349,7 +370,7 @@ func (ct *costTracker) updateStats(code, amount, servingTime, realCost uint64) { avg := reqAvgTimeCost[code] avgTimeCost := avg.baseCost + amount*avg.reqCost select { - case ct.gfUpdateCh <- gfUpdate{float64(avgTimeCost), float64(servingTime)}: + case ct.reqInfoCh <- reqInfo{float64(avgTimeCost), float64(servingTime)}: default: } if makeCostStats { diff --git a/les/csvlogger/csvlogger.go b/les/csvlogger/csvlogger.go deleted file mode 100644 index 9a4093cb9..000000000 --- a/les/csvlogger/csvlogger.go +++ /dev/null @@ -1,227 +0,0 @@ -// Copyright 2019 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see . - -package csvlogger - -import ( - "fmt" - "os" - "sync" - "time" - - "github.com/ethereum/go-ethereum/common/mclock" - "github.com/ethereum/go-ethereum/log" -) - -// Logger is a metrics/events logger that writes logged values and events into a comma separated file -type Logger struct { - file *os.File - started mclock.AbsTime - channels []*Channel - period time.Duration - stopCh, stopped chan struct{} - storeCh chan string - eventHeader string -} - -// NewLogger creates a new Logger -func NewLogger(fileName string, updatePeriod time.Duration, eventHeader string) *Logger { - if fileName == "" { - return nil - } - f, err := os.Create(fileName) - if err != nil { - log.Error("Error creating log file", "name", fileName, "error", err) - return nil - } - return &Logger{ - file: f, - period: updatePeriod, - stopCh: make(chan struct{}), - storeCh: make(chan string, 1), - eventHeader: eventHeader, - } -} - -// NewChannel creates a new value logger channel that writes values in a single -// column. If the relative change of the value is bigger than the given threshold -// then a new line is added immediately (threshold can also be 0). -func (l *Logger) NewChannel(name string, threshold float64) *Channel { - if l == nil { - return nil - } - c := &Channel{ - logger: l, - name: name, - threshold: threshold, - } - l.channels = append(l.channels, c) - return c -} - -// NewMinMaxChannel creates a new value logger channel that writes the minimum and -// maximum of the tracked value in two columns. It never triggers adding a new line. -// If zeroDefault is true then 0 is written to both min and max columns if no update -// was given during the last period. If it is false then the last update will appear -// in both columns. -func (l *Logger) NewMinMaxChannel(name string, zeroDefault bool) *Channel { - if l == nil { - return nil - } - c := &Channel{ - logger: l, - name: name, - minmax: true, - mmZeroDefault: zeroDefault, - } - l.channels = append(l.channels, c) - return c -} - -func (l *Logger) store(event string) { - s := fmt.Sprintf("%g", float64(mclock.Now()-l.started)/1000000000) - for _, ch := range l.channels { - s += ", " + ch.store() - } - if event != "" { - s += ", " + event - } - l.file.WriteString(s + "\n") -} - -// Start writes the header line and starts the logger -func (l *Logger) Start() { - if l == nil { - return - } - l.started = mclock.Now() - s := "Time" - for _, ch := range l.channels { - s += ", " + ch.header() - } - if l.eventHeader != "" { - s += ", " + l.eventHeader - } - l.file.WriteString(s + "\n") - go func() { - timer := time.NewTimer(l.period) - for { - select { - case <-timer.C: - l.store("") - timer.Reset(l.period) - case event := <-l.storeCh: - l.store(event) - if !timer.Stop() { - <-timer.C - } - timer.Reset(l.period) - case <-l.stopCh: - close(l.stopped) - return - } - } - }() -} - -// Stop stops the logger and closes the file -func (l *Logger) Stop() { - if l == nil { - return - } - l.stopped = make(chan struct{}) - close(l.stopCh) - <-l.stopped - l.file.Close() -} - -// Event immediately adds a new line and adds the given event string in the last column -func (l *Logger) Event(event string) { - if l == nil { - return - } - select { - case l.storeCh <- event: - case <-l.stopCh: - } -} - -// Channel represents a logger channel tracking a single value -type Channel struct { - logger *Logger - lock sync.Mutex - name string - threshold, storeMin, storeMax, lastValue, min, max float64 - minmax, mmSet, mmZeroDefault bool -} - -// Update updates the tracked value -func (lc *Channel) Update(value float64) { - if lc == nil { - return - } - lc.lock.Lock() - defer lc.lock.Unlock() - - lc.lastValue = value - if lc.minmax { - if value > lc.max || !lc.mmSet { - lc.max = value - } - if value < lc.min || !lc.mmSet { - lc.min = value - } - lc.mmSet = true - } else { - if value < lc.storeMin || value > lc.storeMax { - select { - case lc.logger.storeCh <- "": - default: - } - } - } -} - -func (lc *Channel) store() (s string) { - lc.lock.Lock() - defer lc.lock.Unlock() - - if lc.minmax { - s = fmt.Sprintf("%g, %g", lc.min, lc.max) - lc.mmSet = false - if lc.mmZeroDefault { - lc.min = 0 - } else { - lc.min = lc.lastValue - } - lc.max = lc.min - } else { - s = fmt.Sprintf("%g", lc.lastValue) - lc.storeMin = lc.lastValue * (1 - lc.threshold) - lc.storeMax = lc.lastValue * (1 + lc.threshold) - if lc.lastValue < 0 { - lc.storeMin, lc.storeMax = lc.storeMax, lc.storeMin - } - } - return -} - -func (lc *Channel) header() string { - if lc.minmax { - return lc.name + " (min), " + lc.name + " (max)" - } - return lc.name -} diff --git a/les/freeclient.go b/les/freeclient.go index f434ea0b9..934b88153 100644 --- a/les/freeclient.go +++ b/les/freeclient.go @@ -26,7 +26,6 @@ import ( "github.com/ethereum/go-ethereum/common/mclock" "github.com/ethereum/go-ethereum/common/prque" "github.com/ethereum/go-ethereum/ethdb" - "github.com/ethereum/go-ethereum/les/csvlogger" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/rlp" ) @@ -53,8 +52,7 @@ type freeClientPool struct { connectedLimit, totalLimit int freeClientCap uint64 - logger *csvlogger.Logger - logTotalFreeConn *csvlogger.Channel + connectedCap uint64 addressMap map[string]*freeClientPoolEntry connPool, disconnPool *prque.Prque @@ -69,18 +67,16 @@ const ( ) // newFreeClientPool creates a new free client pool -func newFreeClientPool(db ethdb.Database, freeClientCap uint64, totalLimit int, clock mclock.Clock, removePeer func(string), metricsLogger, eventLogger *csvlogger.Logger) *freeClientPool { +func newFreeClientPool(db ethdb.Database, freeClientCap uint64, totalLimit int, clock mclock.Clock, removePeer func(string)) *freeClientPool { pool := &freeClientPool{ - db: db, - clock: clock, - addressMap: make(map[string]*freeClientPoolEntry), - connPool: prque.New(poolSetIndex), - disconnPool: prque.New(poolSetIndex), - freeClientCap: freeClientCap, - totalLimit: totalLimit, - logger: eventLogger, - logTotalFreeConn: metricsLogger.NewChannel("totalFreeConn", 0), - removePeer: removePeer, + db: db, + clock: clock, + addressMap: make(map[string]*freeClientPoolEntry), + connPool: prque.New(poolSetIndex), + disconnPool: prque.New(poolSetIndex), + freeClientCap: freeClientCap, + totalLimit: totalLimit, + removePeer: removePeer, } pool.loadFromDb() return pool @@ -126,10 +122,7 @@ func (f *freeClientPool) connect(address, id string) bool { if f.closed { return false } - - f.logger.Event("freeClientPool: connecting from " + address + ", " + id) if f.connectedLimit == 0 { - f.logger.Event("freeClientPool: rejected, " + id) log.Debug("Client rejected", "address", address) return false } @@ -141,7 +134,6 @@ func (f *freeClientPool) connect(address, id string) bool { f.addressMap[address] = e } else { if e.connected { - f.logger.Event("freeClientPool: already connected, " + id) log.Debug("Client already connected", "address", address) return false } @@ -154,12 +146,13 @@ func (f *freeClientPool) connect(address, id string) bool { if e.linUsage+int64(connectedBias)-i.linUsage < 0 { // kick it out and accept the new client f.dropClient(i, now) - f.logger.Event("freeClientPool: kicked out, " + i.id) + clientKickedMeter.Mark(1) + f.connectedCap -= f.freeClientCap } else { // keep the old client and reject the new one f.connPool.Push(i, i.linUsage) - f.logger.Event("freeClientPool: rejected, " + id) log.Debug("Client rejected", "address", address) + clientRejectedMeter.Mark(1) return false } } @@ -167,11 +160,12 @@ func (f *freeClientPool) connect(address, id string) bool { e.connected = true e.id = id f.connPool.Push(e, e.linUsage) - f.logTotalFreeConn.Update(float64(uint64(f.connPool.Size()) * f.freeClientCap)) if f.connPool.Size()+f.disconnPool.Size() > f.totalLimit { f.disconnPool.Pop() } - f.logger.Event("freeClientPool: accepted, " + id) + f.connectedCap += f.freeClientCap + totalConnectedGauge.Update(int64(f.connectedCap)) + clientConnectedMeter.Mark(1) log.Debug("Client accepted", "address", address) return true } @@ -203,13 +197,12 @@ func (f *freeClientPool) disconnect(address string) { log.Debug("Client already disconnected", "address", address) return } - f.connPool.Remove(e.index) - f.logTotalFreeConn.Update(float64(uint64(f.connPool.Size()) * f.freeClientCap)) f.calcLogUsage(e, now) e.connected = false f.disconnPool.Push(e, -e.logUsage) - f.logger.Event("freeClientPool: disconnected, " + e.id) + f.connectedCap -= f.freeClientCap + totalConnectedGauge.Update(int64(f.connectedCap)) log.Debug("Client disconnected", "address", address) } @@ -227,15 +220,15 @@ func (f *freeClientPool) setLimits(count int, totalCap uint64) { for f.connPool.Size() > f.connectedLimit { i := f.connPool.PopItem().(*freeClientPoolEntry) f.dropClient(i, now) - f.logger.Event("freeClientPool: setLimits kicked out, " + i.id) + f.connectedCap -= f.freeClientCap } + totalConnectedGauge.Update(int64(f.connectedCap)) } // dropClient disconnects a client and also moves it from the connected to the // disconnected pool func (f *freeClientPool) dropClient(i *freeClientPoolEntry, now mclock.AbsTime) { f.connPool.Remove(i.index) - f.logTotalFreeConn.Update(float64(uint64(f.connPool.Size()) * f.freeClientCap)) f.calcLogUsage(i, now) i.connected = false f.disconnPool.Push(i, -i.logUsage) diff --git a/les/freeclient_test.go b/les/freeclient_test.go index 5a58a6c1c..191822264 100644 --- a/les/freeclient_test.go +++ b/les/freeclient_test.go @@ -61,7 +61,7 @@ func testFreeClientPool(t *testing.T, connLimit, clientCount int) { } disconnCh <- i } - pool = newFreeClientPool(db, 1, 10000, &clock, disconnFn, nil, nil) + pool = newFreeClientPool(db, 1, 10000, &clock, disconnFn) ) pool.setLimits(connLimit, uint64(connLimit)) @@ -130,7 +130,7 @@ func testFreeClientPool(t *testing.T, connLimit, clientCount int) { // close and restart pool pool.stop() - pool = newFreeClientPool(db, 1, 10000, &clock, disconnFn, nil, nil) + pool = newFreeClientPool(db, 1, 10000, &clock, disconnFn) pool.setLimits(connLimit, uint64(connLimit)) // try connecting all known peers (connLimit should be filled up) diff --git a/les/handler.go b/les/handler.go index c902db65a..d9d07f014 100644 --- a/les/handler.go +++ b/les/handler.go @@ -35,7 +35,6 @@ import ( "github.com/ethereum/go-ethereum/eth/downloader" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" - "github.com/ethereum/go-ethereum/les/csvlogger" "github.com/ethereum/go-ethereum/light" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p" @@ -124,7 +123,6 @@ type ProtocolManager struct { wg *sync.WaitGroup eventMux *event.TypeMux - logger *csvlogger.Logger // Callbacks synced func() bool @@ -262,11 +260,12 @@ func (pm *ProtocolManager) handle(p *peer) error { // Ignore maxPeers if this is a trusted peer // In server mode we try to check into the client pool after handshake if pm.client && pm.peers.Len() >= pm.maxPeers && !p.Peer.Info().Network.Trusted { - pm.logger.Event("Rejected (too many peers), " + p.id) + clientRejectedMeter.Mark(1) return p2p.DiscTooManyPeers } // Reject light clients if server is not synced. if !pm.client && !pm.synced() { + clientRejectedMeter.Mark(1) return p2p.DiscRequested } p.Log().Debug("Light Ethereum peer connected", "name", p.Name()) @@ -281,7 +280,7 @@ func (pm *ProtocolManager) handle(p *peer) error { ) if err := p.Handshake(td, hash, number, genesis.Hash(), pm.server); err != nil { p.Log().Debug("Light Ethereum handshake failed", "err", err) - pm.logger.Event("Handshake error: " + err.Error() + ", " + p.id) + clientErrorMeter.Mark(1) return err } if p.fcClient != nil { @@ -294,14 +293,14 @@ func (pm *ProtocolManager) handle(p *peer) error { // Register the peer locally if err := pm.peers.Register(p); err != nil { + clientErrorMeter.Mark(1) p.Log().Error("Light Ethereum peer registration failed", "err", err) - pm.logger.Event("Peer registration error: " + err.Error() + ", " + p.id) return err } - pm.logger.Event("Connection established, " + p.id) + connectedAt := time.Now() defer func() { - pm.logger.Event("Closed connection, " + p.id) pm.removePeer(p.id) + connectionTimer.UpdateSince(connectedAt) }() // Register the peer in the downloader. If the downloader considers it banned, we disconnect @@ -317,11 +316,9 @@ func (pm *ProtocolManager) handle(p *peer) error { pm.serverPool.registered(p.poolEntry) } } - // main loop. handle incoming messages. for { if err := pm.handleMsg(p); err != nil { - pm.logger.Event("Message handling error: " + err.Error() + ", " + p.id) p.Log().Debug("Light Ethereum message handling failed", "err", err) if p.fcServer != nil { p.fcServer.DumpLogs() diff --git a/les/helper_test.go b/les/helper_test.go index 035865b08..fd5236a99 100644 --- a/les/helper_test.go +++ b/les/helper_test.go @@ -231,7 +231,7 @@ func newTestProtocolManager(lightSync bool, blocks int, odr *LesOdr, indexers [] if !lightSync { srv := &LesServer{lesCommons: lesCommons{protocolManager: pm, chainDb: db}} pm.server = srv - pm.servingQueue = newServingQueue(int64(time.Millisecond*10), 1, nil) + pm.servingQueue = newServingQueue(int64(time.Millisecond*10), 1) pm.servingQueue.setThreads(4) srv.defParams = flowcontrol.ServerParams{ diff --git a/les/metrics.go b/les/metrics.go index c282a62a1..4c6737a4e 100644 --- a/les/metrics.go +++ b/les/metrics.go @@ -22,46 +22,31 @@ import ( ) var ( - /* propTxnInPacketsMeter = metrics.NewMeter("eth/prop/txns/in/packets") - propTxnInTrafficMeter = metrics.NewMeter("eth/prop/txns/in/traffic") - propTxnOutPacketsMeter = metrics.NewMeter("eth/prop/txns/out/packets") - propTxnOutTrafficMeter = metrics.NewMeter("eth/prop/txns/out/traffic") - propHashInPacketsMeter = metrics.NewMeter("eth/prop/hashes/in/packets") - propHashInTrafficMeter = metrics.NewMeter("eth/prop/hashes/in/traffic") - propHashOutPacketsMeter = metrics.NewMeter("eth/prop/hashes/out/packets") - propHashOutTrafficMeter = metrics.NewMeter("eth/prop/hashes/out/traffic") - propBlockInPacketsMeter = metrics.NewMeter("eth/prop/blocks/in/packets") - propBlockInTrafficMeter = metrics.NewMeter("eth/prop/blocks/in/traffic") - propBlockOutPacketsMeter = metrics.NewMeter("eth/prop/blocks/out/packets") - propBlockOutTrafficMeter = metrics.NewMeter("eth/prop/blocks/out/traffic") - reqHashInPacketsMeter = metrics.NewMeter("eth/req/hashes/in/packets") - reqHashInTrafficMeter = metrics.NewMeter("eth/req/hashes/in/traffic") - reqHashOutPacketsMeter = metrics.NewMeter("eth/req/hashes/out/packets") - reqHashOutTrafficMeter = metrics.NewMeter("eth/req/hashes/out/traffic") - reqBlockInPacketsMeter = metrics.NewMeter("eth/req/blocks/in/packets") - reqBlockInTrafficMeter = metrics.NewMeter("eth/req/blocks/in/traffic") - reqBlockOutPacketsMeter = metrics.NewMeter("eth/req/blocks/out/packets") - reqBlockOutTrafficMeter = metrics.NewMeter("eth/req/blocks/out/traffic") - reqHeaderInPacketsMeter = metrics.NewMeter("eth/req/headers/in/packets") - reqHeaderInTrafficMeter = metrics.NewMeter("eth/req/headers/in/traffic") - reqHeaderOutPacketsMeter = metrics.NewMeter("eth/req/headers/out/packets") - reqHeaderOutTrafficMeter = metrics.NewMeter("eth/req/headers/out/traffic") - reqBodyInPacketsMeter = metrics.NewMeter("eth/req/bodies/in/packets") - reqBodyInTrafficMeter = metrics.NewMeter("eth/req/bodies/in/traffic") - reqBodyOutPacketsMeter = metrics.NewMeter("eth/req/bodies/out/packets") - reqBodyOutTrafficMeter = metrics.NewMeter("eth/req/bodies/out/traffic") - reqStateInPacketsMeter = metrics.NewMeter("eth/req/states/in/packets") - reqStateInTrafficMeter = metrics.NewMeter("eth/req/states/in/traffic") - reqStateOutPacketsMeter = metrics.NewMeter("eth/req/states/out/packets") - reqStateOutTrafficMeter = metrics.NewMeter("eth/req/states/out/traffic") - reqReceiptInPacketsMeter = metrics.NewMeter("eth/req/receipts/in/packets") - reqReceiptInTrafficMeter = metrics.NewMeter("eth/req/receipts/in/traffic") - reqReceiptOutPacketsMeter = metrics.NewMeter("eth/req/receipts/out/packets") - reqReceiptOutTrafficMeter = metrics.NewMeter("eth/req/receipts/out/traffic")*/ miscInPacketsMeter = metrics.NewRegisteredMeter("les/misc/in/packets", nil) miscInTrafficMeter = metrics.NewRegisteredMeter("les/misc/in/traffic", nil) miscOutPacketsMeter = metrics.NewRegisteredMeter("les/misc/out/packets", nil) miscOutTrafficMeter = metrics.NewRegisteredMeter("les/misc/out/traffic", nil) + + connectionTimer = metrics.NewRegisteredTimer("les/connectionTime", nil) + + totalConnectedGauge = metrics.NewRegisteredGauge("les/server/totalConnected", nil) + totalCapacityGauge = metrics.NewRegisteredGauge("les/server/totalCapacity", nil) + totalRechargeGauge = metrics.NewRegisteredGauge("les/server/totalRecharge", nil) + blockProcessingTimer = metrics.NewRegisteredTimer("les/server/blockProcessingTime", nil) + requestServedTimer = metrics.NewRegisteredTimer("les/server/requestServed", nil) + requestServedMeter = metrics.NewRegisteredMeter("les/server/totalRequestServed", nil) + requestEstimatedMeter = metrics.NewRegisteredMeter("les/server/totalRequestEstimated", nil) + relativeCostHistogram = metrics.NewRegisteredHistogram("les/server/relativeCost", nil, metrics.NewExpDecaySample(1028, 0.015)) + recentServedGauge = metrics.NewRegisteredGauge("les/server/recentRequestServed", nil) + recentEstimatedGauge = metrics.NewRegisteredGauge("les/server/recentRequestEstimated", nil) + sqServedGauge = metrics.NewRegisteredGauge("les/server/servingQueue/served", nil) + sqQueuedGauge = metrics.NewRegisteredGauge("les/server/servingQueue/queued", nil) + clientConnectedMeter = metrics.NewRegisteredMeter("les/server/clientEvent/connected", nil) + clientRejectedMeter = metrics.NewRegisteredMeter("les/server/clientEvent/rejected", nil) + clientKickedMeter = metrics.NewRegisteredMeter("les/server/clientEvent/kicked", nil) + // clientDisconnectedMeter = metrics.NewRegisteredMeter("les/server/clientEvent/disconnected", nil) + clientFreezeMeter = metrics.NewRegisteredMeter("les/server/clientEvent/freeze", nil) + clientErrorMeter = metrics.NewRegisteredMeter("les/server/clientEvent/error", nil) ) // meteredMsgReadWriter is a wrapper around a p2p.MsgReadWriter, capable of diff --git a/les/server.go b/les/server.go index 08d973416..86570aa54 100644 --- a/les/server.go +++ b/les/server.go @@ -28,7 +28,6 @@ import ( "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/eth" - "github.com/ethereum/go-ethereum/les/csvlogger" "github.com/ethereum/go-ethereum/les/flowcontrol" "github.com/ethereum/go-ethereum/light" "github.com/ethereum/go-ethereum/log" @@ -40,15 +39,6 @@ import ( const bufLimitRatio = 6000 // fixed bufLimit/MRR ratio -const ( - logFileName = "" // csv log file name (disabled if empty) - logClientPoolMetrics = true // log client pool metrics - logClientPoolEvents = false // detailed client pool event logging - logRequestServing = true // log request serving metrics and events - logBlockProcEvents = true // log block processing events - logProtocolHandler = true // log protocol handler events -) - type LesServer struct { lesCommons @@ -62,26 +52,15 @@ type LesServer struct { privateKey *ecdsa.PrivateKey quitSync chan struct{} onlyAnnounce bool - csvLogger *csvlogger.Logger - logTotalCap *csvlogger.Channel thcNormal, thcBlockProcessing int // serving thread count for normal operation and block processing mode maxPeers int minCapacity, freeClientCap uint64 freeClientPool *freeClientPool - priorityClientPool *priorityClientPool } func NewLesServer(e *eth.Ethereum, config *eth.Config) (*LesServer, error) { - var csvLogger *csvlogger.Logger - if logFileName != "" { - csvLogger = csvlogger.NewLogger(logFileName, time.Second*10, "event, peerId") - } - requestLogger := csvLogger - if !logRequestServing { - requestLogger = nil - } lesTopics := make([]discv5.Topic, len(AdvertiseProtocolVersions)) for i, pv := range AdvertiseProtocolVersions { lesTopics[i] = lesTopic(e.BlockChain().Genesis().Hash(), pv) @@ -99,10 +78,8 @@ func NewLesServer(e *eth.Ethereum, config *eth.Config) (*LesServer, error) { quitSync: quitSync, lesTopics: lesTopics, onlyAnnounce: config.OnlyAnnounce, - csvLogger: csvLogger, - logTotalCap: requestLogger.NewChannel("totalCapacity", 0.01), } - srv.costTracker, srv.minCapacity = newCostTracker(e.ChainDb(), config, requestLogger) + srv.costTracker, srv.minCapacity = newCostTracker(e.ChainDb(), config) logger := log.New() srv.thcNormal = config.LightServ * 4 / 100 @@ -131,10 +108,7 @@ func NewLesServer(e *eth.Ethereum, config *eth.Config) (*LesServer, error) { return nil, err } srv.protocolManager = pm - if logProtocolHandler { - pm.logger = csvLogger - } - pm.servingQueue = newServingQueue(int64(time.Millisecond*10), float64(config.LightServ)/100, requestLogger) + pm.servingQueue = newServingQueue(int64(time.Millisecond*10), float64(config.LightServ)/100) pm.server = srv return srv, nil @@ -142,12 +116,6 @@ func NewLesServer(e *eth.Ethereum, config *eth.Config) (*LesServer, error) { func (s *LesServer) APIs() []rpc.API { return []rpc.API{ - { - Namespace: "les", - Version: "1.0", - Service: NewPrivateLightServerAPI(s), - Public: false, - }, { Namespace: "les", Version: "1.0", @@ -163,11 +131,10 @@ func (s *LesServer) APIs() []rpc.API { func (s *LesServer) startEventLoop() { s.protocolManager.wg.Add(1) - blockProcLogger := s.csvLogger - if !logBlockProcEvents { - blockProcLogger = nil - } - var processing, procLast bool + var ( + processing, procLast bool + procStarted time.Time + ) blockProcFeed := make(chan bool, 100) s.protocolManager.blockchain.(*core.BlockChain).SubscribeBlockProcessingEvent(blockProcFeed) totalRechargeCh := make(chan uint64, 100) @@ -176,13 +143,13 @@ func (s *LesServer) startEventLoop() { updateRecharge := func() { if processing { if !procLast { - blockProcLogger.Event("block processing started") + procStarted = time.Now() } s.protocolManager.servingQueue.setThreads(s.thcBlockProcessing) s.fcManager.SetRechargeCurve(flowcontrol.PieceWiseLinear{{0, 0}, {totalRecharge, totalRecharge}}) } else { if procLast { - blockProcLogger.Event("block processing finished") + blockProcessingTimer.UpdateSince(procStarted) } s.protocolManager.servingQueue.setThreads(s.thcNormal) s.fcManager.SetRechargeCurve(flowcontrol.PieceWiseLinear{{0, 0}, {totalRecharge / 16, totalRecharge / 2}, {totalRecharge / 2, totalRecharge / 2}, {totalRecharge, totalRecharge}}) @@ -191,7 +158,7 @@ func (s *LesServer) startEventLoop() { } updateRecharge() totalCapacity := s.fcManager.SubscribeTotalCapacity(totalCapacityCh) - s.priorityClientPool.setLimits(s.maxPeers, totalCapacity) + s.freeClientPool.setLimits(s.maxPeers, totalCapacity) var maxFreePeers uint64 go func() { @@ -202,13 +169,13 @@ func (s *LesServer) startEventLoop() { case totalRecharge = <-totalRechargeCh: updateRecharge() case totalCapacity = <-totalCapacityCh: - s.logTotalCap.Update(float64(totalCapacity)) + totalCapacityGauge.Update(int64(totalCapacity)) newFreePeers := totalCapacity / s.freeClientCap if newFreePeers < maxFreePeers && newFreePeers < uint64(s.maxPeers) { log.Warn("Reduced total capacity", "maxFreePeers", newFreePeers) } maxFreePeers = newFreePeers - s.priorityClientPool.setLimits(s.maxPeers, totalCapacity) + s.freeClientPool.setLimits(s.maxPeers, totalCapacity) case <-s.protocolManager.quitSync: s.protocolManager.wg.Done() return @@ -243,19 +210,9 @@ func (s *LesServer) Start(srvr *p2p.Server) { maxCapacity = totalRecharge } s.fcManager.SetCapacityLimits(s.freeClientCap, maxCapacity, s.freeClientCap*2) - poolMetricsLogger := s.csvLogger - if !logClientPoolMetrics { - poolMetricsLogger = nil - } - poolEventLogger := s.csvLogger - if !logClientPoolEvents { - poolEventLogger = nil - } - s.freeClientPool = newFreeClientPool(s.chainDb, s.freeClientCap, 10000, mclock.System{}, func(id string) { go s.protocolManager.removePeer(id) }, poolMetricsLogger, poolEventLogger) - s.priorityClientPool = newPriorityClientPool(s.freeClientCap, s.protocolManager.peers, s.freeClientPool, poolMetricsLogger, poolEventLogger) + s.freeClientPool = newFreeClientPool(s.chainDb, s.freeClientCap, 10000, mclock.System{}, func(id string) { go s.protocolManager.removePeer(id) }) + s.protocolManager.peers.notify(s.freeClientPool) - s.protocolManager.peers.notify(s.priorityClientPool) - s.csvLogger.Start() s.startEventLoop() s.protocolManager.Start(s.config.LightPeers) if srvr.DiscV5 != nil { @@ -296,7 +253,6 @@ func (s *LesServer) Stop() { s.freeClientPool.stop() s.costTracker.stop() s.protocolManager.Stop() - s.csvLogger.Stop() } // todo(rjl493456442) separate client and server implementation. diff --git a/les/servingqueue.go b/les/servingqueue.go index 26656ec01..a9e8369fe 100644 --- a/les/servingqueue.go +++ b/les/servingqueue.go @@ -17,14 +17,12 @@ package les import ( - "fmt" "sort" "sync" "sync/atomic" "github.com/ethereum/go-ethereum/common/mclock" "github.com/ethereum/go-ethereum/common/prque" - "github.com/ethereum/go-ethereum/les/csvlogger" ) // servingQueue allows running tasks in a limited number of threads and puts the @@ -44,10 +42,6 @@ type servingQueue struct { queue *prque.Prque // priority queue for waiting or suspended tasks best *servingTask // the highest priority task (not included in the queue) suspendBias int64 // priority bias against suspending an already running task - - logger *csvlogger.Logger - logRecentTime *csvlogger.Channel - logQueuedTime *csvlogger.Channel } // servingTask represents a request serving task. Tasks can be implemented to @@ -127,7 +121,7 @@ func (t *servingTask) waitOrStop() bool { } // newServingQueue returns a new servingQueue -func newServingQueue(suspendBias int64, utilTarget float64, logger *csvlogger.Logger) *servingQueue { +func newServingQueue(suspendBias int64, utilTarget float64) *servingQueue { sq := &servingQueue{ queue: prque.New(nil), suspendBias: suspendBias, @@ -140,9 +134,6 @@ func newServingQueue(suspendBias int64, utilTarget float64, logger *csvlogger.Lo burstDropLimit: uint64(utilTarget * bufLimitRatio * 1000000), burstDecRate: utilTarget, lastUpdate: mclock.Now(), - logger: logger, - logRecentTime: logger.NewMinMaxChannel("recentTime", false), - logQueuedTime: logger.NewMinMaxChannel("queuedTime", false), } sq.wg.Add(2) go sq.queueLoop() @@ -246,16 +237,13 @@ func (sq *servingQueue) freezePeers() { } sort.Sort(peerList) drop := true - sq.logger.Event("freezing peers") for _, tasks := range peerList { if drop { tasks.peer.freezeClient() tasks.peer.fcClient.Freeze() sq.queuedTime -= tasks.sumTime - if sq.logQueuedTime != nil { - sq.logQueuedTime.Update(float64(sq.queuedTime) / 1000) - } - sq.logger.Event(fmt.Sprintf("frozen peer sumTime=%d, %v", tasks.sumTime, tasks.peer.id)) + sqQueuedGauge.Update(int64(sq.queuedTime)) + clientFreezeMeter.Mark(1) drop = sq.recentTime+sq.queuedTime > sq.burstDropLimit for _, task := range tasks.list { task.tokenCh <- nil @@ -299,10 +287,8 @@ func (sq *servingQueue) addTask(task *servingTask) { } sq.updateRecentTime() sq.queuedTime += task.expTime - if sq.logQueuedTime != nil { - sq.logRecentTime.Update(float64(sq.recentTime) / 1000) - sq.logQueuedTime.Update(float64(sq.queuedTime) / 1000) - } + sqServedGauge.Update(int64(sq.recentTime)) + sqQueuedGauge.Update(int64(sq.queuedTime)) if sq.recentTime+sq.queuedTime > sq.burstLimit { sq.freezePeers() } @@ -322,10 +308,8 @@ func (sq *servingQueue) queueLoop() { sq.updateRecentTime() sq.queuedTime -= expTime sq.recentTime += expTime - if sq.logQueuedTime != nil { - sq.logRecentTime.Update(float64(sq.recentTime) / 1000) - sq.logQueuedTime.Update(float64(sq.queuedTime) / 1000) - } + sqServedGauge.Update(int64(sq.recentTime)) + sqQueuedGauge.Update(int64(sq.queuedTime)) if sq.queue.Size() == 0 { sq.best = nil } else {