nim-chronos/chronos/apps/http/httpbodyrw.nim
Eugene Kabanov 2e8551b0d9
Cancellation fixes and tests. (#445)
* Add callTick and stream cancellation tests.

* Fix stepsAsync() test.

* Cancellation changes.

* Update and add more cancellation tests.

* Fix Posix shutdown call to handle ENOTCONN error.

* With new changes to to cancellation its now possible.

* Refactor testsoon.nim to not produce artifacts after tests are finished.

* Debugging MacOS issue.

* Adjust flaky test times.

* Fix issue.

* Add test for issue #334 which was also addressed in this PR.
Avoid `break` in problematic test.

* Add noCancelWait() call which prohibits cancellation.
Fix closeWait() calls to use noCancelWait() predicate.
Adding sleep to flaky MacOS test.

* Remove all debugging echoes.

* Fix cancelAndWait() which now could perform multiple attempts to cancel target Future (mustCancel behavior).

* Fix issues revealed by switch to different cancelAndWait().

* Address review comments.

* Fix testutils compilation warning.

* Rename callTick() to internalCallTick().

* Add some documentation comments.

* Disable flaky ratelimit test.

* Rename noCancelWait() to noCancel().
Address review comments.
2023-09-15 19:38:39 +03:00

95 lines
3.5 KiB
Nim

#
# Chronos HTTP/S body reader/writer
# (c) Copyright 2021-Present
# Status Research & Development GmbH
#
# Licensed under either of
# Apache License, version 2.0, (LICENSE-APACHEv2)
# MIT license (LICENSE-MIT)
import ../../asyncloop, ../../asyncsync
import ../../streams/[asyncstream, boundstream]
import httpcommon
const
HttpBodyReaderTrackerName* = "http.body.reader"
## HTTP body reader leaks tracker name
HttpBodyWriterTrackerName* = "http.body.writer"
## HTTP body writer leaks tracker name
type
HttpBodyReader* = ref object of AsyncStreamReader
bstate*: HttpState
streams*: seq[AsyncStreamReader]
HttpBodyWriter* = ref object of AsyncStreamWriter
bstate*: HttpState
streams*: seq[AsyncStreamWriter]
proc newHttpBodyReader*(streams: varargs[AsyncStreamReader]): HttpBodyReader =
## HttpBodyReader is AsyncStreamReader which holds references to all the
## ``streams``. Also on close it will close all the ``streams``.
##
## First stream in sequence will be used as a source.
doAssert(len(streams) > 0, "At least one stream must be added")
var res = HttpBodyReader(bstate: HttpState.Alive, streams: @streams)
res.init(streams[0])
trackCounter(HttpBodyReaderTrackerName)
res
proc closeWait*(bstream: HttpBodyReader) {.async.} =
## Close and free resource allocated by body reader.
if bstream.bstate == HttpState.Alive:
bstream.bstate = HttpState.Closing
var res = newSeq[Future[void]]()
# We closing streams in reversed order because stream at position [0], uses
# data from stream at position [1].
for index in countdown((len(bstream.streams) - 1), 0):
res.add(bstream.streams[index].closeWait())
res.add(procCall(closeWait(AsyncStreamReader(bstream))))
await noCancel(allFutures(res))
bstream.bstate = HttpState.Closed
untrackCounter(HttpBodyReaderTrackerName)
proc newHttpBodyWriter*(streams: varargs[AsyncStreamWriter]): HttpBodyWriter =
## HttpBodyWriter is AsyncStreamWriter which holds references to all the
## ``streams``. Also on close it will close all the ``streams``.
##
## First stream in sequence will be used as a destination.
doAssert(len(streams) > 0, "At least one stream must be added")
var res = HttpBodyWriter(bstate: HttpState.Alive, streams: @streams)
res.init(streams[0])
trackCounter(HttpBodyWriterTrackerName)
res
proc closeWait*(bstream: HttpBodyWriter) {.async.} =
## Close and free all the resources allocated by body writer.
if bstream.bstate == HttpState.Alive:
bstream.bstate = HttpState.Closing
var res = newSeq[Future[void]]()
for index in countdown(len(bstream.streams) - 1, 0):
res.add(bstream.streams[index].closeWait())
await noCancel(allFutures(res))
await procCall(closeWait(AsyncStreamWriter(bstream)))
bstream.bstate = HttpState.Closed
untrackCounter(HttpBodyWriterTrackerName)
proc hasOverflow*(bstream: HttpBodyReader): bool {.raises: [].} =
if len(bstream.streams) == 1:
# If HttpBodyReader has only one stream it has ``BoundedStreamReader``, in
# such case its impossible to get more bytes then expected amount.
false
else:
# If HttpBodyReader has two or more streams, we check if
# ``BoundedStreamReader`` at EOF.
if bstream.streams[0].atEof():
for i in 1 ..< len(bstream.streams):
if not(bstream.streams[i].atEof()):
return true
false
else:
false
proc closed*(bstream: HttpBodyReader | HttpBodyWriter): bool {.
raises: [].} =
bstream.bstate != HttpState.Alive