diff --git a/libp2p/muxers/mplex/lpchannel.nim b/libp2p/muxers/mplex/lpchannel.nim index 52df181..0d64f05 100644 --- a/libp2p/muxers/mplex/lpchannel.nim +++ b/libp2p/muxers/mplex/lpchannel.nim @@ -35,6 +35,8 @@ const MaxWrites = 1024 ##\ ## Maximum number of in-flight writes - after this, we disconnect the peer + LPChannelTrackerName* = "LPChannel" + type LPChannel* = ref object of BufferStream id*: uint64 # channel id diff --git a/libp2p/protocols/secure/secure.nim b/libp2p/protocols/secure/secure.nim index 0d83849..61ef8c9 100644 --- a/libp2p/protocols/secure/secure.nim +++ b/libp2p/protocols/secure/secure.nim @@ -20,6 +20,9 @@ export protocol logScope: topics = "secure" +const + SecureConnTrackerName* = "SecureConn" + type Secure* = ref object of LPProtocol # base type for secure managers diff --git a/libp2p/stream/bufferstream.nim b/libp2p/stream/bufferstream.nim index eee95d6..e2d6859 100644 --- a/libp2p/stream/bufferstream.nim +++ b/libp2p/stream/bufferstream.nim @@ -22,36 +22,7 @@ logScope: topics = "bufferstream" const - BufferStreamTrackerName* = "libp2p.bufferstream" - -type - BufferStreamTracker* = ref object of TrackerBase - opened*: uint64 - closed*: uint64 - -proc setupBufferStreamTracker(): BufferStreamTracker {.gcsafe.} - -proc getBufferStreamTracker(): BufferStreamTracker {.gcsafe.} = - result = cast[BufferStreamTracker](getTracker(BufferStreamTrackerName)) - if isNil(result): - result = setupBufferStreamTracker() - -proc dumpTracking(): string {.gcsafe.} = - var tracker = getBufferStreamTracker() - result = "Opened buffers: " & $tracker.opened & "\n" & - "Closed buffers: " & $tracker.closed - -proc leakTransport(): bool {.gcsafe.} = - var tracker = getBufferStreamTracker() - result = (tracker.opened != tracker.closed) - -proc setupBufferStreamTracker(): BufferStreamTracker = - result = new BufferStreamTracker - result.opened = 0 - result.closed = 0 - result.dump = dumpTracking - result.isLeaked = leakTransport - addTracker(BufferStreamTrackerName, result) + BufferStreamTrackerName* = "BufferStream" type BufferStream* = ref object of Connection @@ -79,7 +50,6 @@ method initStream*(s: BufferStream) = s.readQueue = newAsyncQueue[seq[byte]](1) trace "BufferStream created", s - inc getBufferStreamTracker().opened proc newBufferStream*(timeout: Duration = DefaultConnectionTimeout): BufferStream = new result @@ -169,7 +139,6 @@ method closeImpl*(s: BufferStream): Future[void] = if not s.pushedEof: # Potentially wake up reader asyncSpawn s.pushEof() - inc getBufferStreamTracker().closed trace "Closed BufferStream", s procCall Connection(s).closeImpl() # noraises, nocancels diff --git a/libp2p/stream/chronosstream.nim b/libp2p/stream/chronosstream.nim index 5f8f321..259de3b 100644 --- a/libp2p/stream/chronosstream.nim +++ b/libp2p/stream/chronosstream.nim @@ -16,6 +16,7 @@ logScope: const DefaultChronosStreamTimeout = 10.minutes + ChronosStreamTrackerName* = "ChronosStream" type ChronosStream* = ref object of Connection diff --git a/libp2p/stream/connection.nim b/libp2p/stream/connection.nim index 504b1cc..b1966b9 100644 --- a/libp2p/stream/connection.nim +++ b/libp2p/stream/connection.nim @@ -19,7 +19,7 @@ logScope: topics = "connection" const - ConnectionTrackerName* = "libp2p.connection" + ConnectionTrackerName* = "Connection" DefaultConnectionTimeout* = 5.minutes type @@ -33,35 +33,8 @@ type peerInfo*: PeerInfo observedAddr*: Multiaddress - ConnectionTracker* = ref object of TrackerBase - opened*: uint64 - closed*: uint64 - -proc setupConnectionTracker(): ConnectionTracker {.gcsafe.} proc timeoutMonitor(s: Connection) {.async, gcsafe.} -proc getConnectionTracker*(): ConnectionTracker {.gcsafe.} = - result = cast[ConnectionTracker](getTracker(ConnectionTrackerName)) - if isNil(result): - result = setupConnectionTracker() - -proc dumpTracking(): string {.gcsafe.} = - var tracker = getConnectionTracker() - result = "Opened conns: " & $tracker.opened & "\n" & - "Closed conns: " & $tracker.closed - -proc leakTransport(): bool {.gcsafe.} = - var tracker = getConnectionTracker() - result = (tracker.opened != tracker.closed) - -proc setupConnectionTracker(): ConnectionTracker = - result = new ConnectionTracker - result.opened = 0 - result.closed = 0 - result.dump = dumpTracking - result.isLeaked = leakTransport - addTracker(ConnectionTrackerName, result) - func shortLog*(conn: Connection): string = if conn.isNil: "Connection(nil)" elif conn.peerInfo.isNil: $conn.oid @@ -85,15 +58,12 @@ method initStream*(s: Connection) = trace "Idle timeout expired, closing connection", s s.close() - inc getConnectionTracker().opened - method closeImpl*(s: Connection): Future[void] = # Cleanup timeout timer trace "Closing connection", s if not isNil(s.timerTaskFut) and not s.timerTaskFut.finished: s.timerTaskFut.cancel() - inc getConnectionTracker().closed trace "Closed connection", s procCall LPStream(s).closeImpl() diff --git a/libp2p/stream/lpstream.nim b/libp2p/stream/lpstream.nim index fc7d33f..655a891 100644 --- a/libp2p/stream/lpstream.nim +++ b/libp2p/stream/lpstream.nim @@ -23,6 +23,9 @@ export oids logScope: topics = "lpstream" +const + LPStreamTrackerName* = "LPStream" + type Direction* {.pure.} = enum In, Out @@ -49,6 +52,34 @@ type InvalidVarintError* = object of LPStreamError MaxSizeError* = object of LPStreamError + StreamTracker* = ref object of TrackerBase + opened*: uint64 + closed*: uint64 + +proc setupStreamTracker(name: string): StreamTracker = + let tracker = new StreamTracker + + proc dumpTracking(): string {.gcsafe.} = + return "Opened " & tracker.id & " :" & $tracker.opened & "\n" & + "Closed " & tracker.id & " :" & $tracker.closed + + proc leakTransport(): bool {.gcsafe.} = + return (tracker.opened != tracker.closed) + + tracker.id = name + tracker.opened = 0 + tracker.closed = 0 + tracker.dump = dumpTracking + tracker.isLeaked = leakTransport + addTracker(name, tracker) + + return tracker + +proc getStreamTracker(name: string): StreamTracker {.gcsafe.} = + result = cast[StreamTracker](getTracker(name)) + if isNil(result): + result = setupStreamTracker(name) + proc newLPStreamReadError*(p: ref CatchableError): ref CatchableError = var w = newException(LPStreamReadError, "Read stream failed") w.msg = w.msg & ", originated from [" & $p.name & "] " & p.msg @@ -92,6 +123,7 @@ method initStream*(s: LPStream) {.base.} = s.oid = genOid() libp2p_open_streams.inc(labelValues = [s.objName, $s.dir]) + inc getStreamTracker(s.objName).opened trace "Stream created", s, objName = s.objName, dir = $s.dir proc join*(s: LPStream): Future[void] = @@ -220,6 +252,7 @@ method closeImpl*(s: LPStream): Future[void] {.async, base.} = trace "Closing stream", s, objName = s.objName, dir = $s.dir s.closeEvent.fire() libp2p_open_streams.dec(labelValues = [s.objName, $s.dir]) + inc getStreamTracker(s.objName).closed trace "Closed stream", s, objName = s.objName, dir = $s.dir method close*(s: LPStream): Future[void] {.base, async.} = # {.raises [Defect].} diff --git a/tests/helpers.nim b/tests/helpers.nim index cb8d6a7..561a943 100644 --- a/tests/helpers.nim +++ b/tests/helpers.nim @@ -6,13 +6,18 @@ import ../libp2p/transports/tcptransport import ../libp2p/stream/bufferstream import ../libp2p/crypto/crypto import ../libp2p/stream/lpstream +import ../libp2p/muxers/mplex/lpchannel +import ../libp2p/protocols/secure/secure const StreamTransportTrackerName = "stream.transport" StreamServerTrackerName = "stream.server" trackerNames = [ + LPStreamTrackerName, ConnectionTrackerName, + LPChannelTrackerName, + SecureConnTrackerName, BufferStreamTrackerName, TcpTransportTrackerName, StreamTransportTrackerName, diff --git a/tests/testbufferstream.nim b/tests/testbufferstream.nim index 9eb9a60..6af89ad 100644 --- a/tests/testbufferstream.nim +++ b/tests/testbufferstream.nim @@ -7,8 +7,8 @@ import ../libp2p/stream/bufferstream, suite "BufferStream": teardown: - # echo getTracker("libp2p.bufferstream").dump() - check getTracker("libp2p.bufferstream").isLeaked() == false + # echo getTracker(BufferStreamTrackerName).dump() + check getTracker(BufferStreamTrackerName).isLeaked() == false test "push data to buffer": proc testpushData(): Future[bool] {.async.} = diff --git a/tests/testswitch.nim b/tests/testswitch.nim index a201565..4d2a50c 100644 --- a/tests/testswitch.nim +++ b/tests/testswitch.nim @@ -14,7 +14,9 @@ import ../libp2p/[errors, peerinfo, crypto/crypto, protocols/protocol, + protocols/secure/secure, muxers/muxer, + muxers/mplex/lpchannel, stream/lpstream] import ./helpers @@ -246,11 +248,12 @@ suite "Switch": check not switch1.isConnected(switch2.peerInfo) check not switch2.isConnected(switch1.peerInfo) - var bufferTracker = getTracker(BufferStreamTrackerName) - # echo bufferTracker.dump() - check bufferTracker.isLeaked() == false + var channelTracker = getTracker(LPChannelTrackerName) + # echo channelTracker.dump() + check channelTracker.isLeaked() == false - var connTracker = getTracker(ConnectionTrackerName) + var connTracker = getTracker(SecureConnTrackerName) + doAssert(not isNil(connTracker)) # echo connTracker.dump() check connTracker.isLeaked() == false @@ -305,11 +308,11 @@ suite "Switch": check not switch1.isConnected(switch2.peerInfo) check not switch2.isConnected(switch1.peerInfo) - var bufferTracker = getTracker(BufferStreamTrackerName) + var bufferTracker = getTracker(LPChannelTrackerName) # echo bufferTracker.dump() check bufferTracker.isLeaked() == false - var connTracker = getTracker(ConnectionTrackerName) + var connTracker = getTracker(SecureConnTrackerName) # echo connTracker.dump() check connTracker.isLeaked() == false @@ -370,11 +373,11 @@ suite "Switch": check not switch1.isConnected(switch2.peerInfo) check not switch2.isConnected(switch1.peerInfo) - var bufferTracker = getTracker(BufferStreamTrackerName) + var bufferTracker = getTracker(LPChannelTrackerName) # echo bufferTracker.dump() check bufferTracker.isLeaked() == false - var connTracker = getTracker(ConnectionTrackerName) + var connTracker = getTracker(SecureConnTrackerName) # echo connTracker.dump() check connTracker.isLeaked() == false @@ -434,11 +437,11 @@ suite "Switch": check not switch1.isConnected(switch2.peerInfo) check not switch2.isConnected(switch1.peerInfo) - var bufferTracker = getTracker(BufferStreamTrackerName) + var bufferTracker = getTracker(LPChannelTrackerName) # echo bufferTracker.dump() check bufferTracker.isLeaked() == false - var connTracker = getTracker(ConnectionTrackerName) + var connTracker = getTracker(SecureConnTrackerName) # echo connTracker.dump() check connTracker.isLeaked() == false @@ -498,11 +501,11 @@ suite "Switch": check not switch1.isConnected(switch2.peerInfo) check not switch2.isConnected(switch1.peerInfo) - var bufferTracker = getTracker(BufferStreamTrackerName) + var bufferTracker = getTracker(LPChannelTrackerName) # echo bufferTracker.dump() check bufferTracker.isLeaked() == false - var connTracker = getTracker(ConnectionTrackerName) + var connTracker = getTracker(SecureConnTrackerName) # echo connTracker.dump() check connTracker.isLeaked() == false @@ -577,11 +580,11 @@ suite "Switch": check not switch2.isConnected(switch1.peerInfo) check not switch3.isConnected(switch1.peerInfo) - var bufferTracker = getTracker(BufferStreamTrackerName) + var bufferTracker = getTracker(LPChannelTrackerName) # echo bufferTracker.dump() check bufferTracker.isLeaked() == false - var connTracker = getTracker(ConnectionTrackerName) + var connTracker = getTracker(SecureConnTrackerName) # echo connTracker.dump() check connTracker.isLeaked() == false