diff --git a/chronos/streams/asyncstream.nim b/chronos/streams/asyncstream.nim index f8f6f2f..1c623de 100644 --- a/chronos/streams/asyncstream.nim +++ b/chronos/streams/asyncstream.nim @@ -91,7 +91,6 @@ type par*: ref Exception AsyncStreamWriteError* = object of AsyncStreamError par*: ref Exception - AsyncStreamTimeoutError* = object of AsyncStreamError proc init*(t: typedesc[AsyncBuffer], size: int): AsyncBuffer = result.buffer = newSeq[byte](size) diff --git a/chronos/streams/chunkstream.nim b/chronos/streams/chunkstream.nim index 104587e..554c3ea 100644 --- a/chronos/streams/chunkstream.nim +++ b/chronos/streams/chunkstream.nim @@ -13,50 +13,18 @@ export asyncstream, stream, timer, common const ChunkBufferSize = 4096 ChunkHeaderSize = 8 - ChunkDefaultTimeout = 10.seconds CRLF = @[byte(0x0D), byte(0x0A)] type ChunkedStreamReader* = ref object of AsyncStreamReader - timeout: Duration - ChunkedStreamWriter* = ref object of AsyncStreamWriter ChunkedStreamError* = object of CatchableError - ChunkedStreamTimeoutError* = object of ChunkedStreamError ChunkedStreamProtocolError* = object of ChunkedStreamError -proc newTimeoutError(): ref Exception {.inline.} = - newException(ChunkedStreamTimeoutError, "Timeout exceeded!") - proc newProtocolError(): ref Exception {.inline.} = newException(ChunkedStreamProtocolError, "Protocol error!") -proc oneOf*[A, B, C](fut1: Future[A], fut2: Future[B], - fut3: Future[C]): Future[void] = - ## Wait for completion of one future from list [`fut1`, `fut2`, `fut3`], - ## resulting future do not care about futures error, so you need to check - ## `fut1`, `fut2`, `fut3` for error manually. - var retFuture = newFuture[void]("chunked.oneOf()") - proc cb(data: pointer) {.gcsafe.} = - var index: int - if not retFuture.finished: - var fut = cast[FutureBase](data) - if cast[pointer](fut1) == data: - fut2.removeCallback(cb) - fut3.removeCallback(cb) - elif cast[pointer](fut2) == data: - fut1.removeCallback(cb) - fut3.removeCallback(cb) - elif cast[pointer](fut3) == data: - fut2.removeCallback(cb) - fut3.removeCallback(cb) - retFuture.complete() - fut1.callback = cb - fut2.callback = cb - fut3.callback = cb - return retFuture - proc oneOf*[A, B](fut1: Future[A], fut2: Future[B]): Future[void] = ## Wait for completion of `fut1` or `fut2`, resulting future do not care about ## error, so you need to check `fut1` and `fut2` for error. @@ -121,25 +89,14 @@ proc chunkedReadLoop(stream: AsyncStreamReader) {.async.} = var rstream = cast[ChunkedStreamReader](stream) var exitFut = rstream.exevent.wait() var buffer = newSeq[byte](1024) - var timerFut: Future[void] - - if rstream.timeout == InfiniteDuration: - timerFut = newFuture[void]() while true: - if rstream.timeout != InfiniteDuration: - timerFut = sleepAsync(rstream.timeout) - # Reading chunk size var ruFut1 = rstream.rsource.readUntil(addr buffer[0], 1024, CRLF) - await oneOf(ruFut1, timerFut, exitFut) + await oneOf(ruFut1, exitFut) - if exitFut.finished or timerFut.finished: - if timerFut.finished: - rstream.error = newTimeoutError() - rstream.state = AsyncStreamState.Error - else: - rstream.state = AsyncStreamState.Stopped + if exitFut.finished: + rstream.state = AsyncStreamState.Stopped break if ruFut1.failed: @@ -159,14 +116,10 @@ proc chunkedReadLoop(stream: AsyncStreamReader) {.async.} = let toRead = min(int(chunksize), rstream.buffer.bufferLen()) var reFut2 = rstream.rsource.readExactly(rstream.buffer.getBuffer(), toRead) - await oneOf(reFut2, timerFut, exitFut) + await oneOf(reFut2, exitFut) - if exitFut.finished or timerFut.finished: - if timerFut.finished: - rstream.error = newTimeoutError() - rstream.state = AsyncStreamState.Error - else: - rstream.state = AsyncStreamState.Stopped + if exitFut.finished: + rstream.state = AsyncStreamState.Stopped break if reFut2.failed: @@ -187,13 +140,9 @@ proc chunkedReadLoop(stream: AsyncStreamReader) {.async.} = # Reading chunk trailing CRLF var reFut3 = rstream.rsource.readExactly(addr buffer[0], 2) - await oneOf(reFut3, timerFut, exitFut) - if exitFut.finished or timerFut.finished: - if timerFut.finished: - rstream.error = newTimeoutError() - rstream.state = AsyncStreamState.Error - else: - rstream.state = AsyncStreamState.Stopped + await oneOf(reFut3, exitFut) + if exitFut.finished: + rstream.state = AsyncStreamState.Stopped break if reFut3.failed: @@ -208,14 +157,10 @@ proc chunkedReadLoop(stream: AsyncStreamReader) {.async.} = else: # Reading trailing line for last chunk var ruFut4 = rstream.rsource.readUntil(addr buffer[0], len(buffer), CRLF) - await oneOf(ruFut4, timerFut, exitFut) + await oneOf(ruFut4, exitFut) - if exitFut.finished or timerFut.finished: - if timerFut.finished: - rstream.error = newTimeoutError() - rstream.state = AsyncStreamState.Error - else: - rstream.state = AsyncStreamState.Stopped + if exitFut.finished: + rstream.state = AsyncStreamState.Stopped break if ruFut4.failed: @@ -336,30 +281,25 @@ proc chunkedWriteLoop(stream: AsyncStreamWriter) {.async.} = untrackAsyncStreamWriter(wstream) proc init*[T](child: ChunkedStreamReader, rsource: AsyncStreamReader, - bufferSize = ChunkBufferSize, timeout = ChunkDefaultTimeout, - udata: ref T) = - child.timeout = timeout + bufferSize = ChunkBufferSize, udata: ref T) = init(cast[AsyncStreamReader](child), rsource, chunkedReadLoop, bufferSize, udata) proc init*(child: ChunkedStreamReader, rsource: AsyncStreamReader, - bufferSize = ChunkBufferSize, timeout = ChunkDefaultTimeout) = - child.timeout = timeout + bufferSize = ChunkBufferSize) = init(cast[AsyncStreamReader](child), rsource, chunkedReadLoop, bufferSize) proc newChunkedStreamReader*[T](rsource: AsyncStreamReader, bufferSize = AsyncStreamDefaultBufferSize, - timeout = ChunkDefaultTimeout, udata: ref T): ChunkedStreamReader = result = new ChunkedStreamReader - result.init(rsource, bufferSize, timeout, udata) + result.init(rsource, bufferSize, udata) proc newChunkedStreamReader*(rsource: AsyncStreamReader, bufferSize = AsyncStreamDefaultBufferSize, - timeout = ChunkDefaultTimeout, ): ChunkedStreamReader = result = new ChunkedStreamReader - result.init(rsource, bufferSize, timeout) + result.init(rsource, bufferSize) proc init*[T](child: ChunkedStreamWriter, wsource: AsyncStreamWriter, queueSize = AsyncStreamDefaultQueueSize, udata: ref T) =