bandwidth estimator

This commit is contained in:
Tanguy 2023-04-27 17:26:46 +02:00
parent 9b11fa7332
commit 1629eccde0
No known key found for this signature in database
GPG Key ID: 7DD8EC6B6CE6C45E
5 changed files with 77 additions and 4 deletions

View File

@ -261,6 +261,10 @@ proc handleSubscribe*(g: GossipSub,
trace "gossip peers", peers = g.gossipsub.peers(topic), topic
proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) =
if control.ping.len in 1..<64:
g.send(peer, RPCMsg(control: some(ControlMessage(pong: control.ping))))
if control.pong.len in 1..<64:
peer.handlePong(control.pong)
g.handlePrune(peer, control.prune)
var respControl: ControlMessage

View File

@ -297,7 +297,7 @@ proc getOrCreatePeer*(
p.onPubSubPeerEvent(peer, event)
# create new pubsub peer
let pubSubPeer = PubSubPeer.new(peerId, getConn, onEvent, protos[0], p.maxMessageSize)
let pubSubPeer = PubSubPeer.new(peerId, getConn, onEvent, protos[0], p.maxMessageSize, p.rng)
debug "created new pubsub peer", peerId
p.peers[peerId] = pubSubPeer

View File

@ -61,6 +61,10 @@ type
handler*: RPCHandler
observers*: ref seq[PubSubObserver] # ref as in smart_ptr
gotMsgs*: Table[MessageId, Future[void]]
sentData*: seq[(Moment, int)]
pings*: Table[seq[byte], Moment]
rtts*: seq[int]
bandwidth*: int # in bytes per ms
score*: float64
iWantBudget*: int
@ -68,6 +72,7 @@ type
maxMessageSize: int
appScore*: float64 # application specific score
behaviourPenalty*: float64 # the eventual penalty score
rng*: ref HmacDrbgContext
when defined(libp2p_agents_metrics):
shortAgent*: string
@ -103,6 +108,50 @@ func outbound*(p: PubSubPeer): bool =
else:
false
proc bufferSizeAtMoment*(p: PubSubPeer, m: Moment): int =
var lastSent: Moment
for (sent, size) in p.sentData:
if sent > m:
break
let timeGap = (sent - lastSent).milliseconds
result -= timeGap * p.bandwidth
if result < 0: result = 0
result += size
lastSent = sent
let timeGap = (m - lastSent).milliseconds
result -= timeGap * p.bandwidth
return max(0, result)
proc minRtt*(p: PubSubPeer): int =
if p.rtts.len > 0:
p.rtts.foldl(min(a, b), 10000)
else: 100
proc handlePong*(p: PubSubPeer, pong: seq[byte]) =
if pong notin p.pings: return
let
pingMoment = p.pings.getOrDefault(pong)
delay = (Moment.now() - pingMoment).milliseconds
minRtt = p.minRtt
p.rtts.add(delay)
if delay < minRtt:
# can't make bandwidth estimate in this situation
return
let
bufferSizeWhenSendingPing = p.bufferSizeAtMoment(pingMoment)
estimatedBandwidth =
bufferSizeWhenSendingPing / delay
if bufferSizeWhenSendingPing / p.bandwidth < minRtt / 5:
# can't make bandwidth estimate in this situation
return
p.bandwidth = (p.bandwidth * 9 + int(estimatedBandwidth)) div 10
proc recvObservers(p: PubSubPeer, msg: var RPCMsg) =
# trigger hooks
if not(isNil(p.observers)) and p.observers[].len > 0:
@ -253,8 +302,16 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte]) {.raises: [Defect], async.} =
trace "sending encoded msgs to peer", conn, encoded = shortLog(msg)
let startOfSend = Moment.now()
p.sentData.add((startOfSend, msg.len))
try:
await conn.writeLp(msg)
if p.bufferSizeAtMoment(startOfSend) / p.bandwidth < p.minRtt / 5:
# ping it
let pingKey = p.rng[].generateBytes(32)
p.pings[pingKey] = startOfSend
await conn.writeLp(msg & encodeRpcMsg(RPCMsg(control: some(ControlMessage(ping: pingKey))), true))
else:
await conn.writeLp(msg)
trace "sent pubsub message to remote", conn
except CatchableError as exc: # never cancelled
# Because we detach the send call from the currently executing task using
@ -293,13 +350,17 @@ proc new*(
getConn: GetConn,
onEvent: OnEvent,
codec: string,
maxMessageSize: int): T =
maxMessageSize: int,
rng: ref HmacDrbgContext = newRng(),
): T =
T(
getConn: getConn,
onEvent: onEvent,
codec: codec,
peerId: peerId,
bandwidth: 6250,
connectedFut: newFuture[void](),
maxMessageSize: maxMessageSize
maxMessageSize: maxMessageSize,
rng: rng
)

View File

@ -46,6 +46,8 @@ type
iwant*: seq[ControlIWant]
graft*: seq[ControlGraft]
prune*: seq[ControlPrune]
ping*: seq[byte]
pong*: seq[byte]
ControlIHave* = object
topicId*: string

View File

@ -92,6 +92,10 @@ proc write*(pb: var ProtoBuffer, field: int, control: ControlMessage) =
ipb.write(4, prune)
for ihave in control.dontSend:
ipb.write(5, ihave)
if control.ping.len > 0:
ipb.write(10, control.ping)
if control.pong.len > 0:
ipb.write(11, control.pong)
if len(ipb.buffer) > 0:
ipb.finish()
pb.write(field, ipb)
@ -231,6 +235,8 @@ proc decodeControl*(pb: ProtoBuffer): ProtoResult[Option[ControlMessage]] {.
if ? cpb.getRepeatedField(5, dontsendpbs):
for item in dontsendpbs:
control.dontSend.add(? decodeIHave(initProtoBuffer(item)))
discard ? cpb.getField(10, control.ping)
discard ? cpb.getField(11, control.pong)
trace "decodeControl: message statistics", graft_count = len(control.graft),
prune_count = len(control.prune),
ihave_count = len(control.ihave),