From eaa73ae6e82f9b1c1f720f47617424469b6db2a6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C8=98tefan=20Talpalaru?= Date: Tue, 14 Apr 2020 15:27:07 +0200 Subject: [PATCH] add stream metrics (#136) * add stream metrics - just BufferStream and Connection are tracked, for now - flag checking is enforced more strictly in close(), since it became clear that instances are closed multiple times * add "metrics" dependency and sort the list --- libp2p.nimble | 7 ++++--- libp2p/connection.nim | 21 +++++++++++++-------- libp2p/protocols/secure/secure.nim | 3 +-- libp2p/stream/bufferstream.nim | 25 ++++++++++++++++--------- 4 files changed, 34 insertions(+), 22 deletions(-) diff --git a/libp2p.nimble b/libp2p.nimble index 8841e8af1..a6c5ffd07 100644 --- a/libp2p.nimble +++ b/libp2p.nimble @@ -8,11 +8,12 @@ license = "MIT" skipDirs = @["tests", "examples", "Nim"] requires "nim > 0.19.4", - "secp256k1", - "nimcrypto >= 0.4.1", - "chronos >= 2.3.8", "bearssl >= 0.1.4", "chronicles >= 0.7.1", + "chronos >= 2.3.8", + "metrics", + "nimcrypto >= 0.4.1", + "secp256k1", "stew" proc runTest(filename: string) = diff --git a/libp2p/connection.nim b/libp2p/connection.nim index 6fc31821a..46fc5cc52 100644 --- a/libp2p/connection.nim +++ b/libp2p/connection.nim @@ -7,7 +7,8 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. -import chronos, chronicles, oids +import oids +import chronos, chronicles, metrics import peerinfo, multiaddress, stream/lpstream, @@ -29,6 +30,8 @@ type InvalidVarintException = object of LPStreamError InvalidVarintSizeException = object of LPStreamError +declareGauge libp2p_open_connection, "open Connection instances" + proc newInvalidVarintException*(): ref InvalidVarintException = newException(InvalidVarintException, "Unable to parse varint") @@ -57,6 +60,7 @@ proc init*[T: Connection](self: var T, stream: LPStream): T = when chronicles.enabledLogLevel == LogLevel.TRACE: self.oid = genOid() asyncCheck self.bindStreamClose() + libp2p_open_connection.inc() return self @@ -117,11 +121,11 @@ method closed*(s: Connection): bool = result = s.stream.closed method close*(s: Connection) {.async, gcsafe.} = - trace "about to close connection", closed = s.closed, - peer = if not isNil(s.peerInfo): - s.peerInfo.id else: "" - if not s.closed: + trace "about to close connection", closed = s.closed, + peer = if not isNil(s.peerInfo): + s.peerInfo.id else: "" + if not isNil(s.stream) and not s.stream.closed: trace "closing child stream", closed = s.closed, peer = if not isNil(s.peerInfo): @@ -131,9 +135,10 @@ method close*(s: Connection) {.async, gcsafe.} = s.closeEvent.fire() s.isClosed = true - trace "connection closed", closed = s.closed, - peer = if not isNil(s.peerInfo): - s.peerInfo.id else: "" + trace "connection closed", closed = s.closed, + peer = if not isNil(s.peerInfo): + s.peerInfo.id else: "" + libp2p_open_connection.dec() proc readLp*(s: Connection): Future[seq[byte]] {.async, gcsafe.} = ## read lenght prefixed msg diff --git a/libp2p/protocols/secure/secure.nim b/libp2p/protocols/secure/secure.nim index 7860b39d3..00105d165 100644 --- a/libp2p/protocols/secure/secure.nim +++ b/libp2p/protocols/secure/secure.nim @@ -8,8 +8,7 @@ ## those terms. import options -import chronos -import chronicles +import chronos, chronicles import ../protocol, ../../stream/bufferstream, ../../crypto/crypto, diff --git a/libp2p/stream/bufferstream.nim b/libp2p/stream/bufferstream.nim index 2fc088a1e..f2c168abe 100644 --- a/libp2p/stream/bufferstream.nim +++ b/libp2p/stream/bufferstream.nim @@ -31,7 +31,7 @@ ## buffer goes below ``maxSize`` or more data becomes available. import deques, math, oids -import chronos, chronicles +import chronos, chronicles, metrics import ../stream/lpstream const DefaultBufferSize* = 1024 @@ -52,6 +52,8 @@ type AlreadyPipedError* = object of CatchableError NotWritableError* = object of CatchableError +declareGauge libp2p_open_bufferstream, "open BufferStream instances" + proc newAlreadyPipedError*(): ref Exception {.inline.} = result = newException(AlreadyPipedError, "stream already piped") @@ -77,6 +79,8 @@ proc initBufferStream*(s: BufferStream, s.closeEvent = newAsyncEvent() when chronicles.enabledLogLevel == LogLevel.TRACE: s.oid = genOid() + s.isClosed = false + libp2p_open_bufferstream.inc() proc newBufferStream*(handler: WriteHandler = nil, size: int = DefaultBufferSize): BufferStream = @@ -386,11 +390,14 @@ proc `|`*(s: BufferStream, target: BufferStream): BufferStream = method close*(s: BufferStream) {.async.} = ## close the stream and clear the buffer - trace "closing bufferstream" - for r in s.readReqs: - if not(isNil(r)) and not(r.finished()): - r.fail(newLPStreamEOFError()) - s.dataReadEvent.fire() - s.readBuf.clear() - s.closeEvent.fire() - s.isClosed = true + if not s.isClosed: + trace "closing bufferstream" + for r in s.readReqs: + if not(isNil(r)) and not(r.finished()): + r.fail(newLPStreamEOFError()) + s.dataReadEvent.fire() + s.readBuf.clear() + s.closeEvent.fire() + s.isClosed = true + libp2p_open_bufferstream.dec() +