Yamux metrics and limits (#740)
* Add yamux channel gauge * Add limit to channel * Add recv/send queue length metrics * Add yamux stream tracking * Add timeout to YamuxChannel Co-authored-by: Tanguy <tanguy@status.im>
This commit is contained in:
parent
34c2fb8787
commit
a9a7e7eb15
|
@ -34,7 +34,6 @@ when defined(libp2p_expensive_metrics):
|
|||
"mplex channels", labels = ["initiator", "peer"])
|
||||
|
||||
type
|
||||
TooManyChannels* = object of MuxerError
|
||||
InvalidChannelIdError* = object of MuxerError
|
||||
|
||||
Mplex* = ref object of Muxer
|
||||
|
|
|
@ -22,6 +22,7 @@ const
|
|||
|
||||
type
|
||||
MuxerError* = object of LPError
|
||||
TooManyChannels* = object of MuxerError
|
||||
|
||||
StreamHandler* = proc(conn: Connection): Future[void] {.gcsafe, raises: [Defect].}
|
||||
MuxerHandler* = proc(muxer: Muxer): Future[void] {.gcsafe, raises: [Defect].}
|
||||
|
|
|
@ -10,7 +10,7 @@
|
|||
{.push raises: [Defect].}
|
||||
|
||||
import sequtils, std/[tables]
|
||||
import chronos, chronicles, stew/[endians2, byteutils, objects]
|
||||
import chronos, chronicles, metrics, stew/[endians2, byteutils, objects]
|
||||
import ../muxer,
|
||||
../../stream/connection
|
||||
|
||||
|
@ -23,6 +23,14 @@ const
|
|||
YamuxCodec* = "/yamux/1.0.0"
|
||||
YamuxVersion = 0.uint8
|
||||
DefaultWindowSize = 256000
|
||||
MaxChannelCount = 200
|
||||
|
||||
when defined(libp2p_yamux_metrics):
|
||||
declareGauge(libp2p_yamux_channels, "yamux channels", labels = ["initiator", "peer"])
|
||||
declareHistogram libp2p_yamux_send_queue, "message send queue length (in byte)",
|
||||
buckets = [0.0, 100.0, 250.0, 1000.0, 2000.0, 1600.0, 6400.0, 25600.0, 256000.0]
|
||||
declareHistogram libp2p_yamux_recv_queue, "message recv queue length (in byte)",
|
||||
buckets = [0.0, 100.0, 250.0, 1000.0, 2000.0, 1600.0, 6400.0, 25600.0, 256000.0]
|
||||
|
||||
type
|
||||
YamuxError* = object of CatchableError
|
||||
|
@ -195,6 +203,7 @@ proc reset(channel: YamuxChannel, isLocal: bool = false) {.async.} =
|
|||
if isLocal:
|
||||
try: await channel.conn.write(YamuxHeader.data(channel.id, 0, {Rst}))
|
||||
except LPStreamEOFError as exc: discard
|
||||
except LPStreamClosedError as exc: discard
|
||||
await channel.close()
|
||||
if not channel.closedRemotely.done():
|
||||
await channel.remoteClosed()
|
||||
|
@ -246,6 +255,8 @@ proc gotDataFromRemote(channel: YamuxChannel, b: seq[byte]) {.async.} =
|
|||
channel.recvWindow -= b.len
|
||||
channel.recvQueue = channel.recvQueue.concat(b)
|
||||
channel.receivedData.fire()
|
||||
when defined(libp2p_yamux_metrics):
|
||||
libp2p_yamux_recv_queue.observe(channel.recvQueue.len.int64)
|
||||
await channel.updateRecvWindow()
|
||||
|
||||
proc setMaxRecvWindow*(channel: YamuxChannel, maxRecvWindow: int) =
|
||||
|
@ -261,7 +272,12 @@ proc trySend(channel: YamuxChannel) {.async.} =
|
|||
if channel.sendWindow == 0:
|
||||
trace "send window empty"
|
||||
if channel.sendQueueBytes(true) > channel.maxRecvWindow:
|
||||
await channel.reset(true)
|
||||
debug "channel send queue too big, resetting", maxSendWindow=channel.maxRecvWindow,
|
||||
currentQueueSize = channel.sendQueueBytes(true)
|
||||
try:
|
||||
await channel.reset(true)
|
||||
except CatchableError as exc:
|
||||
debug "failed to reset", msg=exc.msg
|
||||
break
|
||||
|
||||
let
|
||||
|
@ -293,7 +309,7 @@ proc trySend(channel: YamuxChannel) {.async.} =
|
|||
trace "build send buffer", h = $header, msg=string.fromBytes(sendBuffer[12..^1])
|
||||
channel.sendWindow.dec(toSend)
|
||||
try: await channel.conn.write(sendBuffer)
|
||||
except LPStreamEOFError as exc:
|
||||
except CatchableError as exc:
|
||||
for fut in futures.items():
|
||||
fut.fail(exc)
|
||||
await channel.reset()
|
||||
|
@ -311,6 +327,8 @@ method write*(channel: YamuxChannel, msg: seq[byte]): Future[void] =
|
|||
result.complete()
|
||||
return result
|
||||
channel.sendQueue.add((msg, 0, result))
|
||||
when defined(libp2p_yamux_metrics):
|
||||
libp2p_yamux_recv_queue.observe(channel.sendQueueBytes().int64)
|
||||
asyncSpawn channel.trySend()
|
||||
|
||||
proc open*(channel: YamuxChannel) {.async, gcsafe.} =
|
||||
|
@ -326,10 +344,17 @@ type
|
|||
flushed: Table[uint32, int]
|
||||
currentId: uint32
|
||||
isClosed: bool
|
||||
maxChannCount: int
|
||||
|
||||
proc lenBySrc(m: Yamux, isSrc: bool): int =
|
||||
for v in m.channels.values():
|
||||
if v.isSrc == isSrc: result += 1
|
||||
|
||||
proc cleanupChann(m: Yamux, channel: YamuxChannel) {.async.} =
|
||||
await channel.join()
|
||||
m.channels.del(channel.id)
|
||||
when defined(libp2p_yamux_metrics):
|
||||
libp2p_yamux_channels.set(m.lenBySrc(channel.isSrc).int64, [$channel.isSrc, $channel.peerId])
|
||||
if channel.isReset and channel.recvWindow > 0:
|
||||
m.flushed[channel.id] = channel.recvWindow
|
||||
|
||||
|
@ -344,6 +369,11 @@ proc createStream(m: Yamux, id: uint32, isSrc: bool): YamuxChannel =
|
|||
receivedData: newAsyncEvent(),
|
||||
closedRemotely: newFuture[void]()
|
||||
)
|
||||
result.objName = "YamuxStream"
|
||||
result.dir = if isSrc: Direction.Out else: Direction.In
|
||||
result.timeoutHandler = proc(): Future[void] {.gcsafe.} =
|
||||
trace "Idle timeout expired, resetting YamuxChannel"
|
||||
result.reset()
|
||||
result.initStream()
|
||||
result.peerId = m.connection.peerId
|
||||
result.observedAddr = m.connection.observedAddr
|
||||
|
@ -353,6 +383,8 @@ proc createStream(m: Yamux, id: uint32, isSrc: bool): YamuxChannel =
|
|||
m.channels[id] = result
|
||||
asyncSpawn m.cleanupChann(result)
|
||||
trace "created channel", id, pid=m.connection.peerId
|
||||
when defined(libp2p_yamux_metrics):
|
||||
libp2p_yamux_channels.set(m.lenBySrc(isSrc).int64, [$isSrc, $result.peerId])
|
||||
|
||||
method close*(m: Yamux) {.async.} =
|
||||
if m.isClosed == true:
|
||||
|
@ -405,6 +437,9 @@ method handle*(m: Yamux) {.async, gcsafe.} =
|
|||
if header.streamId mod 2 == m.currentId mod 2:
|
||||
raise newException(YamuxError, "Peer used our reserved stream id")
|
||||
let newStream = m.createStream(header.streamId, false)
|
||||
if m.channels.len >= m.maxChannCount:
|
||||
await newStream.reset()
|
||||
continue
|
||||
await newStream.open()
|
||||
asyncSpawn m.handleStream(newStream)
|
||||
elif header.streamId notin m.channels:
|
||||
|
@ -455,14 +490,17 @@ method newStream*(
|
|||
name: string = "",
|
||||
lazy: bool = false): Future[Connection] {.async, gcsafe.} =
|
||||
|
||||
if m.channels.len > m.maxChannCount - 1:
|
||||
raise newException(TooManyChannels, "max allowed channel count exceeded")
|
||||
let stream = m.createStream(m.currentId, true)
|
||||
m.currentId += 2
|
||||
if not lazy:
|
||||
await stream.open()
|
||||
return stream
|
||||
|
||||
proc new*(T: type[Yamux], conn: Connection): T =
|
||||
proc new*(T: type[Yamux], conn: Connection, maxChannCount: int = MaxChannelCount): T =
|
||||
T(
|
||||
connection: conn,
|
||||
currentId: if conn.dir == Out: 1 else: 2
|
||||
currentId: if conn.dir == Out: 1 else: 2,
|
||||
maxChannCount: maxChannCount
|
||||
)
|
||||
|
|
Loading…
Reference in New Issue