147 lines
5.1 KiB
Nim
147 lines
5.1 KiB
Nim
|
#
|
||
|
# Chronos HTTP/S body reader/writer
|
||
|
# (c) Copyright 2021-Present
|
||
|
# Status Research & Development GmbH
|
||
|
#
|
||
|
# Licensed under either of
|
||
|
# Apache License, version 2.0, (LICENSE-APACHEv2)
|
||
|
# MIT license (LICENSE-MIT)
|
||
|
import ../../asyncloop, ../../asyncsync
|
||
|
import ../../streams/[asyncstream, boundstream]
|
||
|
|
||
|
const
|
||
|
HttpBodyReaderTrackerName* = "http.body.reader"
|
||
|
## HTTP body reader leaks tracker name
|
||
|
HttpBodyWriterTrackerName* = "http.body.writer"
|
||
|
## HTTP body writer leaks tracker name
|
||
|
|
||
|
type
|
||
|
HttpBodyReader* = ref object of AsyncStreamReader
|
||
|
streams*: seq[AsyncStreamReader]
|
||
|
|
||
|
HttpBodyWriter* = ref object of AsyncStreamWriter
|
||
|
streams*: seq[AsyncStreamWriter]
|
||
|
|
||
|
HttpBodyTracker* = ref object of TrackerBase
|
||
|
opened*: int64
|
||
|
closed*: int64
|
||
|
|
||
|
proc setupHttpBodyWriterTracker(): HttpBodyTracker {.gcsafe, raises: [Defect].}
|
||
|
proc setupHttpBodyReaderTracker(): HttpBodyTracker {.gcsafe, raises: [Defect].}
|
||
|
|
||
|
proc getHttpBodyWriterTracker(): HttpBodyTracker {.inline.} =
|
||
|
var res = cast[HttpBodyTracker](getTracker(HttpBodyWriterTrackerName))
|
||
|
if isNil(res):
|
||
|
res = setupHttpBodyWriterTracker()
|
||
|
res
|
||
|
|
||
|
proc getHttpBodyReaderTracker(): HttpBodyTracker {.inline.} =
|
||
|
var res = cast[HttpBodyTracker](getTracker(HttpBodyReaderTrackerName))
|
||
|
if isNil(res):
|
||
|
res = setupHttpBodyReaderTracker()
|
||
|
res
|
||
|
|
||
|
proc dumpHttpBodyWriterTracking(): string {.gcsafe.} =
|
||
|
let tracker = getHttpBodyWriterTracker()
|
||
|
"Opened HTTP body writers: " & $tracker.opened & "\n" &
|
||
|
"Closed HTTP body writers: " & $tracker.closed
|
||
|
|
||
|
proc dumpHttpBodyReaderTracking(): string {.gcsafe.} =
|
||
|
let tracker = getHttpBodyReaderTracker()
|
||
|
"Opened HTTP body readers: " & $tracker.opened & "\n" &
|
||
|
"Closed HTTP body readers: " & $tracker.closed
|
||
|
|
||
|
proc leakHttpBodyWriter(): bool {.gcsafe.} =
|
||
|
var tracker = getHttpBodyWriterTracker()
|
||
|
tracker.opened != tracker.closed
|
||
|
|
||
|
proc leakHttpBodyReader(): bool {.gcsafe.} =
|
||
|
var tracker = getHttpBodyReaderTracker()
|
||
|
tracker.opened != tracker.closed
|
||
|
|
||
|
proc trackHttpBodyWriter(t: HttpBodyWriter) {.inline.} =
|
||
|
inc(getHttpBodyWriterTracker().opened)
|
||
|
|
||
|
proc untrackHttpBodyWriter*(t: HttpBodyWriter) {.inline.} =
|
||
|
inc(getHttpBodyWriterTracker().closed)
|
||
|
|
||
|
proc trackHttpBodyReader(t: HttpBodyReader) {.inline.} =
|
||
|
inc(getHttpBodyReaderTracker().opened)
|
||
|
|
||
|
proc untrackHttpBodyReader*(t: HttpBodyReader) {.inline.} =
|
||
|
inc(getHttpBodyReaderTracker().closed)
|
||
|
|
||
|
proc setupHttpBodyWriterTracker(): HttpBodyTracker {.gcsafe.} =
|
||
|
var res = HttpBodyTracker(opened: 0, closed: 0,
|
||
|
dump: dumpHttpBodyWriterTracking,
|
||
|
isLeaked: leakHttpBodyWriter
|
||
|
)
|
||
|
addTracker(HttpBodyWriterTrackerName, res)
|
||
|
res
|
||
|
|
||
|
proc setupHttpBodyReaderTracker(): HttpBodyTracker {.gcsafe.} =
|
||
|
var res = HttpBodyTracker(opened: 0, closed: 0,
|
||
|
dump: dumpHttpBodyReaderTracking,
|
||
|
isLeaked: leakHttpBodyReader
|
||
|
)
|
||
|
addTracker(HttpBodyReaderTrackerName, res)
|
||
|
res
|
||
|
|
||
|
proc newHttpBodyReader*(streams: varargs[AsyncStreamReader]): HttpBodyReader =
|
||
|
## HttpBodyReader is AsyncStreamReader which holds references to all the
|
||
|
## ``streams``. Also on close it will close all the ``streams``.
|
||
|
##
|
||
|
## First stream in sequence will be used as a source.
|
||
|
doAssert(len(streams) > 0, "At least one stream must be added")
|
||
|
var res = HttpBodyReader(streams: @streams)
|
||
|
res.init(streams[0])
|
||
|
trackHttpBodyReader(res)
|
||
|
res
|
||
|
|
||
|
proc closeWait*(bstream: HttpBodyReader) {.async.} =
|
||
|
## Close and free resource allocated by body reader.
|
||
|
var res = newSeq[Future[void]]()
|
||
|
# We closing streams in reversed order because stream at position [0], uses
|
||
|
# data from stream at position [1].
|
||
|
for index in countdown((len(bstream.streams) - 1), 0):
|
||
|
res.add(bstream.streams[index].closeWait())
|
||
|
await allFutures(res)
|
||
|
await procCall(closeWait(AsyncStreamReader(bstream)))
|
||
|
untrackHttpBodyReader(bstream)
|
||
|
|
||
|
proc newHttpBodyWriter*(streams: varargs[AsyncStreamWriter]): HttpBodyWriter =
|
||
|
## HttpBodyWriter is AsyncStreamWriter which holds references to all the
|
||
|
## ``streams``. Also on close it will close all the ``streams``.
|
||
|
##
|
||
|
## First stream in sequence will be used as a destination.
|
||
|
doAssert(len(streams) > 0, "At least one stream must be added")
|
||
|
var res = HttpBodyWriter(streams: @streams)
|
||
|
res.init(streams[0])
|
||
|
trackHttpBodyWriter(res)
|
||
|
res
|
||
|
|
||
|
proc closeWait*(bstream: HttpBodyWriter) {.async.} =
|
||
|
## Close and free all the resources allocated by body writer.
|
||
|
var res = newSeq[Future[void]]()
|
||
|
for index in countdown(len(bstream.streams) - 1, 0):
|
||
|
res.add(bstream.streams[index].closeWait())
|
||
|
await allFutures(res)
|
||
|
await procCall(closeWait(AsyncStreamWriter(bstream)))
|
||
|
untrackHttpBodyWriter(bstream)
|
||
|
|
||
|
proc hasOverflow*(bstream: HttpBodyReader): bool {.raises: [Defect].} =
|
||
|
if len(bstream.streams) == 1:
|
||
|
# If HttpBodyReader has only one stream it has ``BoundedStreamReader``, in
|
||
|
# such case its impossible to get more bytes then expected amount.
|
||
|
false
|
||
|
else:
|
||
|
# If HttpBodyReader has two or more streams, we check if
|
||
|
# ``BoundedStreamReader`` at EOF.
|
||
|
if bstream.streams[0].atEof():
|
||
|
for i in 1 ..< len(bstream.streams):
|
||
|
if not(bstream.streams[1].atEof()):
|
||
|
return true
|
||
|
false
|
||
|
else:
|
||
|
false
|