From 7fc0dfbd555eaf371b9d62f729496a0f75afed3a Mon Sep 17 00:00:00 2001 From: Tanguy Cizain Date: Mon, 26 Jul 2021 16:12:36 +0200 Subject: [PATCH] Muxer bandwidth metrics (#607) * add stream bytes metrics * renamed bw metric * renamed * new global metric --- libp2p.nimble | 2 +- libp2p/multistream.nim | 3 +++ libp2p/muxers/mplex/lpchannel.nim | 12 ++++++++++++ libp2p/stream/chronosstream.nim | 4 ++++ libp2p/stream/connection.nim | 1 + 5 files changed, 21 insertions(+), 1 deletion(-) diff --git a/libp2p.nimble b/libp2p.nimble index a2ca432bf..9496a9009 100644 --- a/libp2p.nimble +++ b/libp2p.nimble @@ -18,7 +18,7 @@ requires "nim >= 1.2.0", proc runTest(filename: string, verify: bool = true, sign: bool = true, moreoptions: string = "") = - var excstr = "nim c --opt:speed -d:debug -d:libp2p_agents_metrics -d:libp2p_protobuf_metrics --verbosity:0 --hints:off" + var excstr = "nim c --opt:speed -d:debug -d:libp2p_agents_metrics -d:libp2p_protobuf_metrics -d:lipp2p_network_protocols_metrics --verbosity:0 --hints:off" excstr.add(" --warning[CaseTransition]:off --warning[ObservableStores]:off --warning[LockLevel]:off") excstr.add(" -d:libp2p_pubsub_sign=" & $sign) excstr.add(" -d:libp2p_pubsub_verify=" & $verify) diff --git a/libp2p/multistream.nim b/libp2p/multistream.nim index 456a767d8..2ce2fc938 100644 --- a/libp2p/multistream.nim +++ b/libp2p/multistream.nim @@ -80,6 +80,7 @@ proc select*(m: MultistreamSelect, trace "reading first requested proto", conn if s == proto[0]: trace "successfully selected ", conn, proto = proto[0] + conn.tag = proto[0] return proto[0] elif proto.len > 1: # Try to negotiate alternatives @@ -92,6 +93,7 @@ proc select*(m: MultistreamSelect, validateSuffix(s) if s == p: trace "selected protocol", conn, protocol = s + conn.tag = s return s return "" else: @@ -169,6 +171,7 @@ proc handle*(m: MultistreamSelect, conn: Connection, active: bool = false) {.asy if (not isNil(h.match) and h.match(ms)) or h.protos.contains(ms): trace "found handler", conn, protocol = ms await conn.writeLp(ms & "\n") + conn.tag = ms await h.protocol.handler(conn, ms) return debug "no handlers", conn, protocol = ms diff --git a/libp2p/muxers/mplex/lpchannel.nim b/libp2p/muxers/mplex/lpchannel.nim index 0cfd14e6b..b0418bb7b 100644 --- a/libp2p/muxers/mplex/lpchannel.nim +++ b/libp2p/muxers/mplex/lpchannel.nim @@ -21,6 +21,9 @@ export connection logScope: topics = "libp2p mplexchannel" +when defined(lipp2p_network_protocols_metrics): + declareCounter libp2p_protocols_bytes, "total sent or received bytes", ["protocol", "direction"] + ## Channel half-closed states ## ## | State | Closed local | Closed remote @@ -157,6 +160,10 @@ method readOnce*(s: LPChannel, ## or the reads will lock each other. try: let bytes = await procCall BufferStream(s).readOnce(pbytes, nbytes) + when defined(lipp2p_network_protocols_metrics): + if s.tag.len > 0: + libp2p_protocols_bytes.inc(bytes.int64, labelValues=[s.tag, "in"]) + trace "readOnce", s, bytes if bytes == 0: await s.closeUnderlying() @@ -194,6 +201,11 @@ method write*(s: LPChannel, msg: seq[byte]): Future[void] {.async.} = trace "write msg", s, conn = s.conn, len = msg.len await s.conn.writeMsg(s.id, s.msgCode, msg) + + when defined(lipp2p_network_protocols_metrics): + if s.tag.len > 0: + libp2p_protocols_bytes.inc(msg.len.int64, labelValues=[s.tag, "out"]) + s.activity = true except CatchableError as exc: trace "exception in lpchannel write handler", s, msg = exc.msg diff --git a/libp2p/stream/chronosstream.nim b/libp2p/stream/chronosstream.nim index c625d50bb..a4281bb08 100644 --- a/libp2p/stream/chronosstream.nim +++ b/libp2p/stream/chronosstream.nim @@ -33,6 +33,8 @@ when defined(libp2p_agents_metrics): declareCounter(libp2p_peers_traffic_read, "incoming traffic", labels = ["agent"]) declareCounter(libp2p_peers_traffic_write, "outgoing traffic", labels = ["agent"]) +declareCounter(libp2p_network_bytes, "total traffic", labels = ["direction"]) + func shortLog*(conn: ChronosStream): auto = try: if conn.isNil: "ChronosStream(nil)" @@ -105,6 +107,7 @@ method readOnce*(s: ChronosStream, pbytes: pointer, nbytes: int): Future[int] {. withExceptions: result = await s.client.readOnce(pbytes, nbytes) s.activity = true # reset activity flag + libp2p_network_bytes.inc(nbytes.int64, labelValues = ["in"]) when defined(libp2p_agents_metrics): s.trackPeerIdentity() if s.tracked: @@ -127,6 +130,7 @@ method write*(s: ChronosStream, msg: seq[byte]) {.async.} = raise (ref LPStreamClosedError)(msg: "Write couldn't finish writing") s.activity = true # reset activity flag + libp2p_network_bytes.inc(msg.len.int64, labelValues = ["out"]) when defined(libp2p_agents_metrics): s.trackPeerIdentity() if s.tracked: diff --git a/libp2p/stream/connection.nim b/libp2p/stream/connection.nim index 6db668414..8e05a9947 100644 --- a/libp2p/stream/connection.nim +++ b/libp2p/stream/connection.nim @@ -36,6 +36,7 @@ type peerInfo*: PeerInfo observedAddr*: Multiaddress upgraded*: Future[void] + tag*: string # debug tag for metrics (generally ms protocol) transportDir*: Direction # The bottom level transport (generally the socket) direction proc timeoutMonitor(s: Connection) {.async, gcsafe.}