diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 261cdf06c..f85630a54 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -261,6 +261,10 @@ proc handleSubscribe*(g: GossipSub, trace "gossip peers", peers = g.gossipsub.peers(topic), topic proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) = + if control.ping.len in 1..<64: + g.send(peer, RPCMsg(control: some(ControlMessage(pong: control.ping)))) + if control.pong.len in 1..<64: + peer.handlePong(control.pong) g.handlePrune(peer, control.prune) var respControl: ControlMessage diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index fb7aea13b..b233761ea 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -297,7 +297,7 @@ proc getOrCreatePeer*( p.onPubSubPeerEvent(peer, event) # create new pubsub peer - let pubSubPeer = PubSubPeer.new(peerId, getConn, onEvent, protos[0], p.maxMessageSize) + let pubSubPeer = PubSubPeer.new(peerId, getConn, onEvent, protos[0], p.maxMessageSize, p.rng) debug "created new pubsub peer", peerId p.peers[peerId] = pubSubPeer diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index ecc7e018f..a10a57a9e 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -61,6 +61,10 @@ type handler*: RPCHandler observers*: ref seq[PubSubObserver] # ref as in smart_ptr gotMsgs*: Table[MessageId, Future[void]] + sentData*: seq[(Moment, int)] + pings*: Table[seq[byte], Moment] + rtts*: seq[int] + bandwidth*: int # in bytes per ms score*: float64 iWantBudget*: int @@ -68,6 +72,7 @@ type maxMessageSize: int appScore*: float64 # application specific score behaviourPenalty*: float64 # the eventual penalty score + rng*: ref HmacDrbgContext when defined(libp2p_agents_metrics): shortAgent*: string @@ -103,6 +108,50 @@ func outbound*(p: PubSubPeer): bool = else: false +proc bufferSizeAtMoment*(p: PubSubPeer, m: Moment): int = + var lastSent: Moment + for (sent, size) in p.sentData: + if sent > m: + break + let timeGap = (sent - lastSent).milliseconds + result -= timeGap * p.bandwidth + if result < 0: result = 0 + + result += size + lastSent = sent + + let timeGap = (m - lastSent).milliseconds + result -= timeGap * p.bandwidth + return max(0, result) + +proc minRtt*(p: PubSubPeer): int = + if p.rtts.len > 0: + p.rtts.foldl(min(a, b), 10000) + else: 100 + +proc handlePong*(p: PubSubPeer, pong: seq[byte]) = + if pong notin p.pings: return + let + pingMoment = p.pings.getOrDefault(pong) + delay = (Moment.now() - pingMoment).milliseconds + minRtt = p.minRtt + + p.rtts.add(delay) + if delay < minRtt: + # can't make bandwidth estimate in this situation + return + + let + bufferSizeWhenSendingPing = p.bufferSizeAtMoment(pingMoment) + estimatedBandwidth = + bufferSizeWhenSendingPing / delay + + if bufferSizeWhenSendingPing / p.bandwidth < minRtt / 5: + # can't make bandwidth estimate in this situation + return + + p.bandwidth = (p.bandwidth * 9 + int(estimatedBandwidth)) div 10 + proc recvObservers(p: PubSubPeer, msg: var RPCMsg) = # trigger hooks if not(isNil(p.observers)) and p.observers[].len > 0: @@ -253,8 +302,16 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte]) {.raises: [Defect], async.} = trace "sending encoded msgs to peer", conn, encoded = shortLog(msg) + let startOfSend = Moment.now() + p.sentData.add((startOfSend, msg.len)) try: - await conn.writeLp(msg) + if p.bufferSizeAtMoment(startOfSend) / p.bandwidth < p.minRtt / 5: + # ping it + let pingKey = p.rng[].generateBytes(32) + p.pings[pingKey] = startOfSend + await conn.writeLp(msg & encodeRpcMsg(RPCMsg(control: some(ControlMessage(ping: pingKey))), true)) + else: + await conn.writeLp(msg) trace "sent pubsub message to remote", conn except CatchableError as exc: # never cancelled # Because we detach the send call from the currently executing task using @@ -293,13 +350,17 @@ proc new*( getConn: GetConn, onEvent: OnEvent, codec: string, - maxMessageSize: int): T = + maxMessageSize: int, + rng: ref HmacDrbgContext = newRng(), + ): T = T( getConn: getConn, onEvent: onEvent, codec: codec, peerId: peerId, + bandwidth: 6250, connectedFut: newFuture[void](), - maxMessageSize: maxMessageSize + maxMessageSize: maxMessageSize, + rng: rng ) diff --git a/libp2p/protocols/pubsub/rpc/messages.nim b/libp2p/protocols/pubsub/rpc/messages.nim index 4b6fbebba..ce00ff58b 100644 --- a/libp2p/protocols/pubsub/rpc/messages.nim +++ b/libp2p/protocols/pubsub/rpc/messages.nim @@ -46,6 +46,8 @@ type iwant*: seq[ControlIWant] graft*: seq[ControlGraft] prune*: seq[ControlPrune] + ping*: seq[byte] + pong*: seq[byte] ControlIHave* = object topicId*: string diff --git a/libp2p/protocols/pubsub/rpc/protobuf.nim b/libp2p/protocols/pubsub/rpc/protobuf.nim index 5bc940220..1779d2867 100644 --- a/libp2p/protocols/pubsub/rpc/protobuf.nim +++ b/libp2p/protocols/pubsub/rpc/protobuf.nim @@ -92,6 +92,10 @@ proc write*(pb: var ProtoBuffer, field: int, control: ControlMessage) = ipb.write(4, prune) for ihave in control.dontSend: ipb.write(5, ihave) + if control.ping.len > 0: + ipb.write(10, control.ping) + if control.pong.len > 0: + ipb.write(11, control.pong) if len(ipb.buffer) > 0: ipb.finish() pb.write(field, ipb) @@ -231,6 +235,8 @@ proc decodeControl*(pb: ProtoBuffer): ProtoResult[Option[ControlMessage]] {. if ? cpb.getRepeatedField(5, dontsendpbs): for item in dontsendpbs: control.dontSend.add(? decodeIHave(initProtoBuffer(item))) + discard ? cpb.getField(10, control.ping) + discard ? cpb.getField(11, control.pong) trace "decodeControl: message statistics", graft_count = len(control.graft), prune_count = len(control.prune), ihave_count = len(control.ihave),