From a9a7e7eb15db7e1f4b1d028db2cfeea9ac5341a9 Mon Sep 17 00:00:00 2001 From: lchenut Date: Mon, 1 Aug 2022 14:52:42 +0200 Subject: [PATCH] Yamux metrics and limits (#740) * Add yamux channel gauge * Add limit to channel * Add recv/send queue length metrics * Add yamux stream tracking * Add timeout to YamuxChannel Co-authored-by: Tanguy --- libp2p/muxers/mplex/mplex.nim | 1 - libp2p/muxers/muxer.nim | 1 + libp2p/muxers/yamux/yamux.nim | 48 +++++++++++++++++++++++++++++++---- 3 files changed, 44 insertions(+), 6 deletions(-) diff --git a/libp2p/muxers/mplex/mplex.nim b/libp2p/muxers/mplex/mplex.nim index 977a64e..e2d54a1 100644 --- a/libp2p/muxers/mplex/mplex.nim +++ b/libp2p/muxers/mplex/mplex.nim @@ -34,7 +34,6 @@ when defined(libp2p_expensive_metrics): "mplex channels", labels = ["initiator", "peer"]) type - TooManyChannels* = object of MuxerError InvalidChannelIdError* = object of MuxerError Mplex* = ref object of Muxer diff --git a/libp2p/muxers/muxer.nim b/libp2p/muxers/muxer.nim index 9dc5203..eda79c0 100644 --- a/libp2p/muxers/muxer.nim +++ b/libp2p/muxers/muxer.nim @@ -22,6 +22,7 @@ const type MuxerError* = object of LPError + TooManyChannels* = object of MuxerError StreamHandler* = proc(conn: Connection): Future[void] {.gcsafe, raises: [Defect].} MuxerHandler* = proc(muxer: Muxer): Future[void] {.gcsafe, raises: [Defect].} diff --git a/libp2p/muxers/yamux/yamux.nim b/libp2p/muxers/yamux/yamux.nim index 601ad33..02e96f7 100644 --- a/libp2p/muxers/yamux/yamux.nim +++ b/libp2p/muxers/yamux/yamux.nim @@ -10,7 +10,7 @@ {.push raises: [Defect].} import sequtils, std/[tables] -import chronos, chronicles, stew/[endians2, byteutils, objects] +import chronos, chronicles, metrics, stew/[endians2, byteutils, objects] import ../muxer, ../../stream/connection @@ -23,6 +23,14 @@ const YamuxCodec* = "/yamux/1.0.0" YamuxVersion = 0.uint8 DefaultWindowSize = 256000 + MaxChannelCount = 200 + +when defined(libp2p_yamux_metrics): + declareGauge(libp2p_yamux_channels, "yamux channels", labels = ["initiator", "peer"]) + declareHistogram libp2p_yamux_send_queue, "message send queue length (in byte)", + buckets = [0.0, 100.0, 250.0, 1000.0, 2000.0, 1600.0, 6400.0, 25600.0, 256000.0] + declareHistogram libp2p_yamux_recv_queue, "message recv queue length (in byte)", + buckets = [0.0, 100.0, 250.0, 1000.0, 2000.0, 1600.0, 6400.0, 25600.0, 256000.0] type YamuxError* = object of CatchableError @@ -195,6 +203,7 @@ proc reset(channel: YamuxChannel, isLocal: bool = false) {.async.} = if isLocal: try: await channel.conn.write(YamuxHeader.data(channel.id, 0, {Rst})) except LPStreamEOFError as exc: discard + except LPStreamClosedError as exc: discard await channel.close() if not channel.closedRemotely.done(): await channel.remoteClosed() @@ -246,6 +255,8 @@ proc gotDataFromRemote(channel: YamuxChannel, b: seq[byte]) {.async.} = channel.recvWindow -= b.len channel.recvQueue = channel.recvQueue.concat(b) channel.receivedData.fire() + when defined(libp2p_yamux_metrics): + libp2p_yamux_recv_queue.observe(channel.recvQueue.len.int64) await channel.updateRecvWindow() proc setMaxRecvWindow*(channel: YamuxChannel, maxRecvWindow: int) = @@ -261,7 +272,12 @@ proc trySend(channel: YamuxChannel) {.async.} = if channel.sendWindow == 0: trace "send window empty" if channel.sendQueueBytes(true) > channel.maxRecvWindow: - await channel.reset(true) + debug "channel send queue too big, resetting", maxSendWindow=channel.maxRecvWindow, + currentQueueSize = channel.sendQueueBytes(true) + try: + await channel.reset(true) + except CatchableError as exc: + debug "failed to reset", msg=exc.msg break let @@ -293,7 +309,7 @@ proc trySend(channel: YamuxChannel) {.async.} = trace "build send buffer", h = $header, msg=string.fromBytes(sendBuffer[12..^1]) channel.sendWindow.dec(toSend) try: await channel.conn.write(sendBuffer) - except LPStreamEOFError as exc: + except CatchableError as exc: for fut in futures.items(): fut.fail(exc) await channel.reset() @@ -311,6 +327,8 @@ method write*(channel: YamuxChannel, msg: seq[byte]): Future[void] = result.complete() return result channel.sendQueue.add((msg, 0, result)) + when defined(libp2p_yamux_metrics): + libp2p_yamux_recv_queue.observe(channel.sendQueueBytes().int64) asyncSpawn channel.trySend() proc open*(channel: YamuxChannel) {.async, gcsafe.} = @@ -326,10 +344,17 @@ type flushed: Table[uint32, int] currentId: uint32 isClosed: bool + maxChannCount: int + +proc lenBySrc(m: Yamux, isSrc: bool): int = + for v in m.channels.values(): + if v.isSrc == isSrc: result += 1 proc cleanupChann(m: Yamux, channel: YamuxChannel) {.async.} = await channel.join() m.channels.del(channel.id) + when defined(libp2p_yamux_metrics): + libp2p_yamux_channels.set(m.lenBySrc(channel.isSrc).int64, [$channel.isSrc, $channel.peerId]) if channel.isReset and channel.recvWindow > 0: m.flushed[channel.id] = channel.recvWindow @@ -344,6 +369,11 @@ proc createStream(m: Yamux, id: uint32, isSrc: bool): YamuxChannel = receivedData: newAsyncEvent(), closedRemotely: newFuture[void]() ) + result.objName = "YamuxStream" + result.dir = if isSrc: Direction.Out else: Direction.In + result.timeoutHandler = proc(): Future[void] {.gcsafe.} = + trace "Idle timeout expired, resetting YamuxChannel" + result.reset() result.initStream() result.peerId = m.connection.peerId result.observedAddr = m.connection.observedAddr @@ -353,6 +383,8 @@ proc createStream(m: Yamux, id: uint32, isSrc: bool): YamuxChannel = m.channels[id] = result asyncSpawn m.cleanupChann(result) trace "created channel", id, pid=m.connection.peerId + when defined(libp2p_yamux_metrics): + libp2p_yamux_channels.set(m.lenBySrc(isSrc).int64, [$isSrc, $result.peerId]) method close*(m: Yamux) {.async.} = if m.isClosed == true: @@ -405,6 +437,9 @@ method handle*(m: Yamux) {.async, gcsafe.} = if header.streamId mod 2 == m.currentId mod 2: raise newException(YamuxError, "Peer used our reserved stream id") let newStream = m.createStream(header.streamId, false) + if m.channels.len >= m.maxChannCount: + await newStream.reset() + continue await newStream.open() asyncSpawn m.handleStream(newStream) elif header.streamId notin m.channels: @@ -455,14 +490,17 @@ method newStream*( name: string = "", lazy: bool = false): Future[Connection] {.async, gcsafe.} = + if m.channels.len > m.maxChannCount - 1: + raise newException(TooManyChannels, "max allowed channel count exceeded") let stream = m.createStream(m.currentId, true) m.currentId += 2 if not lazy: await stream.open() return stream -proc new*(T: type[Yamux], conn: Connection): T = +proc new*(T: type[Yamux], conn: Connection, maxChannCount: int = MaxChannelCount): T = T( connection: conn, - currentId: if conn.dir == Out: 1 else: 2 + currentId: if conn.dir == Out: 1 else: 2, + maxChannCount: maxChannCount )