Initial commit.
This commit is contained in:
parent
96c9473e9c
commit
ccfbdfa3a2
|
@ -6,6 +6,9 @@
|
|||
# Licensed under either of
|
||||
# Apache License, version 2.0, (LICENSE-APACHEv2)
|
||||
# MIT license (LICENSE-MIT)
|
||||
|
||||
{.push raises: [].}
|
||||
|
||||
import ../../asyncloop, ../../asyncsync
|
||||
import ../../streams/[asyncstream, boundstream]
|
||||
import httpcommon
|
||||
|
@ -36,7 +39,7 @@ proc newHttpBodyReader*(streams: varargs[AsyncStreamReader]): HttpBodyReader =
|
|||
trackCounter(HttpBodyReaderTrackerName)
|
||||
res
|
||||
|
||||
proc closeWait*(bstream: HttpBodyReader) {.async.} =
|
||||
proc closeWait*(bstream: HttpBodyReader) {.async: (raises: []).} =
|
||||
## Close and free resource allocated by body reader.
|
||||
if bstream.bstate == HttpState.Alive:
|
||||
bstream.bstate = HttpState.Closing
|
||||
|
@ -61,7 +64,7 @@ proc newHttpBodyWriter*(streams: varargs[AsyncStreamWriter]): HttpBodyWriter =
|
|||
trackCounter(HttpBodyWriterTrackerName)
|
||||
res
|
||||
|
||||
proc closeWait*(bstream: HttpBodyWriter) {.async.} =
|
||||
proc closeWait*(bstream: HttpBodyWriter) {.async: (raises: []).} =
|
||||
## Close and free all the resources allocated by body writer.
|
||||
if bstream.bstate == HttpState.Alive:
|
||||
bstream.bstate = HttpState.Closing
|
||||
|
@ -73,7 +76,7 @@ proc closeWait*(bstream: HttpBodyWriter) {.async.} =
|
|||
bstream.bstate = HttpState.Closed
|
||||
untrackCounter(HttpBodyWriterTrackerName)
|
||||
|
||||
proc hasOverflow*(bstream: HttpBodyReader): bool {.raises: [].} =
|
||||
proc hasOverflow*(bstream: HttpBodyReader): bool =
|
||||
if len(bstream.streams) == 1:
|
||||
# If HttpBodyReader has only one stream it has ``BoundedStreamReader``, in
|
||||
# such case its impossible to get more bytes then expected amount.
|
||||
|
@ -89,6 +92,5 @@ proc hasOverflow*(bstream: HttpBodyReader): bool {.raises: [].} =
|
|||
else:
|
||||
false
|
||||
|
||||
proc closed*(bstream: HttpBodyReader | HttpBodyWriter): bool {.
|
||||
raises: [].} =
|
||||
proc closed*(bstream: HttpBodyReader | HttpBodyWriter): bool =
|
||||
bstream.bstate != HttpState.Alive
|
||||
|
|
|
@ -6,6 +6,9 @@
|
|||
# Licensed under either of
|
||||
# Apache License, version 2.0, (LICENSE-APACHEv2)
|
||||
# MIT license (LICENSE-MIT)
|
||||
|
||||
{.push raises: [].}
|
||||
|
||||
import std/[uri, tables, sequtils]
|
||||
import stew/[base10, base64, byteutils], httputils, results
|
||||
import ../../asyncloop, ../../asyncsync
|
||||
|
@ -120,7 +123,7 @@ type
|
|||
headersTimeout*: Duration
|
||||
idleTimeout: Duration
|
||||
idlePeriod: Duration
|
||||
watcherFut: Future[void]
|
||||
watcherFut: Future[void].Raising([])
|
||||
connectionBufferSize*: int
|
||||
maxConnections*: int
|
||||
connectionsCount*: int
|
||||
|
@ -253,7 +256,7 @@ template isIdle(conn: HttpClientConnectionRef, timestamp: Moment,
|
|||
timeout: Duration): bool =
|
||||
(timestamp - conn.timestamp) >= timeout
|
||||
|
||||
proc sessionWatcher(session: HttpSessionRef) {.async.}
|
||||
proc sessionWatcher(session: HttpSessionRef) {.async: (raises: []).}
|
||||
|
||||
proc new*(t: typedesc[HttpSessionRef],
|
||||
flags: HttpClientFlags = {},
|
||||
|
@ -265,8 +268,7 @@ proc new*(t: typedesc[HttpSessionRef],
|
|||
idleTimeout = HttpConnectionIdleTimeout,
|
||||
idlePeriod = HttpConnectionCheckPeriod,
|
||||
socketFlags: set[SocketFlags] = {},
|
||||
dualstack = DualStackType.Auto): HttpSessionRef {.
|
||||
raises: [] .} =
|
||||
dualstack = DualStackType.Auto): HttpSessionRef =
|
||||
## Create new HTTP session object.
|
||||
##
|
||||
## ``maxRedirections`` - maximum number of HTTP 3xx redirections
|
||||
|
@ -292,10 +294,10 @@ proc new*(t: typedesc[HttpSessionRef],
|
|||
if HttpClientFlag.Http11Pipeline in flags:
|
||||
sessionWatcher(res)
|
||||
else:
|
||||
newFuture[void]("session.watcher.placeholder")
|
||||
Future[void].Raising([]).init("session.watcher.placeholder")
|
||||
res
|
||||
|
||||
proc getTLSFlags(flags: HttpClientFlags): set[TLSFlags] {.raises: [] .} =
|
||||
proc getTLSFlags(flags: HttpClientFlags): set[TLSFlags] =
|
||||
var res: set[TLSFlags]
|
||||
if HttpClientFlag.NoVerifyHost in flags:
|
||||
res.incl(TLSFlags.NoVerifyHost)
|
||||
|
@ -306,7 +308,7 @@ proc getTLSFlags(flags: HttpClientFlags): set[TLSFlags] {.raises: [] .} =
|
|||
proc getHttpAddress*(
|
||||
url: Uri,
|
||||
flags: HttpClientFlags = {}
|
||||
): HttpAddressResult {.raises: [].} =
|
||||
): HttpAddressResult =
|
||||
let
|
||||
scheme =
|
||||
if len(url.scheme) == 0:
|
||||
|
@ -370,24 +372,23 @@ proc getHttpAddress*(
|
|||
proc getHttpAddress*(
|
||||
url: string,
|
||||
flags: HttpClientFlags = {}
|
||||
): HttpAddressResult {.raises: [].} =
|
||||
): HttpAddressResult =
|
||||
getHttpAddress(parseUri(url), flags)
|
||||
|
||||
proc getHttpAddress*(
|
||||
session: HttpSessionRef,
|
||||
url: Uri
|
||||
): HttpAddressResult {.raises: [].} =
|
||||
): HttpAddressResult =
|
||||
getHttpAddress(url, session.flags)
|
||||
|
||||
proc getHttpAddress*(
|
||||
session: HttpSessionRef,
|
||||
url: string
|
||||
): HttpAddressResult {.raises: [].} =
|
||||
): HttpAddressResult =
|
||||
## Create new HTTP address using URL string ``url`` and .
|
||||
getHttpAddress(parseUri(url), session.flags)
|
||||
|
||||
proc getAddress*(session: HttpSessionRef, url: Uri): HttpResult[HttpAddress] {.
|
||||
raises: [] .} =
|
||||
proc getAddress*(session: HttpSessionRef, url: Uri): HttpResult[HttpAddress] =
|
||||
let scheme =
|
||||
if len(url.scheme) == 0:
|
||||
HttpClientScheme.NonSecure
|
||||
|
@ -451,13 +452,13 @@ proc getAddress*(session: HttpSessionRef, url: Uri): HttpResult[HttpAddress] {.
|
|||
addresses: addresses))
|
||||
|
||||
proc getAddress*(session: HttpSessionRef,
|
||||
url: string): HttpResult[HttpAddress] {.raises: [].} =
|
||||
url: string): HttpResult[HttpAddress] =
|
||||
## Create new HTTP address using URL string ``url`` and .
|
||||
session.getAddress(parseUri(url))
|
||||
|
||||
proc getAddress*(address: TransportAddress,
|
||||
ctype: HttpClientScheme = HttpClientScheme.NonSecure,
|
||||
queryString: string = "/"): HttpAddress {.raises: [].} =
|
||||
queryString: string = "/"): HttpAddress =
|
||||
## Create new HTTP address using Transport address ``address``, connection
|
||||
## type ``ctype`` and query string ``queryString``.
|
||||
let uri = parseUri(queryString)
|
||||
|
@ -540,8 +541,12 @@ proc getUniqueConnectionId(session: HttpSessionRef): uint64 =
|
|||
inc(session.counter)
|
||||
session.counter
|
||||
|
||||
proc new(t: typedesc[HttpClientConnectionRef], session: HttpSessionRef,
|
||||
ha: HttpAddress, transp: StreamTransport): HttpClientConnectionRef =
|
||||
proc new(
|
||||
t: typedesc[HttpClientConnectionRef],
|
||||
session: HttpSessionRef,
|
||||
ha: HttpAddress,
|
||||
transp: StreamTransport
|
||||
): Result[HttpClientConnectionRef, string] =
|
||||
case ha.scheme
|
||||
of HttpClientScheme.NonSecure:
|
||||
let res = HttpClientConnectionRef(
|
||||
|
@ -554,44 +559,48 @@ proc new(t: typedesc[HttpClientConnectionRef], session: HttpSessionRef,
|
|||
remoteHostname: ha.id
|
||||
)
|
||||
trackCounter(HttpClientConnectionTrackerName)
|
||||
res
|
||||
ok(res)
|
||||
of HttpClientScheme.Secure:
|
||||
let treader = newAsyncStreamReader(transp)
|
||||
let twriter = newAsyncStreamWriter(transp)
|
||||
let tls = newTLSClientAsyncStream(treader, twriter, ha.hostname,
|
||||
flags = session.flags.getTLSFlags())
|
||||
let res = HttpClientConnectionRef(
|
||||
id: session.getUniqueConnectionId(),
|
||||
kind: HttpClientScheme.Secure,
|
||||
transp: transp,
|
||||
treader: treader,
|
||||
twriter: twriter,
|
||||
reader: tls.reader,
|
||||
writer: tls.writer,
|
||||
tls: tls,
|
||||
state: HttpClientConnectionState.Connecting,
|
||||
remoteHostname: ha.id
|
||||
)
|
||||
trackCounter(HttpClientConnectionTrackerName)
|
||||
res
|
||||
let
|
||||
treader = newAsyncStreamReader(transp)
|
||||
twriter = newAsyncStreamWriter(transp)
|
||||
tls =
|
||||
try:
|
||||
newTLSClientAsyncStream(treader, twriter, ha.hostname,
|
||||
flags = session.flags.getTLSFlags())
|
||||
except TLSStreamInitError as exc:
|
||||
return err(exc.msg)
|
||||
|
||||
proc setError(request: HttpClientRequestRef, error: ref HttpError) {.
|
||||
raises: [] .} =
|
||||
res = HttpClientConnectionRef(
|
||||
id: session.getUniqueConnectionId(),
|
||||
kind: HttpClientScheme.Secure,
|
||||
transp: transp,
|
||||
treader: treader,
|
||||
twriter: twriter,
|
||||
reader: tls.reader,
|
||||
writer: tls.writer,
|
||||
tls: tls,
|
||||
state: HttpClientConnectionState.Connecting,
|
||||
remoteHostname: ha.id
|
||||
)
|
||||
trackCounter(HttpClientConnectionTrackerName)
|
||||
ok(res)
|
||||
|
||||
proc setError(request: HttpClientRequestRef, error: ref HttpError) =
|
||||
request.error = error
|
||||
request.state = HttpReqRespState.Error
|
||||
if not(isNil(request.connection)):
|
||||
request.connection.state = HttpClientConnectionState.Error
|
||||
request.connection.error = error
|
||||
|
||||
proc setError(response: HttpClientResponseRef, error: ref HttpError) {.
|
||||
raises: [] .} =
|
||||
proc setError(response: HttpClientResponseRef, error: ref HttpError) =
|
||||
response.error = error
|
||||
response.state = HttpReqRespState.Error
|
||||
if not(isNil(response.connection)):
|
||||
response.connection.state = HttpClientConnectionState.Error
|
||||
response.connection.error = error
|
||||
|
||||
proc closeWait(conn: HttpClientConnectionRef) {.async.} =
|
||||
proc closeWait(conn: HttpClientConnectionRef) {.async: (raises: []).} =
|
||||
## Close HttpClientConnectionRef instance ``conn`` and free all the resources.
|
||||
if conn.state notin {HttpClientConnectionState.Closing,
|
||||
HttpClientConnectionState.Closed}:
|
||||
|
@ -613,7 +622,8 @@ proc closeWait(conn: HttpClientConnectionRef) {.async.} =
|
|||
untrackCounter(HttpClientConnectionTrackerName)
|
||||
|
||||
proc connect(session: HttpSessionRef,
|
||||
ha: HttpAddress): Future[HttpClientConnectionRef] {.async.} =
|
||||
ha: HttpAddress): Future[HttpClientConnectionRef] {.
|
||||
async: (raises: [CancelledError, HttpConnectionError]).} =
|
||||
## Establish new connection with remote server using ``url`` and ``flags``.
|
||||
## On success returns ``HttpClientConnectionRef`` object.
|
||||
var lastError = ""
|
||||
|
@ -627,12 +637,14 @@ proc connect(session: HttpSessionRef,
|
|||
dualstack = session.dualstack)
|
||||
except CancelledError as exc:
|
||||
raise exc
|
||||
except CatchableError:
|
||||
except TransportError:
|
||||
nil
|
||||
if not(isNil(transp)):
|
||||
let conn =
|
||||
block:
|
||||
let res = HttpClientConnectionRef.new(session, ha, transp)
|
||||
let res = HttpClientConnectionRef.new(session, ha, transp).valueOr:
|
||||
raiseHttpConnectionError(
|
||||
"Could not connect to remote host, reason: " & error)
|
||||
if res.kind == HttpClientScheme.Secure:
|
||||
try:
|
||||
await res.tls.handshake()
|
||||
|
@ -662,7 +674,7 @@ proc connect(session: HttpSessionRef,
|
|||
raiseHttpConnectionError("Could not connect to remote host")
|
||||
|
||||
proc removeConnection(session: HttpSessionRef,
|
||||
conn: HttpClientConnectionRef) {.async.} =
|
||||
conn: HttpClientConnectionRef) {.async: (raises: []).} =
|
||||
let removeHost =
|
||||
block:
|
||||
var res = false
|
||||
|
@ -686,7 +698,8 @@ proc acquireConnection(
|
|||
session: HttpSessionRef,
|
||||
ha: HttpAddress,
|
||||
flags: set[HttpClientRequestFlag]
|
||||
): Future[HttpClientConnectionRef] {.async.} =
|
||||
): Future[HttpClientConnectionRef] {.
|
||||
async: (raises: [CancelledError, HttpConnectionError]).} =
|
||||
## Obtain connection from ``session`` or establish a new one.
|
||||
var default: seq[HttpClientConnectionRef]
|
||||
let timestamp = Moment.now()
|
||||
|
@ -710,10 +723,11 @@ proc acquireConnection(
|
|||
inc(session.connectionsCount)
|
||||
connection.setTimestamp(timestamp)
|
||||
connection.setDuration()
|
||||
return connection
|
||||
connection
|
||||
|
||||
proc releaseConnection(session: HttpSessionRef,
|
||||
connection: HttpClientConnectionRef) {.async.} =
|
||||
connection: HttpClientConnectionRef) {.
|
||||
async: (raises: []).} =
|
||||
## Return connection back to the ``session``.
|
||||
let removeConnection =
|
||||
if HttpClientFlag.Http11Pipeline notin session.flags:
|
||||
|
@ -751,7 +765,7 @@ proc releaseConnection(session: HttpSessionRef,
|
|||
HttpClientConnectionFlag.Response,
|
||||
HttpClientConnectionFlag.NoBody})
|
||||
|
||||
proc releaseConnection(request: HttpClientRequestRef) {.async.} =
|
||||
proc releaseConnection(request: HttpClientRequestRef) {.async: (raises: []).} =
|
||||
let
|
||||
session = request.session
|
||||
connection = request.connection
|
||||
|
@ -763,7 +777,8 @@ proc releaseConnection(request: HttpClientRequestRef) {.async.} =
|
|||
if HttpClientConnectionFlag.Response notin connection.flags:
|
||||
await session.releaseConnection(connection)
|
||||
|
||||
proc releaseConnection(response: HttpClientResponseRef) {.async.} =
|
||||
proc releaseConnection(response: HttpClientResponseRef) {.
|
||||
async: (raises: []).} =
|
||||
let
|
||||
session = response.session
|
||||
connection = response.connection
|
||||
|
@ -775,7 +790,7 @@ proc releaseConnection(response: HttpClientResponseRef) {.async.} =
|
|||
if HttpClientConnectionFlag.Request notin connection.flags:
|
||||
await session.releaseConnection(connection)
|
||||
|
||||
proc closeWait*(session: HttpSessionRef) {.async.} =
|
||||
proc closeWait*(session: HttpSessionRef) {.async: (raises: []).} =
|
||||
## Closes HTTP session object.
|
||||
##
|
||||
## This closes all the connections opened to remote servers.
|
||||
|
@ -788,7 +803,7 @@ proc closeWait*(session: HttpSessionRef) {.async.} =
|
|||
pending.add(closeWait(conn))
|
||||
await noCancel(allFutures(pending))
|
||||
|
||||
proc sessionWatcher(session: HttpSessionRef) {.async.} =
|
||||
proc sessionWatcher(session: HttpSessionRef) {.async: (raises: []).} =
|
||||
while true:
|
||||
let firstBreak =
|
||||
try:
|
||||
|
@ -819,18 +834,19 @@ proc sessionWatcher(session: HttpSessionRef) {.async.} =
|
|||
var pending: seq[Future[void]]
|
||||
let secondBreak =
|
||||
try:
|
||||
pending = idleConnections.mapIt(it.closeWait())
|
||||
for conn in idleConnections:
|
||||
pending.add(conn.closeWait())
|
||||
await allFutures(pending)
|
||||
false
|
||||
except CancelledError:
|
||||
# We still want to close connections to avoid socket leaks.
|
||||
await allFutures(pending)
|
||||
await noCancel(allFutures(pending))
|
||||
true
|
||||
|
||||
if secondBreak:
|
||||
break
|
||||
|
||||
proc closeWait*(request: HttpClientRequestRef) {.async.} =
|
||||
proc closeWait*(request: HttpClientRequestRef) {.async: (raises: []).} =
|
||||
var pending: seq[FutureBase]
|
||||
if request.state notin {HttpReqRespState.Closing, HttpReqRespState.Closed}:
|
||||
request.state = HttpReqRespState.Closing
|
||||
|
@ -845,7 +861,7 @@ proc closeWait*(request: HttpClientRequestRef) {.async.} =
|
|||
request.state = HttpReqRespState.Closed
|
||||
untrackCounter(HttpClientRequestTrackerName)
|
||||
|
||||
proc closeWait*(response: HttpClientResponseRef) {.async.} =
|
||||
proc closeWait*(response: HttpClientResponseRef) {.async: (raises: []).} =
|
||||
var pending: seq[FutureBase]
|
||||
if response.state notin {HttpReqRespState.Closing, HttpReqRespState.Closed}:
|
||||
response.state = HttpReqRespState.Closing
|
||||
|
@ -860,8 +876,10 @@ proc closeWait*(response: HttpClientResponseRef) {.async.} =
|
|||
response.state = HttpReqRespState.Closed
|
||||
untrackCounter(HttpClientResponseTrackerName)
|
||||
|
||||
proc prepareResponse(request: HttpClientRequestRef, data: openArray[byte]
|
||||
): HttpResult[HttpClientResponseRef] {.raises: [] .} =
|
||||
proc prepareResponse(
|
||||
request: HttpClientRequestRef,
|
||||
data: openArray[byte]
|
||||
): HttpResult[HttpClientResponseRef] =
|
||||
## Process response headers.
|
||||
let resp = parseResponse(data, false)
|
||||
if resp.failed():
|
||||
|
@ -972,7 +990,7 @@ proc prepareResponse(request: HttpClientRequestRef, data: openArray[byte]
|
|||
ok(res)
|
||||
|
||||
proc getResponse(req: HttpClientRequestRef): Future[HttpClientResponseRef] {.
|
||||
async.} =
|
||||
async: (raises: [CancelledError, HttpError]).} =
|
||||
var buffer: array[HttpMaxHeadersSize, byte]
|
||||
let timestamp = Moment.now()
|
||||
req.connection.setTimestamp(timestamp)
|
||||
|
@ -999,8 +1017,7 @@ proc new*(t: typedesc[HttpClientRequestRef], session: HttpSessionRef,
|
|||
version: HttpVersion = HttpVersion11,
|
||||
flags: set[HttpClientRequestFlag] = {},
|
||||
headers: openArray[HttpHeaderTuple] = [],
|
||||
body: openArray[byte] = []): HttpClientRequestRef {.
|
||||
raises: [].} =
|
||||
body: openArray[byte] = []): HttpClientRequestRef =
|
||||
let res = HttpClientRequestRef(
|
||||
state: HttpReqRespState.Ready, session: session, meth: meth,
|
||||
version: version, flags: flags, headers: HttpTable.init(headers),
|
||||
|
@ -1014,8 +1031,7 @@ proc new*(t: typedesc[HttpClientRequestRef], session: HttpSessionRef,
|
|||
version: HttpVersion = HttpVersion11,
|
||||
flags: set[HttpClientRequestFlag] = {},
|
||||
headers: openArray[HttpHeaderTuple] = [],
|
||||
body: openArray[byte] = []): HttpResult[HttpClientRequestRef] {.
|
||||
raises: [].} =
|
||||
body: openArray[byte] = []): HttpResult[HttpClientRequestRef] =
|
||||
let address = ? session.getAddress(parseUri(url))
|
||||
let res = HttpClientRequestRef(
|
||||
state: HttpReqRespState.Ready, session: session, meth: meth,
|
||||
|
@ -1029,14 +1045,14 @@ proc get*(t: typedesc[HttpClientRequestRef], session: HttpSessionRef,
|
|||
url: string, version: HttpVersion = HttpVersion11,
|
||||
flags: set[HttpClientRequestFlag] = {},
|
||||
headers: openArray[HttpHeaderTuple] = []
|
||||
): HttpResult[HttpClientRequestRef] {.raises: [].} =
|
||||
): HttpResult[HttpClientRequestRef] =
|
||||
HttpClientRequestRef.new(session, url, MethodGet, version, flags, headers)
|
||||
|
||||
proc get*(t: typedesc[HttpClientRequestRef], session: HttpSessionRef,
|
||||
ha: HttpAddress, version: HttpVersion = HttpVersion11,
|
||||
flags: set[HttpClientRequestFlag] = {},
|
||||
headers: openArray[HttpHeaderTuple] = []
|
||||
): HttpClientRequestRef {.raises: [].} =
|
||||
): HttpClientRequestRef =
|
||||
HttpClientRequestRef.new(session, ha, MethodGet, version, flags, headers)
|
||||
|
||||
proc post*(t: typedesc[HttpClientRequestRef], session: HttpSessionRef,
|
||||
|
@ -1044,7 +1060,7 @@ proc post*(t: typedesc[HttpClientRequestRef], session: HttpSessionRef,
|
|||
flags: set[HttpClientRequestFlag] = {},
|
||||
headers: openArray[HttpHeaderTuple] = [],
|
||||
body: openArray[byte] = []
|
||||
): HttpResult[HttpClientRequestRef] {.raises: [].} =
|
||||
): HttpResult[HttpClientRequestRef] =
|
||||
HttpClientRequestRef.new(session, url, MethodPost, version, flags, headers,
|
||||
body)
|
||||
|
||||
|
@ -1052,8 +1068,7 @@ proc post*(t: typedesc[HttpClientRequestRef], session: HttpSessionRef,
|
|||
url: string, version: HttpVersion = HttpVersion11,
|
||||
flags: set[HttpClientRequestFlag] = {},
|
||||
headers: openArray[HttpHeaderTuple] = [],
|
||||
body: openArray[char] = []): HttpResult[HttpClientRequestRef] {.
|
||||
raises: [].} =
|
||||
body: openArray[char] = []): HttpResult[HttpClientRequestRef] =
|
||||
HttpClientRequestRef.new(session, url, MethodPost, version, flags, headers,
|
||||
body.toOpenArrayByte(0, len(body) - 1))
|
||||
|
||||
|
@ -1061,8 +1076,7 @@ proc post*(t: typedesc[HttpClientRequestRef], session: HttpSessionRef,
|
|||
ha: HttpAddress, version: HttpVersion = HttpVersion11,
|
||||
flags: set[HttpClientRequestFlag] = {},
|
||||
headers: openArray[HttpHeaderTuple] = [],
|
||||
body: openArray[byte] = []): HttpClientRequestRef {.
|
||||
raises: [].} =
|
||||
body: openArray[byte] = []): HttpClientRequestRef =
|
||||
HttpClientRequestRef.new(session, ha, MethodPost, version, flags, headers,
|
||||
body)
|
||||
|
||||
|
@ -1070,13 +1084,11 @@ proc post*(t: typedesc[HttpClientRequestRef], session: HttpSessionRef,
|
|||
ha: HttpAddress, version: HttpVersion = HttpVersion11,
|
||||
flags: set[HttpClientRequestFlag] = {},
|
||||
headers: openArray[HttpHeaderTuple] = [],
|
||||
body: openArray[char] = []): HttpClientRequestRef {.
|
||||
raises: [].} =
|
||||
body: openArray[char] = []): HttpClientRequestRef =
|
||||
HttpClientRequestRef.new(session, ha, MethodPost, version, flags, headers,
|
||||
body.toOpenArrayByte(0, len(body) - 1))
|
||||
|
||||
proc prepareRequest(request: HttpClientRequestRef): string {.
|
||||
raises: [].} =
|
||||
proc prepareRequest(request: HttpClientRequestRef): string =
|
||||
template hasChunkedEncoding(request: HttpClientRequestRef): bool =
|
||||
toLowerAscii(request.headers.getString(TransferEncodingHeader)) == "chunked"
|
||||
|
||||
|
@ -1151,7 +1163,7 @@ proc prepareRequest(request: HttpClientRequestRef): string {.
|
|||
res
|
||||
|
||||
proc send*(request: HttpClientRequestRef): Future[HttpClientResponseRef] {.
|
||||
async.} =
|
||||
async: (raises: [CancelledError, HttpError]).} =
|
||||
doAssert(request.state == HttpReqRespState.Ready,
|
||||
"Request's state is " & $request.state)
|
||||
let connection =
|
||||
|
@ -1190,19 +1202,17 @@ proc send*(request: HttpClientRequestRef): Future[HttpClientResponseRef] {.
|
|||
request.setError(error)
|
||||
raise error
|
||||
|
||||
let resp =
|
||||
try:
|
||||
await request.getResponse()
|
||||
except CancelledError as exc:
|
||||
request.setError(newHttpInterruptError())
|
||||
raise exc
|
||||
except HttpError as exc:
|
||||
request.setError(exc)
|
||||
raise exc
|
||||
return resp
|
||||
try:
|
||||
await request.getResponse()
|
||||
except CancelledError as exc:
|
||||
request.setError(newHttpInterruptError())
|
||||
raise exc
|
||||
except HttpError as exc:
|
||||
request.setError(exc)
|
||||
raise exc
|
||||
|
||||
proc open*(request: HttpClientRequestRef): Future[HttpBodyWriter] {.
|
||||
async.} =
|
||||
async: (raises: [CancelledError, HttpError]).} =
|
||||
## Start sending request's headers and return `HttpBodyWriter`, which can be
|
||||
## used to send request's body.
|
||||
doAssert(request.state == HttpReqRespState.Ready,
|
||||
|
@ -1258,7 +1268,7 @@ proc open*(request: HttpClientRequestRef): Future[HttpBodyWriter] {.
|
|||
return writer
|
||||
|
||||
proc finish*(request: HttpClientRequestRef): Future[HttpClientResponseRef] {.
|
||||
async.} =
|
||||
async: (raises: [CancelledError, HttpError]).} =
|
||||
## Finish sending request and receive response.
|
||||
doAssert(not(isNil(request.connection)),
|
||||
"Request missing connection instance")
|
||||
|
@ -1295,7 +1305,8 @@ proc getNewLocation*(resp: HttpClientResponseRef): HttpResult[HttpAddress] =
|
|||
else:
|
||||
err("Location header is missing")
|
||||
|
||||
proc getBodyReader*(response: HttpClientResponseRef): HttpBodyReader =
|
||||
proc getBodyReader*(response: HttpClientResponseRef): HttpBodyReader {.
|
||||
raises: [HttpUseClosedError].} =
|
||||
## Returns stream's reader instance which can be used to read response's body.
|
||||
##
|
||||
## Streams which was obtained using this procedure must be closed to avoid
|
||||
|
@ -1324,7 +1335,8 @@ proc getBodyReader*(response: HttpClientResponseRef): HttpBodyReader =
|
|||
response.reader = reader
|
||||
response.reader
|
||||
|
||||
proc finish*(response: HttpClientResponseRef) {.async.} =
|
||||
proc finish*(response: HttpClientResponseRef) {.
|
||||
async: (raises: [HttpUseClosedError]).} =
|
||||
## Finish receiving response.
|
||||
##
|
||||
## Because ``finish()`` returns nothing, this operation become NOP for
|
||||
|
@ -1343,7 +1355,7 @@ proc finish*(response: HttpClientResponseRef) {.async.} =
|
|||
response.setDuration()
|
||||
|
||||
proc getBodyBytes*(response: HttpClientResponseRef): Future[seq[byte]] {.
|
||||
async.} =
|
||||
async: (raises: [CancelledError, HttpError]).} =
|
||||
## Read all bytes from response ``response``.
|
||||
##
|
||||
## Note: This procedure performs automatic finishing for ``response``.
|
||||
|
@ -1353,7 +1365,7 @@ proc getBodyBytes*(response: HttpClientResponseRef): Future[seq[byte]] {.
|
|||
await reader.closeWait()
|
||||
reader = nil
|
||||
await response.finish()
|
||||
return data
|
||||
data
|
||||
except CancelledError as exc:
|
||||
if not(isNil(reader)):
|
||||
await reader.closeWait()
|
||||
|
@ -1367,7 +1379,8 @@ proc getBodyBytes*(response: HttpClientResponseRef): Future[seq[byte]] {.
|
|||
raise error
|
||||
|
||||
proc getBodyBytes*(response: HttpClientResponseRef,
|
||||
nbytes: int): Future[seq[byte]] {.async.} =
|
||||
nbytes: int): Future[seq[byte]] {.
|
||||
async: (raises: [CancelledError, HttpError]).} =
|
||||
## Read all bytes (nbytes <= 0) or exactly `nbytes` bytes from response
|
||||
## ``response``.
|
||||
##
|
||||
|
@ -1378,7 +1391,7 @@ proc getBodyBytes*(response: HttpClientResponseRef,
|
|||
await reader.closeWait()
|
||||
reader = nil
|
||||
await response.finish()
|
||||
return data
|
||||
data
|
||||
except CancelledError as exc:
|
||||
if not(isNil(reader)):
|
||||
await reader.closeWait()
|
||||
|
@ -1391,7 +1404,8 @@ proc getBodyBytes*(response: HttpClientResponseRef,
|
|||
response.setError(error)
|
||||
raise error
|
||||
|
||||
proc consumeBody*(response: HttpClientResponseRef): Future[int] {.async.} =
|
||||
proc consumeBody*(response: HttpClientResponseRef): Future[int] {.
|
||||
async: (raises: [CancelledError, HttpError]).} =
|
||||
## Consume/discard response and return number of bytes consumed.
|
||||
##
|
||||
## Note: This procedure performs automatic finishing for ``response``.
|
||||
|
@ -1401,7 +1415,7 @@ proc consumeBody*(response: HttpClientResponseRef): Future[int] {.async.} =
|
|||
await reader.closeWait()
|
||||
reader = nil
|
||||
await response.finish()
|
||||
return res
|
||||
res
|
||||
except CancelledError as exc:
|
||||
if not(isNil(reader)):
|
||||
await reader.closeWait()
|
||||
|
@ -1460,7 +1474,7 @@ proc redirect*(request: HttpClientRequestRef,
|
|||
ok(res)
|
||||
|
||||
proc fetch*(request: HttpClientRequestRef): Future[HttpResponseTuple] {.
|
||||
async.} =
|
||||
async: (raises: [CancelledError, HttpError]).} =
|
||||
var response: HttpClientResponseRef
|
||||
try:
|
||||
response = await request.send()
|
||||
|
@ -1468,7 +1482,7 @@ proc fetch*(request: HttpClientRequestRef): Future[HttpResponseTuple] {.
|
|||
let status = response.status
|
||||
await response.closeWait()
|
||||
response = nil
|
||||
return (status, buffer)
|
||||
(status, buffer)
|
||||
except HttpError as exc:
|
||||
if not(isNil(response)): await response.closeWait()
|
||||
raise exc
|
||||
|
@ -1477,7 +1491,7 @@ proc fetch*(request: HttpClientRequestRef): Future[HttpResponseTuple] {.
|
|||
raise exc
|
||||
|
||||
proc fetch*(session: HttpSessionRef, url: Uri): Future[HttpResponseTuple] {.
|
||||
async.} =
|
||||
async: (raises: [CancelledError, HttpError]).} =
|
||||
## Fetch resource pointed by ``url`` using HTTP GET method and ``session``
|
||||
## parameters.
|
||||
##
|
||||
|
@ -1519,28 +1533,34 @@ proc fetch*(session: HttpSessionRef, url: Uri): Future[HttpResponseTuple] {.
|
|||
request = redirect
|
||||
redirect = nil
|
||||
else:
|
||||
let data = await response.getBodyBytes()
|
||||
let code = response.status
|
||||
let
|
||||
data = await response.getBodyBytes()
|
||||
code = response.status
|
||||
await response.closeWait()
|
||||
response = nil
|
||||
await request.closeWait()
|
||||
request = nil
|
||||
return (code, data)
|
||||
except CancelledError as exc:
|
||||
if not(isNil(response)): await closeWait(response)
|
||||
if not(isNil(request)): await closeWait(request)
|
||||
if not(isNil(redirect)): await closeWait(redirect)
|
||||
var pending: seq[Future[void]]
|
||||
if not(isNil(response)): pending.add(closeWait(response))
|
||||
if not(isNil(request)): pending.add(closeWait(request))
|
||||
if not(isNil(redirect)): pending.add(closeWait(redirect))
|
||||
await noCancel(allFutures(pending))
|
||||
raise exc
|
||||
except HttpError as exc:
|
||||
if not(isNil(response)): await closeWait(response)
|
||||
if not(isNil(request)): await closeWait(request)
|
||||
if not(isNil(redirect)): await closeWait(redirect)
|
||||
var pending: seq[Future[void]]
|
||||
if not(isNil(response)): pending.add(closeWait(response))
|
||||
if not(isNil(request)): pending.add(closeWait(request))
|
||||
if not(isNil(redirect)): pending.add(closeWait(redirect))
|
||||
await noCancel(allFutures(pending))
|
||||
raise exc
|
||||
|
||||
proc getServerSentEvents*(
|
||||
response: HttpClientResponseRef,
|
||||
maxEventSize: int = -1
|
||||
): Future[seq[ServerSentEvent]] {.async.} =
|
||||
): Future[seq[ServerSentEvent]] {.
|
||||
async: (raises: [CancelledError, HttpError]).} =
|
||||
## Read number of server-sent events (SSE) from HTTP response ``response``.
|
||||
##
|
||||
## ``maxEventSize`` - maximum size of events chunk in one message, use
|
||||
|
@ -1628,7 +1648,13 @@ proc getServerSentEvents*(
|
|||
|
||||
(i, false)
|
||||
|
||||
await reader.readMessage(predicate)
|
||||
try:
|
||||
await reader.readMessage(predicate)
|
||||
except CancelledError as exc:
|
||||
raise exc
|
||||
except AsyncStreamError as exc:
|
||||
raiseHttpReadError($exc.msg)
|
||||
|
||||
if not isNil(error):
|
||||
raise error
|
||||
else:
|
||||
|
|
|
@ -6,6 +6,9 @@
|
|||
# Licensed under either of
|
||||
# Apache License, version 2.0, (LICENSE-APACHEv2)
|
||||
# MIT license (LICENSE-MIT)
|
||||
|
||||
{.push raises: [].}
|
||||
|
||||
import std/[strutils, uri]
|
||||
import results, httputils
|
||||
import ../../asyncloop, ../../asyncsync
|
||||
|
@ -124,35 +127,44 @@ func toString*(error: HttpAddressErrorType): string =
|
|||
of HttpAddressErrorType.NoAddressResolved:
|
||||
"No address has been resolved"
|
||||
|
||||
proc raiseHttpCriticalError*(msg: string,
|
||||
code = Http400) {.noinline, noreturn.} =
|
||||
proc raiseHttpCriticalError*(msg: string, code = Http400) {.
|
||||
noinline, noreturn, raises: [HttpCriticalError].} =
|
||||
raise (ref HttpCriticalError)(code: code, msg: msg)
|
||||
|
||||
proc raiseHttpDisconnectError*() {.noinline, noreturn.} =
|
||||
proc raiseHttpDisconnectError*() {.
|
||||
noinline, noreturn, raises: [HttpDisconnectError].} =
|
||||
raise (ref HttpDisconnectError)(msg: "Remote peer disconnected")
|
||||
|
||||
proc raiseHttpDefect*(msg: string) {.noinline, noreturn.} =
|
||||
proc raiseHttpDefect*(msg: string) {.
|
||||
noinline, noreturn, raises: [].} =
|
||||
raise (ref HttpDefect)(msg: msg)
|
||||
|
||||
proc raiseHttpConnectionError*(msg: string) {.noinline, noreturn.} =
|
||||
proc raiseHttpConnectionError*(msg: string) {.
|
||||
noinline, noreturn, raises: [HttpConnectionError].} =
|
||||
raise (ref HttpConnectionError)(msg: msg)
|
||||
|
||||
proc raiseHttpInterruptError*() {.noinline, noreturn.} =
|
||||
proc raiseHttpInterruptError*() {.
|
||||
noinline, noreturn, raises: [HttpInterruptError].} =
|
||||
raise (ref HttpInterruptError)(msg: "Connection was interrupted")
|
||||
|
||||
proc raiseHttpReadError*(msg: string) {.noinline, noreturn.} =
|
||||
proc raiseHttpReadError*(msg: string) {.
|
||||
noinline, noreturn, raises: [HttpReadError].} =
|
||||
raise (ref HttpReadError)(msg: msg)
|
||||
|
||||
proc raiseHttpProtocolError*(msg: string) {.noinline, noreturn.} =
|
||||
proc raiseHttpProtocolError*(msg: string) {.
|
||||
noinline, noreturn, raises: [HttpProtocolError].} =
|
||||
raise (ref HttpProtocolError)(msg: msg)
|
||||
|
||||
proc raiseHttpWriteError*(msg: string) {.noinline, noreturn.} =
|
||||
proc raiseHttpWriteError*(msg: string) {.
|
||||
noinline, noreturn, raises: [HttpWriteError].} =
|
||||
raise (ref HttpWriteError)(msg: msg)
|
||||
|
||||
proc raiseHttpRedirectError*(msg: string) {.noinline, noreturn.} =
|
||||
proc raiseHttpRedirectError*(msg: string) {.
|
||||
noinline, noreturn, raises: [HttpRedirectError].} =
|
||||
raise (ref HttpRedirectError)(msg: msg)
|
||||
|
||||
proc raiseHttpAddressError*(msg: string) {.noinline, noreturn.} =
|
||||
proc raiseHttpAddressError*(msg: string) {.
|
||||
noinline, noreturn, raises: [HttpAddressError].} =
|
||||
raise (ref HttpAddressError)(msg: msg)
|
||||
|
||||
template newHttpInterruptError*(): ref HttpInterruptError =
|
||||
|
@ -168,8 +180,7 @@ template newHttpUseClosedError*(): ref HttpUseClosedError =
|
|||
newException(HttpUseClosedError, "Connection was already closed")
|
||||
|
||||
iterator queryParams*(query: string,
|
||||
flags: set[QueryParamsFlag] = {}): KeyValueTuple {.
|
||||
raises: [].} =
|
||||
flags: set[QueryParamsFlag] = {}): KeyValueTuple =
|
||||
## Iterate over url-encoded query string.
|
||||
for pair in query.split('&'):
|
||||
let items = pair.split('=', maxsplit = 1)
|
||||
|
@ -182,9 +193,9 @@ iterator queryParams*(query: string,
|
|||
else:
|
||||
yield (decodeUrl(k), decodeUrl(v))
|
||||
|
||||
func getTransferEncoding*(ch: openArray[string]): HttpResult[
|
||||
set[TransferEncodingFlags]] {.
|
||||
raises: [].} =
|
||||
func getTransferEncoding*(
|
||||
ch: openArray[string]
|
||||
): HttpResult[set[TransferEncodingFlags]] =
|
||||
## Parse value of multiple HTTP headers ``Transfer-Encoding`` and return
|
||||
## it as set of ``TransferEncodingFlags``.
|
||||
var res: set[TransferEncodingFlags] = {}
|
||||
|
@ -213,9 +224,9 @@ func getTransferEncoding*(ch: openArray[string]): HttpResult[
|
|||
return err("Incorrect Transfer-Encoding value")
|
||||
ok(res)
|
||||
|
||||
func getContentEncoding*(ch: openArray[string]): HttpResult[
|
||||
set[ContentEncodingFlags]] {.
|
||||
raises: [].} =
|
||||
func getContentEncoding*(
|
||||
ch: openArray[string]
|
||||
): HttpResult[set[ContentEncodingFlags]] =
|
||||
## Parse value of multiple HTTP headers ``Content-Encoding`` and return
|
||||
## it as set of ``ContentEncodingFlags``.
|
||||
var res: set[ContentEncodingFlags] = {}
|
||||
|
@ -244,8 +255,7 @@ func getContentEncoding*(ch: openArray[string]): HttpResult[
|
|||
return err("Incorrect Content-Encoding value")
|
||||
ok(res)
|
||||
|
||||
func getContentType*(ch: openArray[string]): HttpResult[ContentTypeData] {.
|
||||
raises: [].} =
|
||||
func getContentType*(ch: openArray[string]): HttpResult[ContentTypeData] =
|
||||
## Check and prepare value of ``Content-Type`` header.
|
||||
if len(ch) == 0:
|
||||
err("No Content-Type values found")
|
||||
|
|
|
@ -6,6 +6,9 @@
|
|||
# Licensed under either of
|
||||
# Apache License, version 2.0, (LICENSE-APACHEv2)
|
||||
# MIT license (LICENSE-MIT)
|
||||
|
||||
{.push raises: [].}
|
||||
|
||||
import std/tables
|
||||
import results
|
||||
import ../../timer
|
||||
|
@ -16,8 +19,6 @@ from ../../osdefs import SocketHandle
|
|||
from ../../transports/common import TransportAddress, ServerFlags
|
||||
export HttpClientScheme, SocketHandle, TransportAddress, ServerFlags, HttpState
|
||||
|
||||
{.push raises: [].}
|
||||
|
||||
type
|
||||
ConnectionType* {.pure.} = enum
|
||||
NonSecure, Secure
|
||||
|
|
|
@ -6,6 +6,9 @@
|
|||
# Licensed under either of
|
||||
# Apache License, version 2.0, (LICENSE-APACHEv2)
|
||||
# MIT license (LICENSE-MIT)
|
||||
|
||||
{.push raises: [].}
|
||||
|
||||
import std/[tables, uri, strutils]
|
||||
import stew/[base10], httputils, results
|
||||
import ../../asyncloop, ../../asyncsync
|
||||
|
@ -71,7 +74,7 @@ type
|
|||
|
||||
HttpCloseConnectionCallback* =
|
||||
proc(connection: HttpConnectionRef): Future[void] {.
|
||||
gcsafe, raises: [].}
|
||||
async: (raises: []), gcsafe.}
|
||||
|
||||
HttpConnectionHolder* = object of RootObj
|
||||
connection*: HttpConnectionRef
|
||||
|
@ -94,7 +97,7 @@ type
|
|||
flags*: set[HttpServerFlags]
|
||||
socketFlags*: set[ServerFlags]
|
||||
connections*: OrderedTable[string, HttpConnectionHolderRef]
|
||||
acceptLoop*: Future[void]
|
||||
acceptLoop*: Future[void].Raising([])
|
||||
lifetime*: Future[void]
|
||||
headersTimeout*: Duration
|
||||
bufferSize*: int
|
||||
|
@ -157,13 +160,11 @@ type
|
|||
|
||||
proc init(htype: typedesc[HttpProcessError], error: HttpServerError,
|
||||
exc: ref CatchableError, remote: Opt[TransportAddress],
|
||||
code: HttpCode): HttpProcessError {.
|
||||
raises: [].} =
|
||||
code: HttpCode): HttpProcessError =
|
||||
HttpProcessError(kind: error, exc: exc, remote: remote, code: code)
|
||||
|
||||
proc init(htype: typedesc[HttpProcessError],
|
||||
error: HttpServerError): HttpProcessError {.
|
||||
raises: [].} =
|
||||
error: HttpServerError): HttpProcessError =
|
||||
HttpProcessError(kind: error)
|
||||
|
||||
proc new(htype: typedesc[HttpConnectionHolderRef], server: HttpServerRef,
|
||||
|
@ -176,8 +177,8 @@ proc new(htype: typedesc[HttpConnectionHolderRef], server: HttpServerRef,
|
|||
proc error*(e: HttpProcessError): HttpServerError = e.kind
|
||||
|
||||
proc createConnection(server: HttpServerRef,
|
||||
transp: StreamTransport): Future[HttpConnectionRef] {.
|
||||
gcsafe.}
|
||||
transp: StreamTransport): Future[HttpConnectionRef] {.
|
||||
async: (raises: [CancelledError, HttpConnectionError]).}
|
||||
|
||||
proc new*(htype: typedesc[HttpServerRef],
|
||||
address: TransportAddress,
|
||||
|
@ -192,8 +193,7 @@ proc new*(htype: typedesc[HttpServerRef],
|
|||
httpHeadersTimeout = 10.seconds,
|
||||
maxHeadersSize: int = 8192,
|
||||
maxRequestBodySize: int = 1_048_576,
|
||||
dualstack = DualStackType.Auto): HttpResult[HttpServerRef] {.
|
||||
raises: [].} =
|
||||
dualstack = DualStackType.Auto): HttpResult[HttpServerRef] =
|
||||
|
||||
let serverUri =
|
||||
if len(serverUri.hostname) > 0:
|
||||
|
@ -210,8 +210,6 @@ proc new*(htype: typedesc[HttpServerRef],
|
|||
backlog = backlogSize, dualstack = dualstack)
|
||||
except TransportOsError as exc:
|
||||
return err(exc.msg)
|
||||
except CatchableError as exc:
|
||||
return err(exc.msg)
|
||||
|
||||
var res = HttpServerRef(
|
||||
address: serverInstance.localAddress(),
|
||||
|
@ -259,13 +257,13 @@ proc getResponseFlags(req: HttpRequestRef): set[HttpResponseFlags] =
|
|||
else:
|
||||
defaultFlags
|
||||
|
||||
proc getResponseVersion(reqFence: RequestFence): HttpVersion {.raises: [].} =
|
||||
proc getResponseVersion(reqFence: RequestFence): HttpVersion =
|
||||
if reqFence.isErr():
|
||||
HttpVersion11
|
||||
else:
|
||||
reqFence.get().version
|
||||
|
||||
proc getResponse*(req: HttpRequestRef): HttpResponseRef {.raises: [].} =
|
||||
proc getResponse*(req: HttpRequestRef): HttpResponseRef =
|
||||
if req.response.isNone():
|
||||
var resp = HttpResponseRef(
|
||||
status: Http200,
|
||||
|
@ -286,30 +284,29 @@ proc getHostname*(server: HttpServerRef): string =
|
|||
else:
|
||||
server.baseUri.hostname
|
||||
|
||||
proc defaultResponse*(): HttpResponseRef {.raises: [].} =
|
||||
proc defaultResponse*(): HttpResponseRef =
|
||||
## Create an empty response to return when request processor got no request.
|
||||
HttpResponseRef(state: HttpResponseState.Default, version: HttpVersion11)
|
||||
|
||||
proc dumbResponse*(): HttpResponseRef {.raises: [],
|
||||
proc dumbResponse*(): HttpResponseRef {.
|
||||
deprecated: "Please use defaultResponse() instead".} =
|
||||
## Create an empty response to return when request processor got no request.
|
||||
defaultResponse()
|
||||
|
||||
proc getId(transp: StreamTransport): Result[string, string] {.inline.} =
|
||||
proc getId(transp: StreamTransport): Result[string, string] {.inline.} =
|
||||
## Returns string unique transport's identifier as string.
|
||||
try:
|
||||
ok($transp.remoteAddress() & "_" & $transp.localAddress())
|
||||
except TransportOsError as exc:
|
||||
err($exc.msg)
|
||||
|
||||
proc hasBody*(request: HttpRequestRef): bool {.raises: [].} =
|
||||
proc hasBody*(request: HttpRequestRef): bool =
|
||||
## Returns ``true`` if request has body.
|
||||
request.requestFlags * {HttpRequestFlags.BoundBody,
|
||||
HttpRequestFlags.UnboundBody} != {}
|
||||
|
||||
proc prepareRequest(conn: HttpConnectionRef,
|
||||
req: HttpRequestHeader): HttpResultCode[HttpRequestRef] {.
|
||||
raises: [].}=
|
||||
req: HttpRequestHeader): HttpResultCode[HttpRequestRef] =
|
||||
var request = HttpRequestRef(connection: conn, state: HttpState.Alive)
|
||||
|
||||
if req.version notin {HttpVersion10, HttpVersion11}:
|
||||
|
@ -450,7 +447,8 @@ proc getBodyReader*(request: HttpRequestRef): HttpResult[HttpBodyReader] =
|
|||
else:
|
||||
err("Request do not have body available")
|
||||
|
||||
proc handleExpect*(request: HttpRequestRef) {.async.} =
|
||||
proc handleExpect*(request: HttpRequestRef) {.
|
||||
async: (raises: [CancelledError, HttpError]).} =
|
||||
## Handle expectation for ``Expect`` header.
|
||||
## https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Expect
|
||||
if HttpServerFlags.NoExpectHandler notin request.connection.server.flags:
|
||||
|
@ -461,10 +459,11 @@ proc handleExpect*(request: HttpRequestRef) {.async.} =
|
|||
await request.connection.writer.write(message)
|
||||
except CancelledError as exc:
|
||||
raise exc
|
||||
except AsyncStreamWriteError, AsyncStreamIncompleteError:
|
||||
except AsyncStreamError:
|
||||
raiseHttpCriticalError("Unable to send `100-continue` response")
|
||||
|
||||
proc getBody*(request: HttpRequestRef): Future[seq[byte]] {.async.} =
|
||||
proc getBody*(request: HttpRequestRef): Future[seq[byte]] {.
|
||||
async: (raises: [CancelledError, HttpError]).} =
|
||||
## Obtain request's body as sequence of bytes.
|
||||
let bodyReader = request.getBodyReader()
|
||||
if bodyReader.isErr():
|
||||
|
@ -486,12 +485,18 @@ proc getBody*(request: HttpRequestRef): Future[seq[byte]] {.async.} =
|
|||
if not(isNil(reader)):
|
||||
await reader.closeWait()
|
||||
raise exc
|
||||
except AsyncStreamError:
|
||||
except HttpError as exc:
|
||||
if not(isNil(reader)):
|
||||
await reader.closeWait()
|
||||
raiseHttpCriticalError("Unable to read request's body")
|
||||
raise exc
|
||||
except AsyncStreamError as exc:
|
||||
let msg = "Unable to read request's body, reason: " & $exc.msg
|
||||
if not(isNil(reader)):
|
||||
await reader.closeWait()
|
||||
raiseHttpCriticalError(msg)
|
||||
|
||||
proc consumeBody*(request: HttpRequestRef): Future[void] {.async.} =
|
||||
proc consumeBody*(request: HttpRequestRef): Future[void] {.
|
||||
async: (raises: [CancelledError, HttpError]).} =
|
||||
## Consume/discard request's body.
|
||||
let bodyReader = request.getBodyReader()
|
||||
if bodyReader.isErr():
|
||||
|
@ -513,10 +518,15 @@ proc consumeBody*(request: HttpRequestRef): Future[void] {.async.} =
|
|||
if not(isNil(reader)):
|
||||
await reader.closeWait()
|
||||
raise exc
|
||||
except AsyncStreamError:
|
||||
except HttpError as exc:
|
||||
if not(isNil(reader)):
|
||||
await reader.closeWait()
|
||||
raiseHttpCriticalError("Unable to read request's body")
|
||||
raise exc
|
||||
except AsyncStreamError as exc:
|
||||
let msg = "Unable to consume request's body, reason: " & $exc.msg
|
||||
if not(isNil(reader)):
|
||||
await reader.closeWait()
|
||||
raiseHttpCriticalError(msg)
|
||||
|
||||
proc getAcceptInfo*(request: HttpRequestRef): Result[AcceptInfo, cstring] =
|
||||
## Returns value of `Accept` header as `AcceptInfo` object.
|
||||
|
@ -636,7 +646,8 @@ proc preferredContentType*(request: HttpRequestRef,
|
|||
proc sendErrorResponse(conn: HttpConnectionRef, version: HttpVersion,
|
||||
code: HttpCode, keepAlive = true,
|
||||
datatype = "text/text",
|
||||
databody = "") {.async.} =
|
||||
databody = "") {.
|
||||
async: (raises: [CancelledError]).} =
|
||||
var answer = $version & " " & $code & "\r\n"
|
||||
answer.add(DateHeader)
|
||||
answer.add(": ")
|
||||
|
@ -664,7 +675,7 @@ proc sendErrorResponse(conn: HttpConnectionRef, version: HttpVersion,
|
|||
await conn.writer.write(answer)
|
||||
except CancelledError as exc:
|
||||
raise exc
|
||||
except CatchableError:
|
||||
except AsyncStreamError:
|
||||
# We ignore errors here, because we indicating error already.
|
||||
discard
|
||||
|
||||
|
@ -672,7 +683,7 @@ proc sendErrorResponse(
|
|||
conn: HttpConnectionRef,
|
||||
reqFence: RequestFence,
|
||||
respError: HttpProcessError
|
||||
): Future[HttpProcessExitType] {.async.} =
|
||||
): Future[HttpProcessExitType] {.async: (raises: []).} =
|
||||
let version = getResponseVersion(reqFence)
|
||||
try:
|
||||
if reqFence.isOk():
|
||||
|
@ -694,14 +705,12 @@ proc sendErrorResponse(
|
|||
HttpProcessExitType.Graceful
|
||||
except CancelledError:
|
||||
HttpProcessExitType.Immediate
|
||||
except CatchableError:
|
||||
HttpProcessExitType.Immediate
|
||||
|
||||
proc sendDefaultResponse(
|
||||
conn: HttpConnectionRef,
|
||||
reqFence: RequestFence,
|
||||
response: HttpResponseRef
|
||||
): Future[HttpProcessExitType] {.async.} =
|
||||
): Future[HttpProcessExitType] {.async: (raises: []).} =
|
||||
let
|
||||
version = getResponseVersion(reqFence)
|
||||
keepConnection =
|
||||
|
@ -772,7 +781,8 @@ proc sendDefaultResponse(
|
|||
except CatchableError:
|
||||
HttpProcessExitType.Immediate
|
||||
|
||||
proc getRequest(conn: HttpConnectionRef): Future[HttpRequestRef] {.async.} =
|
||||
proc getRequest(conn: HttpConnectionRef): Future[HttpRequestRef] {.
|
||||
async: (raises: [CancelledError, HttpError]).} =
|
||||
try:
|
||||
conn.buffer.setLen(conn.server.maxHeadersSize)
|
||||
let res = await conn.reader.readUntil(addr conn.buffer[0], len(conn.buffer),
|
||||
|
@ -787,7 +797,7 @@ proc getRequest(conn: HttpConnectionRef): Future[HttpRequestRef] {.async.} =
|
|||
raiseHttpCriticalError("Invalid request received", res.error)
|
||||
else:
|
||||
return res.get()
|
||||
except AsyncStreamIncompleteError, AsyncStreamReadError:
|
||||
except AsyncStreamError:
|
||||
raiseHttpDisconnectError()
|
||||
except AsyncStreamLimitError:
|
||||
raiseHttpCriticalError("Maximum size of request headers reached", Http431)
|
||||
|
@ -803,7 +813,7 @@ proc init*(value: var HttpConnection, server: HttpServerRef,
|
|||
mainWriter: newAsyncStreamWriter(transp)
|
||||
)
|
||||
|
||||
proc closeUnsecureConnection(conn: HttpConnectionRef) {.async.} =
|
||||
proc closeUnsecureConnection(conn: HttpConnectionRef) {.async: (raises: []).} =
|
||||
if conn.state == HttpState.Alive:
|
||||
conn.state = HttpState.Closing
|
||||
var pending: seq[Future[void]]
|
||||
|
@ -826,14 +836,19 @@ proc new(ht: typedesc[HttpConnectionRef], server: HttpServerRef,
|
|||
trackCounter(HttpServerUnsecureConnectionTrackerName)
|
||||
res
|
||||
|
||||
proc gracefulCloseWait*(conn: HttpConnectionRef) {.async.} =
|
||||
await noCancel(conn.transp.shutdownWait())
|
||||
proc gracefulCloseWait*(conn: HttpConnectionRef) {.async: (raises: []).} =
|
||||
try:
|
||||
await noCancel(conn.transp.shutdownWait())
|
||||
except TransportError:
|
||||
# We are closing connection, so ignoring any type of errors here.
|
||||
discard
|
||||
await conn.closeCb(conn)
|
||||
|
||||
proc closeWait*(conn: HttpConnectionRef): Future[void] =
|
||||
proc closeWait*(conn: HttpConnectionRef): Future[void] {.
|
||||
async: (raw: true, raises: []).} =
|
||||
conn.closeCb(conn)
|
||||
|
||||
proc closeWait*(req: HttpRequestRef) {.async.} =
|
||||
proc closeWait*(req: HttpRequestRef) {.async: (raises: []).} =
|
||||
if req.state == HttpState.Alive:
|
||||
if req.response.isSome():
|
||||
req.state = HttpState.Closing
|
||||
|
@ -847,8 +862,8 @@ proc closeWait*(req: HttpRequestRef) {.async.} =
|
|||
|
||||
proc createConnection(server: HttpServerRef,
|
||||
transp: StreamTransport): Future[HttpConnectionRef] {.
|
||||
async.} =
|
||||
return HttpConnectionRef.new(server, transp)
|
||||
async: (raises: [CancelledError, HttpConnectionError]).} =
|
||||
HttpConnectionRef.new(server, transp)
|
||||
|
||||
proc `keepalive=`*(resp: HttpResponseRef, value: bool) =
|
||||
doAssert(resp.state == HttpResponseState.Empty)
|
||||
|
@ -857,25 +872,23 @@ proc `keepalive=`*(resp: HttpResponseRef, value: bool) =
|
|||
else:
|
||||
resp.flags.excl(HttpResponseFlags.KeepAlive)
|
||||
|
||||
proc keepalive*(resp: HttpResponseRef): bool {.raises: [].} =
|
||||
proc keepalive*(resp: HttpResponseRef): bool =
|
||||
HttpResponseFlags.KeepAlive in resp.flags
|
||||
|
||||
proc getRemoteAddress(transp: StreamTransport): Opt[TransportAddress] {.
|
||||
raises: [].} =
|
||||
proc getRemoteAddress(transp: StreamTransport): Opt[TransportAddress] =
|
||||
if isNil(transp): return Opt.none(TransportAddress)
|
||||
try:
|
||||
Opt.some(transp.remoteAddress())
|
||||
except CatchableError:
|
||||
except TransportOsError:
|
||||
Opt.none(TransportAddress)
|
||||
|
||||
proc getRemoteAddress(connection: HttpConnectionRef): Opt[TransportAddress] {.
|
||||
raises: [].} =
|
||||
proc getRemoteAddress(connection: HttpConnectionRef): Opt[TransportAddress] =
|
||||
if isNil(connection): return Opt.none(TransportAddress)
|
||||
getRemoteAddress(connection.transp)
|
||||
|
||||
proc getResponseFence*(connection: HttpConnectionRef,
|
||||
reqFence: RequestFence): Future[ResponseFence] {.
|
||||
async.} =
|
||||
async: (raises: []).} =
|
||||
try:
|
||||
let res = await connection.server.processCallback(reqFence)
|
||||
ResponseFence.ok(res)
|
||||
|
@ -897,7 +910,7 @@ proc getResponseFence*(connection: HttpConnectionRef,
|
|||
|
||||
proc getResponseFence*(server: HttpServerRef,
|
||||
connFence: ConnectionFence): Future[ResponseFence] {.
|
||||
async.} =
|
||||
async: (raises: []).} =
|
||||
doAssert(connFence.isErr())
|
||||
try:
|
||||
let
|
||||
|
@ -922,7 +935,7 @@ proc getResponseFence*(server: HttpServerRef,
|
|||
|
||||
proc getRequestFence*(server: HttpServerRef,
|
||||
connection: HttpConnectionRef): Future[RequestFence] {.
|
||||
async.} =
|
||||
async: (raises: []).} =
|
||||
try:
|
||||
let res =
|
||||
if server.headersTimeout.isInfinite():
|
||||
|
@ -956,7 +969,7 @@ proc getRequestFence*(server: HttpServerRef,
|
|||
|
||||
proc getConnectionFence*(server: HttpServerRef,
|
||||
transp: StreamTransport): Future[ConnectionFence] {.
|
||||
async.} =
|
||||
async: (raises: []).} =
|
||||
try:
|
||||
let res = await server.createConnCallback(server, transp)
|
||||
ConnectionFence.ok(res)
|
||||
|
@ -975,7 +988,8 @@ proc getConnectionFence*(server: HttpServerRef,
|
|||
|
||||
proc processRequest(server: HttpServerRef,
|
||||
connection: HttpConnectionRef,
|
||||
connId: string): Future[HttpProcessExitType] {.async.} =
|
||||
connId: string): Future[HttpProcessExitType] {.
|
||||
async: (raises: []).} =
|
||||
let requestFence = await getRequestFence(server, connection)
|
||||
if requestFence.isErr():
|
||||
case requestFence.error.kind
|
||||
|
@ -1005,7 +1019,7 @@ proc processRequest(server: HttpServerRef,
|
|||
|
||||
res
|
||||
|
||||
proc processLoop(holder: HttpConnectionHolderRef) {.async.} =
|
||||
proc processLoop(holder: HttpConnectionHolderRef) {.async: (raises: []).} =
|
||||
let
|
||||
server = holder.server
|
||||
transp = holder.transp
|
||||
|
@ -1042,7 +1056,7 @@ proc processLoop(holder: HttpConnectionHolderRef) {.async.} =
|
|||
|
||||
server.connections.del(connectionId)
|
||||
|
||||
proc acceptClientLoop(server: HttpServerRef) {.async.} =
|
||||
proc acceptClientLoop(server: HttpServerRef) {.async: (raises: []).} =
|
||||
var runLoop = true
|
||||
while runLoop:
|
||||
try:
|
||||
|
@ -1067,7 +1081,7 @@ proc acceptClientLoop(server: HttpServerRef) {.async.} =
|
|||
# Critical, cancellation or unexpected error
|
||||
runLoop = false
|
||||
|
||||
proc state*(server: HttpServerRef): HttpServerState {.raises: [].} =
|
||||
proc state*(server: HttpServerRef): HttpServerState =
|
||||
## Returns current HTTP server's state.
|
||||
if server.lifetime.finished():
|
||||
ServerClosed
|
||||
|
@ -1085,12 +1099,12 @@ proc start*(server: HttpServerRef) =
|
|||
if server.state == ServerStopped:
|
||||
server.acceptLoop = acceptClientLoop(server)
|
||||
|
||||
proc stop*(server: HttpServerRef) {.async.} =
|
||||
proc stop*(server: HttpServerRef) {.async: (raises: []).} =
|
||||
## Stop HTTP server from accepting new connections.
|
||||
if server.state == ServerRunning:
|
||||
await server.acceptLoop.cancelAndWait()
|
||||
# if server.state == ServerRunning:
|
||||
# await server.acceptLoop.cancelAndWait()
|
||||
|
||||
proc drop*(server: HttpServerRef) {.async.} =
|
||||
proc drop*(server: HttpServerRef) {.async: (raises: []).} =
|
||||
## Drop all pending HTTP connections.
|
||||
var pending: seq[Future[void]]
|
||||
if server.state in {ServerStopped, ServerRunning}:
|
||||
|
@ -1100,7 +1114,7 @@ proc drop*(server: HttpServerRef) {.async.} =
|
|||
await noCancel(allFutures(pending))
|
||||
server.connections.clear()
|
||||
|
||||
proc closeWait*(server: HttpServerRef) {.async.} =
|
||||
proc closeWait*(server: HttpServerRef) {.async: (raises: []).} =
|
||||
## Stop HTTP server and drop all the pending connections.
|
||||
if server.state != ServerClosed:
|
||||
await server.stop()
|
||||
|
@ -1108,7 +1122,8 @@ proc closeWait*(server: HttpServerRef) {.async.} =
|
|||
await server.instance.closeWait()
|
||||
server.lifetime.complete()
|
||||
|
||||
proc join*(server: HttpServerRef): Future[void] =
|
||||
proc join*(server: HttpServerRef): Future[void] {.
|
||||
async: (raw: true, raises: [CancelledError]).} =
|
||||
## Wait until HTTP server will not be closed.
|
||||
var retFuture = newFuture[void]("http.server.join")
|
||||
|
||||
|
@ -1128,8 +1143,7 @@ proc join*(server: HttpServerRef): Future[void] =
|
|||
|
||||
retFuture
|
||||
|
||||
proc getMultipartReader*(req: HttpRequestRef): HttpResult[MultiPartReaderRef] {.
|
||||
raises: [].} =
|
||||
proc getMultipartReader*(req: HttpRequestRef): HttpResult[MultiPartReaderRef] =
|
||||
## Create new MultiPartReader interface for specific request.
|
||||
if req.meth in PostMethods:
|
||||
if MultipartForm in req.requestFlags:
|
||||
|
@ -1144,7 +1158,8 @@ proc getMultipartReader*(req: HttpRequestRef): HttpResult[MultiPartReaderRef] {.
|
|||
else:
|
||||
err("Request's method do not supports multipart")
|
||||
|
||||
proc post*(req: HttpRequestRef): Future[HttpTable] {.async.} =
|
||||
proc post*(req: HttpRequestRef): Future[HttpTable] {.
|
||||
async: (raises: [CancelledError, HttpError]).} =
|
||||
## Return POST parameters
|
||||
if req.postTable.isSome():
|
||||
return req.postTable.get()
|
||||
|
@ -1224,31 +1239,28 @@ proc post*(req: HttpRequestRef): Future[HttpTable] {.async.} =
|
|||
elif HttpRequestFlags.UnboundBody in req.requestFlags:
|
||||
raiseHttpCriticalError("Unsupported request body")
|
||||
|
||||
proc setHeader*(resp: HttpResponseRef, key, value: string) {.
|
||||
raises: [].} =
|
||||
proc setHeader*(resp: HttpResponseRef, key, value: string) =
|
||||
## Sets value of header ``key`` to ``value``.
|
||||
doAssert(resp.state == HttpResponseState.Empty)
|
||||
resp.headersTable.set(key, value)
|
||||
|
||||
proc setHeaderDefault*(resp: HttpResponseRef, key, value: string) {.
|
||||
raises: [].} =
|
||||
proc setHeaderDefault*(resp: HttpResponseRef, key, value: string) =
|
||||
## Sets value of header ``key`` to ``value``, only if header ``key`` is not
|
||||
## present in the headers table.
|
||||
discard resp.headersTable.hasKeyOrPut(key, value)
|
||||
|
||||
proc addHeader*(resp: HttpResponseRef, key, value: string) {.
|
||||
raises: [].} =
|
||||
proc addHeader*(resp: HttpResponseRef, key, value: string) =
|
||||
## Adds value ``value`` to header's ``key`` value.
|
||||
doAssert(resp.state == HttpResponseState.Empty)
|
||||
resp.headersTable.add(key, value)
|
||||
|
||||
proc getHeader*(resp: HttpResponseRef, key: string,
|
||||
default: string = ""): string {.raises: [].} =
|
||||
default: string = ""): string =
|
||||
## Returns value of header with name ``name`` or ``default``, if header is
|
||||
## not present in the table.
|
||||
resp.headersTable.getString(key, default)
|
||||
|
||||
proc hasHeader*(resp: HttpResponseRef, key: string): bool {.raises: [].} =
|
||||
proc hasHeader*(resp: HttpResponseRef, key: string): bool =
|
||||
## Returns ``true`` if header with name ``key`` present in the headers table.
|
||||
key in resp.headersTable
|
||||
|
||||
|
@ -1267,8 +1279,7 @@ func createHeaders(resp: HttpResponseRef): string =
|
|||
answer.add("\r\n")
|
||||
answer
|
||||
|
||||
proc prepareLengthHeaders(resp: HttpResponseRef, length: int): string {.
|
||||
raises: [].}=
|
||||
proc prepareLengthHeaders(resp: HttpResponseRef, length: int): string =
|
||||
if not(resp.hasHeader(DateHeader)):
|
||||
resp.setHeader(DateHeader, httpDate())
|
||||
if length > 0:
|
||||
|
@ -1285,8 +1296,7 @@ proc prepareLengthHeaders(resp: HttpResponseRef, length: int): string {.
|
|||
resp.setHeader(ConnectionHeader, "close")
|
||||
resp.createHeaders()
|
||||
|
||||
proc prepareChunkedHeaders(resp: HttpResponseRef): string {.
|
||||
raises: [].} =
|
||||
proc prepareChunkedHeaders(resp: HttpResponseRef): string =
|
||||
if not(resp.hasHeader(DateHeader)):
|
||||
resp.setHeader(DateHeader, httpDate())
|
||||
if not(resp.hasHeader(ContentTypeHeader)):
|
||||
|
@ -1302,8 +1312,7 @@ proc prepareChunkedHeaders(resp: HttpResponseRef): string {.
|
|||
resp.setHeader(ConnectionHeader, "close")
|
||||
resp.createHeaders()
|
||||
|
||||
proc prepareServerSideEventHeaders(resp: HttpResponseRef): string {.
|
||||
raises: [].} =
|
||||
proc prepareServerSideEventHeaders(resp: HttpResponseRef): string =
|
||||
if not(resp.hasHeader(DateHeader)):
|
||||
resp.setHeader(DateHeader, httpDate())
|
||||
if not(resp.hasHeader(ContentTypeHeader)):
|
||||
|
@ -1315,8 +1324,7 @@ proc prepareServerSideEventHeaders(resp: HttpResponseRef): string {.
|
|||
resp.setHeader(ConnectionHeader, "close")
|
||||
resp.createHeaders()
|
||||
|
||||
proc preparePlainHeaders(resp: HttpResponseRef): string {.
|
||||
raises: [].} =
|
||||
proc preparePlainHeaders(resp: HttpResponseRef): string =
|
||||
if not(resp.hasHeader(DateHeader)):
|
||||
resp.setHeader(DateHeader, httpDate())
|
||||
if not(resp.hasHeader(ServerHeader)):
|
||||
|
@ -1326,7 +1334,8 @@ proc preparePlainHeaders(resp: HttpResponseRef): string {.
|
|||
resp.setHeader(ConnectionHeader, "close")
|
||||
resp.createHeaders()
|
||||
|
||||
proc sendBody*(resp: HttpResponseRef, pbytes: pointer, nbytes: int) {.async.} =
|
||||
proc sendBody*(resp: HttpResponseRef, pbytes: pointer, nbytes: int) {.
|
||||
async: (raises: [CancelledError, HttpError]).} =
|
||||
## Send HTTP response at once by using bytes pointer ``pbytes`` and length
|
||||
## ``nbytes``.
|
||||
doAssert(not(isNil(pbytes)), "pbytes must not be nil")
|
||||
|
@ -1343,11 +1352,12 @@ proc sendBody*(resp: HttpResponseRef, pbytes: pointer, nbytes: int) {.async.} =
|
|||
except CancelledError as exc:
|
||||
resp.state = HttpResponseState.Cancelled
|
||||
raise exc
|
||||
except AsyncStreamWriteError, AsyncStreamIncompleteError:
|
||||
except AsyncStreamError as exc:
|
||||
resp.state = HttpResponseState.Failed
|
||||
raiseHttpCriticalError("Unable to send response")
|
||||
raiseHttpCriticalError("Unable to send response, reason: " & $exc.msg)
|
||||
|
||||
proc sendBody*(resp: HttpResponseRef, data: ByteChar) {.async.} =
|
||||
proc sendBody*(resp: HttpResponseRef, data: ByteChar) {.
|
||||
async: (raises: [CancelledError, HttpError]).} =
|
||||
## Send HTTP response at once by using data ``data``.
|
||||
checkPending(resp)
|
||||
let responseHeaders = resp.prepareLengthHeaders(len(data))
|
||||
|
@ -1361,11 +1371,12 @@ proc sendBody*(resp: HttpResponseRef, data: ByteChar) {.async.} =
|
|||
except CancelledError as exc:
|
||||
resp.state = HttpResponseState.Cancelled
|
||||
raise exc
|
||||
except AsyncStreamWriteError, AsyncStreamIncompleteError:
|
||||
except AsyncStreamError as exc:
|
||||
resp.state = HttpResponseState.Failed
|
||||
raiseHttpCriticalError("Unable to send response")
|
||||
raiseHttpCriticalError("Unable to send response, reason: " & $exc.msg)
|
||||
|
||||
proc sendError*(resp: HttpResponseRef, code: HttpCode, body = "") {.async.} =
|
||||
proc sendError*(resp: HttpResponseRef, code: HttpCode, body = "") {.
|
||||
async: (raises: [CancelledError, HttpError]).} =
|
||||
## Send HTTP error status response.
|
||||
checkPending(resp)
|
||||
resp.status = code
|
||||
|
@ -1380,12 +1391,13 @@ proc sendError*(resp: HttpResponseRef, code: HttpCode, body = "") {.async.} =
|
|||
except CancelledError as exc:
|
||||
resp.state = HttpResponseState.Cancelled
|
||||
raise exc
|
||||
except AsyncStreamWriteError, AsyncStreamIncompleteError:
|
||||
except AsyncStreamError as exc:
|
||||
resp.state = HttpResponseState.Failed
|
||||
raiseHttpCriticalError("Unable to send response")
|
||||
raiseHttpCriticalError("Unable to send response, reason: " & $exc.msg)
|
||||
|
||||
proc prepare*(resp: HttpResponseRef,
|
||||
streamType = HttpResponseStreamType.Chunked) {.async.} =
|
||||
streamType = HttpResponseStreamType.Chunked) {.
|
||||
async: (raises: [CancelledError, HttpError]).} =
|
||||
## Prepare for HTTP stream response.
|
||||
##
|
||||
## Such responses will be sent chunk by chunk using ``chunked`` encoding.
|
||||
|
@ -1412,27 +1424,31 @@ proc prepare*(resp: HttpResponseRef,
|
|||
except CancelledError as exc:
|
||||
resp.state = HttpResponseState.Cancelled
|
||||
raise exc
|
||||
except AsyncStreamWriteError, AsyncStreamIncompleteError:
|
||||
except AsyncStreamError as exc:
|
||||
resp.state = HttpResponseState.Failed
|
||||
raiseHttpCriticalError("Unable to send response")
|
||||
raiseHttpCriticalError("Unable to send response, reason: " & $exc.msg)
|
||||
|
||||
proc prepareChunked*(resp: HttpResponseRef): Future[void] =
|
||||
proc prepareChunked*(resp: HttpResponseRef): Future[void] {.
|
||||
async: (raw: true, raises: [CancelledError, HttpError]).} =
|
||||
## Prepare for HTTP chunked stream response.
|
||||
##
|
||||
## Such responses will be sent chunk by chunk using ``chunked`` encoding.
|
||||
resp.prepare(HttpResponseStreamType.Chunked)
|
||||
|
||||
proc preparePlain*(resp: HttpResponseRef): Future[void] =
|
||||
proc preparePlain*(resp: HttpResponseRef): Future[void] {.
|
||||
async: (raw: true, raises: [CancelledError, HttpError]).} =
|
||||
## Prepare for HTTP plain stream response.
|
||||
##
|
||||
## Such responses will be sent without any encoding.
|
||||
resp.prepare(HttpResponseStreamType.Plain)
|
||||
|
||||
proc prepareSSE*(resp: HttpResponseRef): Future[void] =
|
||||
proc prepareSSE*(resp: HttpResponseRef): Future[void] {.
|
||||
async: (raw: true, raises: [CancelledError, HttpError]).} =
|
||||
## Prepare for HTTP server-side event stream response.
|
||||
resp.prepare(HttpResponseStreamType.SSE)
|
||||
|
||||
proc send*(resp: HttpResponseRef, pbytes: pointer, nbytes: int) {.async.} =
|
||||
proc send*(resp: HttpResponseRef, pbytes: pointer, nbytes: int) {.
|
||||
async: (raises: [CancelledError, HttpError]).} =
|
||||
## Send single chunk of data pointed by ``pbytes`` and ``nbytes``.
|
||||
doAssert(not(isNil(pbytes)), "pbytes must not be nil")
|
||||
doAssert(nbytes >= 0, "nbytes should be bigger or equal to zero")
|
||||
|
@ -1447,11 +1463,12 @@ proc send*(resp: HttpResponseRef, pbytes: pointer, nbytes: int) {.async.} =
|
|||
except CancelledError as exc:
|
||||
resp.state = HttpResponseState.Cancelled
|
||||
raise exc
|
||||
except AsyncStreamWriteError, AsyncStreamIncompleteError:
|
||||
except AsyncStreamError as exc:
|
||||
resp.state = HttpResponseState.Failed
|
||||
raiseHttpCriticalError("Unable to send response")
|
||||
raiseHttpCriticalError("Unable to send response, reason: " & $exc.msg)
|
||||
|
||||
proc send*(resp: HttpResponseRef, data: ByteChar) {.async.} =
|
||||
proc send*(resp: HttpResponseRef, data: ByteChar) {.
|
||||
async: (raises: [CancelledError, HttpError]).} =
|
||||
## Send single chunk of data ``data``.
|
||||
if HttpResponseFlags.Stream notin resp.flags:
|
||||
raiseHttpCriticalError("Response was not prepared")
|
||||
|
@ -1464,19 +1481,22 @@ proc send*(resp: HttpResponseRef, data: ByteChar) {.async.} =
|
|||
except CancelledError as exc:
|
||||
resp.state = HttpResponseState.Cancelled
|
||||
raise exc
|
||||
except AsyncStreamWriteError, AsyncStreamIncompleteError:
|
||||
except AsyncStreamError as exc:
|
||||
resp.state = HttpResponseState.Failed
|
||||
raiseHttpCriticalError("Unable to send response")
|
||||
raiseHttpCriticalError("Unable to send response, reason: " & $exc.msg)
|
||||
|
||||
proc sendChunk*(resp: HttpResponseRef, pbytes: pointer,
|
||||
nbytes: int): Future[void] =
|
||||
nbytes: int): Future[void] {.
|
||||
async: (raw: true, raises: [CancelledError, HttpError]).} =
|
||||
resp.send(pbytes, nbytes)
|
||||
|
||||
proc sendChunk*(resp: HttpResponseRef, data: ByteChar): Future[void] =
|
||||
proc sendChunk*(resp: HttpResponseRef, data: ByteChar): Future[void] {.
|
||||
async: (raw: true, raises: [CancelledError, HttpError]).} =
|
||||
resp.send(data)
|
||||
|
||||
proc sendEvent*(resp: HttpResponseRef, eventName: string,
|
||||
data: string): Future[void] =
|
||||
data: string): Future[void] {.
|
||||
async: (raw: true, raises: [CancelledError, HttpError]).} =
|
||||
## Send server-side event with name ``eventName`` and payload ``data`` to
|
||||
## remote peer.
|
||||
let data =
|
||||
|
@ -1492,7 +1512,8 @@ proc sendEvent*(resp: HttpResponseRef, eventName: string,
|
|||
res
|
||||
resp.send(data)
|
||||
|
||||
proc finish*(resp: HttpResponseRef) {.async.} =
|
||||
proc finish*(resp: HttpResponseRef) {.
|
||||
async: (raises: [CancelledError, HttpError]).} =
|
||||
## Sending last chunk of data, so it will indicate end of HTTP response.
|
||||
if HttpResponseFlags.Stream notin resp.flags:
|
||||
raiseHttpCriticalError("Response was not prepared")
|
||||
|
@ -1505,12 +1526,13 @@ proc finish*(resp: HttpResponseRef) {.async.} =
|
|||
except CancelledError as exc:
|
||||
resp.state = HttpResponseState.Cancelled
|
||||
raise exc
|
||||
except AsyncStreamWriteError, AsyncStreamIncompleteError:
|
||||
except AsyncStreamError as exc:
|
||||
resp.state = HttpResponseState.Failed
|
||||
raiseHttpCriticalError("Unable to send response")
|
||||
raiseHttpCriticalError("Unable to send response, reason: " & $exc.msg)
|
||||
|
||||
proc respond*(req: HttpRequestRef, code: HttpCode, content: ByteChar,
|
||||
headers: HttpTable): Future[HttpResponseRef] {.async.} =
|
||||
headers: HttpTable): Future[HttpResponseRef] {.
|
||||
async: (raises: [CancelledError, HttpError]).} =
|
||||
## Responds to the request with the specified ``HttpCode``, HTTP ``headers``
|
||||
## and ``content``.
|
||||
let response = req.getResponse()
|
||||
|
@ -1518,19 +1540,22 @@ proc respond*(req: HttpRequestRef, code: HttpCode, content: ByteChar,
|
|||
for k, v in headers.stringItems():
|
||||
response.addHeader(k, v)
|
||||
await response.sendBody(content)
|
||||
return response
|
||||
response
|
||||
|
||||
proc respond*(req: HttpRequestRef, code: HttpCode,
|
||||
content: ByteChar): Future[HttpResponseRef] =
|
||||
content: ByteChar): Future[HttpResponseRef] {.
|
||||
async: (raw: true, raises: [CancelledError, HttpError]).} =
|
||||
## Responds to the request with specified ``HttpCode`` and ``content``.
|
||||
respond(req, code, content, HttpTable.init())
|
||||
|
||||
proc respond*(req: HttpRequestRef, code: HttpCode): Future[HttpResponseRef] =
|
||||
proc respond*(req: HttpRequestRef, code: HttpCode): Future[HttpResponseRef] {.
|
||||
async: (raw: true, raises: [CancelledError, HttpError]).} =
|
||||
## Responds to the request with specified ``HttpCode`` only.
|
||||
respond(req, code, "", HttpTable.init())
|
||||
|
||||
proc redirect*(req: HttpRequestRef, code: HttpCode,
|
||||
location: string, headers: HttpTable): Future[HttpResponseRef] =
|
||||
location: string, headers: HttpTable): Future[HttpResponseRef] {.
|
||||
async: (raw: true, raises: [CancelledError, HttpError]).} =
|
||||
## Responds to the request with redirection to location ``location`` and
|
||||
## additional headers ``headers``.
|
||||
##
|
||||
|
@ -1541,7 +1566,8 @@ proc redirect*(req: HttpRequestRef, code: HttpCode,
|
|||
respond(req, code, "", mheaders)
|
||||
|
||||
proc redirect*(req: HttpRequestRef, code: HttpCode,
|
||||
location: Uri, headers: HttpTable): Future[HttpResponseRef] =
|
||||
location: Uri, headers: HttpTable): Future[HttpResponseRef] {.
|
||||
async: (raw: true, raises: [CancelledError, HttpError]).} =
|
||||
## Responds to the request with redirection to location ``location`` and
|
||||
## additional headers ``headers``.
|
||||
##
|
||||
|
@ -1550,12 +1576,14 @@ proc redirect*(req: HttpRequestRef, code: HttpCode,
|
|||
redirect(req, code, $location, headers)
|
||||
|
||||
proc redirect*(req: HttpRequestRef, code: HttpCode,
|
||||
location: Uri): Future[HttpResponseRef] =
|
||||
location: Uri): Future[HttpResponseRef] {.
|
||||
async: (raw: true, raises: [CancelledError, HttpError]).} =
|
||||
## Responds to the request with redirection to location ``location``.
|
||||
redirect(req, code, location, HttpTable.init())
|
||||
|
||||
proc redirect*(req: HttpRequestRef, code: HttpCode,
|
||||
location: string): Future[HttpResponseRef] =
|
||||
location: string): Future[HttpResponseRef] {.
|
||||
async: (raw: true, raises: [CancelledError, HttpError]).} =
|
||||
## Responds to the request with redirection to location ``location``.
|
||||
redirect(req, code, location, HttpTable.init())
|
||||
|
||||
|
@ -1569,16 +1597,20 @@ proc responded*(req: HttpRequestRef): bool =
|
|||
else:
|
||||
false
|
||||
|
||||
proc remoteAddress*(conn: HttpConnectionRef): TransportAddress =
|
||||
proc remoteAddress*(conn: HttpConnectionRef): TransportAddress {.
|
||||
raises: [HttpAddressError].} =
|
||||
## Returns address of the remote host that established connection ``conn``.
|
||||
conn.transp.remoteAddress()
|
||||
try:
|
||||
conn.transp.remoteAddress()
|
||||
except TransportOsError as exc:
|
||||
raiseHttpAddressError($exc.msg)
|
||||
|
||||
proc remoteAddress*(request: HttpRequestRef): TransportAddress =
|
||||
proc remoteAddress*(request: HttpRequestRef): TransportAddress {.
|
||||
raises: [HttpAddressError].} =
|
||||
## Returns address of the remote host that made request ``request``.
|
||||
request.connection.remoteAddress()
|
||||
|
||||
proc requestInfo*(req: HttpRequestRef, contentType = "text/text"): string {.
|
||||
raises: [].} =
|
||||
proc requestInfo*(req: HttpRequestRef, contentType = "text/text"): string =
|
||||
## Returns comprehensive information about request for specific content
|
||||
## type.
|
||||
##
|
||||
|
|
|
@ -7,6 +7,9 @@
|
|||
# Licensed under either of
|
||||
# Apache License, version 2.0, (LICENSE-APACHEv2)
|
||||
# MIT license (LICENSE-MIT)
|
||||
|
||||
{.push raises: [].}
|
||||
|
||||
import std/[monotimes, strutils]
|
||||
import results, httputils
|
||||
import ../../asyncloop
|
||||
|
@ -71,8 +74,7 @@ type
|
|||
|
||||
BChar* = byte | char
|
||||
|
||||
proc startsWith(s, prefix: openArray[byte]): bool {.
|
||||
raises: [].} =
|
||||
proc startsWith(s, prefix: openArray[byte]): bool =
|
||||
# This procedure is copy of strutils.startsWith() procedure, however,
|
||||
# it is intended to work with arrays of bytes, but not with strings.
|
||||
var i = 0
|
||||
|
@ -81,8 +83,7 @@ proc startsWith(s, prefix: openArray[byte]): bool {.
|
|||
if i >= len(s) or s[i] != prefix[i]: return false
|
||||
inc(i)
|
||||
|
||||
proc parseUntil(s, until: openArray[byte]): int {.
|
||||
raises: [].} =
|
||||
proc parseUntil(s, until: openArray[byte]): int =
|
||||
# This procedure is copy of parseutils.parseUntil() procedure, however,
|
||||
# it is intended to work with arrays of bytes, but not with strings.
|
||||
var i = 0
|
||||
|
@ -95,8 +96,7 @@ proc parseUntil(s, until: openArray[byte]): int {.
|
|||
inc(i)
|
||||
-1
|
||||
|
||||
func setPartNames(part: var MultiPart): HttpResult[void] {.
|
||||
raises: [].} =
|
||||
func setPartNames(part: var MultiPart): HttpResult[void] =
|
||||
if part.headers.count("content-disposition") != 1:
|
||||
return err("Content-Disposition header is incorrect")
|
||||
var header = part.headers.getString("content-disposition")
|
||||
|
@ -120,8 +120,7 @@ func setPartNames(part: var MultiPart): HttpResult[void] {.
|
|||
|
||||
proc init*[A: BChar, B: BChar](mpt: typedesc[MultiPartReader],
|
||||
buffer: openArray[A],
|
||||
boundary: openArray[B]): MultiPartReader {.
|
||||
raises: [].} =
|
||||
boundary: openArray[B]): MultiPartReader =
|
||||
## Create new MultiPartReader instance with `buffer` interface.
|
||||
##
|
||||
## ``buffer`` - is buffer which will be used to read data.
|
||||
|
@ -145,8 +144,7 @@ proc init*[A: BChar, B: BChar](mpt: typedesc[MultiPartReader],
|
|||
proc new*[B: BChar](mpt: typedesc[MultiPartReaderRef],
|
||||
stream: HttpBodyReader,
|
||||
boundary: openArray[B],
|
||||
partHeadersMaxSize = 4096): MultiPartReaderRef {.
|
||||
raises: [].} =
|
||||
partHeadersMaxSize = 4096): MultiPartReaderRef =
|
||||
## Create new MultiPartReader instance with `stream` interface.
|
||||
##
|
||||
## ``stream`` is stream used to read data.
|
||||
|
@ -173,7 +171,8 @@ proc new*[B: BChar](mpt: typedesc[MultiPartReaderRef],
|
|||
stream: stream, offset: 0, boundary: fboundary,
|
||||
buffer: newSeq[byte](partHeadersMaxSize))
|
||||
|
||||
proc readPart*(mpr: MultiPartReaderRef): Future[MultiPart] {.async.} =
|
||||
proc readPart*(mpr: MultiPartReaderRef): Future[MultiPart] {.
|
||||
async: (raises: [CancelledError, HttpCriticalError]).} =
|
||||
doAssert(mpr.kind == MultiPartSource.Stream)
|
||||
if mpr.firstTime:
|
||||
try:
|
||||
|
@ -240,7 +239,8 @@ proc readPart*(mpr: MultiPartReaderRef): Future[MultiPart] {.async.} =
|
|||
else:
|
||||
raiseHttpCriticalError(UnableToReadMultipartBody)
|
||||
|
||||
proc getBody*(mp: MultiPart): Future[seq[byte]] {.async.} =
|
||||
proc getBody*(mp: MultiPart): Future[seq[byte]] {.
|
||||
async: (raises: [CancelledError, HttpCriticalError]).} =
|
||||
## Get multipart's ``mp`` value as sequence of bytes.
|
||||
case mp.kind
|
||||
of MultiPartSource.Stream:
|
||||
|
@ -255,7 +255,8 @@ proc getBody*(mp: MultiPart): Future[seq[byte]] {.async.} =
|
|||
of MultiPartSource.Buffer:
|
||||
return mp.buffer
|
||||
|
||||
proc consumeBody*(mp: MultiPart) {.async.} =
|
||||
proc consumeBody*(mp: MultiPart) {.
|
||||
async: (raises: [CancelledError, HttpCriticalError]).} =
|
||||
## Discard multipart's ``mp`` value.
|
||||
case mp.kind
|
||||
of MultiPartSource.Stream:
|
||||
|
@ -269,8 +270,7 @@ proc consumeBody*(mp: MultiPart) {.async.} =
|
|||
of MultiPartSource.Buffer:
|
||||
discard
|
||||
|
||||
proc getBodyStream*(mp: MultiPart): HttpResult[AsyncStreamReader] {.
|
||||
raises: [].} =
|
||||
proc getBodyStream*(mp: MultiPart): HttpResult[AsyncStreamReader] =
|
||||
## Get multipart's ``mp`` stream, which can be used to obtain value of the
|
||||
## part.
|
||||
case mp.kind
|
||||
|
@ -279,7 +279,7 @@ proc getBodyStream*(mp: MultiPart): HttpResult[AsyncStreamReader] {.
|
|||
else:
|
||||
err("Could not obtain stream from buffer-like part")
|
||||
|
||||
proc closeWait*(mp: MultiPart) {.async.} =
|
||||
proc closeWait*(mp: MultiPart) {.async: (raises: []).} =
|
||||
## Close and release MultiPart's ``mp`` stream and resources.
|
||||
case mp.kind
|
||||
of MultiPartSource.Stream:
|
||||
|
@ -287,7 +287,7 @@ proc closeWait*(mp: MultiPart) {.async.} =
|
|||
else:
|
||||
discard
|
||||
|
||||
proc closeWait*(mpr: MultiPartReaderRef) {.async.} =
|
||||
proc closeWait*(mpr: MultiPartReaderRef) {.async: (raises: []).} =
|
||||
## Close and release MultiPartReader's ``mpr`` stream and resources.
|
||||
case mpr.kind
|
||||
of MultiPartSource.Stream:
|
||||
|
@ -295,7 +295,7 @@ proc closeWait*(mpr: MultiPartReaderRef) {.async.} =
|
|||
else:
|
||||
discard
|
||||
|
||||
proc getBytes*(mp: MultiPart): seq[byte] {.raises: [].} =
|
||||
proc getBytes*(mp: MultiPart): seq[byte] =
|
||||
## Returns value for MultiPart ``mp`` as sequence of bytes.
|
||||
case mp.kind
|
||||
of MultiPartSource.Buffer:
|
||||
|
@ -304,7 +304,7 @@ proc getBytes*(mp: MultiPart): seq[byte] {.raises: [].} =
|
|||
doAssert(not(mp.stream.atEof()), "Value is not obtained yet")
|
||||
mp.buffer
|
||||
|
||||
proc getString*(mp: MultiPart): string {.raises: [].} =
|
||||
proc getString*(mp: MultiPart): string =
|
||||
## Returns value for MultiPart ``mp`` as string.
|
||||
case mp.kind
|
||||
of MultiPartSource.Buffer:
|
||||
|
@ -313,7 +313,7 @@ proc getString*(mp: MultiPart): string {.raises: [].} =
|
|||
doAssert(not(mp.stream.atEof()), "Value is not obtained yet")
|
||||
bytesToString(mp.buffer)
|
||||
|
||||
proc atEoM*(mpr: var MultiPartReader): bool {.raises: [].} =
|
||||
proc atEoM*(mpr: var MultiPartReader): bool =
|
||||
## Procedure returns ``true`` if MultiPartReader has reached the end of
|
||||
## multipart message.
|
||||
case mpr.kind
|
||||
|
@ -322,7 +322,7 @@ proc atEoM*(mpr: var MultiPartReader): bool {.raises: [].} =
|
|||
of MultiPartSource.Stream:
|
||||
mpr.stream.atEof()
|
||||
|
||||
proc atEoM*(mpr: MultiPartReaderRef): bool {.raises: [].} =
|
||||
proc atEoM*(mpr: MultiPartReaderRef): bool =
|
||||
## Procedure returns ``true`` if MultiPartReader has reached the end of
|
||||
## multipart message.
|
||||
case mpr.kind
|
||||
|
@ -331,8 +331,7 @@ proc atEoM*(mpr: MultiPartReaderRef): bool {.raises: [].} =
|
|||
of MultiPartSource.Stream:
|
||||
mpr.stream.atEof()
|
||||
|
||||
proc getPart*(mpr: var MultiPartReader): Result[MultiPart, string] {.
|
||||
raises: [].} =
|
||||
proc getPart*(mpr: var MultiPartReader): Result[MultiPart, string] =
|
||||
## Get multipart part from MultiPartReader instance.
|
||||
##
|
||||
## This procedure will work only for MultiPartReader with buffer source.
|
||||
|
@ -422,8 +421,7 @@ proc getPart*(mpr: var MultiPartReader): Result[MultiPart, string] {.
|
|||
else:
|
||||
err("Incorrect multipart form")
|
||||
|
||||
func isEmpty*(mp: MultiPart): bool {.
|
||||
raises: [].} =
|
||||
func isEmpty*(mp: MultiPart): bool =
|
||||
## Returns ``true`` is multipart ``mp`` is not initialized/filled yet.
|
||||
mp.counter == 0
|
||||
|
||||
|
@ -439,8 +437,7 @@ func validateBoundary[B: BChar](boundary: openArray[B]): HttpResult[void] =
|
|||
return err("Content-Type boundary alphabet incorrect")
|
||||
ok()
|
||||
|
||||
func getMultipartBoundary*(contentData: ContentTypeData): HttpResult[string] {.
|
||||
raises: [].} =
|
||||
func getMultipartBoundary*(contentData: ContentTypeData): HttpResult[string] =
|
||||
## Returns ``multipart/form-data`` boundary value from ``Content-Type``
|
||||
## header.
|
||||
##
|
||||
|
@ -480,8 +477,7 @@ proc quoteCheck(name: string): HttpResult[string] =
|
|||
ok(name)
|
||||
|
||||
proc init*[B: BChar](mpt: typedesc[MultiPartWriter],
|
||||
boundary: openArray[B]): MultiPartWriter {.
|
||||
raises: [].} =
|
||||
boundary: openArray[B]): MultiPartWriter =
|
||||
## Create new MultiPartWriter instance with `buffer` interface.
|
||||
##
|
||||
## ``boundary`` - is multipart boundary, this value must not be empty.
|
||||
|
@ -510,8 +506,7 @@ proc init*[B: BChar](mpt: typedesc[MultiPartWriter],
|
|||
|
||||
proc new*[B: BChar](mpt: typedesc[MultiPartWriterRef],
|
||||
stream: HttpBodyWriter,
|
||||
boundary: openArray[B]): MultiPartWriterRef {.
|
||||
raises: [].} =
|
||||
boundary: openArray[B]): MultiPartWriterRef =
|
||||
doAssert(validateBoundary(boundary).isOk())
|
||||
doAssert(not(isNil(stream)))
|
||||
|
||||
|
@ -576,7 +571,8 @@ proc prepareHeaders(partMark: openArray[byte], name: string, filename: string,
|
|||
buffer.add("\r\n")
|
||||
buffer
|
||||
|
||||
proc begin*(mpw: MultiPartWriterRef) {.async.} =
|
||||
proc begin*(mpw: MultiPartWriterRef) {.
|
||||
async: (raises: [CancelledError, HttpCriticalError]).} =
|
||||
## Starts multipart message form and write approprate markers to output
|
||||
## stream.
|
||||
doAssert(mpw.kind == MultiPartSource.Stream)
|
||||
|
@ -599,7 +595,8 @@ proc begin*(mpw: var MultiPartWriter) =
|
|||
mpw.state = MultiPartWriterState.MessageStarted
|
||||
|
||||
proc beginPart*(mpw: MultiPartWriterRef, name: string,
|
||||
filename: string, headers: HttpTable) {.async.} =
|
||||
filename: string, headers: HttpTable) {.
|
||||
async: (raises: [CancelledError, HttpCriticalError]).} =
|
||||
## Starts part of multipart message and write appropriate ``headers`` to the
|
||||
## output stream.
|
||||
##
|
||||
|
@ -634,7 +631,8 @@ proc beginPart*(mpw: var MultiPartWriter, name: string,
|
|||
mpw.buffer.add(buffer.toOpenArrayByte(0, len(buffer) - 1))
|
||||
mpw.state = MultiPartWriterState.PartStarted
|
||||
|
||||
proc write*(mpw: MultiPartWriterRef, pbytes: pointer, nbytes: int) {.async.} =
|
||||
proc write*(mpw: MultiPartWriterRef, pbytes: pointer, nbytes: int) {.
|
||||
async: (raises: [CancelledError, HttpCriticalError]).} =
|
||||
## Write part's data ``data`` to the output stream.
|
||||
doAssert(mpw.kind == MultiPartSource.Stream)
|
||||
doAssert(mpw.state == MultiPartWriterState.PartStarted)
|
||||
|
@ -645,7 +643,8 @@ proc write*(mpw: MultiPartWriterRef, pbytes: pointer, nbytes: int) {.async.} =
|
|||
mpw.state = MultiPartWriterState.MessageFailure
|
||||
raiseHttpCriticalError("Unable to write multipart data")
|
||||
|
||||
proc write*(mpw: MultiPartWriterRef, data: seq[byte]) {.async.} =
|
||||
proc write*(mpw: MultiPartWriterRef, data: seq[byte]) {.
|
||||
async: (raises: [CancelledError, HttpCriticalError]).} =
|
||||
## Write part's data ``data`` to the output stream.
|
||||
doAssert(mpw.kind == MultiPartSource.Stream)
|
||||
doAssert(mpw.state == MultiPartWriterState.PartStarted)
|
||||
|
@ -656,7 +655,8 @@ proc write*(mpw: MultiPartWriterRef, data: seq[byte]) {.async.} =
|
|||
mpw.state = MultiPartWriterState.MessageFailure
|
||||
raiseHttpCriticalError("Unable to write multipart data")
|
||||
|
||||
proc write*(mpw: MultiPartWriterRef, data: string) {.async.} =
|
||||
proc write*(mpw: MultiPartWriterRef, data: string) {.
|
||||
async: (raises: [CancelledError, HttpCriticalError]).} =
|
||||
## Write part's data ``data`` to the output stream.
|
||||
doAssert(mpw.kind == MultiPartSource.Stream)
|
||||
doAssert(mpw.state == MultiPartWriterState.PartStarted)
|
||||
|
@ -688,7 +688,8 @@ proc write*(mpw: var MultiPartWriter, data: openArray[char]) =
|
|||
doAssert(mpw.state == MultiPartWriterState.PartStarted)
|
||||
mpw.buffer.add(data.toOpenArrayByte(0, len(data) - 1))
|
||||
|
||||
proc finishPart*(mpw: MultiPartWriterRef) {.async.} =
|
||||
proc finishPart*(mpw: MultiPartWriterRef) {.
|
||||
async: (raises: [CancelledError, HttpCriticalError]).} =
|
||||
## Finish multipart's message part and send proper markers to output stream.
|
||||
doAssert(mpw.state == MultiPartWriterState.PartStarted)
|
||||
try:
|
||||
|
@ -707,7 +708,8 @@ proc finishPart*(mpw: var MultiPartWriter) =
|
|||
mpw.buffer.add(mpw.finishPartMark)
|
||||
mpw.state = MultiPartWriterState.PartFinished
|
||||
|
||||
proc finish*(mpw: MultiPartWriterRef) {.async.} =
|
||||
proc finish*(mpw: MultiPartWriterRef) {.
|
||||
async: (raises: [CancelledError, HttpCriticalError]).} =
|
||||
## Finish multipart's message form and send finishing markers to the output
|
||||
## stream.
|
||||
doAssert(mpw.kind == MultiPartSource.Stream)
|
||||
|
|
|
@ -6,6 +6,9 @@
|
|||
# Licensed under either of
|
||||
# Apache License, version 2.0, (LICENSE-APACHEv2)
|
||||
# MIT license (LICENSE-MIT)
|
||||
|
||||
{.push raises: [].}
|
||||
|
||||
import httpserver
|
||||
import ../../asyncloop, ../../asyncsync
|
||||
import ../../streams/[asyncstream, tlsstream]
|
||||
|
@ -24,7 +27,7 @@ type
|
|||
|
||||
SecureHttpConnectionRef* = ref SecureHttpConnection
|
||||
|
||||
proc closeSecConnection(conn: HttpConnectionRef) {.async.} =
|
||||
proc closeSecConnection(conn: HttpConnectionRef) {.async: (raises: []).} =
|
||||
if conn.state == HttpState.Alive:
|
||||
conn.state = HttpState.Closing
|
||||
var pending: seq[Future[void]]
|
||||
|
@ -38,44 +41,44 @@ proc closeSecConnection(conn: HttpConnectionRef) {.async.} =
|
|||
untrackCounter(HttpServerSecureConnectionTrackerName)
|
||||
conn.state = HttpState.Closed
|
||||
|
||||
proc new*(ht: typedesc[SecureHttpConnectionRef], server: SecureHttpServerRef,
|
||||
transp: StreamTransport): SecureHttpConnectionRef =
|
||||
proc new(ht: typedesc[SecureHttpConnectionRef], server: SecureHttpServerRef,
|
||||
transp: StreamTransport): Result[SecureHttpConnectionRef, string] =
|
||||
var res = SecureHttpConnectionRef()
|
||||
HttpConnection(res[]).init(HttpServerRef(server), transp)
|
||||
let tlsStream =
|
||||
newTLSServerAsyncStream(res.mainReader, res.mainWriter,
|
||||
server.tlsPrivateKey,
|
||||
server.tlsCertificate,
|
||||
minVersion = TLSVersion.TLS12,
|
||||
flags = server.secureFlags)
|
||||
try:
|
||||
newTLSServerAsyncStream(res.mainReader, res.mainWriter,
|
||||
server.tlsPrivateKey,
|
||||
server.tlsCertificate,
|
||||
minVersion = TLSVersion.TLS12,
|
||||
flags = server.secureFlags)
|
||||
except TLSStreamError as exc:
|
||||
return err(exc.msg)
|
||||
res.tlsStream = tlsStream
|
||||
res.reader = AsyncStreamReader(tlsStream.reader)
|
||||
res.writer = AsyncStreamWriter(tlsStream.writer)
|
||||
res.closeCb = closeSecConnection
|
||||
trackCounter(HttpServerSecureConnectionTrackerName)
|
||||
res
|
||||
ok(res)
|
||||
|
||||
proc createSecConnection(server: HttpServerRef,
|
||||
transp: StreamTransport): Future[HttpConnectionRef] {.
|
||||
async.} =
|
||||
let secureServ = cast[SecureHttpServerRef](server)
|
||||
var sconn = SecureHttpConnectionRef.new(secureServ, transp)
|
||||
async: (raises: [CancelledError, HttpConnectionError]).} =
|
||||
let
|
||||
secureServ = cast[SecureHttpServerRef](server)
|
||||
sconn = SecureHttpConnectionRef.new(secureServ, transp).valueOr:
|
||||
raiseHttpConnectionError(error)
|
||||
|
||||
try:
|
||||
await handshake(sconn.tlsStream)
|
||||
return HttpConnectionRef(sconn)
|
||||
HttpConnectionRef(sconn)
|
||||
except CancelledError as exc:
|
||||
await HttpConnectionRef(sconn).closeWait()
|
||||
raise exc
|
||||
except TLSStreamError as exc:
|
||||
except AsyncStreamError as exc:
|
||||
await HttpConnectionRef(sconn).closeWait()
|
||||
let msg = "Unable to establish secure connection, reason [" &
|
||||
$exc.msg & "]"
|
||||
raiseHttpCriticalError(msg)
|
||||
except CatchableError as exc:
|
||||
await HttpConnectionRef(sconn).closeWait()
|
||||
let msg = "Unexpected error while trying to establish secure connection, " &
|
||||
"reason [" & $exc.msg & "]"
|
||||
raiseHttpCriticalError(msg)
|
||||
let msg = "Unable to establish secure connection, reason: " & $exc.msg
|
||||
raiseHttpConnectionError(msg)
|
||||
|
||||
proc new*(htype: typedesc[SecureHttpServerRef],
|
||||
address: TransportAddress,
|
||||
|
@ -94,7 +97,7 @@ proc new*(htype: typedesc[SecureHttpServerRef],
|
|||
maxHeadersSize: int = 8192,
|
||||
maxRequestBodySize: int = 1_048_576,
|
||||
dualstack = DualStackType.Auto
|
||||
): HttpResult[SecureHttpServerRef] {.raises: [].} =
|
||||
): HttpResult[SecureHttpServerRef] =
|
||||
|
||||
doAssert(not(isNil(tlsPrivateKey)), "TLS private key must not be nil!")
|
||||
doAssert(not(isNil(tlsCertificate)), "TLS certificate must not be nil!")
|
||||
|
@ -114,8 +117,6 @@ proc new*(htype: typedesc[SecureHttpServerRef],
|
|||
backlog = backlogSize, dualstack = dualstack)
|
||||
except TransportOsError as exc:
|
||||
return err(exc.msg)
|
||||
except CatchableError as exc:
|
||||
return err(exc.msg)
|
||||
|
||||
let res = SecureHttpServerRef(
|
||||
address: address,
|
||||
|
|
|
@ -923,7 +923,7 @@ template cancel*(future: FutureBase) {.
|
|||
cancelSoon(future, nil, nil, getSrcLocation())
|
||||
|
||||
proc cancelAndWait*(future: FutureBase, loc: ptr SrcLoc): Future[void] {.
|
||||
async: (raw: true, raises: [CancelledError]).} =
|
||||
async: (raw: true, raises: []).} =
|
||||
## Perform cancellation ``future`` return Future which will be completed when
|
||||
## ``future`` become finished (completed with value, failed or cancelled).
|
||||
##
|
||||
|
|
|
@ -74,7 +74,7 @@ type
|
|||
scontext: ptr SslServerContext
|
||||
stream*: TLSAsyncStream
|
||||
handshaked*: bool
|
||||
handshakeFut*: Future[void]
|
||||
handshakeFut*: Future[void].Raising([CancelledError, AsyncStreamError])
|
||||
|
||||
TLSStreamReader* = ref object of AsyncStreamReader
|
||||
case kind: TLSStreamKind
|
||||
|
@ -84,7 +84,7 @@ type
|
|||
scontext: ptr SslServerContext
|
||||
stream*: TLSAsyncStream
|
||||
handshaked*: bool
|
||||
handshakeFut*: Future[void]
|
||||
handshakeFut*: Future[void].Raising([CancelledError, AsyncStreamError])
|
||||
|
||||
TLSAsyncStream* = ref object of RootRef
|
||||
xwc*: X509NoanchorContext
|
||||
|
@ -94,7 +94,7 @@ type
|
|||
x509*: X509MinimalContext
|
||||
reader*: TLSStreamReader
|
||||
writer*: TLSStreamWriter
|
||||
mainLoop*: Future[void].Raising([CancelledError, AsyncStreamError])
|
||||
mainLoop*: Future[void].Raising([])
|
||||
trustAnchors: TrustAnchorStore
|
||||
|
||||
SomeTLSStreamType* = TLSStreamReader|TLSStreamWriter|TLSAsyncStream
|
||||
|
@ -264,19 +264,6 @@ template readAndReset(fut: untyped) =
|
|||
loopState = AsyncStreamState.Finished
|
||||
break
|
||||
|
||||
proc cancelAndWait*(a, b, c, d: Future[TLSResult]): Future[void] {.
|
||||
async: (raw: true, raises: [CancelledError]).} =
|
||||
var waiting: seq[FutureBase]
|
||||
if not(isNil(a)) and not(a.finished()):
|
||||
waiting.add(a.cancelAndWait())
|
||||
if not(isNil(b)) and not(b.finished()):
|
||||
waiting.add(b.cancelAndWait())
|
||||
if not(isNil(c)) and not(c.finished()):
|
||||
waiting.add(c.cancelAndWait())
|
||||
if not(isNil(d)) and not(d.finished()):
|
||||
waiting.add(d.cancelAndWait())
|
||||
allFutures(waiting)
|
||||
|
||||
proc dumpState*(state: cuint): string =
|
||||
var res = ""
|
||||
if (state and SSL_CLOSED) == SSL_CLOSED:
|
||||
|
@ -296,8 +283,7 @@ proc dumpState*(state: cuint): string =
|
|||
res.add("SSL_RECVAPP")
|
||||
"{" & res & "}"
|
||||
|
||||
proc tlsLoop*(stream: TLSAsyncStream) {.
|
||||
async: (raises: [CancelledError, AsyncStreamError]).} =
|
||||
proc tlsLoop*(stream: TLSAsyncStream) {.async: (raises: []).} =
|
||||
var
|
||||
sendRecFut, sendAppFut: Future[TLSResult].Raising([])
|
||||
recvRecFut, recvAppFut: Future[TLSResult].Raising([])
|
||||
|
@ -373,7 +359,17 @@ proc tlsLoop*(stream: TLSAsyncStream) {.
|
|||
break
|
||||
|
||||
# Cancelling and waiting all the pending operations
|
||||
await cancelAndWait(sendRecFut, sendAppFut, recvRecFut, recvAppFut)
|
||||
var waiting: seq[FutureBase]
|
||||
if not(isNil(sendRecFut)) and not(sendRecFut.finished()):
|
||||
waiting.add(sendRecFut.cancelAndWait())
|
||||
if not(isNil(sendAppFut)) and not(sendAppFut.finished()):
|
||||
waiting.add(sendAppFut.cancelAndWait())
|
||||
if not(isNil(recvRecFut)) and not(recvRecFut.finished()):
|
||||
waiting.add(recvRecFut.cancelAndWait())
|
||||
if not(isNil(recvAppFut)) and not(recvAppFut.finished()):
|
||||
waiting.add(recvAppFut.cancelAndWait())
|
||||
await noCancel(allFutures(waiting))
|
||||
|
||||
# Calculating error
|
||||
let error =
|
||||
case loopState
|
||||
|
@ -789,9 +785,11 @@ proc init*(tt: typedesc[TLSSessionCache], size: int = 4096): TLSSessionCache =
|
|||
sslSessionCacheLruInit(addr res.context, addr res.storage[0], rsize)
|
||||
res
|
||||
|
||||
proc handshake*(rws: SomeTLSStreamType): Future[void] =
|
||||
proc handshake*(rws: SomeTLSStreamType): Future[void] {.
|
||||
async: (raw: true, raises: [CancelledError, AsyncStreamError]).} =
|
||||
## Wait until initial TLS handshake will be successfully performed.
|
||||
var retFuture = newFuture[void]("tlsstream.handshake")
|
||||
let retFuture = Future[void].Raising([CancelledError, AsyncStreamError])
|
||||
.init("tlsstream.handshake")
|
||||
when rws is TLSStreamReader:
|
||||
if rws.handshaked:
|
||||
retFuture.complete()
|
||||
|
|
|
@ -1831,10 +1831,21 @@ proc close*(server: StreamServer) =
|
|||
server.sock.closeSocket(continuation)
|
||||
|
||||
proc closeWait*(server: StreamServer): Future[void] {.
|
||||
async: (raw: true, raises: [CancelledError]).} =
|
||||
async: (raw: true, raises: []).} =
|
||||
## Close server ``server`` and release all resources.
|
||||
let retFuture = newFuture[void](
|
||||
"stream.server.closeWait", {FutureFlag.OwnCancelSchedule})
|
||||
|
||||
proc continuation(udata: pointer) =
|
||||
retFuture.complete()
|
||||
|
||||
server.close()
|
||||
server.join()
|
||||
|
||||
if not(server.loopFuture.finished()):
|
||||
server.loopFuture.addCallback(continuation, cast[pointer](retFuture))
|
||||
else:
|
||||
retFuture.complete()
|
||||
retFuture
|
||||
|
||||
proc getBacklogSize(backlog: int): cint =
|
||||
doAssert(backlog >= 0 and backlog <= high(int32))
|
||||
|
|
Loading…
Reference in New Issue