From f1cf6d36fc6c09357566c67a7093922ddb0626ec Mon Sep 17 00:00:00 2001 From: cheatfate Date: Tue, 24 Jul 2018 16:23:35 +0300 Subject: [PATCH] Fix double completion issue in reading procedures. --- asyncdispatch2/transports/stream.nim | 28 ++++++---------------------- 1 file changed, 6 insertions(+), 22 deletions(-) diff --git a/asyncdispatch2/transports/stream.nim b/asyncdispatch2/transports/stream.nim index 4ad1af7b..99dea6ec 100644 --- a/asyncdispatch2/transports/stream.nim +++ b/asyncdispatch2/transports/stream.nim @@ -132,10 +132,6 @@ template setReadError(t, e: untyped) = (t).state.incl(ReadError) (t).error = newException(TransportOsError, osErrorMsg((e))) -template finishReader(t: untyped) = - var reader = (t).reader - reader.complete() - template checkPending(t: untyped) = if not isNil((t).reader): raise newException(TransportError, "Read operation already pending!") @@ -319,7 +315,8 @@ when defined(windows): transp.setReadError(err) if not isNil(transp.reader): if not transp.reader.finished: - transp.finishReader() + transp.reader.complete() + transp.reader = nil if ReadPaused in transp.state: # Transport buffer is full, so we will not continue on reading. break @@ -346,13 +343,15 @@ when defined(windows): elif int32(err) in {WSAECONNRESET, WSAENETRESET}: if not isNil(transp.reader): transp.state = {ReadEof, ReadPaused} - transp.finishReader() + transp.reader.complete() + transp.reader = nil elif int32(err) != ERROR_IO_PENDING: transp.state.excl(ReadPending) transp.state.incl(ReadPaused) transp.setReadError(err) if not isNil(transp.reader): - transp.finishReader() + transp.reader.complete() + transp.reader = nil ## Finish Loop break @@ -967,9 +966,6 @@ proc readExactly*(transp: StreamTransport, pbytes: pointer, if ReadPaused in transp.state: transp.resumeRead() await transp.reader - # we need to clear transp.reader to avoid double completion of this - # Future[T], because readLoop continues working. - transp.reader = nil proc readOnce*(transp: StreamTransport, pbytes: pointer, nbytes: int): Future[int] {.async.} = @@ -1057,9 +1053,6 @@ proc readUntil*(transp: StreamTransport, pbytes: pointer, nbytes: int, if ReadPaused in transp.state: transp.resumeRead() await transp.reader - # we need to clear transp.reader to avoid double completion of this - # Future[T], because readLoop continues working. - transp.reader = nil proc readLine*(transp: StreamTransport, limit = 0, sep = "\r\n"): Future[string] {.async.} = @@ -1111,9 +1104,6 @@ proc readLine*(transp: StreamTransport, limit = 0, if ReadPaused in transp.state: transp.resumeRead() await transp.reader - # we need to clear transp.reader to avoid double completion of this - # Future[T], because readLoop continues working. - transp.reader = nil proc read*(transp: StreamTransport, n = -1): Future[seq[byte]] {.async.} = ## Read all bytes (n == -1) or exactly `n` bytes from transport ``transp``. @@ -1157,9 +1147,6 @@ proc read*(transp: StreamTransport, n = -1): Future[seq[byte]] {.async.} = if ReadPaused in transp.state: transp.resumeRead() await transp.reader - # we need to clear transp.reader to avoid double completion of this - # Future[T], because readLoop continues working. - transp.reader = nil proc consume*(transp: StreamTransport, n = -1): Future[int] {.async.} = ## Consume all bytes (n == -1) or ``n`` bytes from transport ``transp``. @@ -1195,9 +1182,6 @@ proc consume*(transp: StreamTransport, n = -1): Future[int] {.async.} = if ReadPaused in transp.state: transp.resumeRead() await transp.reader - # we need to clear transp.reader to avoid double completion of this - # Future[T], because readLoop continues working. - transp.reader = nil proc join*(transp: StreamTransport) {.async.} = ## Wait until ``transp`` will not be closed.