diff --git a/libp2p/muxers/mplex/lpchannel.nim b/libp2p/muxers/mplex/lpchannel.nim index 1a4a26b32..88c46cbec 100644 --- a/libp2p/muxers/mplex/lpchannel.nim +++ b/libp2p/muxers/mplex/lpchannel.nim @@ -160,10 +160,12 @@ method readOnce*(s: LPChannel, raise exc method write*(s: LPChannel, msg: seq[byte]): Future[void] {.async.} = - if s.closedLocal: + if s.closedLocal or s.conn.closed: raise newLPStreamClosedError() - doAssert msg.len > 0 + if msg.len == 0: + return + try: if not s.isOpen: await s.open() diff --git a/libp2p/protocols/identify.nim b/libp2p/protocols/identify.nim index 8a5aa273a..9af145eae 100644 --- a/libp2p/protocols/identify.nim +++ b/libp2p/protocols/identify.nim @@ -105,10 +105,6 @@ proc newIdentify*(peerInfo: PeerInfo): Identify = method init*(p: Identify) = proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} = try: - defer: - trace "exiting identify handler", conn - await conn.close() - trace "handling identify request", conn var pb = encodeMsg(p.peerInfo, conn.observedAddr) await conn.writeLp(pb.buffer) @@ -116,6 +112,9 @@ method init*(p: Identify) = raise exc except CatchableError as exc: trace "exception in identify handler", exc = exc.msg, conn + finally: + trace "exiting identify handler", conn + await conn.closeWithEOF() p.handler = handle p.codec = IdentifyCodec diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index c840c85f9..48dc28066 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -205,7 +205,7 @@ method handleConn*(p: PubSub, except CatchableError as exc: trace "exception ocurred in pubsub handle", exc = exc.msg, conn finally: - await conn.close() + await conn.closeWithEOF() method subscribePeer*(p: PubSub, peer: PeerID) {.base.} = ## subscribe to remote peer to receive/send pubsub diff --git a/libp2p/stream/chronosstream.nim b/libp2p/stream/chronosstream.nim index 7250c13d7..b8f442fa1 100644 --- a/libp2p/stream/chronosstream.nim +++ b/libp2p/stream/chronosstream.nim @@ -76,14 +76,16 @@ method write*(s: ChronosStream, msg: seq[byte]) {.async.} = return withExceptions: - var written = 0 - while not s.client.closed and written < msg.len: - written += await s.client.write(msg[written..