diff --git a/libp2p/protocols/secure/secure.nim b/libp2p/protocols/secure/secure.nim index 53ab13600..0d83849ab 100644 --- a/libp2p/protocols/secure/secure.nim +++ b/libp2p/protocols/secure/secure.nim @@ -42,7 +42,8 @@ proc init*(T: type SecureConn, peerInfo: peerInfo, observedAddr: observedAddr, closeEvent: conn.closeEvent, - timeout: timeout) + timeout: timeout, + dir: conn.dir) result.initStream() method initStream*(s: SecureConn) = @@ -52,6 +53,7 @@ method initStream*(s: SecureConn) = procCall Connection(s).initStream() method close*(s: SecureConn) {.async.} = + trace "Closing secure conn", s, dir = s.dir if not(isNil(s.stream)): await s.stream.close() @@ -76,7 +78,7 @@ proc handleConn*(s: Secure, await sconn.close() except CancelledError: # This is top-level procedure which will work as separate task, so it - # do not need to propogate CancelledError. + # do not need to propagate CancelledError. discard except CatchableError as exc: trace "error cleaning up secure connection", err = exc.msg, sconn @@ -127,18 +129,18 @@ method readOnce*(s: SecureConn, if not isNil(err): if not (err of LPStreamEOFError): - warn "error while reading message from secure connection, closing.", - error=err.name, - message=err.msg, + warn "Error while reading message from secure connection, closing.", + error=err.name, + message=err.msg, connection=s await s.close() raise err - + s.activity = true if buf.len == 0: raise newLPStreamIncompleteError() - + s.buf.add(buf) var p = cast[ptr UncheckedArray[byte]](pbytes) diff --git a/libp2p/stream/bufferstream.nim b/libp2p/stream/bufferstream.nim index 311ef81e6..eee95d6ec 100644 --- a/libp2p/stream/bufferstream.nim +++ b/libp2p/stream/bufferstream.nim @@ -7,29 +7,6 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. -## This module implements an asynchronous buffer stream -## which emulates physical async IO. -## -## The stream is based on the standard library's `Deque`, -## which is itself based on a ring buffer. -## -## It works by exposing a regular LPStream interface and -## a method ``pushTo`` to push data to the internal read -## buffer; as well as a handler that can be registered -## that gets triggered on every write to the stream. This -## allows using the buffered stream as a sort of proxy, -## which can be consumed as a regular LPStream but allows -## injecting data for reads and intercepting writes. -## -## Another notable feature is that the stream is fully -## ordered and asynchronous. Reads are queued up in order -## and are suspended when not enough data available. This -## allows preserving backpressure while maintaining full -## asynchrony. Both writing to the internal buffer with -## ``pushTo`` as well as reading with ``read*` methods, -## will suspend until either the amount of elements in the -## buffer goes below ``maxSize`` or more data becomes available. - import std/strformat import stew/byteutils import chronos, chronicles, metrics diff --git a/libp2p/stream/chronosstream.nim b/libp2p/stream/chronosstream.nim index b8f442fa1..5f8f321d7 100644 --- a/libp2p/stream/chronosstream.nim +++ b/libp2p/stream/chronosstream.nim @@ -32,16 +32,18 @@ method initStream*(s: ChronosStream) = s.objName = "ChronosStream" s.timeoutHandler = proc() {.async, gcsafe.} = - trace "idle timeout expired, closing ChronosStream" + trace "Idle timeout expired, closing ChronosStream", s await s.close() procCall Connection(s).initStream() proc init*(C: type ChronosStream, client: StreamTransport, + dir: Direction, timeout = DefaultChronosStreamTimeout): ChronosStream = result = C(client: client, - timeout: timeout) + timeout: timeout, + dir: dir) result.initStream() template withExceptions(body: untyped) = @@ -94,13 +96,17 @@ method atEof*(s: ChronosStream): bool {.inline.} = method closeImpl*(s: ChronosStream) {.async.} = try: - trace "shutting down chronos stream", address = $s.client.remoteAddress(), + trace "Shutting down chronos stream", address = $s.client.remoteAddress(), s if not s.client.closed(): await s.client.closeWait() + + trace "Shutdown chronos stream", address = $s.client.remoteAddress(), + s + except CancelledError as exc: raise exc except CatchableError as exc: - trace "error closing chronosstream", s, msg = exc.msg + trace "Error closing chronosstream", s, msg = exc.msg await procCall Connection(s).closeImpl() diff --git a/libp2p/stream/connection.nim b/libp2p/stream/connection.nim index f55e07e60..504b1cc58 100644 --- a/libp2p/stream/connection.nim +++ b/libp2p/stream/connection.nim @@ -7,7 +7,7 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. -import std/[hashes, oids, strformat] +import std/[hashes, oids, strformat, sugar] import chronicles, chronos, metrics import lpstream, ../multiaddress, @@ -25,9 +25,6 @@ const type TimeoutHandler* = proc(): Future[void] {.gcsafe.} - Direction* {.pure.} = enum - None, In, Out - Connection* = ref object of LPStream activity*: bool # reset every time data is sent or received timeout*: Duration # channel timeout if no activity @@ -35,7 +32,6 @@ type timeoutHandler*: TimeoutHandler # timeout handler peerInfo*: PeerInfo observedAddr*: Multiaddress - dir*: Direction ConnectionTracker* = ref object of TrackerBase opened*: uint64 @@ -85,7 +81,9 @@ method initStream*(s: Connection) = s.timerTaskFut = s.timeoutMonitor() if isNil(s.timeoutHandler): - s.timeoutHandler = proc(): Future[void] = s.close() + s.timeoutHandler = proc(): Future[void] = + trace "Idle timeout expired, closing connection", s + s.close() inc getConnectionTracker().opened @@ -96,7 +94,7 @@ method closeImpl*(s: Connection): Future[void] = s.timerTaskFut.cancel() inc getConnectionTracker().closed - trace "Closed connection" + trace "Closed connection", s procCall LPStream(s).closeImpl() @@ -104,7 +102,7 @@ func hash*(p: Connection): Hash = cast[pointer](p).hash proc timeoutMonitor(s: Connection) {.async, gcsafe.} = - ## monitor the channel for innactivity + ## monitor the channel for inactivity ## ## if the timeout was hit, it means that ## neither incoming nor outgoing activity @@ -125,9 +123,10 @@ proc timeoutMonitor(s: Connection) {.async, gcsafe.} = break - # reset channel on innactivity timeout + # reset channel on inactivity timeout trace "Connection timed out", s if not(isNil(s.timeoutHandler)): + trace "Calling timeout handler", s await s.timeoutHandler() except CancelledError as exc: diff --git a/libp2p/stream/lpstream.nim b/libp2p/stream/lpstream.nim index c19760c4a..fc7d33f53 100644 --- a/libp2p/stream/lpstream.nim +++ b/libp2p/stream/lpstream.nim @@ -15,7 +15,8 @@ import ../varint, ../peerinfo, ../multiaddress -declareGauge(libp2p_open_streams, "open stream instances", labels = ["type"]) +declareGauge(libp2p_open_streams, + "open stream instances", labels = ["type", "dir"]) export oids @@ -23,12 +24,16 @@ logScope: topics = "lpstream" type + Direction* {.pure.} = enum + In, Out + LPStream* = ref object of RootObj closeEvent*: AsyncEvent isClosed*: bool isEof*: bool objName*: string oid*: Oid + dir*: Direction LPStreamError* = object of CatchableError LPStreamIncompleteError* = object of LPStreamError @@ -86,8 +91,8 @@ method initStream*(s: LPStream) {.base.} = s.closeEvent = newAsyncEvent() s.oid = genOid() - libp2p_open_streams.inc(labelValues = [s.objName]) - trace "Stream created", s, objName = s.objName + libp2p_open_streams.inc(labelValues = [s.objName, $s.dir]) + trace "Stream created", s, objName = s.objName, dir = $s.dir proc join*(s: LPStream): Future[void] = s.closeEvent.wait() @@ -212,10 +217,10 @@ proc write*(s: LPStream, msg: string): Future[void] = method closeImpl*(s: LPStream): Future[void] {.async, base.} = ## Implementation of close - called only once - trace "Closing stream", s, objName = s.objName + trace "Closing stream", s, objName = s.objName, dir = $s.dir s.closeEvent.fire() - libp2p_open_streams.dec(labelValues = [s.objName]) - trace "Closed stream", s, objName = s.objName + libp2p_open_streams.dec(labelValues = [s.objName, $s.dir]) + trace "Closed stream", s, objName = s.objName, dir = $s.dir method close*(s: LPStream): Future[void] {.base, async.} = # {.raises [Defect].} ## close the stream - this may block, but will not raise exceptions @@ -223,6 +228,7 @@ method close*(s: LPStream): Future[void] {.base, async.} = # {.raises [Defect].} if s.isClosed: trace "Already closed", s return + s.isClosed = true # Set flag before performing virtual close # An separate implementation method is used so that even when derived types diff --git a/libp2p/transports/tcptransport.nim b/libp2p/transports/tcptransport.nim index 3e62e5b2a..eda2e18aa 100644 --- a/libp2p/transports/tcptransport.nim +++ b/libp2p/transports/tcptransport.nim @@ -63,7 +63,15 @@ proc connHandler*(t: TcpTransport, client: StreamTransport, initiator: bool): Connection = trace "handling connection", address = $client.remoteAddress - let conn: Connection = Connection(ChronosStream.init(client)) + + let conn = Connection( + ChronosStream.init( + client, + dir = if initiator: + Direction.Out + else: + Direction.In)) + conn.observedAddr = MultiAddress.init(client.remoteAddress).tryGet() if not initiator: if not isNil(t.handler): diff --git a/libp2p/transports/transport.nim b/libp2p/transports/transport.nim index a570d79a4..e34df334a 100644 --- a/libp2p/transports/transport.nim +++ b/libp2p/transports/transport.nim @@ -6,6 +6,7 @@ ## at your option. ## This file may not be copied, modified, or distributed except according to ## those terms. +## import sequtils import chronos, chronicles