Fix double completion issue in reading procedures.
This commit is contained in:
parent
2ef7469d71
commit
f1cf6d36fc
|
@ -132,10 +132,6 @@ template setReadError(t, e: untyped) =
|
||||||
(t).state.incl(ReadError)
|
(t).state.incl(ReadError)
|
||||||
(t).error = newException(TransportOsError, osErrorMsg((e)))
|
(t).error = newException(TransportOsError, osErrorMsg((e)))
|
||||||
|
|
||||||
template finishReader(t: untyped) =
|
|
||||||
var reader = (t).reader
|
|
||||||
reader.complete()
|
|
||||||
|
|
||||||
template checkPending(t: untyped) =
|
template checkPending(t: untyped) =
|
||||||
if not isNil((t).reader):
|
if not isNil((t).reader):
|
||||||
raise newException(TransportError, "Read operation already pending!")
|
raise newException(TransportError, "Read operation already pending!")
|
||||||
|
@ -319,7 +315,8 @@ when defined(windows):
|
||||||
transp.setReadError(err)
|
transp.setReadError(err)
|
||||||
if not isNil(transp.reader):
|
if not isNil(transp.reader):
|
||||||
if not transp.reader.finished:
|
if not transp.reader.finished:
|
||||||
transp.finishReader()
|
transp.reader.complete()
|
||||||
|
transp.reader = nil
|
||||||
if ReadPaused in transp.state:
|
if ReadPaused in transp.state:
|
||||||
# Transport buffer is full, so we will not continue on reading.
|
# Transport buffer is full, so we will not continue on reading.
|
||||||
break
|
break
|
||||||
|
@ -346,13 +343,15 @@ when defined(windows):
|
||||||
elif int32(err) in {WSAECONNRESET, WSAENETRESET}:
|
elif int32(err) in {WSAECONNRESET, WSAENETRESET}:
|
||||||
if not isNil(transp.reader):
|
if not isNil(transp.reader):
|
||||||
transp.state = {ReadEof, ReadPaused}
|
transp.state = {ReadEof, ReadPaused}
|
||||||
transp.finishReader()
|
transp.reader.complete()
|
||||||
|
transp.reader = nil
|
||||||
elif int32(err) != ERROR_IO_PENDING:
|
elif int32(err) != ERROR_IO_PENDING:
|
||||||
transp.state.excl(ReadPending)
|
transp.state.excl(ReadPending)
|
||||||
transp.state.incl(ReadPaused)
|
transp.state.incl(ReadPaused)
|
||||||
transp.setReadError(err)
|
transp.setReadError(err)
|
||||||
if not isNil(transp.reader):
|
if not isNil(transp.reader):
|
||||||
transp.finishReader()
|
transp.reader.complete()
|
||||||
|
transp.reader = nil
|
||||||
## Finish Loop
|
## Finish Loop
|
||||||
break
|
break
|
||||||
|
|
||||||
|
@ -967,9 +966,6 @@ proc readExactly*(transp: StreamTransport, pbytes: pointer,
|
||||||
if ReadPaused in transp.state:
|
if ReadPaused in transp.state:
|
||||||
transp.resumeRead()
|
transp.resumeRead()
|
||||||
await transp.reader
|
await transp.reader
|
||||||
# we need to clear transp.reader to avoid double completion of this
|
|
||||||
# Future[T], because readLoop continues working.
|
|
||||||
transp.reader = nil
|
|
||||||
|
|
||||||
proc readOnce*(transp: StreamTransport, pbytes: pointer,
|
proc readOnce*(transp: StreamTransport, pbytes: pointer,
|
||||||
nbytes: int): Future[int] {.async.} =
|
nbytes: int): Future[int] {.async.} =
|
||||||
|
@ -1057,9 +1053,6 @@ proc readUntil*(transp: StreamTransport, pbytes: pointer, nbytes: int,
|
||||||
if ReadPaused in transp.state:
|
if ReadPaused in transp.state:
|
||||||
transp.resumeRead()
|
transp.resumeRead()
|
||||||
await transp.reader
|
await transp.reader
|
||||||
# we need to clear transp.reader to avoid double completion of this
|
|
||||||
# Future[T], because readLoop continues working.
|
|
||||||
transp.reader = nil
|
|
||||||
|
|
||||||
proc readLine*(transp: StreamTransport, limit = 0,
|
proc readLine*(transp: StreamTransport, limit = 0,
|
||||||
sep = "\r\n"): Future[string] {.async.} =
|
sep = "\r\n"): Future[string] {.async.} =
|
||||||
|
@ -1111,9 +1104,6 @@ proc readLine*(transp: StreamTransport, limit = 0,
|
||||||
if ReadPaused in transp.state:
|
if ReadPaused in transp.state:
|
||||||
transp.resumeRead()
|
transp.resumeRead()
|
||||||
await transp.reader
|
await transp.reader
|
||||||
# we need to clear transp.reader to avoid double completion of this
|
|
||||||
# Future[T], because readLoop continues working.
|
|
||||||
transp.reader = nil
|
|
||||||
|
|
||||||
proc read*(transp: StreamTransport, n = -1): Future[seq[byte]] {.async.} =
|
proc read*(transp: StreamTransport, n = -1): Future[seq[byte]] {.async.} =
|
||||||
## Read all bytes (n == -1) or exactly `n` bytes from transport ``transp``.
|
## Read all bytes (n == -1) or exactly `n` bytes from transport ``transp``.
|
||||||
|
@ -1157,9 +1147,6 @@ proc read*(transp: StreamTransport, n = -1): Future[seq[byte]] {.async.} =
|
||||||
if ReadPaused in transp.state:
|
if ReadPaused in transp.state:
|
||||||
transp.resumeRead()
|
transp.resumeRead()
|
||||||
await transp.reader
|
await transp.reader
|
||||||
# we need to clear transp.reader to avoid double completion of this
|
|
||||||
# Future[T], because readLoop continues working.
|
|
||||||
transp.reader = nil
|
|
||||||
|
|
||||||
proc consume*(transp: StreamTransport, n = -1): Future[int] {.async.} =
|
proc consume*(transp: StreamTransport, n = -1): Future[int] {.async.} =
|
||||||
## Consume all bytes (n == -1) or ``n`` bytes from transport ``transp``.
|
## Consume all bytes (n == -1) or ``n`` bytes from transport ``transp``.
|
||||||
|
@ -1195,9 +1182,6 @@ proc consume*(transp: StreamTransport, n = -1): Future[int] {.async.} =
|
||||||
if ReadPaused in transp.state:
|
if ReadPaused in transp.state:
|
||||||
transp.resumeRead()
|
transp.resumeRead()
|
||||||
await transp.reader
|
await transp.reader
|
||||||
# we need to clear transp.reader to avoid double completion of this
|
|
||||||
# Future[T], because readLoop continues working.
|
|
||||||
transp.reader = nil
|
|
||||||
|
|
||||||
proc join*(transp: StreamTransport) {.async.} =
|
proc join*(transp: StreamTransport) {.async.} =
|
||||||
## Wait until ``transp`` will not be closed.
|
## Wait until ``transp`` will not be closed.
|
||||||
|
|
Loading…
Reference in New Issue