From 1de1d49223505a44535fa8835d1c0c65fc4c00fb Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Tue, 27 Oct 2020 11:21:03 -0600 Subject: [PATCH] Channel leaks (#413) * break stream tracking by type * use closeWithEOF to await wrapped stream * fix cancelation leaks * fix channel leaks * logging * use close monitor and always call closeUnderlying * don't use closeWithEOF * removing close monitor * logging --- libp2p/muxers/mplex/lpchannel.nim | 54 ++++++++++++---------- libp2p/muxers/mplex/mplex.nim | 12 ++--- libp2p/protocols/pubsub/pubsub.nim | 13 +++--- libp2p/protocols/secure/secure.nim | 18 ++++---- libp2p/stream/bufferstream.nim | 23 ---------- libp2p/stream/chronosstream.nim | 8 ++-- libp2p/stream/connection.nim | 15 +++---- libp2p/stream/lpstream.nim | 16 ++++--- libp2p/switch.nim | 4 +- libp2p/transports/tcptransport.nim | 72 ++++++++++++++++++++++-------- 10 files changed, 131 insertions(+), 104 deletions(-) diff --git a/libp2p/muxers/mplex/lpchannel.nim b/libp2p/muxers/mplex/lpchannel.nim index 52df181c3..aea771806 100644 --- a/libp2p/muxers/mplex/lpchannel.nim +++ b/libp2p/muxers/mplex/lpchannel.nim @@ -66,12 +66,6 @@ proc open*(s: LPChannel) {.async, gcsafe.} = method closed*(s: LPChannel): bool = s.closedLocal -proc closeUnderlying(s: LPChannel): Future[void] {.async.} = - ## Channels may be closed for reading and writing in any order - we'll close - ## the underlying bufferstream when both directions are closed - if s.closedLocal and s.isEof: - await procCall BufferStream(s).close() - proc reset*(s: LPChannel) {.async, gcsafe.} = if s.isClosed: trace "Already closed", s @@ -115,30 +109,44 @@ proc reset*(s: LPChannel) {.async, gcsafe.} = trace "Channel reset", s +proc closeUnderlying*(s: LPChannel): Future[void] {.async.} = + ## Channels may be closed for reading and writing in any order - we'll close + ## the underlying bufferstream when both directions are closed + if s.closedLocal and s.isEof: + trace "Closing underlying", s, conn = s.conn + await procCall BufferStream(s).close() + method close*(s: LPChannel) {.async, gcsafe.} = ## Close channel for writing - a message will be sent to the other peer ## informing them that the channel is closed and that we're waiting for ## their acknowledgement. - if s.closedLocal: - trace "Already closed", s - return - s.closedLocal = true + ## - trace "Closing channel", s, conn = s.conn, len = s.len + try: + if s.closedLocal: + trace "Already closed", s + return + s.closedLocal = true - if s.isOpen: - try: - await s.conn.writeMsg(s.id, s.closeCode) # write close - except CancelledError as exc: - raise exc - except CatchableError as exc: - # It's harmless that close message cannot be sent - the connection is - # likely down already - trace "Cannot send close message", s, id = s.id + trace "Closing channel", s, conn = s.conn, len = s.len - await s.closeUnderlying() # maybe already eofed - - trace "Closed channel", s, len = s.len + if s.isOpen: + trace "Sending close msg", s, conn = s.conn + await s.conn.writeMsg(s.id, s.closeCode).wait(10.seconds) # write close + trace "Closed channel", s, len = s.len + except CancelledError as exc: + trace "Cancelling close", s, conn = s.conn + # need to reset here otherwise the close sequence doesn't complete and + # the channel leaks since none of it's `onClose` events trigger + await s.reset() + raise exc + except CatchableError as exc: + # need to reset here otherwise the close sequence doesn't complete and + # the channel leaks since none of it's `onClose` events trigger + trace "Cannot send close message", exc = exc.msg, s, conn = s.conn + await s.reset() + finally: + await s.closeUnderlying() # maybe already eofed method initStream*(s: LPChannel) = if s.objName.len == 0: diff --git a/libp2p/muxers/mplex/mplex.nim b/libp2p/muxers/mplex/mplex.nim index a85642067..db37d73ae 100644 --- a/libp2p/muxers/mplex/mplex.nim +++ b/libp2p/muxers/mplex/mplex.nim @@ -60,7 +60,7 @@ proc cleanupChann(m: Mplex, chann: LPChannel) {.async, inline.} = try: await chann.join() m.channels[chann.initiator].del(chann.id) - trace "cleaned up channel", m, chann + trace "cleaned up channel", m, s = chann when defined(libp2p_expensive_metrics): libp2p_mplex_channels.set( @@ -68,7 +68,7 @@ proc cleanupChann(m: Mplex, chann: LPChannel) {.async, inline.} = labelValues = [$chann.initiator, $m.connection.peerInfo.peerId]) except CatchableError as exc: # This is top-level procedure which will work as separate task, so it - # do not need to propogate CancelledError, and no other exceptions should + # do not need to propagate CancelledError, and no other exceptions should # happen here warn "Error cleaning up mplex channel", m, chann, msg = exc.msg @@ -118,7 +118,7 @@ proc handleStream(m: Mplex, chann: LPChannel) {.async.} = doAssert(chann.closed, "connection not closed by handler!") except CatchableError as exc: # This is top-level procedure which will work as separate task, so it - # do not need to propogate CancelledError. + # do not need to propagate CancelledError. trace "Exception in mplex stream handler", m, chann, msg = exc.msg await chann.reset() @@ -184,11 +184,13 @@ method handle*(m: Mplex) {.async, gcsafe.} = except CancelledError: # This procedure is spawned as task and it is not part of public API, so # there no way for this procedure to be cancelled implicitly. - debug "Unexpected cancellation in mplex handler", m + debug "Cancellation in mplex handler", m + except LPStreamClosedError as exc: + trace "Stream Closed", m, msg = exc.msg except LPStreamEOFError as exc: trace "Stream EOF", m, msg = exc.msg except CatchableError as exc: - warn "Unexpected exception in mplex read loop", m, msg = exc.msg + warn "Exception in mplex read loop", m, msg = exc.msg finally: await m.close() trace "Stopped mplex handler", m diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index 75d432741..228bd59a6 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -73,12 +73,12 @@ type msgSeqno*: uint64 anonymize*: bool # if we omit fromPeer and seqno from RPC messages we send -method unsubscribePeer*(p: PubSub, peerId: PeerID) {.base.} = +method unsubscribePeer*(p: PubSub, peer: PeerID) {.base.} = ## handle peer disconnects ## - trace "unsubscribing pubsub peer", peerId - p.peers.del(peerId) + trace "Unsubscribing pubsub peer", peer + p.peers.del(peer) libp2p_pubsub_peers.set(p.peers.len.int64) @@ -217,6 +217,7 @@ method subscribePeer*(p: PubSub, peer: PeerID) {.base.} = ## messages ## + trace "Subscribing peer", peer let peer = p.getOrCreatePeer(peer, p.codecs) peer.outbound = true # flag as outbound @@ -334,9 +335,9 @@ method validate*(p: PubSub, message: Message): Future[ValidationResult] {.async, if res != ValidationResult.Accept: result = res break - + case result - of ValidationResult.Accept: + of ValidationResult.Accept: libp2p_pubsub_validation_success.inc() of ValidationResult.Reject: libp2p_pubsub_validation_failure.inc() @@ -385,7 +386,7 @@ proc init*[PubParams: object | bool]( switch.addPeerEventHandler(peerEventHandler, PeerEvent.Left) pubsub.initPubSub() - + return pubsub diff --git a/libp2p/protocols/secure/secure.nim b/libp2p/protocols/secure/secure.nim index 53ab13600..810290648 100644 --- a/libp2p/protocols/secure/secure.nim +++ b/libp2p/protocols/secure/secure.nim @@ -42,7 +42,8 @@ proc init*(T: type SecureConn, peerInfo: peerInfo, observedAddr: observedAddr, closeEvent: conn.closeEvent, - timeout: timeout) + timeout: timeout, + dir: conn.dir) result.initStream() method initStream*(s: SecureConn) = @@ -52,6 +53,7 @@ method initStream*(s: SecureConn) = procCall Connection(s).initStream() method close*(s: SecureConn) {.async.} = + trace "closing secure conn", s, dir = s.dir if not(isNil(s.stream)): await s.stream.close() @@ -74,10 +76,6 @@ proc handleConn*(s: Secure, try: await conn.join() await sconn.close() - except CancelledError: - # This is top-level procedure which will work as separate task, so it - # do not need to propogate CancelledError. - discard except CatchableError as exc: trace "error cleaning up secure connection", err = exc.msg, sconn @@ -127,18 +125,18 @@ method readOnce*(s: SecureConn, if not isNil(err): if not (err of LPStreamEOFError): - warn "error while reading message from secure connection, closing.", - error=err.name, - message=err.msg, + warn "error while reading message from secure connection, closing.", + error=err.name, + message=err.msg, connection=s await s.close() raise err - + s.activity = true if buf.len == 0: raise newLPStreamIncompleteError() - + s.buf.add(buf) var p = cast[ptr UncheckedArray[byte]](pbytes) diff --git a/libp2p/stream/bufferstream.nim b/libp2p/stream/bufferstream.nim index 311ef81e6..eee95d6ec 100644 --- a/libp2p/stream/bufferstream.nim +++ b/libp2p/stream/bufferstream.nim @@ -7,29 +7,6 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. -## This module implements an asynchronous buffer stream -## which emulates physical async IO. -## -## The stream is based on the standard library's `Deque`, -## which is itself based on a ring buffer. -## -## It works by exposing a regular LPStream interface and -## a method ``pushTo`` to push data to the internal read -## buffer; as well as a handler that can be registered -## that gets triggered on every write to the stream. This -## allows using the buffered stream as a sort of proxy, -## which can be consumed as a regular LPStream but allows -## injecting data for reads and intercepting writes. -## -## Another notable feature is that the stream is fully -## ordered and asynchronous. Reads are queued up in order -## and are suspended when not enough data available. This -## allows preserving backpressure while maintaining full -## asynchrony. Both writing to the internal buffer with -## ``pushTo`` as well as reading with ``read*` methods, -## will suspend until either the amount of elements in the -## buffer goes below ``maxSize`` or more data becomes available. - import std/strformat import stew/byteutils import chronos, chronicles, metrics diff --git a/libp2p/stream/chronosstream.nim b/libp2p/stream/chronosstream.nim index b8f442fa1..c6817896f 100644 --- a/libp2p/stream/chronosstream.nim +++ b/libp2p/stream/chronosstream.nim @@ -32,23 +32,23 @@ method initStream*(s: ChronosStream) = s.objName = "ChronosStream" s.timeoutHandler = proc() {.async, gcsafe.} = - trace "idle timeout expired, closing ChronosStream" + trace "Idle timeout expired, closing ChronosStream", s await s.close() procCall Connection(s).initStream() proc init*(C: type ChronosStream, client: StreamTransport, + dir: Direction, timeout = DefaultChronosStreamTimeout): ChronosStream = result = C(client: client, - timeout: timeout) + timeout: timeout, + dir: dir) result.initStream() template withExceptions(body: untyped) = try: body - except CancelledError as exc: - raise exc except TransportIncompleteError: # for all intents and purposes this is an EOF raise newLPStreamIncompleteError() diff --git a/libp2p/stream/connection.nim b/libp2p/stream/connection.nim index f55e07e60..2ec306242 100644 --- a/libp2p/stream/connection.nim +++ b/libp2p/stream/connection.nim @@ -7,7 +7,7 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. -import std/[hashes, oids, strformat] +import std/[hashes, oids, strformat, sugar] import chronicles, chronos, metrics import lpstream, ../multiaddress, @@ -25,9 +25,6 @@ const type TimeoutHandler* = proc(): Future[void] {.gcsafe.} - Direction* {.pure.} = enum - None, In, Out - Connection* = ref object of LPStream activity*: bool # reset every time data is sent or received timeout*: Duration # channel timeout if no activity @@ -35,7 +32,6 @@ type timeoutHandler*: TimeoutHandler # timeout handler peerInfo*: PeerInfo observedAddr*: Multiaddress - dir*: Direction ConnectionTracker* = ref object of TrackerBase opened*: uint64 @@ -85,7 +81,9 @@ method initStream*(s: Connection) = s.timerTaskFut = s.timeoutMonitor() if isNil(s.timeoutHandler): - s.timeoutHandler = proc(): Future[void] = s.close() + s.timeoutHandler = proc(): Future[void] = + trace "Idle timeout expired, closing connection", s + s.close() inc getConnectionTracker().opened @@ -104,7 +102,7 @@ func hash*(p: Connection): Hash = cast[pointer](p).hash proc timeoutMonitor(s: Connection) {.async, gcsafe.} = - ## monitor the channel for innactivity + ## monitor the channel for inactivity ## ## if the timeout was hit, it means that ## neither incoming nor outgoing activity @@ -125,9 +123,10 @@ proc timeoutMonitor(s: Connection) {.async, gcsafe.} = break - # reset channel on innactivity timeout + # reset channel on inactivity timeout trace "Connection timed out", s if not(isNil(s.timeoutHandler)): + trace "Calling timeout handler", s await s.timeoutHandler() except CancelledError as exc: diff --git a/libp2p/stream/lpstream.nim b/libp2p/stream/lpstream.nim index c19760c4a..b53b67d65 100644 --- a/libp2p/stream/lpstream.nim +++ b/libp2p/stream/lpstream.nim @@ -15,7 +15,8 @@ import ../varint, ../peerinfo, ../multiaddress -declareGauge(libp2p_open_streams, "open stream instances", labels = ["type"]) +declareGauge(libp2p_open_streams, + "open stream instances", labels = ["type", "dir"]) export oids @@ -23,12 +24,16 @@ logScope: topics = "lpstream" type + Direction* {.pure.} = enum + In, Out + LPStream* = ref object of RootObj closeEvent*: AsyncEvent isClosed*: bool isEof*: bool objName*: string oid*: Oid + dir*: Direction LPStreamError* = object of CatchableError LPStreamIncompleteError* = object of LPStreamError @@ -86,8 +91,8 @@ method initStream*(s: LPStream) {.base.} = s.closeEvent = newAsyncEvent() s.oid = genOid() - libp2p_open_streams.inc(labelValues = [s.objName]) - trace "Stream created", s, objName = s.objName + libp2p_open_streams.inc(labelValues = [s.objName, $s.dir]) + trace "Stream created", s, objName = s.objName, dir = $s.dir proc join*(s: LPStream): Future[void] = s.closeEvent.wait() @@ -214,8 +219,8 @@ method closeImpl*(s: LPStream): Future[void] {.async, base.} = ## Implementation of close - called only once trace "Closing stream", s, objName = s.objName s.closeEvent.fire() - libp2p_open_streams.dec(labelValues = [s.objName]) - trace "Closed stream", s, objName = s.objName + libp2p_open_streams.dec(labelValues = [s.objName, $s.dir]) + trace "Closed stream", s, objName = s.objName, dir = $s.dir method close*(s: LPStream): Future[void] {.base, async.} = # {.raises [Defect].} ## close the stream - this may block, but will not raise exceptions @@ -223,6 +228,7 @@ method close*(s: LPStream): Future[void] {.base, async.} = # {.raises [Defect].} if s.isClosed: trace "Already closed", s return + s.isClosed = true # Set flag before performing virtual close # An separate implementation method is used so that even when derived types diff --git a/libp2p/switch.nim b/libp2p/switch.nim index 70e998f9a..8783a8782 100644 --- a/libp2p/switch.nim +++ b/libp2p/switch.nim @@ -292,10 +292,10 @@ proc internalConnect(s: Switch, let dialed = try: await t.dial(a) except CancelledError as exc: - trace "Dialing canceled", msg = exc.msg, peerId + trace "Dialing canceled", msg = exc.msg, peerId, address = $a raise exc except CatchableError as exc: - trace "Dialing failed", msg = exc.msg, peerId + trace "Dialing failed", msg = exc.msg, peerId, address = $a libp2p_failed_dials.inc() continue # Try the next address diff --git a/libp2p/transports/tcptransport.nim b/libp2p/transports/tcptransport.nim index 3e62e5b2a..8f989aa02 100644 --- a/libp2p/transports/tcptransport.nim +++ b/libp2p/transports/tcptransport.nim @@ -62,31 +62,57 @@ proc setupTcpTransportTracker(): TcpTransportTracker = proc connHandler*(t: TcpTransport, client: StreamTransport, initiator: bool): Connection = - trace "handling connection", address = $client.remoteAddress - let conn: Connection = Connection(ChronosStream.init(client)) - conn.observedAddr = MultiAddress.init(client.remoteAddress).tryGet() + debug "Handling tcp connection", address = $client.remoteAddress, + initiator = initiator, + clients = t.clients.len + + let conn = Connection( + ChronosStream.init( + client, + dir = if initiator: + Direction.Out + else: + Direction.In)) + if not initiator: if not isNil(t.handler): t.handlers &= t.handler(conn) proc cleanup() {.async.} = try: - await client.join() - trace "cleaning up client", addrs = $client.remoteAddress, connoid = $conn.oid - if not(isNil(conn)): + await client.join() or conn.join() + trace "Cleaning up client", addrs = $client.remoteAddress, + conn = $conn.oid + + t.clients.keepItIf( it != client ) + if not(isNil(conn) and not conn.closed()): await conn.close() - t.clients.keepItIf(it != client) - except CancelledError: - # This is top-level procedure which will work as separate task, so it - # do not need to propogate CancelledError. - trace "Unexpected cancellation in transport's cleanup" + + if not(isNil(client) and client.closed()): + await client.closeWait() + + trace "Cleaned up client", addrs = $client.remoteAddress, + conn = $conn.oid + except CatchableError as exc: - trace "error cleaning up client", exc = exc.msg + let useExc {.used.} = exc + debug "Error cleaning up client", errMsg = exc.msg, s = conn t.clients.add(client) # All the errors are handled inside `cleanup()` procedure. asyncSpawn cleanup() - result = conn + + try: + conn.observedAddr = MultiAddress.init(client.remoteAddress).tryGet() + except CatchableError as exc: + trace "Unable to get remote address", exc = exc.msg + + if not isNil(client): + client.close() + + raise exc + + return conn proc connCb(server: StreamServer, client: StreamTransport) {.async, gcsafe.} = @@ -97,10 +123,12 @@ proc connCb(server: StreamServer, # as it's added inside connHandler discard t.connHandler(client, false) except CancelledError as exc: + debug "Connection setup cancelled", exc = exc.msg + await client.closeWait() raise exc - except CatchableError as err: - debug "Connection setup failed", err = err.msg - client.close() + except CatchableError as exc: + debug "Connection setup failed", exc = exc.msg + await client.closeWait() proc init*(T: type TcpTransport, flags: set[ServerFlags] = {}): T = result = T(flags: flags) @@ -169,8 +197,16 @@ method dial*(t: TcpTransport, Future[Connection] {.async, gcsafe.} = trace "dialing remote peer", address = $address ## dial a peer - let client: StreamTransport = await connect(address) - result = t.connHandler(client, true) + var client: StreamTransport + try: + client = await connect(address) + except CatchableError as exc: + trace "Exception dialing peer", exc = exc.msg + if not(isNil(client)): + await client.closeWait() + raise exc + + return t.connHandler(client, true) method handles*(t: TcpTransport, address: MultiAddress): bool {.gcsafe.} = if procCall Transport(t).handles(address):