From fd6874e6474518a475595260996530dda8a358ff Mon Sep 17 00:00:00 2001 From: Tanguy Date: Thu, 27 Apr 2023 17:26:46 +0200 Subject: [PATCH] bandwidth estimator --- libp2p/protocols/pubsub/gossipsub.nim | 4 ++ libp2p/protocols/pubsub/pubsub.nim | 2 +- libp2p/protocols/pubsub/pubsubpeer.nim | 71 +++++++++++++++++++++++- libp2p/protocols/pubsub/rpc/messages.nim | 2 + libp2p/protocols/pubsub/rpc/protobuf.nim | 6 ++ 5 files changed, 81 insertions(+), 4 deletions(-) diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index d707bf285..2e1841476 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -259,6 +259,10 @@ proc handleSubscribe*(g: GossipSub, trace "gossip peers", peers = g.gossipsub.peers(topic), topic proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) = + if control.ping.len in 1..<64: + g.send(peer, RPCMsg(control: some(ControlMessage(pong: control.ping)))) + if control.pong.len in 1..<64: + peer.handlePong(control.pong) g.handlePrune(peer, control.prune) var respControl: ControlMessage diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index fb7aea13b..b233761ea 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -297,7 +297,7 @@ proc getOrCreatePeer*( p.onPubSubPeerEvent(peer, event) # create new pubsub peer - let pubSubPeer = PubSubPeer.new(peerId, getConn, onEvent, protos[0], p.maxMessageSize) + let pubSubPeer = PubSubPeer.new(peerId, getConn, onEvent, protos[0], p.maxMessageSize, p.rng) debug "created new pubsub peer", peerId p.peers[peerId] = pubSubPeer diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index 31fbbe7a6..2e207a5e3 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -60,6 +60,11 @@ type peerId*: PeerId handler*: RPCHandler observers*: ref seq[PubSubObserver] # ref as in smart_ptr + sentData*: seq[(Moment, int)] + pings*: Table[seq[byte], Moment] + lastPing*: Moment + rtts*: seq[int] + bandwidth*: int # in bytes per ms score*: float64 sentIHaves*: Deque[HashSet[MessageId]] @@ -67,6 +72,7 @@ type maxMessageSize: int appScore*: float64 # application specific score behaviourPenalty*: float64 # the eventual penalty score + rng*: ref HmacDrbgContext RPCHandler* = proc(peer: PubSubPeer, msg: RPCMsg): Future[void] {.gcsafe, raises: [Defect].} @@ -108,6 +114,50 @@ func outbound*(p: PubSubPeer): bool = else: false +proc bufferSizeAtMoment*(p: PubSubPeer, m: Moment): int = + var lastSent: Moment + for (sent, size) in p.sentData: + if sent > m: + break + let timeGap = (sent - lastSent).milliseconds + result -= int(timeGap * p.bandwidth) + if result < 0: result = 0 + + result += size + lastSent = sent + + let timeGap = (m - lastSent).milliseconds + result -= int(timeGap * p.bandwidth) + return max(0, result) + +proc minRtt*(p: PubSubPeer): int = + if p.rtts.len > 0: + p.rtts.foldl(min(a, b), 10000) + else: 100 + +proc handlePong*(p: PubSubPeer, pong: seq[byte]) = + if pong notin p.pings: return + let + pingMoment = p.pings.getOrDefault(pong) + delay = int((Moment.now() - pingMoment).milliseconds) + minRtt = p.minRtt + + p.rtts.add(delay) + if delay <= minRtt: + # can't make bandwidth estimate in this situation + return + + let + bufferSizeWhenSendingPing = p.bufferSizeAtMoment(pingMoment) + estimatedBandwidth = + bufferSizeWhenSendingPing / (delay - minRtt) + + if bufferSizeWhenSendingPing / p.bandwidth < minRtt / 5: + # can't make bandwidth estimate in this situation + return + + p.bandwidth = (p.bandwidth * 9 + int(estimatedBandwidth)) div 10 + proc recvObservers(p: PubSubPeer, msg: var RPCMsg) = # trigger hooks if not(isNil(p.observers)) and p.observers[].len > 0: @@ -258,8 +308,19 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte]) {.raises: [Defect], async.} = trace "sending encoded msgs to peer", conn, encoded = shortLog(msg) + let + startOfSend = Moment.now() + lastPing = startOfSend - p.lastPing + p.sentData.add((startOfSend, msg.len)) try: - await conn.writeLp(msg) + if p.bufferSizeAtMoment(startOfSend) / p.bandwidth > p.minRtt / 5 or lastPing.milliseconds > p.minRtt * 2: + # ping it + p.lastPing = startOfSend + let pingKey = p.rng[].generateBytes(32) + p.pings[pingKey] = startOfSend + await conn.writeLp(msg & encodeRpcMsg(RPCMsg(control: some(ControlMessage(ping: pingKey))), true)) + else: + await conn.writeLp(msg) trace "sent pubsub message to remote", conn except CatchableError as exc: # never cancelled # Because we detach the send call from the currently executing task using @@ -305,14 +366,18 @@ proc new*( getConn: GetConn, onEvent: OnEvent, codec: string, - maxMessageSize: int): T = + maxMessageSize: int, + rng: ref HmacDrbgContext = newRng(), + ): T = result = T( getConn: getConn, onEvent: onEvent, codec: codec, peerId: peerId, + bandwidth: 6250, connectedFut: newFuture[void](), - maxMessageSize: maxMessageSize + maxMessageSize: maxMessageSize, + rng: rng ) result.sentIHaves.addFirst(default(HashSet[MessageId])) diff --git a/libp2p/protocols/pubsub/rpc/messages.nim b/libp2p/protocols/pubsub/rpc/messages.nim index 541782a8b..c90f3095a 100644 --- a/libp2p/protocols/pubsub/rpc/messages.nim +++ b/libp2p/protocols/pubsub/rpc/messages.nim @@ -45,6 +45,8 @@ type iwant*: seq[ControlIWant] graft*: seq[ControlGraft] prune*: seq[ControlPrune] + ping*: seq[byte] + pong*: seq[byte] ControlIHave* = object topicId*: string diff --git a/libp2p/protocols/pubsub/rpc/protobuf.nim b/libp2p/protocols/pubsub/rpc/protobuf.nim index d87a6b928..aabf9deb9 100644 --- a/libp2p/protocols/pubsub/rpc/protobuf.nim +++ b/libp2p/protocols/pubsub/rpc/protobuf.nim @@ -90,6 +90,10 @@ proc write*(pb: var ProtoBuffer, field: int, control: ControlMessage) = ipb.write(3, graft) for prune in control.prune: ipb.write(4, prune) + if control.ping.len > 0: + ipb.write(10, control.ping) + if control.pong.len > 0: + ipb.write(11, control.pong) if len(ipb.buffer) > 0: ipb.finish() pb.write(field, ipb) @@ -225,6 +229,8 @@ proc decodeControl*(pb: ProtoBuffer): ProtoResult[Option[ControlMessage]] {. if ? cpb.getRepeatedField(4, prunepbs): for item in prunepbs: control.prune.add(? decodePrune(initProtoBuffer(item))) + discard ? cpb.getField(10, control.ping) + discard ? cpb.getField(11, control.pong) trace "decodeControl: message statistics", graft_count = len(control.graft), prune_count = len(control.prune), ihave_count = len(control.ihave),