bandwidth estimator

This commit is contained in:
Tanguy 2023-04-27 17:26:46 +02:00
parent a1eb53b181
commit fd6874e647
No known key found for this signature in database
GPG Key ID: 7DD8EC6B6CE6C45E
5 changed files with 81 additions and 4 deletions

View File

@ -259,6 +259,10 @@ proc handleSubscribe*(g: GossipSub,
trace "gossip peers", peers = g.gossipsub.peers(topic), topic trace "gossip peers", peers = g.gossipsub.peers(topic), topic
proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) = proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) =
if control.ping.len in 1..<64:
g.send(peer, RPCMsg(control: some(ControlMessage(pong: control.ping))))
if control.pong.len in 1..<64:
peer.handlePong(control.pong)
g.handlePrune(peer, control.prune) g.handlePrune(peer, control.prune)
var respControl: ControlMessage var respControl: ControlMessage

View File

@ -297,7 +297,7 @@ proc getOrCreatePeer*(
p.onPubSubPeerEvent(peer, event) p.onPubSubPeerEvent(peer, event)
# create new pubsub peer # create new pubsub peer
let pubSubPeer = PubSubPeer.new(peerId, getConn, onEvent, protos[0], p.maxMessageSize) let pubSubPeer = PubSubPeer.new(peerId, getConn, onEvent, protos[0], p.maxMessageSize, p.rng)
debug "created new pubsub peer", peerId debug "created new pubsub peer", peerId
p.peers[peerId] = pubSubPeer p.peers[peerId] = pubSubPeer

View File

@ -60,6 +60,11 @@ type
peerId*: PeerId peerId*: PeerId
handler*: RPCHandler handler*: RPCHandler
observers*: ref seq[PubSubObserver] # ref as in smart_ptr observers*: ref seq[PubSubObserver] # ref as in smart_ptr
sentData*: seq[(Moment, int)]
pings*: Table[seq[byte], Moment]
lastPing*: Moment
rtts*: seq[int]
bandwidth*: int # in bytes per ms
score*: float64 score*: float64
sentIHaves*: Deque[HashSet[MessageId]] sentIHaves*: Deque[HashSet[MessageId]]
@ -67,6 +72,7 @@ type
maxMessageSize: int maxMessageSize: int
appScore*: float64 # application specific score appScore*: float64 # application specific score
behaviourPenalty*: float64 # the eventual penalty score behaviourPenalty*: float64 # the eventual penalty score
rng*: ref HmacDrbgContext
RPCHandler* = proc(peer: PubSubPeer, msg: RPCMsg): Future[void] RPCHandler* = proc(peer: PubSubPeer, msg: RPCMsg): Future[void]
{.gcsafe, raises: [Defect].} {.gcsafe, raises: [Defect].}
@ -108,6 +114,50 @@ func outbound*(p: PubSubPeer): bool =
else: else:
false false
proc bufferSizeAtMoment*(p: PubSubPeer, m: Moment): int =
var lastSent: Moment
for (sent, size) in p.sentData:
if sent > m:
break
let timeGap = (sent - lastSent).milliseconds
result -= int(timeGap * p.bandwidth)
if result < 0: result = 0
result += size
lastSent = sent
let timeGap = (m - lastSent).milliseconds
result -= int(timeGap * p.bandwidth)
return max(0, result)
proc minRtt*(p: PubSubPeer): int =
if p.rtts.len > 0:
p.rtts.foldl(min(a, b), 10000)
else: 100
proc handlePong*(p: PubSubPeer, pong: seq[byte]) =
if pong notin p.pings: return
let
pingMoment = p.pings.getOrDefault(pong)
delay = int((Moment.now() - pingMoment).milliseconds)
minRtt = p.minRtt
p.rtts.add(delay)
if delay <= minRtt:
# can't make bandwidth estimate in this situation
return
let
bufferSizeWhenSendingPing = p.bufferSizeAtMoment(pingMoment)
estimatedBandwidth =
bufferSizeWhenSendingPing / (delay - minRtt)
if bufferSizeWhenSendingPing / p.bandwidth < minRtt / 5:
# can't make bandwidth estimate in this situation
return
p.bandwidth = (p.bandwidth * 9 + int(estimatedBandwidth)) div 10
proc recvObservers(p: PubSubPeer, msg: var RPCMsg) = proc recvObservers(p: PubSubPeer, msg: var RPCMsg) =
# trigger hooks # trigger hooks
if not(isNil(p.observers)) and p.observers[].len > 0: if not(isNil(p.observers)) and p.observers[].len > 0:
@ -258,8 +308,19 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte]) {.raises: [Defect], async.} =
trace "sending encoded msgs to peer", conn, encoded = shortLog(msg) trace "sending encoded msgs to peer", conn, encoded = shortLog(msg)
let
startOfSend = Moment.now()
lastPing = startOfSend - p.lastPing
p.sentData.add((startOfSend, msg.len))
try: try:
await conn.writeLp(msg) if p.bufferSizeAtMoment(startOfSend) / p.bandwidth > p.minRtt / 5 or lastPing.milliseconds > p.minRtt * 2:
# ping it
p.lastPing = startOfSend
let pingKey = p.rng[].generateBytes(32)
p.pings[pingKey] = startOfSend
await conn.writeLp(msg & encodeRpcMsg(RPCMsg(control: some(ControlMessage(ping: pingKey))), true))
else:
await conn.writeLp(msg)
trace "sent pubsub message to remote", conn trace "sent pubsub message to remote", conn
except CatchableError as exc: # never cancelled except CatchableError as exc: # never cancelled
# Because we detach the send call from the currently executing task using # Because we detach the send call from the currently executing task using
@ -305,14 +366,18 @@ proc new*(
getConn: GetConn, getConn: GetConn,
onEvent: OnEvent, onEvent: OnEvent,
codec: string, codec: string,
maxMessageSize: int): T = maxMessageSize: int,
rng: ref HmacDrbgContext = newRng(),
): T =
result = T( result = T(
getConn: getConn, getConn: getConn,
onEvent: onEvent, onEvent: onEvent,
codec: codec, codec: codec,
peerId: peerId, peerId: peerId,
bandwidth: 6250,
connectedFut: newFuture[void](), connectedFut: newFuture[void](),
maxMessageSize: maxMessageSize maxMessageSize: maxMessageSize,
rng: rng
) )
result.sentIHaves.addFirst(default(HashSet[MessageId])) result.sentIHaves.addFirst(default(HashSet[MessageId]))

View File

@ -45,6 +45,8 @@ type
iwant*: seq[ControlIWant] iwant*: seq[ControlIWant]
graft*: seq[ControlGraft] graft*: seq[ControlGraft]
prune*: seq[ControlPrune] prune*: seq[ControlPrune]
ping*: seq[byte]
pong*: seq[byte]
ControlIHave* = object ControlIHave* = object
topicId*: string topicId*: string

View File

@ -90,6 +90,10 @@ proc write*(pb: var ProtoBuffer, field: int, control: ControlMessage) =
ipb.write(3, graft) ipb.write(3, graft)
for prune in control.prune: for prune in control.prune:
ipb.write(4, prune) ipb.write(4, prune)
if control.ping.len > 0:
ipb.write(10, control.ping)
if control.pong.len > 0:
ipb.write(11, control.pong)
if len(ipb.buffer) > 0: if len(ipb.buffer) > 0:
ipb.finish() ipb.finish()
pb.write(field, ipb) pb.write(field, ipb)
@ -225,6 +229,8 @@ proc decodeControl*(pb: ProtoBuffer): ProtoResult[Option[ControlMessage]] {.
if ? cpb.getRepeatedField(4, prunepbs): if ? cpb.getRepeatedField(4, prunepbs):
for item in prunepbs: for item in prunepbs:
control.prune.add(? decodePrune(initProtoBuffer(item))) control.prune.add(? decodePrune(initProtoBuffer(item)))
discard ? cpb.getField(10, control.ping)
discard ? cpb.getField(11, control.pong)
trace "decodeControl: message statistics", graft_count = len(control.graft), trace "decodeControl: message statistics", graft_count = len(control.graft),
prune_count = len(control.prune), prune_count = len(control.prune),
ihave_count = len(control.ihave), ihave_count = len(control.ihave),