Fix hidden close issue in AsyncStream.

This commit is contained in:
cheatfate 2019-07-02 21:26:21 +03:00
parent 43eeceb8e5
commit 247e453b71
No known key found for this signature in database
GPG Key ID: 46ADD633A7201F95
2 changed files with 4 additions and 5 deletions

View File

@ -707,7 +707,8 @@ proc close*(rw: AsyncStreamRW) =
if not isNil(rw.udata): if not isNil(rw.udata):
GC_unref(cast[ref int](rw.udata)) GC_unref(cast[ref int](rw.udata))
rw.state = AsyncStreamState.Closed rw.state = AsyncStreamState.Closed
rw.future.complete() if not(rw.future.finished()):
rw.future.complete()
when rw is AsyncStreamReader: when rw is AsyncStreamReader:
untrackAsyncStreamReader(rw) untrackAsyncStreamReader(rw)
elif rw is AsyncStreamWriter: elif rw is AsyncStreamWriter:
@ -718,11 +719,13 @@ proc close*(rw: AsyncStreamRW) =
callSoon(continuation) callSoon(continuation)
else: else:
rw.exevent.fire() rw.exevent.fire()
rw.future.addCallback(continuation)
elif rw is AsyncStreamWriter: elif rw is AsyncStreamWriter:
if isNil(rw.wsource) or isNil(rw.writerLoop): if isNil(rw.wsource) or isNil(rw.writerLoop):
callSoon(continuation) callSoon(continuation)
else: else:
rw.exevent.fire() rw.exevent.fire()
rw.future.addCallback(continuation)
proc closeWait*(rw: AsyncStreamRW): Future[void] = proc closeWait*(rw: AsyncStreamRW): Future[void] =
## Close and frees resources of stream ``rw``. ## Close and frees resources of stream ``rw``.

View File

@ -179,8 +179,6 @@ proc chunkedReadLoop(stream: AsyncStreamReader) {.async.} =
# incoming data anymore. # incoming data anymore.
rstream.buffer.forget() rstream.buffer.forget()
untrackAsyncStreamReader(rstream)
proc chunkedWriteLoop(stream: AsyncStreamWriter) {.async.} = proc chunkedWriteLoop(stream: AsyncStreamWriter) {.async.} =
var wstream = cast[ChunkedStreamWriter](stream) var wstream = cast[ChunkedStreamWriter](stream)
var exitFut = wstream.exevent.wait() var exitFut = wstream.exevent.wait()
@ -277,8 +275,6 @@ proc chunkedWriteLoop(stream: AsyncStreamWriter) {.async.} =
wstream.state = AsyncStreamState.Finished wstream.state = AsyncStreamState.Finished
break break
untrackAsyncStreamWriter(wstream)
proc init*[T](child: ChunkedStreamReader, rsource: AsyncStreamReader, proc init*[T](child: ChunkedStreamReader, rsource: AsyncStreamReader,
bufferSize = ChunkBufferSize, udata: ref T) = bufferSize = ChunkBufferSize, udata: ref T) =
init(cast[AsyncStreamReader](child), rsource, chunkedReadLoop, bufferSize, init(cast[AsyncStreamReader](child), rsource, chunkedReadLoop, bufferSize,