Muxer bandwidth metrics (#607)

* add stream bytes metrics

* renamed bw metric

* renamed

* new global metric
This commit is contained in:
Tanguy Cizain 2021-07-26 16:12:36 +02:00 committed by GitHub
parent c1b2d45d1b
commit 7fc0dfbd55
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 21 additions and 1 deletions

View File

@ -18,7 +18,7 @@ requires "nim >= 1.2.0",
proc runTest(filename: string, verify: bool = true, sign: bool = true, proc runTest(filename: string, verify: bool = true, sign: bool = true,
moreoptions: string = "") = moreoptions: string = "") =
var excstr = "nim c --opt:speed -d:debug -d:libp2p_agents_metrics -d:libp2p_protobuf_metrics --verbosity:0 --hints:off" var excstr = "nim c --opt:speed -d:debug -d:libp2p_agents_metrics -d:libp2p_protobuf_metrics -d:lipp2p_network_protocols_metrics --verbosity:0 --hints:off"
excstr.add(" --warning[CaseTransition]:off --warning[ObservableStores]:off --warning[LockLevel]:off") excstr.add(" --warning[CaseTransition]:off --warning[ObservableStores]:off --warning[LockLevel]:off")
excstr.add(" -d:libp2p_pubsub_sign=" & $sign) excstr.add(" -d:libp2p_pubsub_sign=" & $sign)
excstr.add(" -d:libp2p_pubsub_verify=" & $verify) excstr.add(" -d:libp2p_pubsub_verify=" & $verify)

View File

@ -80,6 +80,7 @@ proc select*(m: MultistreamSelect,
trace "reading first requested proto", conn trace "reading first requested proto", conn
if s == proto[0]: if s == proto[0]:
trace "successfully selected ", conn, proto = proto[0] trace "successfully selected ", conn, proto = proto[0]
conn.tag = proto[0]
return proto[0] return proto[0]
elif proto.len > 1: elif proto.len > 1:
# Try to negotiate alternatives # Try to negotiate alternatives
@ -92,6 +93,7 @@ proc select*(m: MultistreamSelect,
validateSuffix(s) validateSuffix(s)
if s == p: if s == p:
trace "selected protocol", conn, protocol = s trace "selected protocol", conn, protocol = s
conn.tag = s
return s return s
return "" return ""
else: else:
@ -169,6 +171,7 @@ proc handle*(m: MultistreamSelect, conn: Connection, active: bool = false) {.asy
if (not isNil(h.match) and h.match(ms)) or h.protos.contains(ms): if (not isNil(h.match) and h.match(ms)) or h.protos.contains(ms):
trace "found handler", conn, protocol = ms trace "found handler", conn, protocol = ms
await conn.writeLp(ms & "\n") await conn.writeLp(ms & "\n")
conn.tag = ms
await h.protocol.handler(conn, ms) await h.protocol.handler(conn, ms)
return return
debug "no handlers", conn, protocol = ms debug "no handlers", conn, protocol = ms

View File

@ -21,6 +21,9 @@ export connection
logScope: logScope:
topics = "libp2p mplexchannel" topics = "libp2p mplexchannel"
when defined(lipp2p_network_protocols_metrics):
declareCounter libp2p_protocols_bytes, "total sent or received bytes", ["protocol", "direction"]
## Channel half-closed states ## Channel half-closed states
## ##
## | State | Closed local | Closed remote ## | State | Closed local | Closed remote
@ -157,6 +160,10 @@ method readOnce*(s: LPChannel,
## or the reads will lock each other. ## or the reads will lock each other.
try: try:
let bytes = await procCall BufferStream(s).readOnce(pbytes, nbytes) let bytes = await procCall BufferStream(s).readOnce(pbytes, nbytes)
when defined(lipp2p_network_protocols_metrics):
if s.tag.len > 0:
libp2p_protocols_bytes.inc(bytes.int64, labelValues=[s.tag, "in"])
trace "readOnce", s, bytes trace "readOnce", s, bytes
if bytes == 0: if bytes == 0:
await s.closeUnderlying() await s.closeUnderlying()
@ -194,6 +201,11 @@ method write*(s: LPChannel, msg: seq[byte]): Future[void] {.async.} =
trace "write msg", s, conn = s.conn, len = msg.len trace "write msg", s, conn = s.conn, len = msg.len
await s.conn.writeMsg(s.id, s.msgCode, msg) await s.conn.writeMsg(s.id, s.msgCode, msg)
when defined(lipp2p_network_protocols_metrics):
if s.tag.len > 0:
libp2p_protocols_bytes.inc(msg.len.int64, labelValues=[s.tag, "out"])
s.activity = true s.activity = true
except CatchableError as exc: except CatchableError as exc:
trace "exception in lpchannel write handler", s, msg = exc.msg trace "exception in lpchannel write handler", s, msg = exc.msg

View File

@ -33,6 +33,8 @@ when defined(libp2p_agents_metrics):
declareCounter(libp2p_peers_traffic_read, "incoming traffic", labels = ["agent"]) declareCounter(libp2p_peers_traffic_read, "incoming traffic", labels = ["agent"])
declareCounter(libp2p_peers_traffic_write, "outgoing traffic", labels = ["agent"]) declareCounter(libp2p_peers_traffic_write, "outgoing traffic", labels = ["agent"])
declareCounter(libp2p_network_bytes, "total traffic", labels = ["direction"])
func shortLog*(conn: ChronosStream): auto = func shortLog*(conn: ChronosStream): auto =
try: try:
if conn.isNil: "ChronosStream(nil)" if conn.isNil: "ChronosStream(nil)"
@ -105,6 +107,7 @@ method readOnce*(s: ChronosStream, pbytes: pointer, nbytes: int): Future[int] {.
withExceptions: withExceptions:
result = await s.client.readOnce(pbytes, nbytes) result = await s.client.readOnce(pbytes, nbytes)
s.activity = true # reset activity flag s.activity = true # reset activity flag
libp2p_network_bytes.inc(nbytes.int64, labelValues = ["in"])
when defined(libp2p_agents_metrics): when defined(libp2p_agents_metrics):
s.trackPeerIdentity() s.trackPeerIdentity()
if s.tracked: if s.tracked:
@ -127,6 +130,7 @@ method write*(s: ChronosStream, msg: seq[byte]) {.async.} =
raise (ref LPStreamClosedError)(msg: "Write couldn't finish writing") raise (ref LPStreamClosedError)(msg: "Write couldn't finish writing")
s.activity = true # reset activity flag s.activity = true # reset activity flag
libp2p_network_bytes.inc(msg.len.int64, labelValues = ["out"])
when defined(libp2p_agents_metrics): when defined(libp2p_agents_metrics):
s.trackPeerIdentity() s.trackPeerIdentity()
if s.tracked: if s.tracked:

View File

@ -36,6 +36,7 @@ type
peerInfo*: PeerInfo peerInfo*: PeerInfo
observedAddr*: Multiaddress observedAddr*: Multiaddress
upgraded*: Future[void] upgraded*: Future[void]
tag*: string # debug tag for metrics (generally ms protocol)
transportDir*: Direction # The bottom level transport (generally the socket) direction transportDir*: Direction # The bottom level transport (generally the socket) direction
proc timeoutMonitor(s: Connection) {.async, gcsafe.} proc timeoutMonitor(s: Connection) {.async, gcsafe.}