Drain buffer (#264)

* drain lpchannel on reset

* move drainBuffer to bufferstream
This commit is contained in:
Dmitriy Ryajov 2020-07-12 10:37:10 -06:00 committed by GitHub
parent 503a7ec1c5
commit 181cf73ca7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 31 additions and 11 deletions

View File

@ -189,14 +189,11 @@ proc closeRemote*(s: LPChannel) {.async.} =
# stack = getStackTrace() # stack = getStackTrace()
trace "got EOF, closing channel" trace "got EOF, closing channel"
await s.drainBuffer()
# wait for all data in the buffer to be consumed
while s.len > 0:
await s.dataReadEvent.wait()
s.dataReadEvent.clear()
s.isEof = true # set EOF immediately to prevent further reads s.isEof = true # set EOF immediately to prevent further reads
await s.close() # close local end await s.close() # close local end
# call to avoid leaks # call to avoid leaks
await procCall BufferStream(s).close() # close parent bufferstream await procCall BufferStream(s).close() # close parent bufferstream
trace "channel closed on EOF" trace "channel closed on EOF"
@ -227,7 +224,11 @@ method reset*(s: LPChannel) {.base, async, gcsafe.} =
# might be dead already - reset is always # might be dead already - reset is always
# optimistic # optimistic
asyncCheck s.resetMessage() asyncCheck s.resetMessage()
# drain the buffer before closing
await s.drainBuffer()
await procCall BufferStream(s).close() await procCall BufferStream(s).close()
s.isEof = true s.isEof = true
s.closedLocal = true s.closedLocal = true
@ -254,11 +255,11 @@ method close*(s: LPChannel) {.async, gcsafe.} =
if s.atEof: # already closed by remote close parent buffer immediately if s.atEof: # already closed by remote close parent buffer immediately
await procCall BufferStream(s).close() await procCall BufferStream(s).close()
except CancelledError as exc: except CancelledError as exc:
await s.reset() # reset on timeout await s.reset()
raise exc raise exc
except CatchableError as exc: except CatchableError as exc:
trace "exception closing channel" trace "exception closing channel"
await s.reset() # reset on timeout await s.reset()
trace "lpchannel closed local" trace "lpchannel closed local"

View File

@ -198,6 +198,15 @@ method pushTo*(s: BufferStream, data: seq[byte]) {.base, async.} =
await s.dataReadEvent.wait() await s.dataReadEvent.wait()
s.dataReadEvent.clear() s.dataReadEvent.clear()
proc drainBuffer*(s: BufferStream) {.async.} =
## wait for all data in the buffer to be consumed
##
trace "draining buffer", len = s.len
while s.len > 0:
await s.dataReadEvent.wait()
s.dataReadEvent.clear()
method readOnce*(s: BufferStream, method readOnce*(s: BufferStream,
pbytes: pointer, pbytes: pointer,
nbytes: int): nbytes: int):

View File

@ -102,22 +102,32 @@ method readOnce*(s: LPStream,
doAssert(false, "not implemented!") doAssert(false, "not implemented!")
proc readExactly*(s: LPStream, proc readExactly*(s: LPStream,
pbytes: pointer, pbytes: pointer,
nbytes: int): nbytes: int):
Future[void] {.async.} = Future[void] {.async.} =
if s.atEof: if s.atEof:
raise newLPStreamEOFError() raise newLPStreamEOFError()
logScope:
nbytes = nbytes
obName = s.objName
stack = getStackTrace()
oid = $s.oid
var pbuffer = cast[ptr UncheckedArray[byte]](pbytes) var pbuffer = cast[ptr UncheckedArray[byte]](pbytes)
var read = 0 var read = 0
while read < nbytes and not(s.atEof()): while read < nbytes and not(s.atEof()):
read += await s.readOnce(addr pbuffer[read], nbytes - read) read += await s.readOnce(addr pbuffer[read], nbytes - read)
if read < nbytes: if read < nbytes:
trace "incomplete data received", read
raise newLPStreamIncompleteError() raise newLPStreamIncompleteError()
proc readLine*(s: LPStream, limit = 0, sep = "\r\n"): Future[string] {.async, deprecated: "todo".} = proc readLine*(s: LPStream,
limit = 0,
sep = "\r\n"): Future[string]
{.async, deprecated: "todo".} =
# TODO replace with something that exploits buffering better # TODO replace with something that exploits buffering better
var lim = if limit <= 0: -1 else: limit var lim = if limit <= 0: -1 else: limit
var state = 0 var state = 0