diff --git a/libp2p/muxers/mplex/lpchannel.nim b/libp2p/muxers/mplex/lpchannel.nim index 771a4e3a3..41be7cbad 100644 --- a/libp2p/muxers/mplex/lpchannel.nim +++ b/libp2p/muxers/mplex/lpchannel.nim @@ -189,14 +189,11 @@ proc closeRemote*(s: LPChannel) {.async.} = # stack = getStackTrace() trace "got EOF, closing channel" - - # wait for all data in the buffer to be consumed - while s.len > 0: - await s.dataReadEvent.wait() - s.dataReadEvent.clear() + await s.drainBuffer() s.isEof = true # set EOF immediately to prevent further reads await s.close() # close local end + # call to avoid leaks await procCall BufferStream(s).close() # close parent bufferstream trace "channel closed on EOF" @@ -227,7 +224,11 @@ method reset*(s: LPChannel) {.base, async, gcsafe.} = # might be dead already - reset is always # optimistic asyncCheck s.resetMessage() + + # drain the buffer before closing + await s.drainBuffer() await procCall BufferStream(s).close() + s.isEof = true s.closedLocal = true @@ -254,11 +255,11 @@ method close*(s: LPChannel) {.async, gcsafe.} = if s.atEof: # already closed by remote close parent buffer immediately await procCall BufferStream(s).close() except CancelledError as exc: - await s.reset() # reset on timeout + await s.reset() raise exc except CatchableError as exc: trace "exception closing channel" - await s.reset() # reset on timeout + await s.reset() trace "lpchannel closed local" diff --git a/libp2p/stream/bufferstream.nim b/libp2p/stream/bufferstream.nim index c15fa7bf5..41c51f6ed 100644 --- a/libp2p/stream/bufferstream.nim +++ b/libp2p/stream/bufferstream.nim @@ -198,6 +198,15 @@ method pushTo*(s: BufferStream, data: seq[byte]) {.base, async.} = await s.dataReadEvent.wait() s.dataReadEvent.clear() +proc drainBuffer*(s: BufferStream) {.async.} = + ## wait for all data in the buffer to be consumed + ## + + trace "draining buffer", len = s.len + while s.len > 0: + await s.dataReadEvent.wait() + s.dataReadEvent.clear() + method readOnce*(s: BufferStream, pbytes: pointer, nbytes: int): diff --git a/libp2p/stream/lpstream.nim b/libp2p/stream/lpstream.nim index f7a4eda80..68bd47ef1 100644 --- a/libp2p/stream/lpstream.nim +++ b/libp2p/stream/lpstream.nim @@ -102,22 +102,32 @@ method readOnce*(s: LPStream, doAssert(false, "not implemented!") proc readExactly*(s: LPStream, - pbytes: pointer, - nbytes: int): - Future[void] {.async.} = + pbytes: pointer, + nbytes: int): + Future[void] {.async.} = if s.atEof: raise newLPStreamEOFError() + logScope: + nbytes = nbytes + obName = s.objName + stack = getStackTrace() + oid = $s.oid + var pbuffer = cast[ptr UncheckedArray[byte]](pbytes) var read = 0 while read < nbytes and not(s.atEof()): read += await s.readOnce(addr pbuffer[read], nbytes - read) if read < nbytes: + trace "incomplete data received", read raise newLPStreamIncompleteError() -proc readLine*(s: LPStream, limit = 0, sep = "\r\n"): Future[string] {.async, deprecated: "todo".} = +proc readLine*(s: LPStream, + limit = 0, + sep = "\r\n"): Future[string] + {.async, deprecated: "todo".} = # TODO replace with something that exploits buffering better var lim = if limit <= 0: -1 else: limit var state = 0