Add queued send bytes

This commit is contained in:
Tanguy 2023-02-13 17:32:07 +01:00
parent 9b4b68b9f9
commit 2f46751c68
No known key found for this signature in database
GPG Key ID: 7DD8EC6B6CE6C45E
5 changed files with 18 additions and 3 deletions

View File

@ -64,6 +64,7 @@ type
closeCode*: MessageType # cached in/out close code
resetCode*: MessageType # cached in/out reset code
writes*: int # In-flight writes
writesBytes*: int # In-flight writes bytes
func shortLog*(s: LPChannel): auto =
try:
@ -228,6 +229,7 @@ proc completeWrite(
s: LPChannel, fut: Future[void], msgLen: int): Future[void] {.async.} =
try:
s.writes += 1
s.writesBytes += msgLen
when defined(libp2p_mplex_metrics):
libp2p_mplex_qlen.observe(s.writes.int64 - 1)
@ -257,6 +259,10 @@ proc completeWrite(
raise newLPStreamConnDownError(exc)
finally:
s.writes -= 1
s.writesBytes -= msgLen
method queuedSendBytes*(channel: LPChannel): int = channel.writesBytes
method write*(s: LPChannel, msg: seq[byte]): Future[void] =
## Write to mplex channel - there may be up to MaxWrite concurrent writes

View File

@ -176,6 +176,8 @@ proc sendQueueBytes(channel: YamuxChannel, limit: bool = false): int =
for (elem, sent, _) in channel.sendQueue:
result.inc(min(elem.len - sent, if limit: channel.maxRecvWindow div 3 else: elem.len - sent))
method queuedSendBytes*(channel: YamuxChannel): int = channel.sendQueueBytes()
proc actuallyClose(channel: YamuxChannel) {.async.} =
if channel.closedLocally and channel.sendQueue.len == 0 and
channel.closedRemotely.done():

View File

@ -294,7 +294,7 @@ proc handleIWant*(g: GossipSub,
break
elif g.hasSeen(mid):
libp2p_gossipsub_mcache_hit.inc(1, labelValues = ["late"])
info "LATE IWANT", diff=(Moment.now() - g.firstSeen(mid)), peerType=peer.shortAgent, mid
info "LATE IWANT", diff=(Moment.now() - g.firstSeen(mid)), peerType=peer.shortAgent, id=mid.toHex(), peerId=peer.peerId
else:
libp2p_gossipsub_mcache_hit.inc(1, labelValues = ["unknown"])
return messages
@ -589,8 +589,6 @@ proc getGossipPeers*(g: GossipSub): Table[PubSubPeer, ControlMessage] {.raises:
cacheWindowSize += midsSeq.len
info "got messages to emit", size=midsSeq.len, topic, msgs=midsSeq
# not in spec
# similar to rust: https://github.com/sigp/rust-libp2p/blob/f53d02bc873fef2bf52cd31e3d5ce366a41d8a8c/protocols/gossipsub/src/behaviour.rs#L2101
# and go https://github.com/libp2p/go-libp2p-pubsub/blob/08c17398fb11b2ab06ca141dddc8ec97272eb772/gossipsub.go#L582
@ -619,6 +617,9 @@ proc getGossipPeers*(g: GossipSub): Table[PubSubPeer, ControlMessage] {.raises:
g.rng.shuffle(allPeers)
allPeers.setLen(target)
info "got messages to emit", size=midsSeq.len, topic, msgs=midsSeq.mapIt(it.toHex()), peers=allPeers.len, peerValues=allPeers.mapIt((it.shortAgent, it.queuedSendBytes, it.peerId))
for peer in allPeers:
control.mgetOrPut(peer, ControlMessage()).ihave.add(ihave)

View File

@ -79,6 +79,10 @@ when defined(libp2p_agents_metrics):
#so we have to read the parents short agent..
p.sendConn.getWrapped().shortAgent
proc queuedSendBytes*(p: PubSubPeer): int =
if p.sendConn.isNil: -2
else: p.sendConn.queuedSendBytes()
func hash*(p: PubSubPeer): Hash =
p.peerId.hash

View File

@ -173,6 +173,8 @@ method closed*(s: LPStream): bool {.base, public.} =
method atEof*(s: LPStream): bool {.base, public.} =
s.isEof
method queuedSendBytes*(s: LPStream): int {.base.} = -1
method readOnce*(
s: LPStream,
pbytes: pointer,