New version of the bandwidth estimator
This commit is contained in:
@ -31,6 +31,90 @@ when defined(libp2p_expensive_metrics):
declareCounter(libp2p_pubsub_skipped_received_messages, "number of received skipped messages", labels = ["id"])
declareCounter(libp2p_pubsub_skipped_sent_messages, "number of sent skipped messages", labels = ["id"])
Bandwidth = object
bps: int
BandwidthEstimator = object
minRtts: seq[(Moment, Duration)]
sentData: seq[(Moment, int)]
bandwidth: Bandwidth
proc `*`(bw: Bandwidth, dur: Duration): int = int(bw.bps * dur.milliseconds div 1000)
proc `*`(dur: Duration, bw: Bandwidth): int = bw * dur
proc `*`(bw: Bandwidth, factor: int): Bandwidth = Bandwidth(bps: bw.bps * factor)
proc `div`(bw: Bandwidth, factor: int): Bandwidth = Bandwidth(bps: bw.bps div factor)
proc `+`(a, b: Bandwidth): Bandwidth = Bandwidth(bps: a.bps + b.bps)
proc `/`(bytes: int, bw: Bandwidth): Duration =
milliseconds(bytes * 1000 div bw.bps)
proc bytesIn(bytes: int, dur: Duration): Bandwidth =
Bandwidth(bps: int(bytes * 1000 div dur.milliseconds))
proc bufferSizeAtMoment(bwe: BandwidthEstimator, m: Moment): int =
# Using current bandwidth estimate, guess how much bytes
# were in flight at `m`
var lastSent: Moment
for (sent, size) in bwe.sentData:
if sent > m:
let timeGap = sent - lastSent
result -= timeGap * bwe.bandwidth
if result < 0: result = 0
result += size
lastSent = sent
let timeGap = m - lastSent
result -= timeGap * bwe.bandwidth
return max(0, result)
proc minRtt(bwe: BandwidthEstimator): Duration =
if bwe.minRtts.len == 0: 150.milliseconds
bwe.minRtts.foldl(min(a, b[1]), 1.hours)
proc addRtt(bwe: var BandwidthEstimator, now: Moment, rtt: Duration) =
# Save RTT for future minrtt computation
it[1] < rtt and
(now - it[0]) < 30.seconds
bwe.minRtts.add((now, rtt))
let minRtt = bwe.minRtt
if minRtt == rtt:
# just got a new minRtt, can't make an estimate here
let bufferSizeBeforeRtt = bwe.bufferSizeAtMoment(now - rtt)
if bufferSizeBeforeRtt / bwe.bandwidth < minRtt div 10:
# only estimate bandwidth when we have enough data queued to
# overcome jitter
let estimatedBandwith = bufferSizeBeforeRtt.bytesIn(rtt - bwe.minRtt)
bwe.bandwidth = (bwe.bandwidth * 9 + estimatedBandwith) div 10
proc sending(bwe: var BandwidthEstimator, now: Moment, bytes: int): bool =
# Returns wheter a ping is appropriate
bwe.sentData.add((now, bytes))
let minRtt = bwe.minRtt
bwe.minRtts.len == 0 or
(now - bwe.minRtts[^1][0]) > minRtt * 2:
# Our data is getting old, getting more
return true
queuedCurrently = bwe.bufferSizeAtMoment(now)
queuedBeforeThis = queuedCurrently - bytes
if queuedCurrently / bwe.bandwidth > minRtt div 10:
# We have enough data in the pipe to estimate bandwidth
return true
return false
PubSubObserver* = ref object
onRecv*: proc(peer: PubSubPeer; msgs: var RPCMsg) {.gcsafe, raises: [].}
@ -56,6 +140,7 @@ type
address*: Option[MultiAddress]
peerId*: PeerId
handler*: RPCHandler
banwidthEstimator: BandwidthEstimator
observers*: ref seq[PubSubObserver] # ref as in smart_ptr
score*: float64
@ -256,6 +341,7 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte]) {.raises: [], async.} =
trace "sending encoded msgs to peer", conn, encoded = shortLog(msg)
let shouldPing = p.banwidthEstimator.sending(, msg.len)
await conn.writeLp(msg)
trace "sent pubsub message to remote", conn
except CatchableError as exc: # never cancelled
Reference in New Issue