502 lines
15 KiB
Nim
502 lines
15 KiB
Nim
import
|
|
tables, sets,
|
|
chronicles, asyncdispatch2, rlp, eth_common/eth_types,
|
|
../../rlpx, ../../private/types, private/les_types
|
|
|
|
const
|
|
maxSamples = 100000
|
|
rechargingScale = 1000000
|
|
|
|
lesStatsKey = "les.flow_control.stats"
|
|
lesStatsVer = 0
|
|
|
|
logScope:
|
|
topics = "les flow_control"
|
|
|
|
# TODO: move this somewhere
|
|
proc pop[A, B](t: var Table[A, B], key: A): B =
|
|
result = t[key]
|
|
t.del(key)
|
|
|
|
when LesTime is SomeInteger:
|
|
template `/`(lhs, rhs: LesTime): LesTime =
|
|
lhs div rhs
|
|
|
|
when defined(testing):
|
|
var lesTime* = LesTime(0)
|
|
template now(): LesTime = lesTime
|
|
template advanceTime(t) = lesTime += LesTime(t)
|
|
|
|
else:
|
|
import times
|
|
let startTime = epochTime()
|
|
|
|
proc now(): LesTime =
|
|
return LesTime((times.epochTime() - startTime) * 1000.0)
|
|
|
|
proc addSample(ra: var StatsRunningAverage; x, y: float64) =
|
|
if ra.count >= maxSamples:
|
|
let decay = float64(ra.count + 1 - maxSamples) / maxSamples
|
|
template applyDecay(x) = x -= x * decay
|
|
|
|
applyDecay ra.sumX
|
|
applyDecay ra.sumY
|
|
applyDecay ra.sumXX
|
|
applyDecay ra.sumXY
|
|
ra.count = maxSamples - 1
|
|
|
|
inc ra.count
|
|
ra.sumX += x
|
|
ra.sumY += y
|
|
ra.sumXX += x * x
|
|
ra.sumXY += x * y
|
|
|
|
proc calc(ra: StatsRunningAverage): tuple[m, b: float] =
|
|
if ra.count == 0:
|
|
return
|
|
|
|
let count = float64(ra.count)
|
|
let d = count * ra.sumXX - ra.sumX * ra.sumX
|
|
if d < 0.001:
|
|
return (m: ra.sumY / count, b: 0.0)
|
|
|
|
result.m = (count * ra.sumXY - ra.sumX * ra.sumY) / d
|
|
result.b = (ra.sumY / count) - (result.m * ra.sumX / count)
|
|
|
|
proc currentRequestsCosts*(network: LesNetwork,
|
|
les: ProtocolInfo): seq[ReqCostInfo] =
|
|
# Make sure the message costs are already initialized
|
|
doAssert network.messageStats.len > les.messages[^1].id,
|
|
"Have you called `initFlowControl`"
|
|
|
|
for msg in les.messages:
|
|
var (m, b) = network.messageStats[msg.id].calc()
|
|
if m < 0:
|
|
b += m
|
|
m = 0
|
|
|
|
if b < 0:
|
|
b = 0
|
|
|
|
result.add ReqCostInfo.init(msgId = msg.id,
|
|
baseCost = ReqCostInt(b * 2),
|
|
reqCost = ReqCostInt(m * 2))
|
|
|
|
proc persistMessageStats*(db: AbstractChainDB,
|
|
network: LesNetwork) =
|
|
doAssert db != nil
|
|
# XXX: Because of the package_visible_types template magic, Nim complains
|
|
# when we pass the messageStats expression directly to `encodeList`
|
|
let stats = network.messageStats
|
|
db.setSetting(lesStatsKey, rlp.encodeList(lesStatsVer, stats))
|
|
|
|
proc loadMessageStats*(network: LesNetwork,
|
|
les: ProtocolInfo,
|
|
db: AbstractChainDb): bool =
|
|
block readFromDB:
|
|
if db == nil:
|
|
break readFromDB
|
|
|
|
var stats = db.getSetting(lesStatsKey)
|
|
if stats.len == 0:
|
|
notice "LES stats not present in the database"
|
|
break readFromDB
|
|
|
|
try:
|
|
var statsRlp = rlpFromBytes(stats.toRange)
|
|
statsRlp.enterList
|
|
|
|
let version = statsRlp.read(int)
|
|
if version != lesStatsVer:
|
|
notice "Found outdated LES stats record"
|
|
break readFromDB
|
|
|
|
statsRlp >> network.messageStats
|
|
if network.messageStats.len <= les.messages[^1].id:
|
|
notice "Found incomplete LES stats record"
|
|
break readFromDB
|
|
|
|
return true
|
|
|
|
except RlpError:
|
|
error "Error while loading LES message stats",
|
|
err = getCurrentExceptionMsg()
|
|
|
|
newSeq(network.messageStats, les.messages[^1].id + 1)
|
|
return false
|
|
|
|
proc update(s: var FlowControlState, t: LesTime) =
|
|
let dt = max(t - s.lastUpdate, LesTime(0))
|
|
|
|
s.bufValue = min(
|
|
s.bufValue + s.minRecharge * dt,
|
|
s.bufLimit)
|
|
|
|
s.lastUpdate = t
|
|
|
|
proc init(s: var FlowControlState,
|
|
bufLimit: BufValueInt, minRecharge: int, t: LesTime) =
|
|
s.bufValue = bufLimit
|
|
s.bufLimit = bufLimit
|
|
s.minRecharge = minRecharge
|
|
s.lastUpdate = t
|
|
|
|
func canMakeRequest(s: FlowControlState,
|
|
maxCost: ReqCostInt): (LesTime, float64) =
|
|
## Returns the required waiting time before sending a request and
|
|
## the estimated buffer level afterwards (as a fraction of the limit)
|
|
const safetyMargin = 50
|
|
|
|
var maxCost = min(
|
|
maxCost + safetyMargin * s.minRecharge,
|
|
s.bufLimit)
|
|
|
|
if s.bufValue >= maxCost:
|
|
result[1] = float64(s.bufValue - maxCost) / float64(s.bufLimit)
|
|
else:
|
|
result[0] = (maxCost - s.bufValue) / s.minRecharge
|
|
|
|
func canServeRequest(srv: LesNetwork): bool =
|
|
result = srv.reqCount < srv.maxReqCount and
|
|
srv.reqCostSum < srv.maxReqCostSum
|
|
|
|
proc rechargeReqCost(peer: LesPeer, t: LesTime) =
|
|
let dt = t - peer.lastRechargeTime
|
|
peer.reqCostVal += peer.reqCostGradient * dt / rechargingScale
|
|
peer.lastRechargeTime = t
|
|
if peer.isRecharging and t >= peer.rechargingEndsAt:
|
|
peer.isRecharging = false
|
|
peer.reqCostGradient = 0
|
|
peer.reqCostVal = 0
|
|
|
|
proc updateRechargingParams(peer: LesPeer, network: LesNetwork) =
|
|
peer.reqCostGradient = 0
|
|
if peer.reqCount > 0:
|
|
peer.reqCostGradient = rechargingScale / network.reqCount
|
|
|
|
if peer.isRecharging:
|
|
peer.reqCostGradient = (network.rechargingRate * peer.rechargingPower /
|
|
network.totalRechargingPower )
|
|
|
|
peer.rechargingEndsAt = peer.lastRechargeTime +
|
|
LesTime(peer.reqCostVal * rechargingScale /
|
|
-peer.reqCostGradient )
|
|
|
|
proc trackRequests(network: LesNetwork, peer: LesPeer, reqCountChange: int) =
|
|
peer.reqCount += reqCountChange
|
|
network.reqCount += reqCountChange
|
|
|
|
doAssert peer.reqCount >= 0 and network.reqCount >= 0
|
|
|
|
if peer.reqCount == 0:
|
|
# All requests have been finished. Start recharging.
|
|
peer.isRecharging = true
|
|
network.totalRechargingPower += peer.rechargingPower
|
|
elif peer.reqCount == reqCountChange and peer.isRecharging:
|
|
# `peer.reqCount` must have been 0 for the condition above to hold.
|
|
# This is a transition from recharging to serving state.
|
|
peer.isRecharging = false
|
|
network.totalRechargingPower -= peer.rechargingPower
|
|
peer.startReqCostVal = peer.reqCostVal
|
|
|
|
updateRechargingParams peer, network
|
|
|
|
proc updateFlowControl(network: LesNetwork, t: LesTime) =
|
|
while true:
|
|
var firstTime = t
|
|
for peer in network.peers:
|
|
# TODO: perhaps use a bin heap here
|
|
if peer.isRecharging and peer.rechargingEndsAt < firstTime:
|
|
firstTime = peer.rechargingEndsAt
|
|
|
|
let rechargingEndedForSomePeer = firstTime < t
|
|
|
|
network.reqCostSum = 0
|
|
for peer in network.peers:
|
|
peer.rechargeReqCost firstTime
|
|
network.reqCostSum += peer.reqCostVal
|
|
|
|
if rechargingEndedForSomePeer:
|
|
for peer in network.peers:
|
|
if peer.isRecharging:
|
|
updateRechargingParams peer, network
|
|
else:
|
|
network.lastUpdate = t
|
|
return
|
|
|
|
proc endPendingRequest*(network: LesNetwork, peer: LesPeer, t: LesTime) =
|
|
if peer.reqCount > 0:
|
|
network.updateFlowControl t
|
|
network.trackRequests peer, -1
|
|
network.updateFlowControl t
|
|
|
|
proc enlistInFlowControl*(network: LesNetwork,
|
|
peer: LesPeer,
|
|
peerRechargingPower = 100) =
|
|
let t = now()
|
|
|
|
assert peer.isServer or peer.isClient
|
|
# Each Peer must be potential communication partner for us.
|
|
# There will be useless peers on the network, but the logic
|
|
# should make sure to disconnect them earlier in `onPeerConnected`.
|
|
|
|
if peer.isServer:
|
|
peer.localFlowState.init network.bufferLimit, network.minRechargingRate, t
|
|
peer.pendingReqs = initTable[int, ReqCostInt]()
|
|
|
|
if peer.isClient:
|
|
peer.remoteFlowState.init network.bufferLimit, network.minRechargingRate, t
|
|
peer.lastRechargeTime = t
|
|
peer.rechargingEndsAt = t
|
|
peer.rechargingPower = peerRechargingPower
|
|
|
|
network.updateFlowControl t
|
|
|
|
proc delistFromFlowControl*(network: LesNetwork, peer: LesPeer) =
|
|
let t = now()
|
|
|
|
# XXX: perhaps this is not safe with our reqCount logic.
|
|
# The original code may depend on the binarity of the `serving` flag.
|
|
network.endPendingRequest peer, t
|
|
network.updateFlowControl t
|
|
|
|
proc initFlowControl*(network: LesNetwork, les: ProtocolInfo,
|
|
maxReqCount, maxReqCostSum, reqCostTarget: int,
|
|
db: AbstractChainDb = nil) =
|
|
network.rechargingRate = (rechargingScale * rechargingScale) /
|
|
(100 * rechargingScale / reqCostTarget - rechargingScale)
|
|
network.maxReqCount = maxReqCount
|
|
network.maxReqCostSum = maxReqCostSum
|
|
|
|
if not network.loadMessageStats(les, db):
|
|
warn "Failed to load persisted LES message stats. " &
|
|
"Flow control will be re-initilized."
|
|
|
|
proc canMakeRequest(peer: var LesPeer, maxCost: int): (LesTime, float64) =
|
|
peer.localFlowState.update now()
|
|
return peer.localFlowState.canMakeRequest(maxCost)
|
|
|
|
template getRequestCost(peer: LesPeer, localOrRemote: untyped,
|
|
msgId, costQuantity: int): ReqCostInt =
|
|
template msgCostInfo: untyped = peer.`localOrRemote ReqCosts`[msgId]
|
|
|
|
min(msgCostInfo.baseCost + msgCostInfo.reqCost * costQuantity,
|
|
peer.`localOrRemote FlowState`.bufLimit)
|
|
|
|
proc trackOutgoingRequest*(network: LesNetwork, peer: LesPeer,
|
|
msgId, reqId, costQuantity: int) =
|
|
let maxCost = peer.getRequestCost(local, msgId, costQuantity)
|
|
|
|
peer.localFlowState.bufValue -= maxCost
|
|
peer.pendingReqsCost += maxCost
|
|
peer.pendingReqs[reqId] = peer.pendingReqsCost
|
|
|
|
proc trackIncomingResponse*(peer: LesPeer, reqId: int, bv: BufValueInt) =
|
|
let bv = min(bv, peer.localFlowState.bufLimit)
|
|
if not peer.pendingReqs.hasKey(reqId):
|
|
return
|
|
|
|
let costsSumAtSending = peer.pendingReqs.pop(reqId)
|
|
let costsSumChange = peer.pendingReqsCost - costsSumAtSending
|
|
|
|
peer.localFlowState.bufValue = if bv > costsSumChange: bv - costsSumChange
|
|
else: 0
|
|
peer.localFlowState.lastUpdate = now()
|
|
|
|
proc acceptRequest*(network: LesNetwork, peer: LesPeer,
|
|
msgId, costQuantity: int): Future[bool] {.async.} =
|
|
let t = now()
|
|
let reqCost = peer.getRequestCost(remote, msgId, costQuantity)
|
|
|
|
peer.remoteFlowState.update t
|
|
network.updateFlowControl t
|
|
|
|
while not network.canServeRequest:
|
|
await sleepAsync(10)
|
|
|
|
if peer notin network.peers:
|
|
# The peer was disconnected or the network
|
|
# was shut down while we waited
|
|
return false
|
|
|
|
network.trackRequests peer, +1
|
|
network.updateFlowControl network.lastUpdate
|
|
|
|
if reqCost > peer.remoteFlowState.bufValue:
|
|
error "LES peer sent request too early",
|
|
recharge = (reqCost - peer.remoteFlowState.bufValue) * rechargingScale /
|
|
peer.remoteFlowState.minRecharge
|
|
return false
|
|
|
|
return true
|
|
|
|
proc bufValueAfterRequest*(network: LesNetwork, peer: LesPeer,
|
|
msgId: int, quantity: int): BufValueInt =
|
|
let t = now()
|
|
let costs = peer.remoteReqCosts[msgId]
|
|
var reqCost = costs.baseCost + quantity * costs.reqCost
|
|
|
|
peer.remoteFlowState.update t
|
|
peer.remoteFlowState.bufValue -= reqCost
|
|
|
|
network.endPendingRequest peer, t
|
|
|
|
let curReqCost = peer.reqCostVal
|
|
if curReqCost < peer.remoteFlowState.bufLimit:
|
|
let bv = peer.remoteFlowState.bufLimit - curReqCost
|
|
if bv > peer.remoteFlowState.bufValue:
|
|
peer.remoteFlowState.bufValue = bv
|
|
|
|
network.messageStats[msgId].addSample(float64(quantity),
|
|
float64(curReqCost - peer.startReqCostVal))
|
|
|
|
return peer.remoteFlowState.bufValue
|
|
|
|
when defined(testing):
|
|
import unittest, random, ../../rlpx
|
|
|
|
proc isMax(s: FlowControlState): bool =
|
|
s.bufValue == s.bufLimit
|
|
|
|
rlpxProtocol dummyLes(version = 1, shortName = "abc"):
|
|
proc a(p: Peer)
|
|
proc b(p: Peer)
|
|
proc c(p: Peer)
|
|
proc d(p: Peer)
|
|
proc e(p: Peer)
|
|
|
|
template fequals(lhs, rhs: float64, epsilon = 0.0001): bool =
|
|
abs(lhs-rhs) < epsilon
|
|
|
|
proc tests* =
|
|
randomize(3913631)
|
|
|
|
suite "les flow control":
|
|
suite "running averages":
|
|
test "consistent costs":
|
|
var s: StatsRunningAverage
|
|
for i in 0..100:
|
|
s.addSample(5.0, 100.0)
|
|
|
|
let (cost, base) = s.calc
|
|
|
|
check:
|
|
fequals(cost, 100.0)
|
|
fequals(base, 0.0)
|
|
|
|
test "randomized averages":
|
|
proc performTest(qBase, qRandom: int, cBase, cRandom: float64) =
|
|
var
|
|
s: StatsRunningAverage
|
|
expectedFinalCost = cBase + cRandom / 2
|
|
error = expectedFinalCost
|
|
|
|
for samples in [100, 1000, 10000]:
|
|
for i in 0..samples:
|
|
let q = float64(qBase + rand(10))
|
|
s.addSample(q, q * (cBase + rand(cRandom)))
|
|
|
|
let (newCost, newBase) = s.calc
|
|
# With more samples, our error should decrease, getting
|
|
# closer and closer to the average (unless we are already close enough)
|
|
let newError = abs(newCost - expectedFinalCost)
|
|
check newError < error
|
|
error = newError
|
|
|
|
# After enough samples we should be very close the the final result
|
|
check error < (expectedFinalCost * 0.02)
|
|
|
|
performTest(1, 10, 5.0, 100.0)
|
|
performTest(1, 4, 200.0, 1000.0)
|
|
|
|
suite "buffer value calculations":
|
|
type TestReq = object
|
|
peer: LesPeer
|
|
msgId, quantity: int
|
|
accepted: bool
|
|
|
|
setup:
|
|
var lesNetwork = new LesNetwork
|
|
lesNetwork.peers = initSet[LesPeer]()
|
|
lesNetwork.initFlowControl(dummyLes.protocolInfo,
|
|
reqCostTarget = 300,
|
|
maxReqCount = 5,
|
|
maxReqCostSum = 1000)
|
|
|
|
for i in 0 ..< lesNetwork.messageStats.len:
|
|
lesNetwork.messageStats[i].addSample(1.0, float(i) * 100.0)
|
|
|
|
var client = new LesPeer
|
|
client.isClient = true
|
|
|
|
var server = new LesPeer
|
|
server.isServer = true
|
|
|
|
var clientServer = new LesPeer
|
|
clientServer.isClient = true
|
|
clientServer.isServer = true
|
|
|
|
var client2 = new LesPeer
|
|
client2.isClient = true
|
|
|
|
var client3 = new LesPeer
|
|
client3.isClient = true
|
|
|
|
var bv: BufValueInt
|
|
|
|
template enlist(peer: LesPeer) {.dirty.} =
|
|
let reqCosts = currentRequestsCosts(lesNetwork, dummyLes.protocolInfo)
|
|
peer.remoteReqCosts = reqCosts
|
|
peer.localReqCosts = reqCosts
|
|
lesNetwork.peers.incl peer
|
|
lesNetwork.enlistInFlowControl peer
|
|
|
|
template startReq(p: LesPeer, msg, q: int): TestReq =
|
|
var req: TestReq
|
|
req.peer = p
|
|
req.msgId = msg
|
|
req.quantity = q
|
|
req.accepted = waitFor lesNetwork.acceptRequest(p, msg, q)
|
|
req
|
|
|
|
template endReq(req: TestReq): BufValueInt =
|
|
bufValueAfterRequest(lesNetwork, req.peer, req.msgId, req.quantity)
|
|
|
|
test "single peer recharging":
|
|
lesNetwork.bufferLimit = 1000
|
|
lesNetwork.minRechargingRate = 100
|
|
|
|
enlist client
|
|
|
|
check:
|
|
client.remoteFlowState.isMax
|
|
client.rechargingPower > 0
|
|
|
|
advanceTime 100
|
|
|
|
let r1 = client.startReq(0, 100)
|
|
check r1.accepted
|
|
check client.isRecharging == false
|
|
|
|
advanceTime 50
|
|
|
|
let r2 = client.startReq(1, 1)
|
|
check r2.accepted
|
|
check client.isRecharging == false
|
|
|
|
advanceTime 25
|
|
bv = endReq r2
|
|
check client.isRecharging == false
|
|
|
|
advanceTime 130
|
|
bv = endReq r1
|
|
check client.isRecharging == true
|
|
|
|
advanceTime 300
|
|
lesNetwork.updateFlowControl now()
|
|
|
|
check:
|
|
client.isRecharging == false
|
|
client.remoteFlowState.isMax
|
|
|