Add queued send bytes

This commit is contained in:
Tanguy 2023-02-13 17:32:07 +01:00
parent f14ada3dcf
commit ae67a9b3f6
No known key found for this signature in database
GPG Key ID: 7DD8EC6B6CE6C45E
4 changed files with 14 additions and 0 deletions

View File

@ -64,6 +64,7 @@ type
closeCode*: MessageType # cached in/out close code closeCode*: MessageType # cached in/out close code
resetCode*: MessageType # cached in/out reset code resetCode*: MessageType # cached in/out reset code
writes*: int # In-flight writes writes*: int # In-flight writes
writesBytes*: int # In-flight writes bytes
func shortLog*(s: LPChannel): auto = func shortLog*(s: LPChannel): auto =
try: try:
@ -228,6 +229,7 @@ proc completeWrite(
s: LPChannel, fut: Future[void], msgLen: int): Future[void] {.async.} = s: LPChannel, fut: Future[void], msgLen: int): Future[void] {.async.} =
try: try:
s.writes += 1 s.writes += 1
s.writesBytes += msgLen
when defined(libp2p_mplex_metrics): when defined(libp2p_mplex_metrics):
libp2p_mplex_qlen.observe(s.writes.int64 - 1) libp2p_mplex_qlen.observe(s.writes.int64 - 1)
@ -257,6 +259,10 @@ proc completeWrite(
raise newLPStreamConnDownError(exc) raise newLPStreamConnDownError(exc)
finally: finally:
s.writes -= 1 s.writes -= 1
s.writesBytes -= msgLen
method queuedSendBytes*(channel: LPChannel): int = channel.writesBytes
method write*(s: LPChannel, msg: seq[byte]): Future[void] = method write*(s: LPChannel, msg: seq[byte]): Future[void] =
## Write to mplex channel - there may be up to MaxWrite concurrent writes ## Write to mplex channel - there may be up to MaxWrite concurrent writes

View File

@ -176,6 +176,8 @@ proc sendQueueBytes(channel: YamuxChannel, limit: bool = false): int =
for (elem, sent, _) in channel.sendQueue: for (elem, sent, _) in channel.sendQueue:
result.inc(min(elem.len - sent, if limit: channel.maxRecvWindow div 3 else: elem.len - sent)) result.inc(min(elem.len - sent, if limit: channel.maxRecvWindow div 3 else: elem.len - sent))
method queuedSendBytes*(channel: YamuxChannel): int = channel.sendQueueBytes()
proc actuallyClose(channel: YamuxChannel) {.async.} = proc actuallyClose(channel: YamuxChannel) {.async.} =
if channel.closedLocally and channel.sendQueue.len == 0 and if channel.closedLocally and channel.sendQueue.len == 0 and
channel.closedRemotely.done(): channel.closedRemotely.done():

View File

@ -74,6 +74,10 @@ type
RPCHandler* = proc(peer: PubSubPeer, msg: RPCMsg): Future[void] RPCHandler* = proc(peer: PubSubPeer, msg: RPCMsg): Future[void]
{.gcsafe, raises: [Defect].} {.gcsafe, raises: [Defect].}
proc queuedSendBytes*(p: PubSubPeer): int =
if p.sendConn.isNil: 0
else: p.sendConn.queuedSendBytes()
func hash*(p: PubSubPeer): Hash = func hash*(p: PubSubPeer): Hash =
p.peerId.hash p.peerId.hash

View File

@ -173,6 +173,8 @@ method closed*(s: LPStream): bool {.base, public.} =
method atEof*(s: LPStream): bool {.base, public.} = method atEof*(s: LPStream): bool {.base, public.} =
s.isEof s.isEof
method queuedSendBytes*(s: LPStream): int {.base.} = 0
method readOnce*( method readOnce*(
s: LPStream, s: LPStream,
pbytes: pointer, pbytes: pointer,