add stream metrics (#136)

* add stream metrics

- just BufferStream and Connection are tracked, for now
- flag checking is enforced more strictly in close(), since it became
  clear that instances are closed multiple times

* add "metrics" dependency

and sort the list
This commit is contained in:
Ștefan Talpalaru 2020-04-14 15:27:07 +02:00 committed by GitHub
parent 7723403b1f
commit eaa73ae6e8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 34 additions and 22 deletions

View File

@ -8,11 +8,12 @@ license = "MIT"
skipDirs = @["tests", "examples", "Nim"] skipDirs = @["tests", "examples", "Nim"]
requires "nim > 0.19.4", requires "nim > 0.19.4",
"secp256k1",
"nimcrypto >= 0.4.1",
"chronos >= 2.3.8",
"bearssl >= 0.1.4", "bearssl >= 0.1.4",
"chronicles >= 0.7.1", "chronicles >= 0.7.1",
"chronos >= 2.3.8",
"metrics",
"nimcrypto >= 0.4.1",
"secp256k1",
"stew" "stew"
proc runTest(filename: string) = proc runTest(filename: string) =

View File

@ -7,7 +7,8 @@
## This file may not be copied, modified, or distributed except according to ## This file may not be copied, modified, or distributed except according to
## those terms. ## those terms.
import chronos, chronicles, oids import oids
import chronos, chronicles, metrics
import peerinfo, import peerinfo,
multiaddress, multiaddress,
stream/lpstream, stream/lpstream,
@ -29,6 +30,8 @@ type
InvalidVarintException = object of LPStreamError InvalidVarintException = object of LPStreamError
InvalidVarintSizeException = object of LPStreamError InvalidVarintSizeException = object of LPStreamError
declareGauge libp2p_open_connection, "open Connection instances"
proc newInvalidVarintException*(): ref InvalidVarintException = proc newInvalidVarintException*(): ref InvalidVarintException =
newException(InvalidVarintException, "Unable to parse varint") newException(InvalidVarintException, "Unable to parse varint")
@ -57,6 +60,7 @@ proc init*[T: Connection](self: var T, stream: LPStream): T =
when chronicles.enabledLogLevel == LogLevel.TRACE: when chronicles.enabledLogLevel == LogLevel.TRACE:
self.oid = genOid() self.oid = genOid()
asyncCheck self.bindStreamClose() asyncCheck self.bindStreamClose()
libp2p_open_connection.inc()
return self return self
@ -117,11 +121,11 @@ method closed*(s: Connection): bool =
result = s.stream.closed result = s.stream.closed
method close*(s: Connection) {.async, gcsafe.} = method close*(s: Connection) {.async, gcsafe.} =
if not s.closed:
trace "about to close connection", closed = s.closed, trace "about to close connection", closed = s.closed,
peer = if not isNil(s.peerInfo): peer = if not isNil(s.peerInfo):
s.peerInfo.id else: "" s.peerInfo.id else: ""
if not s.closed:
if not isNil(s.stream) and not s.stream.closed: if not isNil(s.stream) and not s.stream.closed:
trace "closing child stream", closed = s.closed, trace "closing child stream", closed = s.closed,
peer = if not isNil(s.peerInfo): peer = if not isNil(s.peerInfo):
@ -134,6 +138,7 @@ method close*(s: Connection) {.async, gcsafe.} =
trace "connection closed", closed = s.closed, trace "connection closed", closed = s.closed,
peer = if not isNil(s.peerInfo): peer = if not isNil(s.peerInfo):
s.peerInfo.id else: "" s.peerInfo.id else: ""
libp2p_open_connection.dec()
proc readLp*(s: Connection): Future[seq[byte]] {.async, gcsafe.} = proc readLp*(s: Connection): Future[seq[byte]] {.async, gcsafe.} =
## read lenght prefixed msg ## read lenght prefixed msg

View File

@ -8,8 +8,7 @@
## those terms. ## those terms.
import options import options
import chronos import chronos, chronicles
import chronicles
import ../protocol, import ../protocol,
../../stream/bufferstream, ../../stream/bufferstream,
../../crypto/crypto, ../../crypto/crypto,

View File

@ -31,7 +31,7 @@
## buffer goes below ``maxSize`` or more data becomes available. ## buffer goes below ``maxSize`` or more data becomes available.
import deques, math, oids import deques, math, oids
import chronos, chronicles import chronos, chronicles, metrics
import ../stream/lpstream import ../stream/lpstream
const DefaultBufferSize* = 1024 const DefaultBufferSize* = 1024
@ -52,6 +52,8 @@ type
AlreadyPipedError* = object of CatchableError AlreadyPipedError* = object of CatchableError
NotWritableError* = object of CatchableError NotWritableError* = object of CatchableError
declareGauge libp2p_open_bufferstream, "open BufferStream instances"
proc newAlreadyPipedError*(): ref Exception {.inline.} = proc newAlreadyPipedError*(): ref Exception {.inline.} =
result = newException(AlreadyPipedError, "stream already piped") result = newException(AlreadyPipedError, "stream already piped")
@ -77,6 +79,8 @@ proc initBufferStream*(s: BufferStream,
s.closeEvent = newAsyncEvent() s.closeEvent = newAsyncEvent()
when chronicles.enabledLogLevel == LogLevel.TRACE: when chronicles.enabledLogLevel == LogLevel.TRACE:
s.oid = genOid() s.oid = genOid()
s.isClosed = false
libp2p_open_bufferstream.inc()
proc newBufferStream*(handler: WriteHandler = nil, proc newBufferStream*(handler: WriteHandler = nil,
size: int = DefaultBufferSize): BufferStream = size: int = DefaultBufferSize): BufferStream =
@ -386,6 +390,7 @@ proc `|`*(s: BufferStream, target: BufferStream): BufferStream =
method close*(s: BufferStream) {.async.} = method close*(s: BufferStream) {.async.} =
## close the stream and clear the buffer ## close the stream and clear the buffer
if not s.isClosed:
trace "closing bufferstream" trace "closing bufferstream"
for r in s.readReqs: for r in s.readReqs:
if not(isNil(r)) and not(r.finished()): if not(isNil(r)) and not(r.finished()):
@ -394,3 +399,5 @@ method close*(s: BufferStream) {.async.} =
s.readBuf.clear() s.readBuf.clear()
s.closeEvent.fire() s.closeEvent.fire()
s.isClosed = true s.isClosed = true
libp2p_open_bufferstream.dec()