From ae67a9b3f6a77b1c8aaf2245a44597ecebf7fd7f Mon Sep 17 00:00:00 2001 From: Tanguy Date: Mon, 13 Feb 2023 17:32:07 +0100 Subject: [PATCH] Add queued send bytes --- libp2p/muxers/mplex/lpchannel.nim | 6 ++++++ libp2p/muxers/yamux/yamux.nim | 2 ++ libp2p/protocols/pubsub/pubsubpeer.nim | 4 ++++ libp2p/stream/lpstream.nim | 2 ++ 4 files changed, 14 insertions(+) diff --git a/libp2p/muxers/mplex/lpchannel.nim b/libp2p/muxers/mplex/lpchannel.nim index e85999ea5..64a90616c 100644 --- a/libp2p/muxers/mplex/lpchannel.nim +++ b/libp2p/muxers/mplex/lpchannel.nim @@ -64,6 +64,7 @@ type closeCode*: MessageType # cached in/out close code resetCode*: MessageType # cached in/out reset code writes*: int # In-flight writes + writesBytes*: int # In-flight writes bytes func shortLog*(s: LPChannel): auto = try: @@ -228,6 +229,7 @@ proc completeWrite( s: LPChannel, fut: Future[void], msgLen: int): Future[void] {.async.} = try: s.writes += 1 + s.writesBytes += msgLen when defined(libp2p_mplex_metrics): libp2p_mplex_qlen.observe(s.writes.int64 - 1) @@ -257,6 +259,10 @@ proc completeWrite( raise newLPStreamConnDownError(exc) finally: s.writes -= 1 + s.writesBytes -= msgLen + + +method queuedSendBytes*(channel: LPChannel): int = channel.writesBytes method write*(s: LPChannel, msg: seq[byte]): Future[void] = ## Write to mplex channel - there may be up to MaxWrite concurrent writes diff --git a/libp2p/muxers/yamux/yamux.nim b/libp2p/muxers/yamux/yamux.nim index f60cf72f9..8f8ae8d64 100644 --- a/libp2p/muxers/yamux/yamux.nim +++ b/libp2p/muxers/yamux/yamux.nim @@ -176,6 +176,8 @@ proc sendQueueBytes(channel: YamuxChannel, limit: bool = false): int = for (elem, sent, _) in channel.sendQueue: result.inc(min(elem.len - sent, if limit: channel.maxRecvWindow div 3 else: elem.len - sent)) +method queuedSendBytes*(channel: YamuxChannel): int = channel.sendQueueBytes() + proc actuallyClose(channel: YamuxChannel) {.async.} = if channel.closedLocally and channel.sendQueue.len == 0 and channel.closedRemotely.done(): diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index ebdbd4d20..cb1f8fe6d 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -74,6 +74,10 @@ type RPCHandler* = proc(peer: PubSubPeer, msg: RPCMsg): Future[void] {.gcsafe, raises: [Defect].} +proc queuedSendBytes*(p: PubSubPeer): int = + if p.sendConn.isNil: 0 + else: p.sendConn.queuedSendBytes() + func hash*(p: PubSubPeer): Hash = p.peerId.hash diff --git a/libp2p/stream/lpstream.nim b/libp2p/stream/lpstream.nim index 6dfe501c4..98a9fd45a 100644 --- a/libp2p/stream/lpstream.nim +++ b/libp2p/stream/lpstream.nim @@ -173,6 +173,8 @@ method closed*(s: LPStream): bool {.base, public.} = method atEof*(s: LPStream): bool {.base, public.} = s.isEof +method queuedSendBytes*(s: LPStream): int {.base.} = 0 + method readOnce*( s: LPStream, pbytes: pointer,