From 3c928918a427c2b52e13813b95fe8b342047ac09 Mon Sep 17 00:00:00 2001 From: cheatfate Date: Tue, 8 Oct 2019 10:28:43 +0300 Subject: [PATCH] Change AsyncStream close procedure from events to cancellation. --- chronos/streams/asyncstream.nim | 25 +-- chronos/streams/chunkstream.nim | 319 +++++++++++++------------------- 2 files changed, 145 insertions(+), 199 deletions(-) diff --git a/chronos/streams/asyncstream.nim b/chronos/streams/asyncstream.nim index 3479ef1..ade48f4 100644 --- a/chronos/streams/asyncstream.nim +++ b/chronos/streams/asyncstream.nim @@ -38,6 +38,7 @@ type of String: data3*: string size*: int + offset*: int future*: Future[void] AsyncStreamState* = enum @@ -57,7 +58,6 @@ type tsource*: StreamTransport readerLoop*: StreamReaderLoop state*: AsyncStreamState - exevent*: AsyncEvent buffer*: AsyncBuffer udata: pointer error*: ref Exception @@ -68,7 +68,6 @@ type tsource*: StreamTransport writerLoop*: StreamWriterLoop state*: AsyncStreamState - exevent*: AsyncEvent queue*: AsyncQueue[WriteItem] udata: pointer future: Future[void] @@ -730,17 +729,23 @@ proc close*(rw: AsyncStreamRW) = untrackAsyncStreamWriter(rw) when rw is AsyncStreamReader: - if isNil(rw.rsource) or isNil(rw.readerLoop): + if isNil(rw.rsource) or isNil(rw.readerLoop) or isNil(rw.future): callSoon(continuation) else: - rw.exevent.fire() - rw.future.addCallback(continuation) + if rw.future.finished(): + callSoon(continuation) + else: + rw.future.cancel() + rw.future.addCallback(continuation) elif rw is AsyncStreamWriter: - if isNil(rw.wsource) or isNil(rw.writerLoop): + if isNil(rw.wsource) or isNil(rw.writerLoop) or isNil(rw.future): callSoon(continuation) else: - rw.exevent.fire() - rw.future.addCallback(continuation) + if rw.future.finished(): + callSoon(continuation) + else: + rw.future.cancel() + rw.future.addCallback(continuation) proc closeWait*(rw: AsyncStreamRW): Future[void] = ## Close and frees resources of stream ``rw``. @@ -768,7 +773,6 @@ proc init*(child, wsource: AsyncStreamWriter, loop: StreamWriterLoop, child.writerLoop = loop child.wsource = wsource child.tsource = wsource.tsource - child.exevent = newAsyncEvent() child.queue = newAsyncQueue[WriteItem](queueSize) trackAsyncStreamWriter(child) child.startWriter() @@ -780,7 +784,6 @@ proc init*[T](child, wsource: AsyncStreamWriter, loop: StreamWriterLoop, child.writerLoop = loop child.wsource = wsource child.tsource = wsource.tsource - child.exevent = newAsyncEvent() child.queue = newAsyncQueue[WriteItem](queueSize) if not isNil(udata): GC_ref(udata) @@ -795,7 +798,6 @@ proc init*(child, rsource: AsyncStreamReader, loop: StreamReaderLoop, child.readerLoop = loop child.rsource = rsource child.tsource = rsource.tsource - child.exevent = newAsyncEvent() child.buffer = AsyncBuffer.init(bufferSize) trackAsyncStreamReader(child) child.startReader() @@ -808,7 +810,6 @@ proc init*[T](child, rsource: AsyncStreamReader, loop: StreamReaderLoop, child.readerLoop = loop child.rsource = rsource child.tsource = rsource.tsource - child.exevent = newAsyncEvent() child.buffer = AsyncBuffer.init(bufferSize) if not isNil(udata): GC_ref(udata) diff --git a/chronos/streams/chunkstream.nim b/chronos/streams/chunkstream.nim index 750872d..b847cac 100644 --- a/chronos/streams/chunkstream.nim +++ b/chronos/streams/chunkstream.nim @@ -27,21 +27,6 @@ type proc newProtocolError(): ref Exception {.inline.} = newException(ChunkedStreamProtocolError, "Protocol error!") -proc oneOf*[A, B](fut1: Future[A], fut2: Future[B]): Future[void] = - ## Wait for completion of `fut1` or `fut2`, resulting future do not care about - ## error, so you need to check `fut1` and `fut2` for error. - var retFuture = newFuture[void]("chunked.oneOf()") - proc cb(data: pointer) {.gcsafe.} = - if not(retFuture.finished()): - if cast[pointer](fut1) == data: - fut2.removeCallback(cb) - elif cast[pointer](fut2) == data: - fut1.removeCallback(cb) - retFuture.complete() - fut1.callback = cb - fut2.callback = cb - return retFuture - proc getChunkSize(buffer: openarray[byte]): uint64 = # We using `uint64` representation, but allow only 2^32 chunk size, # ChunkHeaderSize. @@ -87,197 +72,157 @@ proc setChunkSize(buffer: var openarray[byte], length: int64): int = proc chunkedReadLoop(stream: AsyncStreamReader) {.async.} = var rstream = cast[ChunkedStreamReader](stream) - var exitFut = rstream.exevent.wait() var buffer = newSeq[byte](1024) + rstream.state = AsyncStreamState.Running - while true: - # Reading chunk size - var ruFut1 = rstream.rsource.readUntil(addr buffer[0], 1024, CRLF) - await oneOf(ruFut1, exitFut) - - if exitFut.finished(): - rstream.state = AsyncStreamState.Stopped - break - - if ruFut1.failed(): - rstream.error = ruFut1.error - rstream.state = AsyncStreamState.Error - break - - let length = ruFut1.read() - var chunksize = getChunkSize(buffer.toOpenArray(0, length - len(CRLF) - 1)) - - if chunksize == 0xFFFF_FFFF_FFFF_FFFF'u64: - rstream.error = newProtocolError() - rstream.state = AsyncStreamState.Error - break - elif chunksize > 0'u64: - while chunksize > 0'u64: - let toRead = min(int(chunksize), rstream.buffer.bufferLen()) - var reFut2 = rstream.rsource.readExactly(rstream.buffer.getBuffer(), - toRead) - await oneOf(reFut2, exitFut) - - if exitFut.finished(): - rstream.state = AsyncStreamState.Stopped - break - - if reFut2.failed(): - rstream.error = reFut2.error - rstream.state = AsyncStreamState.Error - break - - rstream.buffer.update(toRead) - - await oneOf(rstream.buffer.transfer(), exitFut) - - if exitFut.finished(): - rstream.state = AsyncStreamState.Stopped - break - - chunksize = chunksize - uint64(toRead) - - if rstream.state != AsyncStreamState.Running: - break - - # Reading chunk trailing CRLF - var reFut3 = rstream.rsource.readExactly(addr buffer[0], 2) - await oneOf(reFut3, exitFut) - if exitFut.finished(): - rstream.state = AsyncStreamState.Stopped - break - - if reFut3.failed(): - rstream.error = reFut3.error + try: + while true: + # Reading chunk size + var ruFut1 = awaitne rstream.rsource.readUntil(addr buffer[0], 1024, CRLF) + if ruFut1.failed(): + rstream.error = ruFut1.error rstream.state = AsyncStreamState.Error break - if buffer[0] != CRLF[0] or buffer[1] != CRLF[1]: + let length = ruFut1.read() + var chunksize = getChunkSize(buffer.toOpenArray(0, + length - len(CRLF) - 1)) + + if chunksize == 0xFFFF_FFFF_FFFF_FFFF'u64: rstream.error = newProtocolError() rstream.state = AsyncStreamState.Error break - else: - # Reading trailing line for last chunk - var ruFut4 = rstream.rsource.readUntil(addr buffer[0], len(buffer), CRLF) - await oneOf(ruFut4, exitFut) + elif chunksize > 0'u64: + while chunksize > 0'u64: + let toRead = min(int(chunksize), rstream.buffer.bufferLen()) + var reFut2 = awaitne rstream.rsource.readExactly( + rstream.buffer.getBuffer(), toRead) + if reFut2.failed(): + rstream.error = reFut2.error + rstream.state = AsyncStreamState.Error + break - if exitFut.finished(): - rstream.state = AsyncStreamState.Stopped + rstream.buffer.update(toRead) + await rstream.buffer.transfer() + chunksize = chunksize - uint64(toRead) + + if rstream.state != AsyncStreamState.Running: + break + + # Reading chunk trailing CRLF + var reFut3 = awaitne rstream.rsource.readExactly(addr buffer[0], 2) + if reFut3.failed(): + rstream.error = reFut3.error + rstream.state = AsyncStreamState.Error + break + + if buffer[0] != CRLF[0] or buffer[1] != CRLF[1]: + rstream.error = newProtocolError() + rstream.state = AsyncStreamState.Error + break + else: + # Reading trailing line for last chunk + var ruFut4 = awaitne rstream.rsource.readUntil(addr buffer[0], + len(buffer), CRLF) + if ruFut4.failed(): + rstream.error = ruFut4.error + rstream.state = AsyncStreamState.Error + break + + rstream.state = AsyncStreamState.Finished + await rstream.buffer.transfer() break - if ruFut4.failed(): - rstream.error = ruFut4.error - rstream.state = AsyncStreamState.Error - break - - rstream.state = AsyncStreamState.Finished - - await oneOf(rstream.buffer.transfer(), exitFut) - - if exitFut.finished(): - rstream.state = AsyncStreamState.Stopped - break - - if rstream.state in {AsyncStreamState.Stopped, AsyncStreamState.Error}: - # We need to notify consumer about error/close, but we do not care about - # incoming data anymore. - rstream.buffer.forget() + except CancelledError: + rstream.state = AsyncStreamState.Stopped + finally: + if rstream.state in {AsyncStreamState.Stopped, AsyncStreamState.Error}: + # We need to notify consumer about error/close, but we do not care about + # incoming data anymore. + rstream.buffer.forget() proc chunkedWriteLoop(stream: AsyncStreamWriter) {.async.} = var wstream = cast[ChunkedStreamWriter](stream) - var exitFut = wstream.exevent.wait() var buffer: array[16, byte] var wFut1, wFut2: Future[void] - + var error: ref Exception wstream.state = AsyncStreamState.Running - while true: - # Getting new item from stream's queue. - var getFut = wstream.queue.get() - await oneOf(getFut, exitFut) - if exitFut.finished(): - wstream.state = AsyncStreamState.Stopped - break - var item = getFut.read() - # `item.size == 0` is marker of stream finish, while `item.size != 0` is - # data's marker. - if item.size > 0: - let length = setChunkSize(buffer, int64(item.size)) - # Writing chunk header CRLF. - wFut1 = wstream.wsource.write(addr buffer[0], length) - await oneOf(wFut1, exitFut) - if exitFut.finished(): - wstream.state = AsyncStreamState.Stopped + try: + while true: + # Getting new item from stream's queue. + var item = await wstream.queue.get() + # `item.size == 0` is marker of stream finish, while `item.size != 0` is + # data's marker. + if item.size > 0: + let length = setChunkSize(buffer, int64(item.size)) + # Writing chunk header CRLF. + wFut1 = awaitne wstream.wsource.write(addr buffer[0], length) + if wFut1.failed(): + error = wFut1.error + item.future.fail(error) + continue + + # Writing chunk data. + if item.kind == Pointer: + wFut2 = awaitne wstream.wsource.write(item.data1, item.size) + elif item.kind == Sequence: + wFut2 = awaitne wstream.wsource.write(addr item.data2[0], item.size) + elif item.kind == String: + wFut2 = awaitne wstream.wsource.write(addr item.data3[0], item.size) + if wFut2.failed(): + error = wFut2.error + item.future.fail(error) + continue + + # Writing chunk footer CRLF. + var wFut3 = awaitne wstream.wsource.write(CRLF) + if wFut3.failed(): + error = wFut3.error + item.future.fail(error) + continue + + # Everything is fine, completing queue item's future. + item.future.complete() + else: + let length = setChunkSize(buffer, 0'i64) + + # Write finish chunk `0`. + wFut1 = awaitne wstream.wsource.write(addr buffer[0], length) + if wFut1.failed(): + error = wFut1.error + item.future.fail(error) + # We break here, because this is last chunk + break + + # Write trailing CRLF. + wFut2 = awaitne wstream.wsource.write(CRLF) + if wFut2.failed(): + error = wFut2.error + item.future.fail(error) + # We break here, because this is last chunk + break + + # Everything is fine, completing queue item's future. + item.future.complete() + + # Set stream state to Finished. + wstream.state = AsyncStreamState.Finished break - - if wFut1.failed(): - item.future.fail(wFut1.error) - continue - - # Writing chunk data. - if item.kind == Pointer: - wFut2 = wstream.wsource.write(item.data1, item.size) - elif item.kind == Sequence: - wFut2 = wstream.wsource.write(addr item.data2[0], item.size) - elif item.kind == String: - wFut2 = wstream.wsource.write(addr item.data3[0], item.size) - - await oneOf(wFut2, exitFut) - if exitFut.finished(): - wstream.state = AsyncStreamState.Stopped - break - - if wFut2.failed(): - item.future.fail(wFut2.error) - continue - - # Writing chunk footer CRLF. - var wFut3 = wstream.wsource.write(CRLF) - await oneOf(wFut3, exitFut) - if exitFut.finished(): - wstream.state = AsyncStreamState.Stopped - break - - if wFut3.failed(): - item.future.fail(wFut3.error) - continue - - # Everything is fine, completing queue item's future. - item.future.complete() - else: - let length = setChunkSize(buffer, 0'i64) - - # Write finish chunk `0`. - wFut1 = wstream.wsource.write(addr buffer[0], length) - await oneOf(wFut1, exitFut) - if exitFut.finished(): - wstream.state = AsyncStreamState.Stopped - break - - if wFut1.failed(): - item.future.fail(wFut1.error) - # We break here, because this is last chunk - break - - # Write trailing CRLF. - wFut2 = wstream.wsource.write(CRLF) - await oneOf(wFut2, exitFut) - - if exitFut.finished(): - wstream.state = AsyncStreamState.Stopped - break - - if wFut2.failed(): - item.future.fail(wFut2.error) - # We break here, because this is last chunk - break - - # Everything is fine, completing queue item's future. - item.future.complete() - - # Set stream state to Finished. - wstream.state = AsyncStreamState.Finished - break + except CancelledError: + wstream.state = AsyncStreamState.Stopped + finally: + if wstream.state == AsyncStreamState.Stopped: + while len(wstream.queue) > 0: + let item = wstream.queue.popFirstNoWait() + if not(item.future.finished()): + item.future.complete() + elif wstream.state == AsyncStreamState.Error: + while len(wstream.queue) > 0: + let item = wstream.queue.popFirstNoWait() + if not(item.future.finished()): + if not isNil(error): + item.future.fail(error) proc init*[T](child: ChunkedStreamReader, rsource: AsyncStreamReader, bufferSize = ChunkBufferSize, udata: ref T) =