Merge branch 'master' into feature/profiler-v4

This commit is contained in:
gmega 2024-01-31 11:20:38 -03:00
commit a68de9fe17
No known key found for this signature in database
GPG Key ID: FFD8DAF00660270F
29 changed files with 1127 additions and 409 deletions

View File

@ -1,13 +1,13 @@
mode = ScriptMode.Verbose mode = ScriptMode.Verbose
packageName = "chronos" packageName = "chronos"
version = "3.2.0" version = "4.0.0"
author = "Status Research & Development GmbH" author = "Status Research & Development GmbH"
description = "Networking framework with async/await support" description = "Networking framework with async/await support"
license = "MIT or Apache License 2.0" license = "MIT or Apache License 2.0"
skipDirs = @["tests"] skipDirs = @["tests"]
requires "nim >= 1.6.0", requires "nim >= 1.6.16",
"results", "results",
"stew", "stew",
"bearssl", "bearssl",

View File

@ -43,7 +43,7 @@ proc closeWait*(bstream: HttpBodyReader) {.async: (raises: []).} =
## Close and free resource allocated by body reader. ## Close and free resource allocated by body reader.
if bstream.bstate == HttpState.Alive: if bstream.bstate == HttpState.Alive:
bstream.bstate = HttpState.Closing bstream.bstate = HttpState.Closing
var res = newSeq[Future[void]]() var res = newSeq[Future[void].Raising([])]()
# We closing streams in reversed order because stream at position [0], uses # We closing streams in reversed order because stream at position [0], uses
# data from stream at position [1]. # data from stream at position [1].
for index in countdown((len(bstream.streams) - 1), 0): for index in countdown((len(bstream.streams) - 1), 0):
@ -68,7 +68,7 @@ proc closeWait*(bstream: HttpBodyWriter) {.async: (raises: []).} =
## Close and free all the resources allocated by body writer. ## Close and free all the resources allocated by body writer.
if bstream.bstate == HttpState.Alive: if bstream.bstate == HttpState.Alive:
bstream.bstate = HttpState.Closing bstream.bstate = HttpState.Closing
var res = newSeq[Future[void]]() var res = newSeq[Future[void].Raising([])]()
for index in countdown(len(bstream.streams) - 1, 0): for index in countdown(len(bstream.streams) - 1, 0):
res.add(bstream.streams[index].closeWait()) res.add(bstream.streams[index].closeWait())
await noCancel(allFutures(res)) await noCancel(allFutures(res))

View File

@ -294,7 +294,7 @@ proc new*(t: typedesc[HttpSessionRef],
if HttpClientFlag.Http11Pipeline in flags: if HttpClientFlag.Http11Pipeline in flags:
sessionWatcher(res) sessionWatcher(res)
else: else:
Future[void].Raising([]).init("session.watcher.placeholder") nil
res res
proc getTLSFlags(flags: HttpClientFlags): set[TLSFlags] = proc getTLSFlags(flags: HttpClientFlags): set[TLSFlags] =
@ -607,7 +607,7 @@ proc closeWait(conn: HttpClientConnectionRef) {.async: (raises: []).} =
conn.state = HttpClientConnectionState.Closing conn.state = HttpClientConnectionState.Closing
let pending = let pending =
block: block:
var res: seq[Future[void]] var res: seq[Future[void].Raising([])]
if not(isNil(conn.reader)) and not(conn.reader.closed()): if not(isNil(conn.reader)) and not(conn.reader.closed()):
res.add(conn.reader.closeWait()) res.add(conn.reader.closeWait())
if not(isNil(conn.writer)) and not(conn.writer.closed()): if not(isNil(conn.writer)) and not(conn.writer.closed()):
@ -847,14 +847,14 @@ proc sessionWatcher(session: HttpSessionRef) {.async: (raises: []).} =
break break
proc closeWait*(request: HttpClientRequestRef) {.async: (raises: []).} = proc closeWait*(request: HttpClientRequestRef) {.async: (raises: []).} =
var pending: seq[FutureBase] var pending: seq[Future[void].Raising([])]
if request.state notin {HttpReqRespState.Closing, HttpReqRespState.Closed}: if request.state notin {HttpReqRespState.Closing, HttpReqRespState.Closed}:
request.state = HttpReqRespState.Closing request.state = HttpReqRespState.Closing
if not(isNil(request.writer)): if not(isNil(request.writer)):
if not(request.writer.closed()): if not(request.writer.closed()):
pending.add(FutureBase(request.writer.closeWait())) pending.add(request.writer.closeWait())
request.writer = nil request.writer = nil
pending.add(FutureBase(request.releaseConnection())) pending.add(request.releaseConnection())
await noCancel(allFutures(pending)) await noCancel(allFutures(pending))
request.session = nil request.session = nil
request.error = nil request.error = nil
@ -862,14 +862,14 @@ proc closeWait*(request: HttpClientRequestRef) {.async: (raises: []).} =
untrackCounter(HttpClientRequestTrackerName) untrackCounter(HttpClientRequestTrackerName)
proc closeWait*(response: HttpClientResponseRef) {.async: (raises: []).} = proc closeWait*(response: HttpClientResponseRef) {.async: (raises: []).} =
var pending: seq[FutureBase] var pending: seq[Future[void].Raising([])]
if response.state notin {HttpReqRespState.Closing, HttpReqRespState.Closed}: if response.state notin {HttpReqRespState.Closing, HttpReqRespState.Closed}:
response.state = HttpReqRespState.Closing response.state = HttpReqRespState.Closing
if not(isNil(response.reader)): if not(isNil(response.reader)):
if not(response.reader.closed()): if not(response.reader.closed()):
pending.add(FutureBase(response.reader.closeWait())) pending.add(response.reader.closeWait())
response.reader = nil response.reader = nil
pending.add(FutureBase(response.releaseConnection())) pending.add(response.releaseConnection())
await noCancel(allFutures(pending)) await noCancel(allFutures(pending))
response.session = nil response.session = nil
response.error = nil response.error = nil

View File

@ -11,11 +11,14 @@
import std/[tables, uri, strutils] import std/[tables, uri, strutils]
import stew/[base10], httputils, results import stew/[base10], httputils, results
import ../../asyncloop, ../../asyncsync import ../../[asyncloop, asyncsync]
import ../../streams/[asyncstream, boundstream, chunkstream] import ../../streams/[asyncstream, boundstream, chunkstream]
import "."/[httptable, httpcommon, multipart] import "."/[httptable, httpcommon, multipart]
from ../../transports/common import TransportAddress, ServerFlags, `$`, `==`
export asyncloop, asyncsync, httptable, httpcommon, httputils, multipart, export asyncloop, asyncsync, httptable, httpcommon, httputils, multipart,
asyncstream, boundstream, chunkstream, uri, tables, results asyncstream, boundstream, chunkstream, uri, tables, results
export TransportAddress, ServerFlags, `$`, `==`
type type
HttpServerFlags* {.pure.} = enum HttpServerFlags* {.pure.} = enum
@ -107,6 +110,7 @@ type
maxRequestBodySize*: int maxRequestBodySize*: int
processCallback*: HttpProcessCallback2 processCallback*: HttpProcessCallback2
createConnCallback*: HttpConnectionCallback createConnCallback*: HttpConnectionCallback
middlewares: seq[HttpProcessCallback2]
HttpServerRef* = ref HttpServer HttpServerRef* = ref HttpServer
@ -158,6 +162,16 @@ type
HttpConnectionRef* = ref HttpConnection HttpConnectionRef* = ref HttpConnection
MiddlewareHandleCallback* = proc(
middleware: HttpServerMiddlewareRef, request: RequestFence,
handler: HttpProcessCallback2): Future[HttpResponseRef] {.
async: (raises: [CancelledError]).}
HttpServerMiddleware* = object of RootObj
handler*: MiddlewareHandleCallback
HttpServerMiddlewareRef* = ref HttpServerMiddleware
ByteChar* = string | seq[byte] ByteChar* = string | seq[byte]
proc init(htype: typedesc[HttpProcessError], error: HttpServerError, proc init(htype: typedesc[HttpProcessError], error: HttpServerError,
@ -175,6 +189,8 @@ proc init(htype: typedesc[HttpProcessError],
proc defaultResponse*(exc: ref CatchableError): HttpResponseRef proc defaultResponse*(exc: ref CatchableError): HttpResponseRef
proc defaultResponse*(msg: HttpMessage): HttpResponseRef
proc new(htype: typedesc[HttpConnectionHolderRef], server: HttpServerRef, proc new(htype: typedesc[HttpConnectionHolderRef], server: HttpServerRef,
transp: StreamTransport, transp: StreamTransport,
connectionId: string): HttpConnectionHolderRef = connectionId: string): HttpConnectionHolderRef =
@ -188,20 +204,54 @@ proc createConnection(server: HttpServerRef,
transp: StreamTransport): Future[HttpConnectionRef] {. transp: StreamTransport): Future[HttpConnectionRef] {.
async: (raises: [CancelledError, HttpConnectionError]).} async: (raises: [CancelledError, HttpConnectionError]).}
proc new*(htype: typedesc[HttpServerRef], proc prepareMiddlewares(
address: TransportAddress, requestProcessCallback: HttpProcessCallback2,
processCallback: HttpProcessCallback2, middlewares: openArray[HttpServerMiddlewareRef]
serverFlags: set[HttpServerFlags] = {}, ): seq[HttpProcessCallback2] =
socketFlags: set[ServerFlags] = {ReuseAddr}, var
serverUri = Uri(), handlers: seq[HttpProcessCallback2]
serverIdent = "", currentHandler = requestProcessCallback
maxConnections: int = -1,
bufferSize: int = 4096, if len(middlewares) == 0:
backlogSize: int = DefaultBacklogSize, return handlers
httpHeadersTimeout = 10.seconds,
maxHeadersSize: int = 8192, let mws = @middlewares
maxRequestBodySize: int = 1_048_576, handlers = newSeq[HttpProcessCallback2](len(mws))
dualstack = DualStackType.Auto): HttpResult[HttpServerRef] =
for index in countdown(len(mws) - 1, 0):
let processor =
block:
var res: HttpProcessCallback2
closureScope:
let
middleware = mws[index]
realHandler = currentHandler
res =
proc(request: RequestFence): Future[HttpResponseRef] {.
async: (raises: [CancelledError], raw: true).} =
middleware.handler(middleware, request, realHandler)
res
handlers[index] = processor
currentHandler = processor
handlers
proc new*(
htype: typedesc[HttpServerRef],
address: TransportAddress,
processCallback: HttpProcessCallback2,
serverFlags: set[HttpServerFlags] = {},
socketFlags: set[ServerFlags] = {ReuseAddr},
serverUri = Uri(),
serverIdent = "",
maxConnections: int = -1,
bufferSize: int = 4096,
backlogSize: int = DefaultBacklogSize,
httpHeadersTimeout = 10.seconds,
maxHeadersSize: int = 8192,
maxRequestBodySize: int = 1_048_576,
dualstack = DualStackType.Auto,
middlewares: openArray[HttpServerMiddlewareRef] = []
): HttpResult[HttpServerRef] =
let serverUri = let serverUri =
if len(serverUri.hostname) > 0: if len(serverUri.hostname) > 0:
@ -240,24 +290,28 @@ proc new*(htype: typedesc[HttpServerRef],
# else: # else:
# nil # nil
lifetime: newFuture[void]("http.server.lifetime"), lifetime: newFuture[void]("http.server.lifetime"),
connections: initOrderedTable[string, HttpConnectionHolderRef]() connections: initOrderedTable[string, HttpConnectionHolderRef](),
middlewares: prepareMiddlewares(processCallback, middlewares)
) )
ok(res) ok(res)
proc new*(htype: typedesc[HttpServerRef], proc new*(
address: TransportAddress, htype: typedesc[HttpServerRef],
processCallback: HttpProcessCallback, address: TransportAddress,
serverFlags: set[HttpServerFlags] = {}, processCallback: HttpProcessCallback,
socketFlags: set[ServerFlags] = {ReuseAddr}, serverFlags: set[HttpServerFlags] = {},
serverUri = Uri(), socketFlags: set[ServerFlags] = {ReuseAddr},
serverIdent = "", serverUri = Uri(),
maxConnections: int = -1, serverIdent = "",
bufferSize: int = 4096, maxConnections: int = -1,
backlogSize: int = DefaultBacklogSize, bufferSize: int = 4096,
httpHeadersTimeout = 10.seconds, backlogSize: int = DefaultBacklogSize,
maxHeadersSize: int = 8192, httpHeadersTimeout = 10.seconds,
maxRequestBodySize: int = 1_048_576, maxHeadersSize: int = 8192,
dualstack = DualStackType.Auto): HttpResult[HttpServerRef] {. maxRequestBodySize: int = 1_048_576,
dualstack = DualStackType.Auto,
middlewares: openArray[HttpServerMiddlewareRef] = []
): HttpResult[HttpServerRef] {.
deprecated: "Callback could raise only CancelledError, annotate with " & deprecated: "Callback could raise only CancelledError, annotate with " &
"{.async: (raises: [CancelledError]).}".} = "{.async: (raises: [CancelledError]).}".} =
@ -273,7 +327,7 @@ proc new*(htype: typedesc[HttpServerRef],
HttpServerRef.new(address, wrap, serverFlags, socketFlags, serverUri, HttpServerRef.new(address, wrap, serverFlags, socketFlags, serverUri,
serverIdent, maxConnections, bufferSize, backlogSize, serverIdent, maxConnections, bufferSize, backlogSize,
httpHeadersTimeout, maxHeadersSize, maxRequestBodySize, httpHeadersTimeout, maxHeadersSize, maxRequestBodySize,
dualstack) dualstack, middlewares)
proc getServerFlags(req: HttpRequestRef): set[HttpServerFlags] = proc getServerFlags(req: HttpRequestRef): set[HttpServerFlags] =
var defaultFlags: set[HttpServerFlags] = {} var defaultFlags: set[HttpServerFlags] = {}
@ -345,6 +399,18 @@ proc defaultResponse*(exc: ref CatchableError): HttpResponseRef =
else: else:
HttpResponseRef(state: HttpResponseState.ErrorCode, status: Http503) HttpResponseRef(state: HttpResponseState.ErrorCode, status: Http503)
proc defaultResponse*(msg: HttpMessage): HttpResponseRef =
HttpResponseRef(state: HttpResponseState.ErrorCode, status: msg.code)
proc defaultResponse*(err: HttpProcessError): HttpResponseRef =
HttpResponseRef(state: HttpResponseState.ErrorCode, status: err.code)
proc dropResponse*(): HttpResponseRef =
HttpResponseRef(state: HttpResponseState.Failed)
proc codeResponse*(status: HttpCode): HttpResponseRef =
HttpResponseRef(state: HttpResponseState.ErrorCode, status: status)
proc dumbResponse*(): HttpResponseRef {. proc dumbResponse*(): HttpResponseRef {.
deprecated: "Please use defaultResponse() instead".} = deprecated: "Please use defaultResponse() instead".} =
## Create an empty response to return when request processor got no request. ## Create an empty response to return when request processor got no request.
@ -362,29 +428,21 @@ proc hasBody*(request: HttpRequestRef): bool =
request.requestFlags * {HttpRequestFlags.BoundBody, request.requestFlags * {HttpRequestFlags.BoundBody,
HttpRequestFlags.UnboundBody} != {} HttpRequestFlags.UnboundBody} != {}
proc prepareRequest(conn: HttpConnectionRef, func new(t: typedesc[HttpRequestRef], conn: HttpConnectionRef): HttpRequestRef =
req: HttpRequestHeader): HttpResultMessage[HttpRequestRef] = HttpRequestRef(connection: conn, state: HttpState.Alive)
var request = HttpRequestRef(connection: conn, state: HttpState.Alive)
if req.version notin {HttpVersion10, HttpVersion11}: proc updateRequest*(request: HttpRequestRef, scheme: string, meth: HttpMethod,
return err(HttpMessage.init(Http505, "Unsupported HTTP protocol version")) version: HttpVersion, requestUri: string,
headers: HttpTable): HttpResultMessage[void] =
## Update HTTP request object using base request object with new properties.
request.scheme = # Store request version and call method.
if HttpServerFlags.Secure in conn.server.flags: request.scheme = scheme
"https" request.version = version
else: request.meth = meth
"http"
request.version = req.version
request.meth = req.meth
request.rawPath =
block:
let res = req.uri()
if len(res) == 0:
return err(HttpMessage.init(Http400, "Invalid request URI"))
res
# Processing request's URI
request.rawPath = requestUri
request.uri = request.uri =
if request.rawPath != "*": if request.rawPath != "*":
let uri = parseUri(request.rawPath) let uri = parseUri(request.rawPath)
@ -396,10 +454,11 @@ proc prepareRequest(conn: HttpConnectionRef,
uri.path = "*" uri.path = "*"
uri uri
# Conversion of request query string to HttpTable.
request.query = request.query =
block: block:
let queryFlags = let queryFlags =
if QueryCommaSeparatedArray in conn.server.flags: if QueryCommaSeparatedArray in request.connection.server.flags:
{QueryParamsFlag.CommaSeparatedArray} {QueryParamsFlag.CommaSeparatedArray}
else: else:
{} {}
@ -408,22 +467,8 @@ proc prepareRequest(conn: HttpConnectionRef,
table.add(key, value) table.add(key, value)
table table
request.headers = # Store request headers
block: request.headers = headers
var table = HttpTable.init()
# Retrieve headers and values
for key, value in req.headers():
table.add(key, value)
# Validating HTTP request headers
# Some of the headers must be present only once.
if table.count(ContentTypeHeader) > 1:
return err(HttpMessage.init(Http400, "Multiple Content-Type headers"))
if table.count(ContentLengthHeader) > 1:
return err(HttpMessage.init(Http400, "Multiple Content-Length headers"))
if table.count(TransferEncodingHeader) > 1:
return err(HttpMessage.init(Http400,
"Multuple Transfer-Encoding headers"))
table
# Preprocessing "Content-Encoding" header. # Preprocessing "Content-Encoding" header.
request.contentEncoding = request.contentEncoding =
@ -443,15 +488,17 @@ proc prepareRequest(conn: HttpConnectionRef,
# steps to reveal information about body. # steps to reveal information about body.
request.contentLength = request.contentLength =
if ContentLengthHeader in request.headers: if ContentLengthHeader in request.headers:
# Request headers has `Content-Length` header present.
let length = request.headers.getInt(ContentLengthHeader) let length = request.headers.getInt(ContentLengthHeader)
if length != 0: if length != 0:
if request.meth == MethodTrace: if request.meth == MethodTrace:
let msg = "TRACE requests could not have request body" let msg = "TRACE requests could not have request body"
return err(HttpMessage.init(Http400, msg)) return err(HttpMessage.init(Http400, msg))
# Because of coversion to `int` we should avoid unexpected OverflowError. # Because of coversion to `int` we should avoid unexpected
# OverflowError.
if length > uint64(high(int)): if length > uint64(high(int)):
return err(HttpMessage.init(Http413, "Unsupported content length")) return err(HttpMessage.init(Http413, "Unsupported content length"))
if length > uint64(conn.server.maxRequestBodySize): if length > uint64(request.connection.server.maxRequestBodySize):
return err(HttpMessage.init(Http413, "Content length exceeds limits")) return err(HttpMessage.init(Http413, "Content length exceeds limits"))
request.requestFlags.incl(HttpRequestFlags.BoundBody) request.requestFlags.incl(HttpRequestFlags.BoundBody)
int(length) int(length)
@ -459,6 +506,7 @@ proc prepareRequest(conn: HttpConnectionRef,
0 0
else: else:
if TransferEncodingFlags.Chunked in request.transferEncoding: if TransferEncodingFlags.Chunked in request.transferEncoding:
# Request headers has "Transfer-Encoding: chunked" header present.
if request.meth == MethodTrace: if request.meth == MethodTrace:
let msg = "TRACE requests could not have request body" let msg = "TRACE requests could not have request body"
return err(HttpMessage.init(Http400, msg)) return err(HttpMessage.init(Http400, msg))
@ -466,8 +514,9 @@ proc prepareRequest(conn: HttpConnectionRef,
0 0
if request.hasBody(): if request.hasBody():
# If request has body, we going to understand how its encoded. # If the request has a body, we will determine how it is encoded.
if ContentTypeHeader in request.headers: if ContentTypeHeader in request.headers:
# Request headers has "Content-Type" header present.
let contentType = let contentType =
getContentType(request.headers.getList(ContentTypeHeader)).valueOr: getContentType(request.headers.getList(ContentTypeHeader)).valueOr:
let msg = "Incorrect or missing Content-Type header" let msg = "Incorrect or missing Content-Type header"
@ -477,12 +526,67 @@ proc prepareRequest(conn: HttpConnectionRef,
elif contentType == MultipartContentType: elif contentType == MultipartContentType:
request.requestFlags.incl(HttpRequestFlags.MultipartForm) request.requestFlags.incl(HttpRequestFlags.MultipartForm)
request.contentTypeData = Opt.some(contentType) request.contentTypeData = Opt.some(contentType)
# If `Expect` header is present, we will handle expectation procedure.
if ExpectHeader in request.headers: if ExpectHeader in request.headers:
let expectHeader = request.headers.getString(ExpectHeader) let expectHeader = request.headers.getString(ExpectHeader)
if strip(expectHeader).toLowerAscii() == "100-continue": if strip(expectHeader).toLowerAscii() == "100-continue":
request.requestFlags.incl(HttpRequestFlags.ClientExpect) request.requestFlags.incl(HttpRequestFlags.ClientExpect)
ok()
proc updateRequest*(request: HttpRequestRef, meth: HttpMethod,
requestUri: string,
headers: HttpTable): HttpResultMessage[void] =
## Update HTTP request object using base request object with new properties.
updateRequest(request, request.scheme, meth, request.version, requestUri,
headers)
proc updateRequest*(request: HttpRequestRef, requestUri: string,
headers: HttpTable): HttpResultMessage[void] =
## Update HTTP request object using base request object with new properties.
updateRequest(request, request.scheme, request.meth, request.version,
requestUri, headers)
proc updateRequest*(request: HttpRequestRef,
requestUri: string): HttpResultMessage[void] =
## Update HTTP request object using base request object with new properties.
updateRequest(request, request.scheme, request.meth, request.version,
requestUri, request.headers)
proc updateRequest*(request: HttpRequestRef,
headers: HttpTable): HttpResultMessage[void] =
## Update HTTP request object using base request object with new properties.
updateRequest(request, request.scheme, request.meth, request.version,
request.rawPath, headers)
proc prepareRequest(conn: HttpConnectionRef,
req: HttpRequestHeader): HttpResultMessage[HttpRequestRef] =
let
request = HttpRequestRef.new(conn)
scheme =
if HttpServerFlags.Secure in conn.server.flags:
"https"
else:
"http"
headers =
block:
var table = HttpTable.init()
# Retrieve headers and values
for key, value in req.headers():
table.add(key, value)
# Validating HTTP request headers
# Some of the headers must be present only once.
if table.count(ContentTypeHeader) > 1:
return err(HttpMessage.init(Http400,
"Multiple Content-Type headers"))
if table.count(ContentLengthHeader) > 1:
return err(HttpMessage.init(Http400,
"Multiple Content-Length headers"))
if table.count(TransferEncodingHeader) > 1:
return err(HttpMessage.init(Http400,
"Multuple Transfer-Encoding headers"))
table
? updateRequest(request, scheme, req.meth, req.version, req.uri(), headers)
trackCounter(HttpServerRequestTrackerName) trackCounter(HttpServerRequestTrackerName)
ok(request) ok(request)
@ -736,16 +840,19 @@ proc sendDefaultResponse(
# Response was ignored, so we respond with not found. # Response was ignored, so we respond with not found.
await conn.sendErrorResponse(version, Http404, await conn.sendErrorResponse(version, Http404,
keepConnection.toBool()) keepConnection.toBool())
response.setResponseState(HttpResponseState.Finished)
keepConnection keepConnection
of HttpResponseState.Prepared: of HttpResponseState.Prepared:
# Response was prepared but not sent, so we can respond with some # Response was prepared but not sent, so we can respond with some
# error code # error code
await conn.sendErrorResponse(HttpVersion11, Http409, await conn.sendErrorResponse(HttpVersion11, Http409,
keepConnection.toBool()) keepConnection.toBool())
response.setResponseState(HttpResponseState.Finished)
keepConnection keepConnection
of HttpResponseState.ErrorCode: of HttpResponseState.ErrorCode:
# Response with error code # Response with error code
await conn.sendErrorResponse(version, response.status, false) await conn.sendErrorResponse(version, response.status, false)
response.setResponseState(HttpResponseState.Finished)
HttpProcessExitType.Immediate HttpProcessExitType.Immediate
of HttpResponseState.Sending, HttpResponseState.Failed, of HttpResponseState.Sending, HttpResponseState.Failed,
HttpResponseState.Cancelled: HttpResponseState.Cancelled:
@ -755,6 +862,7 @@ proc sendDefaultResponse(
# Response was ignored, so we respond with not found. # Response was ignored, so we respond with not found.
await conn.sendErrorResponse(version, Http404, await conn.sendErrorResponse(version, Http404,
keepConnection.toBool()) keepConnection.toBool())
response.setResponseState(HttpResponseState.Finished)
keepConnection keepConnection
of HttpResponseState.Finished: of HttpResponseState.Finished:
keepConnection keepConnection
@ -878,6 +986,25 @@ proc getRemoteAddress(connection: HttpConnectionRef): Opt[TransportAddress] =
if isNil(connection): return Opt.none(TransportAddress) if isNil(connection): return Opt.none(TransportAddress)
getRemoteAddress(connection.transp) getRemoteAddress(connection.transp)
proc getLocalAddress(transp: StreamTransport): Opt[TransportAddress] =
if isNil(transp): return Opt.none(TransportAddress)
try:
Opt.some(transp.localAddress())
except TransportOsError:
Opt.none(TransportAddress)
proc getLocalAddress(connection: HttpConnectionRef): Opt[TransportAddress] =
if isNil(connection): return Opt.none(TransportAddress)
getLocalAddress(connection.transp)
proc remote*(request: HttpRequestRef): Opt[TransportAddress] =
## Returns remote address of HTTP request's connection.
request.connection.getRemoteAddress()
proc local*(request: HttpRequestRef): Opt[TransportAddress] =
## Returns local address of HTTP request's connection.
request.connection.getLocalAddress()
proc getRequestFence*(server: HttpServerRef, proc getRequestFence*(server: HttpServerRef,
connection: HttpConnectionRef): Future[RequestFence] {. connection: HttpConnectionRef): Future[RequestFence] {.
async: (raises: []).} = async: (raises: []).} =
@ -920,6 +1047,14 @@ proc getConnectionFence*(server: HttpServerRef,
ConnectionFence.err(HttpProcessError.init( ConnectionFence.err(HttpProcessError.init(
HttpServerError.DisconnectError, exc, address, Http400)) HttpServerError.DisconnectError, exc, address, Http400))
proc invokeProcessCallback(server: HttpServerRef,
req: RequestFence): Future[HttpResponseRef] {.
async: (raw: true, raises: [CancelledError]).} =
if len(server.middlewares) > 0:
server.middlewares[0](req)
else:
server.processCallback(req)
proc processRequest(server: HttpServerRef, proc processRequest(server: HttpServerRef,
connection: HttpConnectionRef, connection: HttpConnectionRef,
connId: string): Future[HttpProcessExitType] {. connId: string): Future[HttpProcessExitType] {.
@ -941,7 +1076,7 @@ proc processRequest(server: HttpServerRef,
try: try:
let response = let response =
try: try:
await connection.server.processCallback(requestFence) await invokeProcessCallback(connection.server, requestFence)
except CancelledError: except CancelledError:
# Cancelled, exiting # Cancelled, exiting
return HttpProcessExitType.Immediate return HttpProcessExitType.Immediate
@ -962,7 +1097,7 @@ proc processLoop(holder: HttpConnectionHolderRef) {.async: (raises: []).} =
if res.isErr(): if res.isErr():
if res.error.kind != HttpServerError.InterruptError: if res.error.kind != HttpServerError.InterruptError:
discard await noCancel( discard await noCancel(
server.processCallback(RequestFence.err(res.error))) invokeProcessCallback(server, RequestFence.err(res.error)))
server.connections.del(connectionId) server.connections.del(connectionId)
return return
res.get() res.get()
@ -1160,31 +1295,6 @@ proc post*(req: HttpRequestRef): Future[HttpTable] {.
elif HttpRequestFlags.UnboundBody in req.requestFlags: elif HttpRequestFlags.UnboundBody in req.requestFlags:
raiseHttpProtocolError(Http400, "Unsupported request body") raiseHttpProtocolError(Http400, "Unsupported request body")
proc setHeader*(resp: HttpResponseRef, key, value: string) =
## Sets value of header ``key`` to ``value``.
doAssert(resp.getResponseState() == HttpResponseState.Empty)
resp.headersTable.set(key, value)
proc setHeaderDefault*(resp: HttpResponseRef, key, value: string) =
## Sets value of header ``key`` to ``value``, only if header ``key`` is not
## present in the headers table.
discard resp.headersTable.hasKeyOrPut(key, value)
proc addHeader*(resp: HttpResponseRef, key, value: string) =
## Adds value ``value`` to header's ``key`` value.
doAssert(resp.getResponseState() == HttpResponseState.Empty)
resp.headersTable.add(key, value)
proc getHeader*(resp: HttpResponseRef, key: string,
default: string = ""): string =
## Returns value of header with name ``name`` or ``default``, if header is
## not present in the table.
resp.headersTable.getString(key, default)
proc hasHeader*(resp: HttpResponseRef, key: string): bool =
## Returns ``true`` if header with name ``key`` present in the headers table.
key in resp.headersTable
template checkPending(t: untyped) = template checkPending(t: untyped) =
let currentState = t.getResponseState() let currentState = t.getResponseState()
doAssert(currentState == HttpResponseState.Empty, doAssert(currentState == HttpResponseState.Empty,
@ -1199,10 +1309,41 @@ template checkStreamResponseState(t: untyped) =
{HttpResponseState.Prepared, HttpResponseState.Sending}, {HttpResponseState.Prepared, HttpResponseState.Sending},
"Response is in the wrong state") "Response is in the wrong state")
template checkResponseCanBeModified(t: untyped) =
doAssert(t.getResponseState() in
{HttpResponseState.Empty, HttpResponseState.ErrorCode},
"Response could not be modified at this stage")
template checkPointerLength(t1, t2: untyped) = template checkPointerLength(t1, t2: untyped) =
doAssert(not(isNil(t1)), "pbytes must not be nil") doAssert(not(isNil(t1)), "pbytes must not be nil")
doAssert(t2 >= 0, "nbytes should be bigger or equal to zero") doAssert(t2 >= 0, "nbytes should be bigger or equal to zero")
proc setHeader*(resp: HttpResponseRef, key, value: string) =
## Sets value of header ``key`` to ``value``.
checkResponseCanBeModified(resp)
resp.headersTable.set(key, value)
proc setHeaderDefault*(resp: HttpResponseRef, key, value: string) =
## Sets value of header ``key`` to ``value``, only if header ``key`` is not
## present in the headers table.
checkResponseCanBeModified(resp)
discard resp.headersTable.hasKeyOrPut(key, value)
proc addHeader*(resp: HttpResponseRef, key, value: string) =
## Adds value ``value`` to header's ``key`` value.
checkResponseCanBeModified(resp)
resp.headersTable.add(key, value)
proc getHeader*(resp: HttpResponseRef, key: string,
default: string = ""): string =
## Returns value of header with name ``name`` or ``default``, if header is
## not present in the table.
resp.headersTable.getString(key, default)
proc hasHeader*(resp: HttpResponseRef, key: string): bool =
## Returns ``true`` if header with name ``key`` present in the headers table.
key in resp.headersTable
func createHeaders(resp: HttpResponseRef): string = func createHeaders(resp: HttpResponseRef): string =
var answer = $(resp.version) & " " & $(resp.status) & "\r\n" var answer = $(resp.version) & " " & $(resp.status) & "\r\n"
for k, v in resp.headersTable.stringItems(): for k, v in resp.headersTable.stringItems():

View File

@ -231,8 +231,9 @@ proc closeProcessHandles(pipes: var AsyncProcessPipes,
lastError: OSErrorCode): OSErrorCode {.apforward.} lastError: OSErrorCode): OSErrorCode {.apforward.}
proc closeProcessStreams(pipes: AsyncProcessPipes, proc closeProcessStreams(pipes: AsyncProcessPipes,
options: set[AsyncProcessOption]): Future[void] {. options: set[AsyncProcessOption]): Future[void] {.
apforward.} async: (raises: []).}
proc closeWait(holder: AsyncStreamHolder): Future[void] {.apforward.} proc closeWait(holder: AsyncStreamHolder): Future[void] {.
async: (raises: []).}
template isOk(code: OSErrorCode): bool = template isOk(code: OSErrorCode): bool =
when defined(windows): when defined(windows):
@ -391,7 +392,8 @@ when defined(windows):
stdinHandle = ProcessStreamHandle(), stdinHandle = ProcessStreamHandle(),
stdoutHandle = ProcessStreamHandle(), stdoutHandle = ProcessStreamHandle(),
stderrHandle = ProcessStreamHandle(), stderrHandle = ProcessStreamHandle(),
): Future[AsyncProcessRef] {.async.} = ): Future[AsyncProcessRef] {.
async: (raises: [AsyncProcessError, CancelledError]).} =
var var
pipes = preparePipes(options, stdinHandle, stdoutHandle, pipes = preparePipes(options, stdinHandle, stdoutHandle,
stderrHandle).valueOr: stderrHandle).valueOr:
@ -517,14 +519,16 @@ when defined(windows):
ok(false) ok(false)
proc waitForExit*(p: AsyncProcessRef, proc waitForExit*(p: AsyncProcessRef,
timeout = InfiniteDuration): Future[int] {.async.} = timeout = InfiniteDuration): Future[int] {.
async: (raises: [AsyncProcessError, AsyncProcessTimeoutError,
CancelledError]).} =
if p.exitStatus.isSome(): if p.exitStatus.isSome():
return p.exitStatus.get() return p.exitStatus.get()
let wres = let wres =
try: try:
await waitForSingleObject(p.processHandle, timeout) await waitForSingleObject(p.processHandle, timeout)
except ValueError as exc: except AsyncError as exc:
raiseAsyncProcessError("Unable to wait for process handle", exc) raiseAsyncProcessError("Unable to wait for process handle", exc)
if wres == WaitableResult.Timeout: if wres == WaitableResult.Timeout:
@ -537,7 +541,8 @@ when defined(windows):
if exitCode >= 0: if exitCode >= 0:
p.exitStatus = Opt.some(exitCode) p.exitStatus = Opt.some(exitCode)
return exitCode
exitCode
proc peekExitCode(p: AsyncProcessRef): AsyncProcessResult[int] = proc peekExitCode(p: AsyncProcessRef): AsyncProcessResult[int] =
if p.exitStatus.isSome(): if p.exitStatus.isSome():
@ -787,7 +792,8 @@ else:
stdinHandle = ProcessStreamHandle(), stdinHandle = ProcessStreamHandle(),
stdoutHandle = ProcessStreamHandle(), stdoutHandle = ProcessStreamHandle(),
stderrHandle = ProcessStreamHandle(), stderrHandle = ProcessStreamHandle(),
): Future[AsyncProcessRef] {.async.} = ): Future[AsyncProcessRef] {.
async: (raises: [AsyncProcessError, CancelledError]).} =
var var
pid: Pid pid: Pid
pipes = preparePipes(options, stdinHandle, stdoutHandle, pipes = preparePipes(options, stdinHandle, stdoutHandle,
@ -887,7 +893,7 @@ else:
) )
trackCounter(AsyncProcessTrackerName) trackCounter(AsyncProcessTrackerName)
return process process
proc peekProcessExitCode(p: AsyncProcessRef, proc peekProcessExitCode(p: AsyncProcessRef,
reap = false): AsyncProcessResult[int] = reap = false): AsyncProcessResult[int] =
@ -948,7 +954,9 @@ else:
ok(false) ok(false)
proc waitForExit*(p: AsyncProcessRef, proc waitForExit*(p: AsyncProcessRef,
timeout = InfiniteDuration): Future[int] = timeout = InfiniteDuration): Future[int] {.
async: (raw: true, raises: [
AsyncProcessError, AsyncProcessTimeoutError, CancelledError]).} =
var var
retFuture = newFuture[int]("chronos.waitForExit()") retFuture = newFuture[int]("chronos.waitForExit()")
processHandle: ProcessHandle processHandle: ProcessHandle
@ -1050,7 +1058,7 @@ else:
# Process is still running, so we going to wait for SIGCHLD. # Process is still running, so we going to wait for SIGCHLD.
retFuture.cancelCallback = cancellation retFuture.cancelCallback = cancellation
return retFuture retFuture
proc peekExitCode(p: AsyncProcessRef): AsyncProcessResult[int] = proc peekExitCode(p: AsyncProcessRef): AsyncProcessResult[int] =
let res = ? p.peekProcessExitCode() let res = ? p.peekProcessExitCode()
@ -1155,7 +1163,7 @@ proc preparePipes(options: set[AsyncProcessOption],
stderrHandle: remoteStderr stderrHandle: remoteStderr
)) ))
proc closeWait(holder: AsyncStreamHolder) {.async.} = proc closeWait(holder: AsyncStreamHolder) {.async: (raises: []).} =
let (future, transp) = let (future, transp) =
case holder.kind case holder.kind
of StreamKind.None: of StreamKind.None:
@ -1182,10 +1190,11 @@ proc closeWait(holder: AsyncStreamHolder) {.async.} =
res res
if len(pending) > 0: if len(pending) > 0:
await allFutures(pending) await noCancel allFutures(pending)
proc closeProcessStreams(pipes: AsyncProcessPipes, proc closeProcessStreams(pipes: AsyncProcessPipes,
options: set[AsyncProcessOption]): Future[void] = options: set[AsyncProcessOption]): Future[void] {.
async: (raw: true, raises: []).} =
let pending = let pending =
block: block:
var res: seq[Future[void]] var res: seq[Future[void]]
@ -1196,10 +1205,12 @@ proc closeProcessStreams(pipes: AsyncProcessPipes,
if ProcessFlag.AutoStderr in pipes.flags: if ProcessFlag.AutoStderr in pipes.flags:
res.add(pipes.stderrHolder.closeWait()) res.add(pipes.stderrHolder.closeWait())
res res
allFutures(pending) noCancel allFutures(pending)
proc opAndWaitForExit(p: AsyncProcessRef, op: WaitOperation, proc opAndWaitForExit(p: AsyncProcessRef, op: WaitOperation,
timeout = InfiniteDuration): Future[int] {.async.} = timeout = InfiniteDuration): Future[int] {.
async: (raises: [
AsyncProcessError, AsyncProcessTimeoutError, CancelledError]).} =
let timerFut = let timerFut =
if timeout == InfiniteDuration: if timeout == InfiniteDuration:
newFuture[void]("chronos.killAndwaitForExit") newFuture[void]("chronos.killAndwaitForExit")
@ -1223,7 +1234,10 @@ proc opAndWaitForExit(p: AsyncProcessRef, op: WaitOperation,
return exitCode return exitCode
let waitFut = p.waitForExit().wait(100.milliseconds) let waitFut = p.waitForExit().wait(100.milliseconds)
discard await race(FutureBase(waitFut), FutureBase(timerFut)) try:
discard await race(FutureBase(waitFut), FutureBase(timerFut))
except ValueError:
raiseAssert "This should not be happened!"
if waitFut.finished() and not(waitFut.failed()): if waitFut.finished() and not(waitFut.failed()):
let res = p.peekExitCode() let res = p.peekExitCode()
@ -1237,25 +1251,28 @@ proc opAndWaitForExit(p: AsyncProcessRef, op: WaitOperation,
await waitFut.cancelAndWait() await waitFut.cancelAndWait()
raiseAsyncProcessTimeoutError() raiseAsyncProcessTimeoutError()
proc closeWait*(p: AsyncProcessRef) {.async.} = proc closeWait*(p: AsyncProcessRef) {.async: (raises: []).} =
# Here we ignore all possible errrors, because we do not want to raise # Here we ignore all possible errrors, because we do not want to raise
# exceptions. # exceptions.
discard closeProcessHandles(p.pipes, p.options, OSErrorCode(0)) discard closeProcessHandles(p.pipes, p.options, OSErrorCode(0))
await noCancel(p.pipes.closeProcessStreams(p.options)) await p.pipes.closeProcessStreams(p.options)
discard p.closeThreadAndProcessHandle() discard p.closeThreadAndProcessHandle()
untrackCounter(AsyncProcessTrackerName) untrackCounter(AsyncProcessTrackerName)
proc stdinStream*(p: AsyncProcessRef): AsyncStreamWriter = proc stdinStream*(p: AsyncProcessRef): AsyncStreamWriter =
## Returns STDIN async stream associated with process `p`.
doAssert(p.pipes.stdinHolder.kind == StreamKind.Writer, doAssert(p.pipes.stdinHolder.kind == StreamKind.Writer,
"StdinStreamWriter is not available") "StdinStreamWriter is not available")
p.pipes.stdinHolder.writer p.pipes.stdinHolder.writer
proc stdoutStream*(p: AsyncProcessRef): AsyncStreamReader = proc stdoutStream*(p: AsyncProcessRef): AsyncStreamReader =
## Returns STDOUT async stream associated with process `p`.
doAssert(p.pipes.stdoutHolder.kind == StreamKind.Reader, doAssert(p.pipes.stdoutHolder.kind == StreamKind.Reader,
"StdoutStreamReader is not available") "StdoutStreamReader is not available")
p.pipes.stdoutHolder.reader p.pipes.stdoutHolder.reader
proc stderrStream*(p: AsyncProcessRef): AsyncStreamReader = proc stderrStream*(p: AsyncProcessRef): AsyncStreamReader =
## Returns STDERR async stream associated with process `p`.
doAssert(p.pipes.stderrHolder.kind == StreamKind.Reader, doAssert(p.pipes.stderrHolder.kind == StreamKind.Reader,
"StderrStreamReader is not available") "StderrStreamReader is not available")
p.pipes.stderrHolder.reader p.pipes.stderrHolder.reader
@ -1263,7 +1280,9 @@ proc stderrStream*(p: AsyncProcessRef): AsyncStreamReader =
proc execCommand*(command: string, proc execCommand*(command: string,
options = {AsyncProcessOption.EvalCommand}, options = {AsyncProcessOption.EvalCommand},
timeout = InfiniteDuration timeout = InfiniteDuration
): Future[int] {.async.} = ): Future[int] {.
async: (raises: [
AsyncProcessError, AsyncProcessTimeoutError, CancelledError]).} =
let let
poptions = options + {AsyncProcessOption.EvalCommand} poptions = options + {AsyncProcessOption.EvalCommand}
process = await startProcess(command, options = poptions) process = await startProcess(command, options = poptions)
@ -1277,7 +1296,9 @@ proc execCommand*(command: string,
proc execCommandEx*(command: string, proc execCommandEx*(command: string,
options = {AsyncProcessOption.EvalCommand}, options = {AsyncProcessOption.EvalCommand},
timeout = InfiniteDuration timeout = InfiniteDuration
): Future[CommandExResponse] {.async.} = ): Future[CommandExResponse] {.
async: (raises: [
AsyncProcessError, AsyncProcessTimeoutError, CancelledError]).} =
let let
process = await startProcess(command, options = options, process = await startProcess(command, options = options,
stdoutHandle = AsyncProcess.Pipe, stdoutHandle = AsyncProcess.Pipe,
@ -1291,13 +1312,13 @@ proc execCommandEx*(command: string,
status = await process.waitForExit(timeout) status = await process.waitForExit(timeout)
output = output =
try: try:
string.fromBytes(outputReader.read()) string.fromBytes(await outputReader)
except AsyncStreamError as exc: except AsyncStreamError as exc:
raiseAsyncProcessError("Unable to read process' stdout channel", raiseAsyncProcessError("Unable to read process' stdout channel",
exc) exc)
error = error =
try: try:
string.fromBytes(errorReader.read()) string.fromBytes(await errorReader)
except AsyncStreamError as exc: except AsyncStreamError as exc:
raiseAsyncProcessError("Unable to read process' stderr channel", raiseAsyncProcessError("Unable to read process' stderr channel",
exc) exc)
@ -1308,13 +1329,15 @@ proc execCommandEx*(command: string,
res res
proc pid*(p: AsyncProcessRef): int = proc pid*(p: AsyncProcessRef): int =
## Returns process ``p`` identifier. ## Returns process ``p`` unique process identifier.
int(p.processId) int(p.processId)
template processId*(p: AsyncProcessRef): int = pid(p) template processId*(p: AsyncProcessRef): int = pid(p)
proc killAndWaitForExit*(p: AsyncProcessRef, proc killAndWaitForExit*(p: AsyncProcessRef,
timeout = InfiniteDuration): Future[int] = timeout = InfiniteDuration): Future[int] {.
async: (raw: true, raises: [
AsyncProcessError, AsyncProcessTimeoutError, CancelledError]).} =
## Perform continuous attempts to kill the ``p`` process for specified period ## Perform continuous attempts to kill the ``p`` process for specified period
## of time ``timeout``. ## of time ``timeout``.
## ##
@ -1330,7 +1353,9 @@ proc killAndWaitForExit*(p: AsyncProcessRef,
opAndWaitForExit(p, WaitOperation.Kill, timeout) opAndWaitForExit(p, WaitOperation.Kill, timeout)
proc terminateAndWaitForExit*(p: AsyncProcessRef, proc terminateAndWaitForExit*(p: AsyncProcessRef,
timeout = InfiniteDuration): Future[int] = timeout = InfiniteDuration): Future[int] {.
async: (raw: true, raises: [
AsyncProcessError, AsyncProcessTimeoutError, CancelledError]).} =
## Perform continuous attempts to terminate the ``p`` process for specified ## Perform continuous attempts to terminate the ``p`` process for specified
## period of time ``timeout``. ## period of time ``timeout``.
## ##

View File

@ -523,15 +523,13 @@ proc closeWait*(ab: AsyncEventQueue): Future[void] {.
{FutureFlag.OwnCancelSchedule}) {FutureFlag.OwnCancelSchedule})
proc continuation(udata: pointer) {.gcsafe.} = proc continuation(udata: pointer) {.gcsafe.} =
retFuture.complete() retFuture.complete()
proc cancellation(udata: pointer) {.gcsafe.} =
# We are not going to change the state of `retFuture` to cancelled, so we # Ignore cancellation requests - we'll complete the future soon enough
# will prevent the entire sequence of Futures from being cancelled. retFuture.cancelCallback = nil
discard
ab.close() ab.close()
# Schedule `continuation` to be called only after all the `reader` # Schedule `continuation` to be called only after all the `reader`
# notifications will be scheduled and processed. # notifications will be scheduled and processed.
retFuture.cancelCallback = cancellation
callSoon(continuation) callSoon(continuation)
retFuture retFuture

View File

@ -11,6 +11,15 @@
## `chronosDebug` can be defined to enable several debugging helpers that come ## `chronosDebug` can be defined to enable several debugging helpers that come
## with a runtime cost - it is recommeneded to not enable these in production ## with a runtime cost - it is recommeneded to not enable these in production
## code. ## code.
##
## In this file we also declare feature flags starting with `chronosHas...` -
## these constants are declared when a feature exists in a particular release -
## each flag is declared as an integer starting at 0 during experimental
## development, 1 when feature complete and higher numbers when significant
## functionality has been added. If a feature ends up being removed (or changed
## in a backwards-incompatible way), the feature flag will be removed or renamed
## also - you can use `when declared(chronosHasXxx): when chronosHasXxx >= N:`
## to require a particular version.
const const
chronosHandleException* {.booldefine.}: bool = false chronosHandleException* {.booldefine.}: bool = false
## Remap `Exception` to `AsyncExceptionError` for all `async` functions. ## Remap `Exception` to `AsyncExceptionError` for all `async` functions.
@ -84,6 +93,9 @@ const
"" ""
## OS polling engine type which is going to be used by chronos. ## OS polling engine type which is going to be used by chronos.
chronosHasRaises* = 0
## raises effect support via `async: (raises: [])`
when defined(chronosStrictException): when defined(chronosStrictException):
{.warning: "-d:chronosStrictException has been deprecated in favor of handleException".} {.warning: "-d:chronosStrictException has been deprecated in favor of handleException".}
# In chronos v3, this setting was used as the opposite of # In chronos v3, this setting was used as the opposite of

View File

@ -34,6 +34,19 @@ type
FutureFlag* {.pure.} = enum FutureFlag* {.pure.} = enum
OwnCancelSchedule OwnCancelSchedule
## When OwnCancelSchedule is set, the owner of the future is responsible
## for implementing cancellation in one of 3 ways:
##
## * ensure that cancellation requests never reach the future by means of
## not exposing it to user code, `await` and `tryCancel`
## * set `cancelCallback` to `nil` to stop cancellation propagation - this
## is appropriate when it is expected that the future will be completed
## in a regular way "soon"
## * set `cancelCallback` to a handler that implements cancellation in an
## operation-specific way
##
## If `cancelCallback` is not set and the future gets cancelled, a
## `Defect` will be raised.
FutureFlags* = set[FutureFlag] FutureFlags* = set[FutureFlag]
@ -111,6 +124,12 @@ proc internalInitFutureBase*(fut: FutureBase, loc: ptr SrcLoc,
fut.internalState = state fut.internalState = state
fut.internalLocation[LocationKind.Create] = loc fut.internalLocation[LocationKind.Create] = loc
fut.internalFlags = flags fut.internalFlags = flags
if FutureFlag.OwnCancelSchedule in flags:
# Owners must replace `cancelCallback` with `nil` if they want to ignore
# cancellations
fut.internalCancelcb = proc(_: pointer) =
raiseAssert "Cancellation request for non-cancellable future"
if state != FutureState.Pending: if state != FutureState.Pending:
fut.internalLocation[LocationKind.Finish] = loc fut.internalLocation[LocationKind.Finish] = loc

View File

@ -335,7 +335,8 @@ proc removeCallback*(future: FutureBase, cb: CallbackFunc,
proc removeCallback*(future: FutureBase, cb: CallbackFunc) = proc removeCallback*(future: FutureBase, cb: CallbackFunc) =
future.removeCallback(cb, cast[pointer](future)) future.removeCallback(cb, cast[pointer](future))
proc `callback=`*(future: FutureBase, cb: CallbackFunc, udata: pointer) = proc `callback=`*(future: FutureBase, cb: CallbackFunc, udata: pointer) {.
deprecated: "use addCallback/removeCallback/clearCallbacks to manage the callback list".} =
## Clears the list of callbacks and sets the callback proc to be called when ## Clears the list of callbacks and sets the callback proc to be called when
## the future completes. ## the future completes.
## ##
@ -346,11 +347,14 @@ proc `callback=`*(future: FutureBase, cb: CallbackFunc, udata: pointer) =
future.clearCallbacks future.clearCallbacks
future.addCallback(cb, udata) future.addCallback(cb, udata)
proc `callback=`*(future: FutureBase, cb: CallbackFunc) = proc `callback=`*(future: FutureBase, cb: CallbackFunc) {.
deprecated: "use addCallback/removeCallback/clearCallbacks instead to manage the callback list".} =
## Sets the callback proc to be called when the future completes. ## Sets the callback proc to be called when the future completes.
## ##
## If future has already completed then ``cb`` will be called immediately. ## If future has already completed then ``cb`` will be called immediately.
{.push warning[Deprecated]: off.}
`callback=`(future, cb, cast[pointer](future)) `callback=`(future, cb, cast[pointer](future))
{.pop.}
proc `cancelCallback=`*(future: FutureBase, cb: CallbackFunc) = proc `cancelCallback=`*(future: FutureBase, cb: CallbackFunc) =
## Sets the callback procedure to be called when the future is cancelled. ## Sets the callback procedure to be called when the future is cancelled.
@ -491,14 +495,26 @@ when chronosStackTrace:
# newMsg.add "\n" & $entry # newMsg.add "\n" & $entry
error.msg = newMsg error.msg = newMsg
proc internalCheckComplete*(fut: FutureBase) {.raises: [CatchableError].} = proc deepLineInfo(n: NimNode, p: LineInfo) =
# For internal use only. Used in asyncmacro n.setLineInfo(p)
if not(isNil(fut.internalError)): for i in 0..<n.len:
when chronosStackTrace: deepLineInfo(n[i], p)
injectStacktrace(fut.internalError)
raise fut.internalError
macro internalCheckComplete*(fut: InternalRaisesFuture, raises: typed) = macro internalRaiseIfError*(fut: FutureBase, info: typed) =
# Check the error field of the given future and raise if it's set to non-nil.
# This is a macro so we can capture the line info from the original call and
# report the correct line number on exception effect violation
let
info = info.lineInfoObj()
res = quote do:
if not(isNil(`fut`.internalError)):
when chronosStackTrace:
injectStacktrace(`fut`.internalError)
raise `fut`.internalError
res.deepLineInfo(info)
res
macro internalRaiseIfError*(fut: InternalRaisesFuture, raises, info: typed) =
# For InternalRaisesFuture[void, (ValueError, OSError), will do: # For InternalRaisesFuture[void, (ValueError, OSError), will do:
# {.cast(raises: [ValueError, OSError]).}: # {.cast(raises: [ValueError, OSError]).}:
# if isNil(f.error): discard # if isNil(f.error): discard
@ -507,10 +523,9 @@ macro internalCheckComplete*(fut: InternalRaisesFuture, raises: typed) =
# we cannot `getTypeInst` on the `fut` - when aliases are involved, the # we cannot `getTypeInst` on the `fut` - when aliases are involved, the
# generics are lost - so instead, we pass the raises list explicitly # generics are lost - so instead, we pass the raises list explicitly
let types = getRaisesTypes(raises) let
types.copyLineInfo(raises) info = info.lineInfoObj()
for t in types: types = getRaisesTypes(raises)
t.copyLineInfo(raises)
if isNoRaises(types): if isNoRaises(types):
return quote do: return quote do:
@ -524,40 +539,43 @@ macro internalCheckComplete*(fut: InternalRaisesFuture, raises: typed) =
assert types[0].strVal == "tuple" assert types[0].strVal == "tuple"
let ifRaise = nnkIfExpr.newTree( let
nnkElifExpr.newTree( internalError = nnkDotExpr.newTree(fut, ident "internalError")
quote do: isNil(`fut`.internalError),
quote do: discard ifRaise = nnkIfExpr.newTree(
), nnkElifExpr.newTree(
nnkElseExpr.newTree( nnkCall.newTree(ident"isNil", internalError),
nnkRaiseStmt.newTree( nnkDiscardStmt.newTree(newEmptyNode())
nnkDotExpr.newTree(fut, ident "internalError") ),
nnkElseExpr.newTree(
nnkRaiseStmt.newTree(internalError)
) )
) )
)
nnkPragmaBlock.newTree( res = nnkPragmaBlock.newTree(
nnkPragma.newTree( nnkPragma.newTree(
nnkCast.newTree( nnkCast.newTree(
newEmptyNode(), newEmptyNode(),
nnkExprColonExpr.newTree( nnkExprColonExpr.newTree(
ident"raises", ident"raises",
block: block:
var res = nnkBracket.newTree() var res = nnkBracket.newTree()
for r in types[1..^1]: for r in types[1..^1]:
res.add(r) res.add(r)
res res
) )
),
), ),
), ifRaise
ifRaise )
) res.deepLineInfo(info)
res
proc readFinished[T: not void](fut: Future[T]): lent T {. proc readFinished[T: not void](fut: Future[T]): lent T {.
raises: [CatchableError].} = raises: [CatchableError].} =
# Read a future that is known to be finished, avoiding the extra exception # Read a future that is known to be finished, avoiding the extra exception
# effect. # effect.
internalCheckComplete(fut) internalRaiseIfError(fut, fut)
fut.internalValue fut.internalValue
proc read*[T: not void](fut: Future[T] ): lent T {.raises: [CatchableError].} = proc read*[T: not void](fut: Future[T] ): lent T {.raises: [CatchableError].} =
@ -582,7 +600,7 @@ proc read*(fut: Future[void]) {.raises: [CatchableError].} =
if not fut.finished(): if not fut.finished():
raiseFuturePendingError(fut) raiseFuturePendingError(fut)
internalCheckComplete(fut) internalRaiseIfError(fut, fut)
proc readError*(fut: FutureBase): ref CatchableError {.raises: [FutureError].} = proc readError*(fut: FutureBase): ref CatchableError {.raises: [FutureError].} =
## Retrieves the exception of the failed or cancelled `fut`. ## Retrieves the exception of the failed or cancelled `fut`.
@ -652,7 +670,7 @@ proc waitFor*(fut: Future[void]) {.raises: [CatchableError].} =
## Must not be called recursively (from inside `async` procedures). ## Must not be called recursively (from inside `async` procedures).
## ##
## See also `await`, `Future.read` ## See also `await`, `Future.read`
pollFor(fut).internalCheckComplete() pollFor(fut).internalRaiseIfError(fut)
proc asyncSpawn*(future: Future[void]) = proc asyncSpawn*(future: Future[void]) =
## Spawns a new concurrent async task. ## Spawns a new concurrent async task.
@ -1012,6 +1030,7 @@ proc cancelAndWait*(future: FutureBase, loc: ptr SrcLoc): Future[void] {.
if future.finished(): if future.finished():
retFuture.complete() retFuture.complete()
else: else:
retFuture.cancelCallback = nil
cancelSoon(future, continuation, cast[pointer](retFuture), loc) cancelSoon(future, continuation, cast[pointer](retFuture), loc)
retFuture retFuture
@ -1056,6 +1075,7 @@ proc noCancel*[F: SomeFuture](future: F): auto = # async: (raw: true, raises: as
if future.finished(): if future.finished():
completeFuture() completeFuture()
else: else:
retFuture.cancelCallback = nil
future.addCallback(continuation) future.addCallback(continuation)
retFuture retFuture
@ -1546,7 +1566,7 @@ when defined(windows):
proc waitForSingleObject*(handle: HANDLE, proc waitForSingleObject*(handle: HANDLE,
timeout: Duration): Future[WaitableResult] {. timeout: Duration): Future[WaitableResult] {.
raises: [].} = async: (raises: [AsyncError, CancelledError], raw: true).} =
## Waits until the specified object is in the signaled state or the ## Waits until the specified object is in the signaled state or the
## time-out interval elapses. WaitForSingleObject() for asynchronous world. ## time-out interval elapses. WaitForSingleObject() for asynchronous world.
let flags = WT_EXECUTEONLYONCE let flags = WT_EXECUTEONLYONCE
@ -1604,7 +1624,7 @@ when defined(windows):
{.pop.} # Automatically deduced raises from here onwards {.pop.} # Automatically deduced raises from here onwards
proc readFinished[T: not void; E](fut: InternalRaisesFuture[T, E]): lent T = proc readFinished[T: not void; E](fut: InternalRaisesFuture[T, E]): lent T =
internalCheckComplete(fut, E) internalRaiseIfError(fut, E, fut)
fut.internalValue fut.internalValue
proc read*[T: not void, E](fut: InternalRaisesFuture[T, E]): lent T = # {.raises: [E, FuturePendingError].} proc read*[T: not void, E](fut: InternalRaisesFuture[T, E]): lent T = # {.raises: [E, FuturePendingError].}
@ -1629,7 +1649,7 @@ proc read*[E](fut: InternalRaisesFuture[void, E]) = # {.raises: [E].}
if not fut.finished(): if not fut.finished():
raiseFuturePendingError(fut) raiseFuturePendingError(fut)
internalCheckComplete(fut, E) internalRaiseIfError(fut, E, fut)
proc waitFor*[T: not void; E](fut: InternalRaisesFuture[T, E]): lent T = # {.raises: [E]} proc waitFor*[T: not void; E](fut: InternalRaisesFuture[T, E]): lent T = # {.raises: [E]}
## Blocks the current thread of execution until `fut` has finished, returning ## Blocks the current thread of execution until `fut` has finished, returning
@ -1652,7 +1672,7 @@ proc waitFor*[E](fut: InternalRaisesFuture[void, E]) = # {.raises: [E]}
## Must not be called recursively (from inside `async` procedures). ## Must not be called recursively (from inside `async` procedures).
## ##
## See also `await`, `Future.read` ## See also `await`, `Future.read`
pollFor(fut).internalCheckComplete(E) pollFor(fut).internalRaiseIfError(E, fut)
proc `or`*[T, Y, E1, E2]( proc `or`*[T, Y, E1, E2](
fut1: InternalRaisesFuture[T, E1], fut1: InternalRaisesFuture[T, E1],

View File

@ -530,7 +530,7 @@ template await*[T](f: Future[T]): T =
# responsible for resuming execution once the yielded future is finished # responsible for resuming execution once the yielded future is finished
yield chronosInternalRetFuture.internalChild yield chronosInternalRetFuture.internalChild
# `child` released by `futureContinue` # `child` released by `futureContinue`
cast[type(f)](chronosInternalRetFuture.internalChild).internalCheckComplete() cast[type(f)](chronosInternalRetFuture.internalChild).internalRaiseIfError(f)
when T isnot void: when T isnot void:
cast[type(f)](chronosInternalRetFuture.internalChild).value() cast[type(f)](chronosInternalRetFuture.internalChild).value()
@ -553,7 +553,7 @@ template await*[T, E](fut: InternalRaisesFuture[T, E]): T =
yield chronosInternalRetFuture.internalChild yield chronosInternalRetFuture.internalChild
# `child` released by `futureContinue` # `child` released by `futureContinue`
cast[type(fut)]( cast[type(fut)](
chronosInternalRetFuture.internalChild).internalCheckComplete(E) chronosInternalRetFuture.internalChild).internalRaiseIfError(E, fut)
when T isnot void: when T isnot void:
cast[type(fut)](chronosInternalRetFuture.internalChild).value() cast[type(fut)](chronosInternalRetFuture.internalChild).value()

View File

@ -18,45 +18,6 @@ proc makeNoRaises*(): NimNode {.compileTime.} =
ident"void" ident"void"
macro Raising*[T](F: typedesc[Future[T]], E: varargs[typedesc]): untyped =
## Given a Future type instance, return a type storing `{.raises.}`
## information
##
## Note; this type may change in the future
E.expectKind(nnkBracket)
let raises = if E.len == 0:
makeNoRaises()
else:
nnkTupleConstr.newTree(E.mapIt(it))
nnkBracketExpr.newTree(
ident "InternalRaisesFuture",
nnkDotExpr.newTree(F, ident"T"),
raises
)
template init*[T, E](
F: type InternalRaisesFuture[T, E], fromProc: static[string] = ""): F =
## Creates a new pending future.
##
## Specifying ``fromProc``, which is a string specifying the name of the proc
## that this future belongs to, is a good habit as it helps with debugging.
let res = F()
internalInitFutureBase(res, getSrcLocation(fromProc), FutureState.Pending, {})
res
template init*[T, E](
F: type InternalRaisesFuture[T, E], fromProc: static[string] = "",
flags: static[FutureFlags]): F =
## Creates a new pending future.
##
## Specifying ``fromProc``, which is a string specifying the name of the proc
## that this future belongs to, is a good habit as it helps with debugging.
let res = F()
internalInitFutureBase(
res, getSrcLocation(fromProc), FutureState.Pending, flags)
res
proc dig(n: NimNode): NimNode {.compileTime.} = proc dig(n: NimNode): NimNode {.compileTime.} =
# Dig through the layers of type to find the raises list # Dig through the layers of type to find the raises list
if n.eqIdent("void"): if n.eqIdent("void"):
@ -87,6 +48,58 @@ proc members(tup: NimNode): seq[NimNode] {.compileTime.} =
for t in tup.members(): for t in tup.members():
result.add(t) result.add(t)
macro hasException(raises: typedesc, ident: static string): bool =
newLit(raises.members.anyIt(it.eqIdent(ident)))
macro Raising*[T](F: typedesc[Future[T]], E: varargs[typedesc]): untyped =
## Given a Future type instance, return a type storing `{.raises.}`
## information
##
## Note; this type may change in the future
E.expectKind(nnkBracket)
let raises = if E.len == 0:
makeNoRaises()
else:
nnkTupleConstr.newTree(E.mapIt(it))
nnkBracketExpr.newTree(
ident "InternalRaisesFuture",
nnkDotExpr.newTree(F, ident"T"),
raises
)
template init*[T, E](
F: type InternalRaisesFuture[T, E], fromProc: static[string] = ""): F =
## Creates a new pending future.
##
## Specifying ``fromProc``, which is a string specifying the name of the proc
## that this future belongs to, is a good habit as it helps with debugging.
when not hasException(type(E), "CancelledError"):
static:
raiseAssert "Manually created futures must either own cancellation schedule or raise CancelledError"
let res = F()
internalInitFutureBase(res, getSrcLocation(fromProc), FutureState.Pending, {})
res
template init*[T, E](
F: type InternalRaisesFuture[T, E], fromProc: static[string] = "",
flags: static[FutureFlags]): F =
## Creates a new pending future.
##
## Specifying ``fromProc``, which is a string specifying the name of the proc
## that this future belongs to, is a good habit as it helps with debugging.
let res = F()
when not hasException(type(E), "CancelledError"):
static:
doAssert FutureFlag.OwnCancelSchedule in flags,
"Manually created futures must either own cancellation schedule or raise CancelledError"
internalInitFutureBase(
res, getSrcLocation(fromProc), FutureState.Pending, flags)
res
proc containsSignature(members: openArray[NimNode], typ: NimNode): bool {.compileTime.} = proc containsSignature(members: openArray[NimNode], typ: NimNode): bool {.compileTime.} =
let typHash = signatureHash(typ) let typHash = signatureHash(typ)

View File

@ -77,7 +77,7 @@ type
udata: pointer udata: pointer
error*: ref AsyncStreamError error*: ref AsyncStreamError
bytesCount*: uint64 bytesCount*: uint64
future: Future[void] future: Future[void].Raising([])
AsyncStreamWriter* = ref object of RootRef AsyncStreamWriter* = ref object of RootRef
wsource*: AsyncStreamWriter wsource*: AsyncStreamWriter
@ -88,7 +88,7 @@ type
error*: ref AsyncStreamError error*: ref AsyncStreamError
udata: pointer udata: pointer
bytesCount*: uint64 bytesCount*: uint64
future: Future[void] future: Future[void].Raising([])
AsyncStream* = object of RootObj AsyncStream* = object of RootObj
reader*: AsyncStreamReader reader*: AsyncStreamReader
@ -897,44 +897,27 @@ proc close*(rw: AsyncStreamRW) =
rw.future.addCallback(continuation) rw.future.addCallback(continuation)
rw.future.cancelSoon() rw.future.cancelSoon()
proc closeWait*(rw: AsyncStreamRW): Future[void] {. proc closeWait*(rw: AsyncStreamRW): Future[void] {.async: (raises: []).} =
async: (raw: true, raises: []).} =
## Close and frees resources of stream ``rw``. ## Close and frees resources of stream ``rw``.
const FutureName = if not rw.closed():
when rw is AsyncStreamReader: rw.close()
"async.stream.reader.closeWait" await noCancel(rw.join())
else:
"async.stream.writer.closeWait"
let retFuture = Future[void].Raising([]).init(FutureName)
if rw.closed():
retFuture.complete()
return retFuture
proc continuation(udata: pointer) {.gcsafe, raises:[].} =
retFuture.complete()
rw.close()
if rw.future.finished():
retFuture.complete()
else:
rw.future.addCallback(continuation, cast[pointer](retFuture))
retFuture
proc startReader(rstream: AsyncStreamReader) = proc startReader(rstream: AsyncStreamReader) =
rstream.state = Running rstream.state = Running
if not isNil(rstream.readerLoop): if not isNil(rstream.readerLoop):
rstream.future = rstream.readerLoop(rstream) rstream.future = rstream.readerLoop(rstream)
else: else:
rstream.future = newFuture[void]("async.stream.empty.reader") rstream.future = Future[void].Raising([]).init(
"async.stream.empty.reader", {FutureFlag.OwnCancelSchedule})
proc startWriter(wstream: AsyncStreamWriter) = proc startWriter(wstream: AsyncStreamWriter) =
wstream.state = Running wstream.state = Running
if not isNil(wstream.writerLoop): if not isNil(wstream.writerLoop):
wstream.future = wstream.writerLoop(wstream) wstream.future = wstream.writerLoop(wstream)
else: else:
wstream.future = newFuture[void]("async.stream.empty.writer") wstream.future = Future[void].Raising([]).init(
"async.stream.empty.writer", {FutureFlag.OwnCancelSchedule})
proc init*(child, wsource: AsyncStreamWriter, loop: StreamWriterLoop, proc init*(child, wsource: AsyncStreamWriter, loop: StreamWriterLoop,
queueSize = AsyncStreamDefaultQueueSize) = queueSize = AsyncStreamDefaultQueueSize) =

View File

@ -272,7 +272,8 @@ proc waitSync*(signal: ThreadSignalPtr,
else: else:
return ok(true) return ok(true)
proc fire*(signal: ThreadSignalPtr): Future[void] = proc fire*(signal: ThreadSignalPtr): Future[void] {.
async: (raises: [AsyncError, CancelledError], raw: true).} =
## Set state of ``signal`` to signaled in asynchronous way. ## Set state of ``signal`` to signaled in asynchronous way.
var retFuture = newFuture[void]("asyncthreadsignal.fire") var retFuture = newFuture[void]("asyncthreadsignal.fire")
when defined(windows): when defined(windows):
@ -356,14 +357,17 @@ proc fire*(signal: ThreadSignalPtr): Future[void] =
retFuture retFuture
when defined(windows): when defined(windows):
proc wait*(signal: ThreadSignalPtr) {.async.} = proc wait*(signal: ThreadSignalPtr) {.
async: (raises: [AsyncError, CancelledError]).} =
let handle = signal[].event let handle = signal[].event
let res = await waitForSingleObject(handle, InfiniteDuration) let res = await waitForSingleObject(handle, InfiniteDuration)
# There should be no other response, because we use `InfiniteDuration`. # There should be no other response, because we use `InfiniteDuration`.
doAssert(res == WaitableResult.Ok) doAssert(res == WaitableResult.Ok)
else: else:
proc wait*(signal: ThreadSignalPtr): Future[void] = proc wait*(signal: ThreadSignalPtr): Future[void] {.
var retFuture = newFuture[void]("asyncthreadsignal.wait") async: (raises: [AsyncError, CancelledError], raw: true).} =
let retFuture = Future[void].Raising([AsyncError, CancelledError]).init(
"asyncthreadsignal.wait")
var data = 1'u64 var data = 1'u64
let eventFd = let eventFd =
when defined(linux): when defined(linux):

View File

@ -73,7 +73,7 @@ when defined(windows) or defined(nimdoc):
udata*: pointer # User-defined pointer udata*: pointer # User-defined pointer
flags*: set[ServerFlags] # Flags flags*: set[ServerFlags] # Flags
bufferSize*: int # Size of internal transports' buffer bufferSize*: int # Size of internal transports' buffer
loopFuture*: Future[void] # Server's main Future loopFuture*: Future[void].Raising([]) # Server's main Future
domain*: Domain # Current server domain (IPv4 or IPv6) domain*: Domain # Current server domain (IPv4 or IPv6)
apending*: bool apending*: bool
asock*: AsyncFD # Current AcceptEx() socket asock*: AsyncFD # Current AcceptEx() socket
@ -92,7 +92,7 @@ else:
udata*: pointer # User-defined pointer udata*: pointer # User-defined pointer
flags*: set[ServerFlags] # Flags flags*: set[ServerFlags] # Flags
bufferSize*: int # Size of internal transports' buffer bufferSize*: int # Size of internal transports' buffer
loopFuture*: Future[void] # Server's main Future loopFuture*: Future[void].Raising([]) # Server's main Future
errorCode*: OSErrorCode # Current error code errorCode*: OSErrorCode # Current error code
dualstack*: DualStackType # IPv4/IPv6 dualstack parameters dualstack*: DualStackType # IPv4/IPv6 dualstack parameters

View File

@ -13,6 +13,7 @@ import std/deques
when not(defined(windows)): import ".."/selectors2 when not(defined(windows)): import ".."/selectors2
import ".."/[asyncloop, config, osdefs, oserrno, osutils, handles] import ".."/[asyncloop, config, osdefs, oserrno, osutils, handles]
import "."/common import "."/common
import stew/ptrops
type type
VectorKind = enum VectorKind = enum
@ -44,7 +45,7 @@ type
remote: TransportAddress # Remote address remote: TransportAddress # Remote address
udata*: pointer # User-driven pointer udata*: pointer # User-driven pointer
function: DatagramCallback # Receive data callback function: DatagramCallback # Receive data callback
future: Future[void] # Transport's life future future: Future[void].Raising([]) # Transport's life future
raddr: Sockaddr_storage # Reader address storage raddr: Sockaddr_storage # Reader address storage
ralen: SockLen # Reader address length ralen: SockLen # Reader address length
waddr: Sockaddr_storage # Writer address storage waddr: Sockaddr_storage # Writer address storage
@ -119,7 +120,7 @@ when defined(windows):
## Initiation ## Initiation
transp.state.incl(WritePending) transp.state.incl(WritePending)
let fd = SocketHandle(transp.fd) let fd = SocketHandle(transp.fd)
var vector = transp.queue.popFirst() let vector = transp.queue.popFirst()
transp.setWriterWSABuffer(vector) transp.setWriterWSABuffer(vector)
let ret = let ret =
if vector.kind == WithAddress: if vector.kind == WithAddress:
@ -359,12 +360,13 @@ when defined(windows):
res.queue = initDeque[GramVector]() res.queue = initDeque[GramVector]()
res.udata = udata res.udata = udata
res.state = {ReadPaused, WritePaused} res.state = {ReadPaused, WritePaused}
res.future = newFuture[void]("datagram.transport") res.future = Future[void].Raising([]).init(
"datagram.transport", {FutureFlag.OwnCancelSchedule})
res.rovl.data = CompletionData(cb: readDatagramLoop, res.rovl.data = CompletionData(cb: readDatagramLoop,
udata: cast[pointer](res)) udata: cast[pointer](res))
res.wovl.data = CompletionData(cb: writeDatagramLoop, res.wovl.data = CompletionData(cb: writeDatagramLoop,
udata: cast[pointer](res)) udata: cast[pointer](res))
res.rwsabuf = WSABUF(buf: cast[cstring](addr res.buffer[0]), res.rwsabuf = WSABUF(buf: cast[cstring](baseAddr res.buffer),
len: ULONG(len(res.buffer))) len: ULONG(len(res.buffer)))
GC_ref(res) GC_ref(res)
# Start tracking transport # Start tracking transport
@ -391,7 +393,7 @@ else:
else: else:
while true: while true:
transp.ralen = SockLen(sizeof(Sockaddr_storage)) transp.ralen = SockLen(sizeof(Sockaddr_storage))
var res = osdefs.recvfrom(fd, addr transp.buffer[0], var res = osdefs.recvfrom(fd, baseAddr transp.buffer,
cint(len(transp.buffer)), cint(0), cint(len(transp.buffer)), cint(0),
cast[ptr SockAddr](addr transp.raddr), cast[ptr SockAddr](addr transp.raddr),
addr transp.ralen) addr transp.ralen)
@ -423,7 +425,7 @@ else:
transp.state.incl({WritePaused}) transp.state.incl({WritePaused})
else: else:
if len(transp.queue) > 0: if len(transp.queue) > 0:
var vector = transp.queue.popFirst() let vector = transp.queue.popFirst()
while true: while true:
if vector.kind == WithAddress: if vector.kind == WithAddress:
toSAddr(vector.address, transp.waddr, transp.walen) toSAddr(vector.address, transp.waddr, transp.walen)
@ -568,7 +570,8 @@ else:
res.queue = initDeque[GramVector]() res.queue = initDeque[GramVector]()
res.udata = udata res.udata = udata
res.state = {ReadPaused, WritePaused} res.state = {ReadPaused, WritePaused}
res.future = newFuture[void]("datagram.transport") res.future = Future[void].Raising([]).init(
"datagram.transport", {FutureFlag.OwnCancelSchedule})
GC_ref(res) GC_ref(res)
# Start tracking transport # Start tracking transport
trackCounter(DgramTransportTrackerName) trackCounter(DgramTransportTrackerName)
@ -824,7 +827,7 @@ proc newDatagramTransport6*[T](cbproc: UnsafeDatagramCallback,
proc join*(transp: DatagramTransport): Future[void] {. proc join*(transp: DatagramTransport): Future[void] {.
async: (raw: true, raises: [CancelledError]).} = async: (raw: true, raises: [CancelledError]).} =
## Wait until the transport ``transp`` will be closed. ## Wait until the transport ``transp`` will be closed.
var retFuture = newFuture[void]("datagram.transport.join") let retFuture = newFuture[void]("datagram.transport.join")
proc continuation(udata: pointer) {.gcsafe.} = proc continuation(udata: pointer) {.gcsafe.} =
retFuture.complete() retFuture.complete()
@ -840,43 +843,28 @@ proc join*(transp: DatagramTransport): Future[void] {.
return retFuture return retFuture
proc closed*(transp: DatagramTransport): bool {.inline.} =
## Returns ``true`` if transport in closed state.
{ReadClosed, WriteClosed} * transp.state != {}
proc closeWait*(transp: DatagramTransport): Future[void] {. proc closeWait*(transp: DatagramTransport): Future[void] {.
async: (raw: true, raises: []).} = async: (raises: []).} =
## Close transport ``transp`` and release all resources. ## Close transport ``transp`` and release all resources.
let retFuture = newFuture[void]( if not transp.closed():
"datagram.transport.closeWait", {FutureFlag.OwnCancelSchedule}) transp.close()
await noCancel(transp.join())
if {ReadClosed, WriteClosed} * transp.state != {}:
retFuture.complete()
return retFuture
proc continuation(udata: pointer) {.gcsafe.} =
retFuture.complete()
proc cancellation(udata: pointer) {.gcsafe.} =
# We are not going to change the state of `retFuture` to cancelled, so we
# will prevent the entire sequence of Futures from being cancelled.
discard
transp.close()
if transp.future.finished():
retFuture.complete()
else:
transp.future.addCallback(continuation, cast[pointer](retFuture))
retFuture.cancelCallback = cancellation
retFuture
proc send*(transp: DatagramTransport, pbytes: pointer, proc send*(transp: DatagramTransport, pbytes: pointer,
nbytes: int): Future[void] {. nbytes: int): Future[void] {.
async: (raw: true, raises: [TransportError, CancelledError]).} = async: (raw: true, raises: [TransportError, CancelledError]).} =
## Send buffer with pointer ``pbytes`` and size ``nbytes`` using transport ## Send buffer with pointer ``pbytes`` and size ``nbytes`` using transport
## ``transp`` to remote destination address which was bounded on transport. ## ``transp`` to remote destination address which was bounded on transport.
var retFuture = newFuture[void]("datagram.transport.send(pointer)") let retFuture = newFuture[void]("datagram.transport.send(pointer)")
transp.checkClosed(retFuture) transp.checkClosed(retFuture)
if transp.remote.port == Port(0): if transp.remote.port == Port(0):
retFuture.fail(newException(TransportError, "Remote peer not set!")) retFuture.fail(newException(TransportError, "Remote peer not set!"))
return retFuture return retFuture
var vector = GramVector(kind: WithoutAddress, buf: pbytes, buflen: nbytes, let vector = GramVector(kind: WithoutAddress, buf: pbytes, buflen: nbytes,
writer: retFuture) writer: retFuture)
transp.queue.addLast(vector) transp.queue.addLast(vector)
if WritePaused in transp.state: if WritePaused in transp.state:
@ -890,14 +878,14 @@ proc send*(transp: DatagramTransport, msg: sink string,
async: (raw: true, raises: [TransportError, CancelledError]).} = async: (raw: true, raises: [TransportError, CancelledError]).} =
## Send string ``msg`` using transport ``transp`` to remote destination ## Send string ``msg`` using transport ``transp`` to remote destination
## address which was bounded on transport. ## address which was bounded on transport.
var retFuture = newFuture[void]("datagram.transport.send(string)") let retFuture = newFuture[void]("datagram.transport.send(string)")
transp.checkClosed(retFuture) transp.checkClosed(retFuture)
let length = if msglen <= 0: len(msg) else: msglen let length = if msglen <= 0: len(msg) else: msglen
var localCopy = chronosMoveSink(msg) var localCopy = chronosMoveSink(msg)
retFuture.addCallback(proc(_: pointer) = reset(localCopy)) retFuture.addCallback(proc(_: pointer) = reset(localCopy))
let vector = GramVector(kind: WithoutAddress, buf: addr localCopy[0], let vector = GramVector(kind: WithoutAddress, buf: baseAddr localCopy,
buflen: length, buflen: length,
writer: retFuture) writer: retFuture)
@ -913,14 +901,14 @@ proc send*[T](transp: DatagramTransport, msg: sink seq[T],
async: (raw: true, raises: [TransportError, CancelledError]).} = async: (raw: true, raises: [TransportError, CancelledError]).} =
## Send string ``msg`` using transport ``transp`` to remote destination ## Send string ``msg`` using transport ``transp`` to remote destination
## address which was bounded on transport. ## address which was bounded on transport.
var retFuture = newFuture[void]("datagram.transport.send(seq)") let retFuture = newFuture[void]("datagram.transport.send(seq)")
transp.checkClosed(retFuture) transp.checkClosed(retFuture)
let length = if msglen <= 0: (len(msg) * sizeof(T)) else: (msglen * sizeof(T)) let length = if msglen <= 0: (len(msg) * sizeof(T)) else: (msglen * sizeof(T))
var localCopy = chronosMoveSink(msg) var localCopy = chronosMoveSink(msg)
retFuture.addCallback(proc(_: pointer) = reset(localCopy)) retFuture.addCallback(proc(_: pointer) = reset(localCopy))
let vector = GramVector(kind: WithoutAddress, buf: addr localCopy[0], let vector = GramVector(kind: WithoutAddress, buf: baseAddr localCopy,
buflen: length, buflen: length,
writer: retFuture) writer: retFuture)
transp.queue.addLast(vector) transp.queue.addLast(vector)
@ -935,7 +923,7 @@ proc sendTo*(transp: DatagramTransport, remote: TransportAddress,
async: (raw: true, raises: [TransportError, CancelledError]).} = async: (raw: true, raises: [TransportError, CancelledError]).} =
## Send buffer with pointer ``pbytes`` and size ``nbytes`` using transport ## Send buffer with pointer ``pbytes`` and size ``nbytes`` using transport
## ``transp`` to remote destination address ``remote``. ## ``transp`` to remote destination address ``remote``.
var retFuture = newFuture[void]("datagram.transport.sendTo(pointer)") let retFuture = newFuture[void]("datagram.transport.sendTo(pointer)")
transp.checkClosed(retFuture) transp.checkClosed(retFuture)
let vector = GramVector(kind: WithAddress, buf: pbytes, buflen: nbytes, let vector = GramVector(kind: WithAddress, buf: pbytes, buflen: nbytes,
writer: retFuture, address: remote) writer: retFuture, address: remote)
@ -951,14 +939,14 @@ proc sendTo*(transp: DatagramTransport, remote: TransportAddress,
async: (raw: true, raises: [TransportError, CancelledError]).} = async: (raw: true, raises: [TransportError, CancelledError]).} =
## Send string ``msg`` using transport ``transp`` to remote destination ## Send string ``msg`` using transport ``transp`` to remote destination
## address ``remote``. ## address ``remote``.
var retFuture = newFuture[void]("datagram.transport.sendTo(string)") let retFuture = newFuture[void]("datagram.transport.sendTo(string)")
transp.checkClosed(retFuture) transp.checkClosed(retFuture)
let length = if msglen <= 0: len(msg) else: msglen let length = if msglen <= 0: len(msg) else: msglen
var localCopy = chronosMoveSink(msg) var localCopy = chronosMoveSink(msg)
retFuture.addCallback(proc(_: pointer) = reset(localCopy)) retFuture.addCallback(proc(_: pointer) = reset(localCopy))
let vector = GramVector(kind: WithAddress, buf: addr localCopy[0], let vector = GramVector(kind: WithAddress, buf: baseAddr localCopy,
buflen: length, buflen: length,
writer: retFuture, writer: retFuture,
address: remote) address: remote)
@ -974,15 +962,15 @@ proc sendTo*[T](transp: DatagramTransport, remote: TransportAddress,
async: (raw: true, raises: [TransportError, CancelledError]).} = async: (raw: true, raises: [TransportError, CancelledError]).} =
## Send sequence ``msg`` using transport ``transp`` to remote destination ## Send sequence ``msg`` using transport ``transp`` to remote destination
## address ``remote``. ## address ``remote``.
var retFuture = newFuture[void]("datagram.transport.sendTo(seq)") let retFuture = newFuture[void]("datagram.transport.sendTo(seq)")
transp.checkClosed(retFuture) transp.checkClosed(retFuture)
let length = if msglen <= 0: (len(msg) * sizeof(T)) else: (msglen * sizeof(T)) let length = if msglen <= 0: (len(msg) * sizeof(T)) else: (msglen * sizeof(T))
var localCopy = chronosMoveSink(msg) var localCopy = chronosMoveSink(msg)
retFuture.addCallback(proc(_: pointer) = reset(localCopy)) retFuture.addCallback(proc(_: pointer) = reset(localCopy))
let vector = GramVector(kind: WithAddress, buf: addr localCopy[0], let vector = GramVector(kind: WithAddress, buf: baseAddr localCopy,
buflen: length, buflen: length,
writer: cast[Future[void]](retFuture), writer: retFuture,
address: remote) address: remote)
transp.queue.addLast(vector) transp.queue.addLast(vector)
if WritePaused in transp.state: if WritePaused in transp.state:
@ -1006,7 +994,6 @@ proc peekMessage*(transp: DatagramTransport, msg: var seq[byte],
proc getMessage*(transp: DatagramTransport): seq[byte] {. proc getMessage*(transp: DatagramTransport): seq[byte] {.
raises: [TransportError].} = raises: [TransportError].} =
## Copy data from internal message buffer and return result. ## Copy data from internal message buffer and return result.
var default: seq[byte]
if ReadError in transp.state: if ReadError in transp.state:
transp.state.excl(ReadError) transp.state.excl(ReadError)
raise transp.getError() raise transp.getError()
@ -1015,12 +1002,8 @@ proc getMessage*(transp: DatagramTransport): seq[byte] {.
copyMem(addr res[0], addr transp.buffer[0], transp.buflen) copyMem(addr res[0], addr transp.buffer[0], transp.buflen)
res res
else: else:
default default(seq[byte])
proc getUserData*[T](transp: DatagramTransport): T {.inline.} = proc getUserData*[T](transp: DatagramTransport): T {.inline.} =
## Obtain user data stored in ``transp`` object. ## Obtain user data stored in ``transp`` object.
cast[T](transp.udata) cast[T](transp.udata)
proc closed*(transp: DatagramTransport): bool {.inline.} =
## Returns ``true`` if transport in closed state.
{ReadClosed, WriteClosed} * transp.state != {}

View File

@ -76,7 +76,7 @@ when defined(windows):
offset: int # Reading buffer offset offset: int # Reading buffer offset
error: ref TransportError # Current error error: ref TransportError # Current error
queue: Deque[StreamVector] # Writer queue queue: Deque[StreamVector] # Writer queue
future: Future[void] # Stream life future future: Future[void].Raising([]) # Stream life future
# Windows specific part # Windows specific part
rwsabuf: WSABUF # Reader WSABUF rwsabuf: WSABUF # Reader WSABUF
wwsabuf: WSABUF # Writer WSABUF wwsabuf: WSABUF # Writer WSABUF
@ -103,7 +103,7 @@ else:
offset: int # Reading buffer offset offset: int # Reading buffer offset
error: ref TransportError # Current error error: ref TransportError # Current error
queue: Deque[StreamVector] # Writer queue queue: Deque[StreamVector] # Writer queue
future: Future[void] # Stream life future future: Future[void].Raising([]) # Stream life future
case kind*: TransportKind case kind*: TransportKind
of TransportKind.Socket: of TransportKind.Socket:
domain: Domain # Socket transport domain (IPv4/IPv6) domain: Domain # Socket transport domain (IPv4/IPv6)
@ -598,7 +598,8 @@ when defined(windows):
transp.buffer = newSeq[byte](bufsize) transp.buffer = newSeq[byte](bufsize)
transp.state = {ReadPaused, WritePaused} transp.state = {ReadPaused, WritePaused}
transp.queue = initDeque[StreamVector]() transp.queue = initDeque[StreamVector]()
transp.future = newFuture[void]("stream.socket.transport") transp.future = Future[void].Raising([]).init(
"stream.socket.transport", {FutureFlag.OwnCancelSchedule})
GC_ref(transp) GC_ref(transp)
transp transp
@ -619,7 +620,8 @@ when defined(windows):
transp.flags = flags transp.flags = flags
transp.state = {ReadPaused, WritePaused} transp.state = {ReadPaused, WritePaused}
transp.queue = initDeque[StreamVector]() transp.queue = initDeque[StreamVector]()
transp.future = newFuture[void]("stream.pipe.transport") transp.future = Future[void].Raising([]).init(
"stream.pipe.transport", {FutureFlag.OwnCancelSchedule})
GC_ref(transp) GC_ref(transp)
transp transp
@ -1457,7 +1459,8 @@ else:
transp.buffer = newSeq[byte](bufsize) transp.buffer = newSeq[byte](bufsize)
transp.state = {ReadPaused, WritePaused} transp.state = {ReadPaused, WritePaused}
transp.queue = initDeque[StreamVector]() transp.queue = initDeque[StreamVector]()
transp.future = newFuture[void]("socket.stream.transport") transp.future = Future[void].Raising([]).init(
"socket.stream.transport", {FutureFlag.OwnCancelSchedule})
GC_ref(transp) GC_ref(transp)
transp transp
@ -1473,7 +1476,8 @@ else:
transp.buffer = newSeq[byte](bufsize) transp.buffer = newSeq[byte](bufsize)
transp.state = {ReadPaused, WritePaused} transp.state = {ReadPaused, WritePaused}
transp.queue = initDeque[StreamVector]() transp.queue = initDeque[StreamVector]()
transp.future = newFuture[void]("pipe.stream.transport") transp.future = Future[void].Raising([]).init(
"pipe.stream.transport", {FutureFlag.OwnCancelSchedule})
GC_ref(transp) GC_ref(transp)
transp transp
@ -1806,6 +1810,9 @@ proc connect*(address: TransportAddress,
if TcpNoDelay in flags: mappedFlags.incl(SocketFlags.TcpNoDelay) if TcpNoDelay in flags: mappedFlags.incl(SocketFlags.TcpNoDelay)
connect(address, bufferSize, child, localAddress, mappedFlags, dualstack) connect(address, bufferSize, child, localAddress, mappedFlags, dualstack)
proc closed*(server: StreamServer): bool =
server.status == ServerStatus.Closed
proc close*(server: StreamServer) = proc close*(server: StreamServer) =
## Release ``server`` resources. ## Release ``server`` resources.
## ##
@ -1832,22 +1839,11 @@ proc close*(server: StreamServer) =
else: else:
server.sock.closeSocket(continuation) server.sock.closeSocket(continuation)
proc closeWait*(server: StreamServer): Future[void] {. proc closeWait*(server: StreamServer): Future[void] {.async: (raises: []).} =
async: (raw: true, raises: []).} =
## Close server ``server`` and release all resources. ## Close server ``server`` and release all resources.
let retFuture = newFuture[void]( if not server.closed():
"stream.server.closeWait", {FutureFlag.OwnCancelSchedule}) server.close()
await noCancel(server.join())
proc continuation(udata: pointer) =
retFuture.complete()
server.close()
if not(server.loopFuture.finished()):
server.loopFuture.addCallback(continuation, cast[pointer](retFuture))
else:
retFuture.complete()
retFuture
proc getBacklogSize(backlog: int): cint = proc getBacklogSize(backlog: int): cint =
doAssert(backlog >= 0 and backlog <= high(int32)) doAssert(backlog >= 0 and backlog <= high(int32))
@ -2058,7 +2054,9 @@ proc createStreamServer*(host: TransportAddress,
sres.init = init sres.init = init
sres.bufferSize = bufferSize sres.bufferSize = bufferSize
sres.status = Starting sres.status = Starting
sres.loopFuture = newFuture[void]("stream.transport.server") sres.loopFuture = asyncloop.init(
Future[void].Raising([]), "stream.transport.server",
{FutureFlag.OwnCancelSchedule})
sres.udata = udata sres.udata = udata
sres.dualstack = dualstack sres.dualstack = dualstack
if localAddress.family == AddressFamily.None: if localAddress.family == AddressFamily.None:
@ -2630,6 +2628,23 @@ proc join*(transp: StreamTransport): Future[void] {.
retFuture.complete() retFuture.complete()
return retFuture return retFuture
proc closed*(transp: StreamTransport): bool {.inline.} =
## Returns ``true`` if transport in closed state.
({ReadClosed, WriteClosed} * transp.state != {})
proc finished*(transp: StreamTransport): bool {.inline.} =
## Returns ``true`` if transport in finished (EOF) state.
({ReadEof, WriteEof} * transp.state != {})
proc failed*(transp: StreamTransport): bool {.inline.} =
## Returns ``true`` if transport in error state.
({ReadError, WriteError} * transp.state != {})
proc running*(transp: StreamTransport): bool {.inline.} =
## Returns ``true`` if transport is still pending.
({ReadClosed, ReadEof, ReadError,
WriteClosed, WriteEof, WriteError} * transp.state == {})
proc close*(transp: StreamTransport) = proc close*(transp: StreamTransport) =
## Closes and frees resources of transport ``transp``. ## Closes and frees resources of transport ``transp``.
## ##
@ -2672,31 +2687,11 @@ proc close*(transp: StreamTransport) =
elif transp.kind == TransportKind.Socket: elif transp.kind == TransportKind.Socket:
closeSocket(transp.fd, continuation) closeSocket(transp.fd, continuation)
proc closeWait*(transp: StreamTransport): Future[void] {. proc closeWait*(transp: StreamTransport): Future[void] {.async: (raises: []).} =
async: (raw: true, raises: []).} =
## Close and frees resources of transport ``transp``. ## Close and frees resources of transport ``transp``.
let retFuture = newFuture[void]( if not transp.closed():
"stream.transport.closeWait", {FutureFlag.OwnCancelSchedule}) transp.close()
await noCancel(transp.join())
if {ReadClosed, WriteClosed} * transp.state != {}:
retFuture.complete()
return retFuture
proc continuation(udata: pointer) {.gcsafe.} =
retFuture.complete()
proc cancellation(udata: pointer) {.gcsafe.} =
# We are not going to change the state of `retFuture` to cancelled, so we
# will prevent the entire sequence of Futures from being cancelled.
discard
transp.close()
if transp.future.finished():
retFuture.complete()
else:
transp.future.addCallback(continuation, cast[pointer](retFuture))
retFuture.cancelCallback = cancellation
retFuture
proc shutdownWait*(transp: StreamTransport): Future[void] {. proc shutdownWait*(transp: StreamTransport): Future[void] {.
async: (raw: true, raises: [TransportError, CancelledError]).} = async: (raw: true, raises: [TransportError, CancelledError]).} =
@ -2756,23 +2751,6 @@ proc shutdownWait*(transp: StreamTransport): Future[void] {.
callSoon(continuation, nil) callSoon(continuation, nil)
retFuture retFuture
proc closed*(transp: StreamTransport): bool {.inline.} =
## Returns ``true`` if transport in closed state.
({ReadClosed, WriteClosed} * transp.state != {})
proc finished*(transp: StreamTransport): bool {.inline.} =
## Returns ``true`` if transport in finished (EOF) state.
({ReadEof, WriteEof} * transp.state != {})
proc failed*(transp: StreamTransport): bool {.inline.} =
## Returns ``true`` if transport in error state.
({ReadError, WriteError} * transp.state != {})
proc running*(transp: StreamTransport): bool {.inline.} =
## Returns ``true`` if transport is still pending.
({ReadClosed, ReadEof, ReadError,
WriteClosed, WriteEof, WriteError} * transp.state == {})
proc fromPipe2*(fd: AsyncFD, child: StreamTransport = nil, proc fromPipe2*(fd: AsyncFD, child: StreamTransport = nil,
bufferSize = DefaultStreamBufferSize bufferSize = DefaultStreamBufferSize
): Result[StreamTransport, OSErrorCode] = ): Result[StreamTransport, OSErrorCode] =

View File

@ -26,6 +26,7 @@ template checkLeaks*(name: string): untyped =
", closed = " & $ counter.closed ", closed = " & $ counter.closed
check counter.opened == counter.closed check counter.opened == counter.closed
template checkLeaks*(): untyped = proc checkLeaks*() =
for key in getThreadDispatcher().trackerCounterKeys(): for key in getThreadDispatcher().trackerCounterKeys():
checkLeaks(key) checkLeaks(key)
GC_fullCollect()

View File

@ -0,0 +1,130 @@
import chronos/apps/http/httpserver
{.push raises: [].}
proc firstMiddlewareHandler(
middleware: HttpServerMiddlewareRef,
reqfence: RequestFence,
nextHandler: HttpProcessCallback2
): Future[HttpResponseRef] {.async: (raises: [CancelledError]).} =
if reqfence.isErr():
# Ignore request errors
return await nextHandler(reqfence)
let request = reqfence.get()
var headers = request.headers
if request.uri.path.startsWith("/path/to/hidden/resources"):
headers.add("X-Filter", "drop")
elif request.uri.path.startsWith("/path/to/blocked/resources"):
headers.add("X-Filter", "block")
else:
headers.add("X-Filter", "pass")
# Updating request by adding new HTTP header `X-Filter`.
let res = request.updateRequest(headers)
if res.isErr():
# We use default error handler in case of error which will respond with
# proper HTTP status code error.
return defaultResponse(res.error)
# Calling next handler.
await nextHandler(reqfence)
proc secondMiddlewareHandler(
middleware: HttpServerMiddlewareRef,
reqfence: RequestFence,
nextHandler: HttpProcessCallback2
): Future[HttpResponseRef] {.async: (raises: [CancelledError]).} =
if reqfence.isErr():
# Ignore request errors
return await nextHandler(reqfence)
let
request = reqfence.get()
filtered = request.headers.getString("X-Filter", "pass")
if filtered == "drop":
# Force HTTP server to drop connection with remote peer.
dropResponse()
elif filtered == "block":
# Force HTTP server to respond with HTTP `404 Not Found` error code.
codeResponse(Http404)
else:
# Calling next handler.
await nextHandler(reqfence)
proc thirdMiddlewareHandler(
middleware: HttpServerMiddlewareRef,
reqfence: RequestFence,
nextHandler: HttpProcessCallback2
): Future[HttpResponseRef] {.async: (raises: [CancelledError]).} =
if reqfence.isErr():
# Ignore request errors
return await nextHandler(reqfence)
let request = reqfence.get()
echo "QUERY = [", request.rawPath, "]"
echo request.headers
try:
if request.uri.path == "/path/to/plugin/resources/page1":
await request.respond(Http200, "PLUGIN PAGE1")
elif request.uri.path == "/path/to/plugin/resources/page2":
await request.respond(Http200, "PLUGIN PAGE2")
else:
# Calling next handler.
await nextHandler(reqfence)
except HttpWriteError as exc:
# We use default error handler if we unable to send response.
defaultResponse(exc)
proc mainHandler(
reqfence: RequestFence
): Future[HttpResponseRef] {.async: (raises: [CancelledError]).} =
if reqfence.isErr():
return defaultResponse()
let request = reqfence.get()
try:
if request.uri.path == "/path/to/original/page1":
await request.respond(Http200, "ORIGINAL PAGE1")
elif request.uri.path == "/path/to/original/page2":
await request.respond(Http200, "ORIGINAL PAGE2")
else:
# Force HTTP server to respond with `404 Not Found` status code.
codeResponse(Http404)
except HttpWriteError as exc:
defaultResponse(exc)
proc middlewareExample() {.async: (raises: []).} =
let
middlewares = [
HttpServerMiddlewareRef(handler: firstMiddlewareHandler),
HttpServerMiddlewareRef(handler: secondMiddlewareHandler),
HttpServerMiddlewareRef(handler: thirdMiddlewareHandler)
]
socketFlags = {ServerFlags.TcpNoDelay, ServerFlags.ReuseAddr}
boundAddress =
if isAvailable(AddressFamily.IPv6):
AnyAddress6
else:
AnyAddress
res = HttpServerRef.new(boundAddress, mainHandler,
socketFlags = socketFlags,
middlewares = middlewares)
doAssert(res.isOk(), "Unable to start HTTP server")
let server = res.get()
server.start()
let address = server.instance.localAddress()
echo "HTTP server running on ", address
try:
await server.join()
except CancelledError:
discard
finally:
await server.stop()
await server.closeWait()
when isMainModule:
waitFor(middlewareExample())

View File

@ -8,6 +8,7 @@
- [Errors and exceptions](./error_handling.md) - [Errors and exceptions](./error_handling.md)
- [Tips, tricks and best practices](./tips.md) - [Tips, tricks and best practices](./tips.md)
- [Porting code to `chronos`](./porting.md) - [Porting code to `chronos`](./porting.md)
- [HTTP server middleware](./http_server_middleware.md)
# Developer guide # Developer guide

View File

@ -16,3 +16,4 @@ Examples are available in the [`docs/examples/`](https://github.com/status-im/ni
* [httpget](https://github.com/status-im/nim-chronos/tree/master/docs/examples/httpget.nim) - Downloading a web page using the http client * [httpget](https://github.com/status-im/nim-chronos/tree/master/docs/examples/httpget.nim) - Downloading a web page using the http client
* [twogets](https://github.com/status-im/nim-chronos/tree/master/docs/examples/twogets.nim) - Download two pages concurrently * [twogets](https://github.com/status-im/nim-chronos/tree/master/docs/examples/twogets.nim) - Download two pages concurrently
* [middleware](https://github.com/status-im/nim-chronos/tree/master/docs/examples/middleware.nim) - Deploy multiple HTTP server middlewares

View File

@ -0,0 +1,102 @@
## HTTP server middleware
Chronos provides a powerful mechanism for customizing HTTP request handlers via
middlewares.
A middleware is a coroutine that can modify, block or filter HTTP request.
Single HTTP server could support unlimited number of middlewares, but you need to consider that each request in worst case could go through all the middlewares, and therefore a huge number of middlewares can have a significant impact on HTTP server performance.
Order of middlewares is also important: right after HTTP server has received request, it will be sent to the first middleware in list, and each middleware will be responsible for passing control to other middlewares. Therefore, when building a list, it would be a good idea to place the request handlers at the end of the list, while keeping the middleware that could block or modify the request at the beginning of the list.
Middleware could also modify HTTP server request, and these changes will be visible to all handlers (either middlewares or the original request handler). This can be done using the following helpers:
```nim
proc updateRequest*(request: HttpRequestRef, scheme: string, meth: HttpMethod,
version: HttpVersion, requestUri: string,
headers: HttpTable): HttpResultMessage[void]
proc updateRequest*(request: HttpRequestRef, meth: HttpMethod,
requestUri: string,
headers: HttpTable): HttpResultMessage[void]
proc updateRequest*(request: HttpRequestRef, requestUri: string,
headers: HttpTable): HttpResultMessage[void]
proc updateRequest*(request: HttpRequestRef,
requestUri: string): HttpResultMessage[void]
proc updateRequest*(request: HttpRequestRef,
headers: HttpTable): HttpResultMessage[void]
```
As you can see all the HTTP request parameters could be modified: request method, version, request path and request headers.
Middleware could also use helpers to obtain more information about remote and local addresses of request's connection (this could be helpful when you need to do some IP address filtering).
```nim
proc remote*(request: HttpRequestRef): Opt[TransportAddress]
## Returns remote address of HTTP request's connection.
proc local*(request: HttpRequestRef): Opt[TransportAddress] =
## Returns local address of HTTP request's connection.
```
Every middleware is the coroutine which looks like this:
```nim
proc middlewareHandler(
middleware: HttpServerMiddlewareRef,
reqfence: RequestFence,
nextHandler: HttpProcessCallback2
): Future[HttpResponseRef] {.async: (raises: [CancelledError]).} =
```
Where `middleware` argument is the object which could hold some specific values, `reqfence` is HTTP request which is enclosed with HTTP server error information and `nextHandler` is reference to next request handler, it could be either middleware handler or the original request processing callback handler.
```nim
await nextHandler(reqfence)
```
You should perform await for the response from the `nextHandler(reqfence)`. Usually you should call next handler when you dont want to handle request or you dont know how to handle it, for example:
```nim
proc middlewareHandler(
middleware: HttpServerMiddlewareRef,
reqfence: RequestFence,
nextHandler: HttpProcessCallback2
): Future[HttpResponseRef] {.async: (raises: [CancelledError]).} =
if reqfence.isErr():
# We dont know or do not want to handle failed requests, so we call next handler.
return await nextHandler(reqfence)
let request = reqfence.get()
if request.uri.path == "/path/we/able/to/respond":
try:
# Sending some response.
await request.respond(Http200, "TEST")
except HttpWriteError as exc:
# We could also return default response for exception or other types of error.
defaultResponse(exc)
elif request.uri.path == "/path/for/rewrite":
# We going to modify request object for this request, next handler will receive it with different request path.
let res = request.updateRequest("/path/to/new/location")
if res.isErr():
return defaultResponse(res.error)
await nextHandler(reqfence)
elif request.uri.path == "/restricted/path":
if request.remote().isNone():
# We can't obtain remote address, so we force HTTP server to respond with `401 Unauthorized` status code.
return codeResponse(Http401)
if $(request.remote().get()).startsWith("127.0.0.1"):
# Remote peer's address starts with "127.0.0.1", sending proper response.
await request.respond(Http200, "AUTHORIZED")
else:
# Force HTTP server to respond with `403 Forbidden` status code.
codeResponse(Http403)
elif request.uri.path == "/blackhole":
# Force HTTP server to drop connection with remote peer.
dropResponse()
else:
# All other requests should be handled by somebody else.
await nextHandler(reqfence)
```

View File

@ -84,6 +84,9 @@ proc createBigMessage(message: string, size: int): seq[byte] =
res res
suite "AsyncStream test suite": suite "AsyncStream test suite":
teardown:
checkLeaks()
test "AsyncStream(StreamTransport) readExactly() test": test "AsyncStream(StreamTransport) readExactly() test":
proc testReadExactly(): Future[bool] {.async.} = proc testReadExactly(): Future[bool] {.async.} =
proc serveClient(server: StreamServer, proc serveClient(server: StreamServer,
@ -256,9 +259,6 @@ suite "AsyncStream test suite":
result = true result = true
check waitFor(testConsume()) == true check waitFor(testConsume()) == true
test "AsyncStream(StreamTransport) leaks test":
checkLeaks()
test "AsyncStream(AsyncStream) readExactly() test": test "AsyncStream(AsyncStream) readExactly() test":
proc testReadExactly2(): Future[bool] {.async.} = proc testReadExactly2(): Future[bool] {.async.} =
proc serveClient(server: StreamServer, proc serveClient(server: StreamServer,
@ -581,10 +581,10 @@ suite "AsyncStream test suite":
check waitFor(testWriteEof()) == true check waitFor(testWriteEof()) == true
test "AsyncStream(AsyncStream) leaks test": suite "ChunkedStream test suite":
teardown:
checkLeaks() checkLeaks()
suite "ChunkedStream test suite":
test "ChunkedStream test vectors": test "ChunkedStream test vectors":
const ChunkedVectors = [ const ChunkedVectors = [
["4\r\nWiki\r\n5\r\npedia\r\nE\r\n in\r\n\r\nchunks.\r\n0\r\n\r\n", ["4\r\nWiki\r\n5\r\npedia\r\nE\r\n in\r\n\r\nchunks.\r\n0\r\n\r\n",
@ -890,10 +890,10 @@ suite "ChunkedStream test suite":
check waitFor(testSmallChunk(262400, 4096, 61)) == true check waitFor(testSmallChunk(262400, 4096, 61)) == true
check waitFor(testSmallChunk(767309, 4457, 173)) == true check waitFor(testSmallChunk(767309, 4457, 173)) == true
test "ChunkedStream leaks test": suite "TLSStream test suite":
teardown:
checkLeaks() checkLeaks()
suite "TLSStream test suite":
const HttpHeadersMark = @[byte(0x0D), byte(0x0A), byte(0x0D), byte(0x0A)] const HttpHeadersMark = @[byte(0x0D), byte(0x0A), byte(0x0D), byte(0x0A)]
test "Simple HTTPS connection": test "Simple HTTPS connection":
proc headerClient(address: TransportAddress, proc headerClient(address: TransportAddress,
@ -1023,10 +1023,9 @@ suite "TLSStream test suite":
let res = waitFor checkTrustAnchors("Some message") let res = waitFor checkTrustAnchors("Some message")
check res == "Some message\r\n" check res == "Some message\r\n"
test "TLSStream leaks test":
checkLeaks()
suite "BoundedStream test suite": suite "BoundedStream test suite":
teardown:
checkLeaks()
type type
BoundarySizeTest = enum BoundarySizeTest = enum
@ -1402,6 +1401,3 @@ suite "BoundedStream test suite":
return (writer1Res and writer2Res and readerRes) return (writer1Res and writer2Res and readerRes)
check waitFor(checkEmptyStreams()) == true check waitFor(checkEmptyStreams()) == true
test "BoundedStream leaks test":
checkLeaks()

View File

@ -13,6 +13,9 @@ import ".."/chronos
{.used.} {.used.}
suite "Datagram Transport test suite": suite "Datagram Transport test suite":
teardown:
checkLeaks()
const const
TestsCount = 2000 TestsCount = 2000
ClientsCount = 20 ClientsCount = 20
@ -727,5 +730,3 @@ suite "Datagram Transport test suite":
DualStackType.Auto, initTAddress("[::1]:0"))) == true DualStackType.Auto, initTAddress("[::1]:0"))) == true
else: else:
skip() skip()
test "Transports leak test":
checkLeaks()

View File

@ -74,6 +74,8 @@ N8r5CwGcIX/XPC3lKazzbZ8baA==
""" """
suite "HTTP client testing suite": suite "HTTP client testing suite":
teardown:
checkLeaks()
type type
TestResponseTuple = tuple[status: int, data: string, count: int] TestResponseTuple = tuple[status: int, data: string, count: int]
@ -1516,6 +1518,3 @@ suite "HTTP client testing suite":
res.isErr() and res.error == HttpAddressErrorType.NameLookupFailed res.isErr() and res.error == HttpAddressErrorType.NameLookupFailed
res.error.isRecoverableError() res.error.isRecoverableError()
not(res.error.isCriticalError()) not(res.error.isCriticalError())
test "Leaks test":
checkLeaks()

View File

@ -14,13 +14,23 @@ import stew/base10
{.used.} {.used.}
suite "HTTP server testing suite": suite "HTTP server testing suite":
teardown:
checkLeaks()
type type
TooBigTest = enum TooBigTest = enum
GetBodyTest, ConsumeBodyTest, PostUrlTest, PostMultipartTest GetBodyTest, ConsumeBodyTest, PostUrlTest, PostMultipartTest
TestHttpResponse = object TestHttpResponse = object
status: int
headers: HttpTable headers: HttpTable
data: string data: string
FirstMiddlewareRef = ref object of HttpServerMiddlewareRef
someInteger: int
SecondMiddlewareRef = ref object of HttpServerMiddlewareRef
someString: string
proc httpClient(address: TransportAddress, proc httpClient(address: TransportAddress,
data: string): Future[string] {.async.} = data: string): Future[string] {.async.} =
var transp: StreamTransport var transp: StreamTransport
@ -50,7 +60,7 @@ suite "HTTP server testing suite":
zeroMem(addr buffer[0], len(buffer)) zeroMem(addr buffer[0], len(buffer))
await transp.readExactly(addr buffer[0], length) await transp.readExactly(addr buffer[0], length)
let data = bytesToString(buffer.toOpenArray(0, length - 1)) let data = bytesToString(buffer.toOpenArray(0, length - 1))
let headers = let (status, headers) =
block: block:
let resp = parseResponse(hdata, false) let resp = parseResponse(hdata, false)
if resp.failed(): if resp.failed():
@ -58,8 +68,38 @@ suite "HTTP server testing suite":
var res = HttpTable.init() var res = HttpTable.init()
for key, value in resp.headers(hdata): for key, value in resp.headers(hdata):
res.add(key, value) res.add(key, value)
res (resp.code, res)
return TestHttpResponse(headers: headers, data: data) TestHttpResponse(status: status, headers: headers, data: data)
proc httpClient3(address: TransportAddress,
data: string): Future[TestHttpResponse] {.async.} =
var
transp: StreamTransport
buffer = newSeq[byte](4096)
sep = @[0x0D'u8, 0x0A'u8, 0x0D'u8, 0x0A'u8]
try:
transp = await connect(address)
if len(data) > 0:
let wres = await transp.write(data)
if wres != len(data):
raise newException(ValueError, "Unable to write full request")
let hres = await transp.readUntil(addr buffer[0], len(buffer), sep)
var hdata = @buffer
hdata.setLen(hres)
var rres = bytesToString(await transp.read())
let (status, headers) =
block:
let resp = parseResponse(hdata, false)
if resp.failed():
raise newException(ValueError, "Unable to decode response headers")
var res = HttpTable.init()
for key, value in resp.headers(hdata):
res.add(key, value)
(resp.code, res)
TestHttpResponse(status: status, headers: headers, data: rres)
finally:
if not(isNil(transp)):
await closeWait(transp)
proc testTooBigBodyChunked(operation: TooBigTest): Future[bool] {.async.} = proc testTooBigBodyChunked(operation: TooBigTest): Future[bool] {.async.} =
var serverRes = false var serverRes = false
@ -1490,5 +1530,258 @@ suite "HTTP server testing suite":
await server.stop() await server.stop()
await server.closeWait() await server.closeWait()
test "Leaks test": asyncTest "HTTP middleware request filtering test":
checkLeaks() proc init(t: typedesc[FirstMiddlewareRef],
data: int): HttpServerMiddlewareRef =
proc shandler(
middleware: HttpServerMiddlewareRef,
reqfence: RequestFence,
nextHandler: HttpProcessCallback2
): Future[HttpResponseRef] {.async: (raises: [CancelledError]).} =
let mw = FirstMiddlewareRef(middleware)
if reqfence.isErr():
# Our handler is not supposed to handle request errors, so we
# call next handler in sequence which could process errors.
return await nextHandler(reqfence)
let request = reqfence.get()
if request.uri.path == "/first":
# This is request we are waiting for, so we going to process it.
try:
await request.respond(Http200, $mw.someInteger)
except HttpWriteError as exc:
defaultResponse(exc)
else:
# We know nothing about request's URI, so we pass this request to the
# next handler which could process such request.
await nextHandler(reqfence)
HttpServerMiddlewareRef(
FirstMiddlewareRef(someInteger: data, handler: shandler))
proc init(t: typedesc[SecondMiddlewareRef],
data: string): HttpServerMiddlewareRef =
proc shandler(
middleware: HttpServerMiddlewareRef,
reqfence: RequestFence,
nextHandler: HttpProcessCallback2
): Future[HttpResponseRef] {.async: (raises: [CancelledError]).} =
let mw = SecondMiddlewareRef(middleware)
if reqfence.isErr():
# Our handler is not supposed to handle request errors, so we
# call next handler in sequence which could process errors.
return await nextHandler(reqfence)
let request = reqfence.get()
if request.uri.path == "/second":
# This is request we are waiting for, so we going to process it.
try:
await request.respond(Http200, mw.someString)
except HttpWriteError as exc:
defaultResponse(exc)
else:
# We know nothing about request's URI, so we pass this request to the
# next handler which could process such request.
await nextHandler(reqfence)
HttpServerMiddlewareRef(
SecondMiddlewareRef(someString: data, handler: shandler))
proc process(r: RequestFence): Future[HttpResponseRef] {.
async: (raises: [CancelledError]).} =
if r.isOk():
let request = r.get()
if request.uri.path == "/test":
try:
await request.respond(Http200, "ORIGIN")
except HttpWriteError as exc:
defaultResponse(exc)
else:
defaultResponse()
else:
defaultResponse()
let
middlewares = [FirstMiddlewareRef.init(655370),
SecondMiddlewareRef.init("SECOND")]
socketFlags = {ServerFlags.TcpNoDelay, ServerFlags.ReuseAddr}
res = HttpServerRef.new(initTAddress("127.0.0.1:0"), process,
socketFlags = socketFlags,
middlewares = middlewares)
check res.isOk()
let server = res.get()
server.start()
let
address = server.instance.localAddress()
req1 = "GET /test HTTP/1.1\r\n\r\n"
req2 = "GET /first HTTP/1.1\r\n\r\n"
req3 = "GET /second HTTP/1.1\r\n\r\n"
req4 = "GET /noway HTTP/1.1\r\n\r\n"
resp1 = await httpClient3(address, req1)
resp2 = await httpClient3(address, req2)
resp3 = await httpClient3(address, req3)
resp4 = await httpClient3(address, req4)
check:
resp1.status == 200
resp1.data == "ORIGIN"
resp2.status == 200
resp2.data == "655370"
resp3.status == 200
resp3.data == "SECOND"
resp4.status == 404
await server.stop()
await server.closeWait()
asyncTest "HTTP middleware request modification test":
proc init(t: typedesc[FirstMiddlewareRef],
data: int): HttpServerMiddlewareRef =
proc shandler(
middleware: HttpServerMiddlewareRef,
reqfence: RequestFence,
nextHandler: HttpProcessCallback2
): Future[HttpResponseRef] {.async: (raises: [CancelledError]).} =
let mw = FirstMiddlewareRef(middleware)
if reqfence.isErr():
# Our handler is not supposed to handle request errors, so we
# call next handler in sequence which could process errors.
return await nextHandler(reqfence)
let
request = reqfence.get()
modifiedUri = "/modified/" & $mw.someInteger & request.rawPath
var modifiedHeaders = request.headers
modifiedHeaders.add("X-Modified", "test-value")
let res = request.updateRequest(modifiedUri, modifiedHeaders)
if res.isErr():
return defaultResponse(res.error)
# We sending modified request to the next handler.
await nextHandler(reqfence)
HttpServerMiddlewareRef(
FirstMiddlewareRef(someInteger: data, handler: shandler))
proc process(r: RequestFence): Future[HttpResponseRef] {.
async: (raises: [CancelledError]).} =
if r.isOk():
let request = r.get()
try:
await request.respond(Http200, request.rawPath & ":" &
request.headers.getString("x-modified"))
except HttpWriteError as exc:
defaultResponse(exc)
else:
defaultResponse()
let
middlewares = [FirstMiddlewareRef.init(655370)]
socketFlags = {ServerFlags.TcpNoDelay, ServerFlags.ReuseAddr}
res = HttpServerRef.new(initTAddress("127.0.0.1:0"), process,
socketFlags = socketFlags,
middlewares = middlewares)
check res.isOk()
let server = res.get()
server.start()
let
address = server.instance.localAddress()
req1 = "GET /test HTTP/1.1\r\n\r\n"
req2 = "GET /first HTTP/1.1\r\n\r\n"
req3 = "GET /second HTTP/1.1\r\n\r\n"
req4 = "GET /noway HTTP/1.1\r\n\r\n"
resp1 = await httpClient3(address, req1)
resp2 = await httpClient3(address, req2)
resp3 = await httpClient3(address, req3)
resp4 = await httpClient3(address, req4)
check:
resp1.status == 200
resp1.data == "/modified/655370/test:test-value"
resp2.status == 200
resp2.data == "/modified/655370/first:test-value"
resp3.status == 200
resp3.data == "/modified/655370/second:test-value"
resp4.status == 200
resp4.data == "/modified/655370/noway:test-value"
await server.stop()
await server.closeWait()
asyncTest "HTTP middleware request blocking test":
proc init(t: typedesc[FirstMiddlewareRef],
data: int): HttpServerMiddlewareRef =
proc shandler(
middleware: HttpServerMiddlewareRef,
reqfence: RequestFence,
nextHandler: HttpProcessCallback2
): Future[HttpResponseRef] {.async: (raises: [CancelledError]).} =
if reqfence.isErr():
# Our handler is not supposed to handle request errors, so we
# call next handler in sequence which could process errors.
return await nextHandler(reqfence)
let request = reqfence.get()
if request.uri.path == "/first":
# Blocking request by disconnecting remote peer.
dropResponse()
elif request.uri.path == "/second":
# Blocking request by sending HTTP error message with 401 code.
codeResponse(Http401)
else:
# Allow all other requests to be processed by next handler.
await nextHandler(reqfence)
HttpServerMiddlewareRef(
FirstMiddlewareRef(someInteger: data, handler: shandler))
proc process(r: RequestFence): Future[HttpResponseRef] {.
async: (raises: [CancelledError]).} =
if r.isOk():
let request = r.get()
try:
await request.respond(Http200, "ORIGIN")
except HttpWriteError as exc:
defaultResponse(exc)
else:
defaultResponse()
let
middlewares = [FirstMiddlewareRef.init(655370)]
socketFlags = {ServerFlags.TcpNoDelay, ServerFlags.ReuseAddr}
res = HttpServerRef.new(initTAddress("127.0.0.1:0"), process,
socketFlags = socketFlags,
middlewares = middlewares)
check res.isOk()
let server = res.get()
server.start()
let
address = server.instance.localAddress()
req1 = "GET /test HTTP/1.1\r\n\r\n"
req2 = "GET /first HTTP/1.1\r\n\r\n"
req3 = "GET /second HTTP/1.1\r\n\r\n"
resp1 = await httpClient3(address, req1)
resp3 = await httpClient3(address, req3)
check:
resp1.status == 200
resp1.data == "ORIGIN"
resp3.status == 401
let checked =
try:
let res {.used.} = await httpClient3(address, req2)
false
except TransportIncompleteError:
true
check:
checked == true
await server.stop()
await server.closeWait()

View File

@ -555,3 +555,27 @@ suite "Exceptions tracking":
await raiseException() await raiseException()
waitFor(callCatchAll()) waitFor(callCatchAll())
test "Results compatibility":
proc returnOk(): Future[Result[int, string]] {.async: (raises: []).} =
ok(42)
proc returnErr(): Future[Result[int, string]] {.async: (raises: []).} =
err("failed")
proc testit(): Future[Result[void, string]] {.async: (raises: []).} =
let
v = await returnOk()
check:
v.isOk() and v.value() == 42
let
vok = ?v
check:
vok == 42
discard ?await returnErr()
check:
waitFor(testit()).error() == "failed"

View File

@ -16,6 +16,9 @@ when defined(posix):
when defined(nimHasUsed): {.used.} when defined(nimHasUsed): {.used.}
suite "Asynchronous process management test suite": suite "Asynchronous process management test suite":
teardown:
checkLeaks()
const OutputTests = const OutputTests =
when defined(windows): when defined(windows):
[ [
@ -463,6 +466,3 @@ suite "Asynchronous process management test suite":
skip() skip()
else: else:
check getCurrentFD() == markFD check getCurrentFD() == markFD
test "Leaks test":
checkLeaks()

View File

@ -75,6 +75,8 @@ N8r5CwGcIX/XPC3lKazzbZ8baA==
suite "Secure HTTP server testing suite": suite "Secure HTTP server testing suite":
teardown:
checkLeaks()
proc httpsClient(address: TransportAddress, proc httpsClient(address: TransportAddress,
data: string, flags = {NoVerifyHost, NoVerifyServerName} data: string, flags = {NoVerifyHost, NoVerifyServerName}
@ -184,6 +186,3 @@ suite "Secure HTTP server testing suite":
return serverRes and data == "EXCEPTION" return serverRes and data == "EXCEPTION"
check waitFor(testHTTPS2(initTAddress("127.0.0.1:30080"))) == true check waitFor(testHTTPS2(initTAddress("127.0.0.1:30080"))) == true
test "Leaks test":
checkLeaks()

View File

@ -16,6 +16,9 @@ when defined(windows):
importc: "_get_osfhandle", header:"<io.h>".} importc: "_get_osfhandle", header:"<io.h>".}
suite "Stream Transport test suite": suite "Stream Transport test suite":
teardown:
checkLeaks()
const const
ConstantMessage = "SOMEDATA" ConstantMessage = "SOMEDATA"
BigMessagePattern = "ABCDEFGHIJKLMNOPQRSTUVWXYZ" BigMessagePattern = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
@ -1555,12 +1558,6 @@ suite "Stream Transport test suite":
check waitFor(testAccept(addresses[i])) == true check waitFor(testAccept(addresses[i])) == true
test prefixes[i] & "close() while in accept() waiting test": test prefixes[i] & "close() while in accept() waiting test":
check waitFor(testAcceptClose(addresses[i])) == true check waitFor(testAcceptClose(addresses[i])) == true
test prefixes[i] & "Intermediate transports leak test #1":
checkLeaks()
when defined(windows):
skip()
else:
checkLeaks(StreamTransportTrackerName)
test prefixes[i] & "accept() too many file descriptors test": test prefixes[i] & "accept() too many file descriptors test":
when defined(windows): when defined(windows):
skip() skip()
@ -1671,8 +1668,6 @@ suite "Stream Transport test suite":
DualStackType.Disabled, initTAddress("[::1]:0"))) == true DualStackType.Disabled, initTAddress("[::1]:0"))) == true
else: else:
skip() skip()
test "Leaks test":
checkLeaks()
test "File descriptors leak test": test "File descriptors leak test":
when defined(windows): when defined(windows):
# Windows handle numbers depends on many conditions, so we can't use # Windows handle numbers depends on many conditions, so we can't use