From 854f068ed60918c624857594a703a6724b8efb3a Mon Sep 17 00:00:00 2001 From: gary rong Date: Tue, 27 Apr 2021 15:44:59 +0800 Subject: [PATCH] les: polish code (#22625) * les: polish code * les/vflus/server: fixes * les: fix lint --- les/metrics.go | 1 - les/peer.go | 1 - les/server_handler.go | 21 +++--- les/vflux/server/prioritypool.go | 123 +++++++++++++++++++------------ 4 files changed, 83 insertions(+), 63 deletions(-) diff --git a/les/metrics.go b/les/metrics.go index d356326b7..07d3133c9 100644 --- a/les/metrics.go +++ b/les/metrics.go @@ -71,7 +71,6 @@ var ( connectionTimer = metrics.NewRegisteredTimer("les/connection/duration", nil) serverConnectionGauge = metrics.NewRegisteredGauge("les/connection/server", nil) - clientConnectionGauge = metrics.NewRegisteredGauge("les/connection/client", nil) totalCapacityGauge = metrics.NewRegisteredGauge("les/server/totalCapacity", nil) totalRechargeGauge = metrics.NewRegisteredGauge("les/server/totalRecharge", nil) diff --git a/les/peer.go b/les/peer.go index f6cc94dfa..c6c672942 100644 --- a/les/peer.go +++ b/les/peer.go @@ -1099,7 +1099,6 @@ func (p *clientPeer) Handshake(td *big.Int, head common.Hash, headNum uint64, ge // set default announceType on server side p.announceType = announceTypeSimple } - p.fcClient = flowcontrol.NewClientNode(server.fcManager, p.fcParams) } return nil }) diff --git a/les/server_handler.go b/les/server_handler.go index 0a683c1b4..80fcf1c44 100644 --- a/les/server_handler.go +++ b/les/server_handler.go @@ -30,6 +30,7 @@ import ( "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/les/flowcontrol" "github.com/ethereum/go-ethereum/light" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" @@ -122,26 +123,27 @@ func (h *serverHandler) handle(p *clientPeer) error { p.Log().Debug("Light Ethereum handshake failed", "err", err) return err } - + // Connected to another server, no messages expected, just wait for disconnection if p.server { if err := h.server.serverset.register(p); err != nil { return err } - // connected to another server, no messages expected, just wait for disconnection _, err := p.rw.ReadMsg() h.server.serverset.unregister(p) return err } - defer p.fcClient.Disconnect() // set by handshake if it's not another server + // Setup flow control mechanism for the peer + p.fcClient = flowcontrol.NewClientNode(h.server.fcManager, p.fcParams) + defer p.fcClient.Disconnect() - // Reject light clients if server is not synced. - // - // Put this checking here, so that "non-synced" les-server peers are still allowed - // to keep the connection. + // Reject light clients if server is not synced. Put this checking here, so + // that "non-synced" les-server peers are still allowed to keep the connection. if !h.synced() { p.Log().Debug("Light server not synced, rejecting peer") return p2p.DiscRequested } + + // Register the peer into the peerset and clientpool if err := h.server.peers.register(p); err != nil { return err } @@ -150,19 +152,14 @@ func (h *serverHandler) handle(p *clientPeer) error { p.Log().Debug("Client pool already closed") return p2p.DiscRequested } - activeCount, _ := h.server.clientPool.Active() - clientConnectionGauge.Update(int64(activeCount)) p.connectedAt = mclock.Now() var wg sync.WaitGroup // Wait group used to track all in-flight task routines. - defer func() { wg.Wait() // Ensure all background task routines have exited. h.server.clientPool.Unregister(p) h.server.peers.unregister(p.ID()) p.balance = nil - activeCount, _ := h.server.clientPool.Active() - clientConnectionGauge.Update(int64(activeCount)) connectionTimer.Update(time.Duration(mclock.Now() - p.connectedAt)) }() diff --git a/les/vflux/server/prioritypool.go b/les/vflux/server/prioritypool.go index 573a3570a..480f77e6a 100644 --- a/les/vflux/server/prioritypool.go +++ b/les/vflux/server/prioritypool.go @@ -63,20 +63,22 @@ type priorityPool struct { ns *nodestate.NodeStateMachine clock mclock.Clock lock sync.Mutex - inactiveQueue *prque.Prque maxCount, maxCap uint64 minCap uint64 activeBias time.Duration capacityStepDiv, fineStepDiv uint64 + // The snapshot of priority pool for query. cachedCurve *capacityCurve ccUpdatedAt mclock.AbsTime ccUpdateForced bool - tempState []*ppNodeInfo // nodes currently in temporary state - // the following fields represent the temporary state if tempState is not empty + // Runtime status of prioritypool, represents the + // temporary state if tempState is not empty + tempState []*ppNodeInfo activeCount, activeCap uint64 activeQueue *prque.LazyQueue + inactiveQueue *prque.Prque } // ppNodeInfo is the internal node descriptor of priorityPool @@ -89,8 +91,9 @@ type ppNodeInfo struct { tempState bool // should only be true while the priorityPool lock is held tempCapacity uint64 // equals capacity when tempState is false + // the following fields only affect the temporary state and they are set to their - // default value when entering the temp state + // default value when leaving the temp state minTarget, stepDiv uint64 bias time.Duration } @@ -157,11 +160,6 @@ func newPriorityPool(ns *nodestate.NodeStateMachine, setup *serverSetup, clock m func (pp *priorityPool) requestCapacity(node *enode.Node, minTarget, maxTarget uint64, bias time.Duration) uint64 { pp.lock.Lock() pp.activeQueue.Refresh() - var updates []capUpdate - defer func() { - pp.lock.Unlock() - pp.updateFlags(updates) - }() if minTarget < pp.minCap { minTarget = pp.minCap @@ -175,12 +173,13 @@ func (pp *priorityPool) requestCapacity(node *enode.Node, minTarget, maxTarget u c, _ := pp.ns.GetField(node, pp.setup.queueField).(*ppNodeInfo) if c == nil { log.Error("requestCapacity called for unknown node", "id", node.ID()) + pp.lock.Unlock() return 0 } pp.setTempState(c) if maxTarget > c.capacity { - c.bias = bias - c.stepDiv = pp.fineStepDiv + pp.setTempStepDiv(c, pp.fineStepDiv) + pp.setTempBias(c, bias) } pp.setTempCapacity(c, maxTarget) c.minTarget = minTarget @@ -188,7 +187,9 @@ func (pp *priorityPool) requestCapacity(node *enode.Node, minTarget, maxTarget u pp.inactiveQueue.Remove(c.inactiveIndex) pp.activeQueue.Push(c) pp.enforceLimits() - updates = pp.finalizeChanges(c.tempCapacity >= minTarget && c.tempCapacity <= maxTarget && c.tempCapacity != c.capacity) + updates := pp.finalizeChanges(c.tempCapacity >= minTarget && c.tempCapacity <= maxTarget && c.tempCapacity != c.capacity) + pp.lock.Unlock() + pp.updateFlags(updates) return c.capacity } @@ -196,15 +197,11 @@ func (pp *priorityPool) requestCapacity(node *enode.Node, minTarget, maxTarget u func (pp *priorityPool) SetLimits(maxCount, maxCap uint64) { pp.lock.Lock() pp.activeQueue.Refresh() - var updates []capUpdate - defer func() { - pp.lock.Unlock() - pp.ns.Operation(func() { pp.updateFlags(updates) }) - }() - inc := (maxCount > pp.maxCount) || (maxCap > pp.maxCap) dec := (maxCount < pp.maxCount) || (maxCap < pp.maxCap) pp.maxCount, pp.maxCap = maxCount, maxCap + + var updates []capUpdate if dec { pp.enforceLimits() updates = pp.finalizeChanges(true) @@ -212,6 +209,8 @@ func (pp *priorityPool) SetLimits(maxCount, maxCap uint64) { if inc { updates = append(updates, pp.tryActivate(false)...) } + pp.lock.Unlock() + pp.ns.Operation(func() { pp.updateFlags(updates) }) } // setActiveBias sets the bias applied when trying to activate inactive nodes @@ -291,18 +290,15 @@ func (pp *priorityPool) inactivePriority(p *ppNodeInfo) int64 { func (pp *priorityPool) connectedNode(c *ppNodeInfo) { pp.lock.Lock() pp.activeQueue.Refresh() - var updates []capUpdate - defer func() { - pp.lock.Unlock() - pp.updateFlags(updates) - }() - if c.connected { + pp.lock.Unlock() return } c.connected = true pp.inactiveQueue.Push(c, pp.inactivePriority(c)) - updates = pp.tryActivate(false) + updates := pp.tryActivate(false) + pp.lock.Unlock() + pp.updateFlags(updates) } // disconnectedNode is called when a node has been removed from the pool (both inactiveFlag @@ -311,23 +307,22 @@ func (pp *priorityPool) connectedNode(c *ppNodeInfo) { func (pp *priorityPool) disconnectedNode(c *ppNodeInfo) { pp.lock.Lock() pp.activeQueue.Refresh() - var updates []capUpdate - defer func() { - pp.lock.Unlock() - pp.updateFlags(updates) - }() - if !c.connected { + pp.lock.Unlock() return } c.connected = false pp.activeQueue.Remove(c.activeIndex) pp.inactiveQueue.Remove(c.inactiveIndex) + + var updates []capUpdate if c.capacity != 0 { pp.setTempState(c) pp.setTempCapacity(c, 0) updates = pp.tryActivate(true) } + pp.lock.Unlock() + pp.updateFlags(updates) } // setTempState internally puts a node in a temporary state that can either be reverted @@ -342,27 +337,62 @@ func (pp *priorityPool) setTempState(c *ppNodeInfo) { if c.tempCapacity != c.capacity { // should never happen log.Error("tempCapacity != capacity when entering tempState") } + // Assign all the defaults to the temp state. c.minTarget = pp.minCap c.stepDiv = pp.capacityStepDiv + c.bias = 0 pp.tempState = append(pp.tempState, c) } +// unsetTempState revokes the temp status of the node and reset all internal +// fields to the default value. +func (pp *priorityPool) unsetTempState(c *ppNodeInfo) { + if !c.tempState { + return + } + c.tempState = false + if c.tempCapacity != c.capacity { // should never happen + log.Error("tempCapacity != capacity when leaving tempState") + } + c.minTarget = pp.minCap + c.stepDiv = pp.capacityStepDiv + c.bias = 0 +} + // setTempCapacity changes the capacity of a node in the temporary state and adjusts // activeCap and activeCount accordingly. Since this change is performed in the temporary // state it should be called after setTempState and before finalizeChanges. -func (pp *priorityPool) setTempCapacity(n *ppNodeInfo, cap uint64) { - if !n.tempState { // should never happen +func (pp *priorityPool) setTempCapacity(c *ppNodeInfo, cap uint64) { + if !c.tempState { // should never happen log.Error("Node is not in temporary state") return } - pp.activeCap += cap - n.tempCapacity - if n.tempCapacity == 0 { + pp.activeCap += cap - c.tempCapacity + if c.tempCapacity == 0 { pp.activeCount++ } if cap == 0 { pp.activeCount-- } - n.tempCapacity = cap + c.tempCapacity = cap +} + +// setTempBias changes the connection bias of a node in the temporary state. +func (pp *priorityPool) setTempBias(c *ppNodeInfo, bias time.Duration) { + if !c.tempState { // should never happen + log.Error("Node is not in temporary state") + return + } + c.bias = bias +} + +// setTempStepDiv changes the capacity divisor of a node in the temporary state. +func (pp *priorityPool) setTempStepDiv(c *ppNodeInfo, stepDiv uint64) { + if !c.tempState { // should never happen + log.Error("Node is not in temporary state") + return + } + c.stepDiv = stepDiv } // enforceLimits enforces active node count and total capacity limits. It returns the @@ -412,10 +442,8 @@ func (pp *priorityPool) finalizeChanges(commit bool) (updates []capUpdate) { } else { pp.setTempCapacity(c, c.capacity) // revert activeCount/activeCap } - c.tempState = false - c.bias = 0 - c.stepDiv = pp.capacityStepDiv - c.minTarget = pp.minCap + pp.unsetTempState(c) + if c.connected { if c.capacity != 0 { pp.activeQueue.Push(c) @@ -462,13 +490,13 @@ func (pp *priorityPool) tryActivate(commit bool) []capUpdate { for pp.inactiveQueue.Size() > 0 { c := pp.inactiveQueue.PopItem().(*ppNodeInfo) pp.setTempState(c) + pp.setTempBias(c, pp.activeBias) pp.setTempCapacity(c, pp.minCap) - c.bias = pp.activeBias pp.activeQueue.Push(c) pp.enforceLimits() if c.tempCapacity > 0 { commit = true - c.bias = 0 + pp.setTempBias(c, 0) } else { break } @@ -483,14 +511,9 @@ func (pp *priorityPool) tryActivate(commit bool) []capUpdate { func (pp *priorityPool) updatePriority(node *enode.Node) { pp.lock.Lock() pp.activeQueue.Refresh() - var updates []capUpdate - defer func() { - pp.lock.Unlock() - pp.updateFlags(updates) - }() - c, _ := pp.ns.GetField(node, pp.setup.queueField).(*ppNodeInfo) if c == nil || !c.connected { + pp.lock.Unlock() return } pp.activeQueue.Remove(c.activeIndex) @@ -500,7 +523,9 @@ func (pp *priorityPool) updatePriority(node *enode.Node) { } else { pp.inactiveQueue.Push(c, pp.inactivePriority(c)) } - updates = pp.tryActivate(false) + updates := pp.tryActivate(false) + pp.lock.Unlock() + pp.updateFlags(updates) } // capacityCurve is a snapshot of the priority pool contents in a format that can efficiently