diff --git a/libp2p/connection.nim b/libp2p/connection.nim index 7f8113ee5..be49a1dc4 100644 --- a/libp2p/connection.nim +++ b/libp2p/connection.nim @@ -44,11 +44,16 @@ proc init*[T: Connection](self: var T, stream: LPStream) = # bind stream's close event to connection's close # to ensure correct close propagation let this = self - if not isNil(self.stream.closeEvent): - self.stream.closeEvent.wait(). + if not isNil(stream.closeEvent): + stream.closeEvent.wait(). addCallback do (udata: pointer): - if not this.closed: - trace "wrapped stream closed, closing conn" + trace "wrapped stream closed, about to close conn", closed = this.isClosed, + peer = if not isNil(this.peerInfo): + this.peerInfo.id else: "" + if not this.isClosed: + trace "wrapped stream closed, closing conn", closed = this.isClosed, + peer = if not isNil(this.peerInfo): + this.peerInfo.id else: "" asyncCheck this.close() proc newConnection*(stream: LPStream): Connection = @@ -108,13 +113,21 @@ method closed*(s: Connection): bool = result = s.stream.closed method close*(s: Connection) {.async, gcsafe.} = - trace "closing connection" + trace "closing connection", closed = s.closed, + peer = if not isNil(s.peerInfo): + s.peerInfo.id else: "" if not s.closed: if not isNil(s.stream) and not s.stream.closed: + trace "closing connection", closed = s.closed, + peer = if not isNil(s.peerInfo): + s.peerInfo.id else: "" await s.stream.close() + s.closeEvent.fire() s.isClosed = true - trace "connection closed", closed = s.closed + trace "connection closed", closed = s.closed, + peer = if not isNil(s.peerInfo): + s.peerInfo.id else: "" proc readLp*(s: Connection): Future[seq[byte]] {.async, gcsafe.} = ## read lenght prefixed msg diff --git a/libp2p/protocols/secure/secure.nim b/libp2p/protocols/secure/secure.nim index 5c6815849..980e8d0c6 100644 --- a/libp2p/protocols/secure/secure.nim +++ b/libp2p/protocols/secure/secure.nim @@ -19,6 +19,8 @@ import ../protocol, type Secure* = ref object of LPProtocol # base type for secure managers + cleanupFut: Future[void] + SecureConn* = ref object of Connection method readMessage*(c: SecureConn): Future[seq[byte]] {.async, base.} = @@ -32,8 +34,9 @@ method handshake(s: Secure, initiator: bool = false): Future[SecureConn] {.async, base.} = doAssert(false, "Not implemented!") -proc readLoop(sconn: SecureConn, stream: BufferStream) {.async.} = +proc readLoop(sconn: SecureConn, conn: Connection) {.async.} = try: + let stream = BufferStream(conn.stream) while not sconn.closed: let msg = await sconn.readMessage() if msg.len == 0: @@ -44,6 +47,9 @@ proc readLoop(sconn: SecureConn, stream: BufferStream) {.async.} = except CatchableError as exc: trace "Exception occurred Secure.readLoop", exc = exc.msg finally: + if not conn.closed: + await conn.close() + if not sconn.closed: await sconn.close() trace "ending Secure readLoop", isclosed = sconn.closed() @@ -54,14 +60,8 @@ proc handleConn*(s: Secure, conn: Connection, initiator: bool = false): Future[C trace "sending encrypted bytes", bytes = data.shortLog await sconn.writeMessage(data) - var stream = newBufferStream(writeHandler) - asyncCheck readLoop(sconn, stream) - result = newConnection(stream) - result.closeEvent.wait() - .addCallback do (udata: pointer): - trace "wrapped connection closed, closing upstream" - if not isNil(sconn) and not sconn.closed: - asyncCheck sconn.close() + result = newConnection(newBufferStream(writeHandler)) + asyncCheck readLoop(sconn, result) if not isNil(sconn.peerInfo) and sconn.peerInfo.publicKey.isSome: result.peerInfo = PeerInfo.init(sconn.peerInfo.publicKey.get())