diff --git a/libp2p/connection.nim b/libp2p/connection.nim index 8073a87b3..4ad20e0ae 100644 --- a/libp2p/connection.nim +++ b/libp2p/connection.nim @@ -35,26 +35,28 @@ proc newInvalidVarintException*(): ref InvalidVarintException = proc newInvalidVarintSizeException*(): ref InvalidVarintSizeException = newException(InvalidVarintSizeException, "Wrong varint size") -proc init*[T: Connection](self: var T, stream: LPStream) = +proc bindStreamClose(conn: Connection) {.async.} = + # bind stream's close event to connection's close + # to ensure correct close propagation + if not isNil(conn.stream.closeEvent): + await conn.stream.closeEvent.wait() + trace "wrapped stream closed, about to close conn", closed = this.isClosed, + peer = if not isNil(this.peerInfo): + this.peerInfo.id else: "" + if not conn.isClosed: + trace "wrapped stream closed, closing conn", closed = this.isClosed, + peer = if not isNil(this.peerInfo): + this.peerInfo.id else: "" + asyncCheck conn.close() + +proc init*[T: Connection](self: var T, stream: LPStream): T = ## create a new Connection for the specified async reader/writer new self self.stream = stream self.closeEvent = newAsyncEvent() + asyncCheck self.bindStreamClose() - # bind stream's close event to connection's close - # to ensure correct close propagation - let this = self - if not isNil(stream.closeEvent): - stream.closeEvent.wait(). - addCallback do (udata: pointer): - trace "wrapped stream closed, about to close conn", closed = this.isClosed, - peer = if not isNil(this.peerInfo): - this.peerInfo.id else: "" - if not this.isClosed: - trace "wrapped stream closed, closing conn", closed = this.isClosed, - peer = if not isNil(this.peerInfo): - this.peerInfo.id else: "" - asyncCheck this.close() + return self proc newConnection*(stream: LPStream): Connection = ## create a new Connection for the specified async reader/writer @@ -119,9 +121,9 @@ method close*(s: Connection) {.async, gcsafe.} = if not s.closed: if not isNil(s.stream) and not s.stream.closed: - trace "closing connection", closed = s.closed, - peer = if not isNil(s.peerInfo): - s.peerInfo.id else: "" + trace "closing child stream", closed = s.closed, + peer = if not isNil(s.peerInfo): + s.peerInfo.id else: "" await s.stream.close() s.closeEvent.fire() diff --git a/libp2p/muxers/mplex/mplex.nim b/libp2p/muxers/mplex/mplex.nim index 50acef392..6b4745827 100644 --- a/libp2p/muxers/mplex/mplex.nim +++ b/libp2p/muxers/mplex/mplex.nim @@ -128,6 +128,11 @@ method handle*(m: Mplex) {.async, gcsafe.} = trace "stopping mplex main loop" await m.close() +proc internalCleanup(m: Mplex, conn: Connection) {.async.} = + await conn.closeEvent.wait() + trace "connection closed, cleaning up mplex" + await m.close() + proc newMplex*(conn: Connection, maxChanns: uint = MaxChannels): Mplex = new result @@ -136,11 +141,7 @@ proc newMplex*(conn: Connection, result.remote = initTable[uint64, LPChannel]() result.local = initTable[uint64, LPChannel]() - let m = result - conn.closeEvent.wait() - .addCallback do (udata: pointer): - trace "connection closed, cleaning up mplex" - asyncCheck m.close() + asyncCheck result.internalCleanup(conn) method newStream*(m: Mplex, name: string = "", diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index 9285ae93b..636322497 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -138,6 +138,14 @@ method handleConn*(p: PubSub, trace "pubsub peer handler ended, cleaning up" await p.cleanUpHelper(peer) +proc internalClenaup(p: PubSub, conn: Connection) {.async.} = + # handle connection close + var peer = p.getPeer(conn.peerInfo, p.codec) + await conn.closeEvent.wait() + trace "connection closed, cleaning up peer", peer = conn.peerInfo.id + + await p.cleanUpHelper(peer) + method subscribeToPeer*(p: PubSub, conn: Connection) {.base, async.} = var peer = p.getPeer(conn.peerInfo, p.codec) @@ -145,13 +153,7 @@ method subscribeToPeer*(p: PubSub, if not peer.isConnected: peer.conn = conn - # handle connection close - conn.closeEvent.wait() - .addCallback do (udata: pointer = nil): - trace "connection closed, cleaning up peer", - peer = conn.peerInfo.id - - asyncCheck p.cleanUpHelper(peer) + asyncCheck p.internalClenaup(conn) method unsubscribe*(p: PubSub, topics: seq[TopicPair]) {.base, async.} =