mirror of
https://github.com/status-im/nim-chronos.git
synced 2025-01-18 23:31:13 +00:00
Expect header handling and response preparations.
This commit is contained in:
parent
74b0f85fc7
commit
0b03f8ec50
@ -18,8 +18,8 @@ when defined(useChroniclesLogging):
|
|||||||
import chronicles
|
import chronicles
|
||||||
|
|
||||||
type
|
type
|
||||||
HttpServerFlags* = enum
|
HttpServerFlags* {.pure.} = enum
|
||||||
Secure
|
Secure, NoExpectHandler
|
||||||
|
|
||||||
HttpStatus* = enum
|
HttpStatus* = enum
|
||||||
DropConnection, KeepConnection
|
DropConnection, KeepConnection
|
||||||
@ -35,7 +35,11 @@ type
|
|||||||
RequestFence*[T] = Result[T, HttpProcessError]
|
RequestFence*[T] = Result[T, HttpProcessError]
|
||||||
|
|
||||||
HttpRequestFlags* {.pure.} = enum
|
HttpRequestFlags* {.pure.} = enum
|
||||||
BoundBody, UnboundBody, MultipartForm, UrlencodedForm
|
BoundBody, UnboundBody, MultipartForm, UrlencodedForm,
|
||||||
|
ClientExpect
|
||||||
|
|
||||||
|
HttpResponseFlags* {.pure.} = enum
|
||||||
|
Prepared, DataSent, DataSending, KeepAlive
|
||||||
|
|
||||||
HttpProcessCallback* =
|
HttpProcessCallback* =
|
||||||
proc(req: RequestFence[HttpRequest]): Future[HttpStatus] {.gcsafe.}
|
proc(req: RequestFence[HttpRequest]): Future[HttpStatus] {.gcsafe.}
|
||||||
@ -75,12 +79,14 @@ type
|
|||||||
connection*: HttpConnection
|
connection*: HttpConnection
|
||||||
mainReader*: AsyncStreamReader
|
mainReader*: AsyncStreamReader
|
||||||
mainWriter*: AsyncStreamWriter
|
mainWriter*: AsyncStreamWriter
|
||||||
|
response*: Option[HttpResponse]
|
||||||
|
|
||||||
HttpResponse* = object
|
HttpResponse* = ref object of RootRef
|
||||||
code*: HttpCode
|
status*: HttpCode
|
||||||
version*: HttpVersion
|
version*: HttpVersion
|
||||||
headersTable: HttpTable
|
headersTable: HttpTable
|
||||||
body*: seq[byte]
|
body*: seq[byte]
|
||||||
|
responseFlags*: set[HttpResponseFlags]
|
||||||
connection*: HttpConnection
|
connection*: HttpConnection
|
||||||
mainWriter: AsyncStreamWriter
|
mainWriter: AsyncStreamWriter
|
||||||
|
|
||||||
@ -94,8 +100,9 @@ proc init(htype: typedesc[HttpProcessError], error: HTTPServerError,
|
|||||||
remote: TransportAddress): HttpProcessError =
|
remote: TransportAddress): HttpProcessError =
|
||||||
HttpProcessError(error: error, exc: exc, remote: remote)
|
HttpProcessError(error: error, exc: exc, remote: remote)
|
||||||
|
|
||||||
proc init*(htype: typedesc[HttpResponse], req: HttpRequest): HttpResponse =
|
proc new*(htype: typedesc[HttpResponse], req: HttpRequest): HttpResponse =
|
||||||
HttpResponse(
|
HttpResponse(
|
||||||
|
status: Http200,
|
||||||
version: req.version,
|
version: req.version,
|
||||||
headersTable: HttpTable.init(),
|
headersTable: HttpTable.init(),
|
||||||
connection: req.connection,
|
connection: req.connection,
|
||||||
@ -105,7 +112,8 @@ proc init*(htype: typedesc[HttpResponse], req: HttpRequest): HttpResponse =
|
|||||||
proc new*(htype: typedesc[HttpServer],
|
proc new*(htype: typedesc[HttpServer],
|
||||||
address: TransportAddress,
|
address: TransportAddress,
|
||||||
processCallback: HttpProcessCallback,
|
processCallback: HttpProcessCallback,
|
||||||
flags: set[HttpServerFlags] = {},
|
serverFlags: set[HttpServerFlags] = {},
|
||||||
|
socketFlags: set[ServerFlags] = {ReuseAddr},
|
||||||
serverUri = Uri(),
|
serverUri = Uri(),
|
||||||
maxConnections: int = -1,
|
maxConnections: int = -1,
|
||||||
bufferSize: int = 4096,
|
bufferSize: int = 4096,
|
||||||
@ -114,26 +122,28 @@ proc new*(htype: typedesc[HttpServer],
|
|||||||
httpBodyTimeout = 30.seconds,
|
httpBodyTimeout = 30.seconds,
|
||||||
maxHeadersSize: int = 8192,
|
maxHeadersSize: int = 8192,
|
||||||
maxRequestBodySize: int = 1_048_576): HttpResult[HttpServer] =
|
maxRequestBodySize: int = 1_048_576): HttpResult[HttpServer] =
|
||||||
|
|
||||||
var res = HttpServer(
|
var res = HttpServer(
|
||||||
maxConnections: maxConnections,
|
maxConnections: maxConnections,
|
||||||
headersTimeout: httpHeadersTimeout,
|
headersTimeout: httpHeadersTimeout,
|
||||||
bodyTimeout: httpBodyTimeout,
|
bodyTimeout: httpBodyTimeout,
|
||||||
maxHeadersSize: maxHeadersSize,
|
maxHeadersSize: maxHeadersSize,
|
||||||
maxRequestBodySize: maxRequestBodySize,
|
maxRequestBodySize: maxRequestBodySize,
|
||||||
processCallback: processCallback
|
processCallback: processCallback,
|
||||||
|
flags: serverFlags
|
||||||
)
|
)
|
||||||
|
|
||||||
res.baseUri =
|
res.baseUri =
|
||||||
if len(serverUri.hostname) > 0 and isAbsolute(serverUri):
|
if len(serverUri.hostname) > 0 and isAbsolute(serverUri):
|
||||||
serverUri
|
serverUri
|
||||||
else:
|
else:
|
||||||
if HttpServerFlags.Secure in flags:
|
if HttpServerFlags.Secure in serverFlags:
|
||||||
parseUri("https://" & $address & "/")
|
parseUri("https://" & $address & "/")
|
||||||
else:
|
else:
|
||||||
parseUri("http://" & $address & "/")
|
parseUri("http://" & $address & "/")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
res.instance = createStreamServer(address, flags = {ReuseAddr},
|
res.instance = createStreamServer(address, flags = socketFlags,
|
||||||
bufferSize = bufferSize,
|
bufferSize = bufferSize,
|
||||||
backlog = backlogSize)
|
backlog = backlogSize)
|
||||||
# if maxConnections > 0:
|
# if maxConnections > 0:
|
||||||
@ -157,7 +167,7 @@ proc hasBody*(request: HttpRequest): bool =
|
|||||||
|
|
||||||
proc prepareRequest(conn: HttpConnection,
|
proc prepareRequest(conn: HttpConnection,
|
||||||
req: HttpRequestHeader): HttpResultCode[HttpRequest] =
|
req: HttpRequestHeader): HttpResultCode[HttpRequest] =
|
||||||
var request = HttpRequest()
|
var request = HttpRequest(connection: conn)
|
||||||
|
|
||||||
if req.version notin {HttpVersion10, HttpVersion11}:
|
if req.version notin {HttpVersion10, HttpVersion11}:
|
||||||
return err(Http505)
|
return err(Http505)
|
||||||
@ -255,16 +265,29 @@ proc prepareRequest(conn: HttpConnection,
|
|||||||
let contentType = request.headersTable.getString("content-type")
|
let contentType = request.headersTable.getString("content-type")
|
||||||
let tmp = strip(contentType).toLowerAscii()
|
let tmp = strip(contentType).toLowerAscii()
|
||||||
if tmp.startsWith(UrlEncodedType):
|
if tmp.startsWith(UrlEncodedType):
|
||||||
request.requestFlags.incl(UrlencodedForm)
|
request.requestFlags.incl(HttpRequestFlags.UrlencodedForm)
|
||||||
elif tmp.startsWith(MultipartType):
|
elif tmp.startsWith(MultipartType):
|
||||||
request.requestFlags.incl(MultipartForm)
|
request.requestFlags.incl(HttpRequestFlags.MultipartForm)
|
||||||
|
|
||||||
|
if "expect" in request.headersTable:
|
||||||
|
let expectHeader = request.headersTable.getString("expect")
|
||||||
|
if strip(expectHeader).toLowerAscii() == "100-continue":
|
||||||
|
request.requestFlags.incl(HttpRequestFlags.ClientExpect)
|
||||||
|
|
||||||
request.mainReader = newAsyncStreamReader(conn.transp)
|
request.mainReader = newAsyncStreamReader(conn.transp)
|
||||||
request.mainWriter = newAsyncStreamWriter(conn.transp)
|
request.mainWriter = newAsyncStreamWriter(conn.transp)
|
||||||
ok(request)
|
ok(request)
|
||||||
|
|
||||||
|
proc clear*(request: HttpRequest) {.async.} =
|
||||||
|
await allFutures(
|
||||||
|
request.mainReader.closeWait(),
|
||||||
|
request.mainWriter.closeWait(),
|
||||||
|
)
|
||||||
|
|
||||||
proc getBodyStream*(request: HttpRequest): HttpResult[AsyncStreamReader] =
|
proc getBodyStream*(request: HttpRequest): HttpResult[AsyncStreamReader] =
|
||||||
## Returns stream's reader instance which can be used to read request's body.
|
## Returns stream's reader instance which can be used to read request's body.
|
||||||
|
##
|
||||||
|
## Please be sure to handle ``Expect`` header properly.
|
||||||
if HttpRequestFlags.BoundBody in request.requestFlags:
|
if HttpRequestFlags.BoundBody in request.requestFlags:
|
||||||
ok(newBoundedStreamReader(request.mainReader, request.contentLength))
|
ok(newBoundedStreamReader(request.mainReader, request.contentLength))
|
||||||
elif HttpRequestFlags.UnboundBody in request.requestFlags:
|
elif HttpRequestFlags.UnboundBody in request.requestFlags:
|
||||||
@ -272,6 +295,20 @@ proc getBodyStream*(request: HttpRequest): HttpResult[AsyncStreamReader] =
|
|||||||
else:
|
else:
|
||||||
err("Request do not have body available")
|
err("Request do not have body available")
|
||||||
|
|
||||||
|
proc handleExpect*(request: HttpRequest) {.async.} =
|
||||||
|
## Handle expectation for ``Expect`` header.
|
||||||
|
## https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Expect
|
||||||
|
if HttpServerFlags.NoExpectHandler notin request.connection.server.flags:
|
||||||
|
if HttpRequestFlags.ClientExpect in request.requestFlags:
|
||||||
|
if request.version == HttpVersion11:
|
||||||
|
try:
|
||||||
|
let message = $request.version & " " & $Http100 & "\r\n\r\n"
|
||||||
|
await request.mainWriter.write(message)
|
||||||
|
except CancelledError as exc:
|
||||||
|
raise exc
|
||||||
|
except AsyncStreamWriteError, AsyncStreamIncompleteError:
|
||||||
|
raise newHttpCriticalError("Unable to send `100-continue` response")
|
||||||
|
|
||||||
proc getBody*(request: HttpRequest): Future[seq[byte]] {.async.} =
|
proc getBody*(request: HttpRequest): Future[seq[byte]] {.async.} =
|
||||||
## Obtain request's body as sequence of bytes.
|
## Obtain request's body as sequence of bytes.
|
||||||
let res = request.getBodyStream()
|
let res = request.getBodyStream()
|
||||||
@ -279,9 +316,10 @@ proc getBody*(request: HttpRequest): Future[seq[byte]] {.async.} =
|
|||||||
return @[]
|
return @[]
|
||||||
else:
|
else:
|
||||||
try:
|
try:
|
||||||
|
await request.handleExpect()
|
||||||
return await read(res.get())
|
return await read(res.get())
|
||||||
except AsyncStreamError:
|
except AsyncStreamError:
|
||||||
raise newHttpCriticalError("Read Error")
|
raise newHttpCriticalError("Unable to read request's body")
|
||||||
|
|
||||||
proc consumeBody*(request: HttpRequest): Future[void] {.async.} =
|
proc consumeBody*(request: HttpRequest): Future[void] {.async.} =
|
||||||
## Consume/discard request's body.
|
## Consume/discard request's body.
|
||||||
@ -291,10 +329,11 @@ proc consumeBody*(request: HttpRequest): Future[void] {.async.} =
|
|||||||
else:
|
else:
|
||||||
let reader = res.get()
|
let reader = res.get()
|
||||||
try:
|
try:
|
||||||
|
await request.handleExpect()
|
||||||
discard await reader.consume()
|
discard await reader.consume()
|
||||||
return
|
return
|
||||||
except AsyncStreamError:
|
except AsyncStreamError:
|
||||||
raise newHttpCriticalError("Read Error")
|
raise newHttpCriticalError("Unable to consume request's body")
|
||||||
|
|
||||||
proc sendErrorResponse(conn: HttpConnection, version: HttpVersion,
|
proc sendErrorResponse(conn: HttpConnection, version: HttpVersion,
|
||||||
code: HttpCode, keepAlive = true,
|
code: HttpCode, keepAlive = true,
|
||||||
@ -441,6 +480,9 @@ proc processLoop(server: HttpServer, transp: StreamTransport) {.async.} =
|
|||||||
# sent to client, so we going to send HTTP503 error.
|
# sent to client, so we going to send HTTP503 error.
|
||||||
discard await conn.sendErrorResponse(HttpVersion11, Http503, true)
|
discard await conn.sendErrorResponse(HttpVersion11, Http503, true)
|
||||||
|
|
||||||
|
## Perform cleanup of request instance
|
||||||
|
await arg.get().clear()
|
||||||
|
|
||||||
if not(keepConn):
|
if not(keepConn):
|
||||||
break
|
break
|
||||||
|
|
||||||
@ -542,6 +584,7 @@ proc post*(req: HttpRequest): Future[HttpTable] {.async.} =
|
|||||||
|
|
||||||
if UrlencodedForm in req.requestFlags:
|
if UrlencodedForm in req.requestFlags:
|
||||||
var table = HttpTable.init()
|
var table = HttpTable.init()
|
||||||
|
# getBody() will handle `Expect`.
|
||||||
var body = await req.getBody()
|
var body = await req.getBody()
|
||||||
## TODO (cheatfate) double copy here.
|
## TODO (cheatfate) double copy here.
|
||||||
var strbody = newString(len(body))
|
var strbody = newString(len(body))
|
||||||
@ -558,6 +601,8 @@ proc post*(req: HttpRequest): Future[HttpTable] {.async.} =
|
|||||||
let bres = getMultipartBoundary(req.headersTable.getList("content-type"))
|
let bres = getMultipartBoundary(req.headersTable.getList("content-type"))
|
||||||
if bres.isErr():
|
if bres.isErr():
|
||||||
raise newHttpCriticalError(bres.error)
|
raise newHttpCriticalError(bres.error)
|
||||||
|
# We must handle `Expect`.
|
||||||
|
await req.handleExpect()
|
||||||
var reader = req.getBodyStream()
|
var reader = req.getBodyStream()
|
||||||
if reader.isErr():
|
if reader.isErr():
|
||||||
raise newHttpCriticalError(reader.error)
|
raise newHttpCriticalError(reader.error)
|
||||||
@ -578,13 +623,51 @@ proc post*(req: HttpRequest): Future[HttpTable] {.async.} =
|
|||||||
req.postTable = some(table)
|
req.postTable = some(table)
|
||||||
return table
|
return table
|
||||||
else:
|
else:
|
||||||
if BoundBody in req.requestFlags:
|
if HttpRequestFlags.BoundBody in req.requestFlags:
|
||||||
if req.contentLength != 0:
|
if req.contentLength != 0:
|
||||||
raise newHttpCriticalError("Unsupported request body")
|
raise newHttpCriticalError("Unsupported request body")
|
||||||
return HttpTable.init()
|
return HttpTable.init()
|
||||||
elif UnboundBody in req.requestFlags:
|
elif HttpRequestFlags.UnboundBody in req.requestFlags:
|
||||||
raise newHttpCriticalError("Unsupported request body")
|
raise newHttpCriticalError("Unsupported request body")
|
||||||
|
|
||||||
|
proc `keepalive=`*(resp: HttpResponse, value: bool) =
|
||||||
|
if value:
|
||||||
|
resp.responseFlags.incl(KeepAlive)
|
||||||
|
else:
|
||||||
|
resp.responseFlags.excl(KeepAlive)
|
||||||
|
|
||||||
|
proc keepalive*(resp: HttpResponse): bool =
|
||||||
|
KeepAlive in resp.responseFlags
|
||||||
|
|
||||||
|
proc setHeader*(resp: HttpResponse, key, value: string) =
|
||||||
|
resp.httpTable.set(key, value)
|
||||||
|
|
||||||
|
proc addHeader*(resp: HttpResponse, key, value: string) =
|
||||||
|
resp.httpTable.add(key, value)
|
||||||
|
|
||||||
|
proc getHeader*(resp: HttpResponse, key: string): string =
|
||||||
|
resp.httpTable.getString(key)
|
||||||
|
|
||||||
|
proc getHeaderOrDefault*(resp: HttpResponse, key: string,
|
||||||
|
default: string = ""): string =
|
||||||
|
|
||||||
|
|
||||||
|
proc sendBody*(resp: HttpResponse, pbytes: pointer, nbytes: int) {.async.} =
|
||||||
|
resp.headersTable
|
||||||
|
|
||||||
|
proc sendBody*(resp: HttpResponse, data: string) {.async.}
|
||||||
|
var answer = resp.version & " " & resp.status & "\r\n"
|
||||||
|
answer.add("")
|
||||||
|
proc prepare*(resp: HttpResponse) {.async.} =
|
||||||
|
discard
|
||||||
|
|
||||||
|
proc sendChunk*(resp: HttpResponse) {.async.} =
|
||||||
|
discard
|
||||||
|
|
||||||
|
proc finish*(resp: HttpResponse) {.async.} =
|
||||||
|
discard
|
||||||
|
|
||||||
|
|
||||||
when isMainModule:
|
when isMainModule:
|
||||||
proc processCallback(req: RequestFence[HttpRequest]): Future[HttpStatus] {.
|
proc processCallback(req: RequestFence[HttpRequest]): Future[HttpStatus] {.
|
||||||
async.} =
|
async.} =
|
||||||
|
@ -60,16 +60,26 @@ proc add*(ht: var HttpTables, key: string, value: string) =
|
|||||||
proc add*(ht: var HttpTables, key: string, value: SomeInteger) =
|
proc add*(ht: var HttpTables, key: string, value: SomeInteger) =
|
||||||
ht.add(key, $value)
|
ht.add(key, $value)
|
||||||
|
|
||||||
|
proc set*(ht: var HttpTables, key: string, value: string) =
|
||||||
|
let lowkey = key.toLowerAscii()
|
||||||
|
ht.table[lowkey] = @[value]
|
||||||
|
|
||||||
proc contains*(ht: var HttpTables, key: string): bool =
|
proc contains*(ht: var HttpTables, key: string): bool =
|
||||||
ht.table.contains(key.toLowerAscii())
|
ht.table.contains(key.toLowerAscii())
|
||||||
|
|
||||||
proc getList*(ht: HttpTables, key: string): seq[string] =
|
proc getList*(ht: HttpTables, key: string,
|
||||||
var default: seq[string]
|
default: openarray[string] = []): seq[string] =
|
||||||
ht.table.getOrDefault(key.toLowerAscii(), default)
|
var defseq = @default
|
||||||
|
ht.table.getOrDefault(key.toLowerAscii(), defseq)
|
||||||
|
|
||||||
proc getString*(ht: HttpTables, key: string): string =
|
proc getString*(ht: HttpTables, key: string,
|
||||||
var default: seq[string]
|
default: string = ""): string =
|
||||||
ht.table.getOrDefault(key.toLowerAscii(), default).join(",")
|
var defseq = newSeq[string]()
|
||||||
|
let res = ht.table.getOrDefault(key.toLowerAscii(), defseq)
|
||||||
|
if len(res) == 0:
|
||||||
|
return default
|
||||||
|
else:
|
||||||
|
res.join(",")
|
||||||
|
|
||||||
proc count*(ht: HttpTables, key: string): int =
|
proc count*(ht: HttpTables, key: string): int =
|
||||||
var default: seq[string]
|
var default: seq[string]
|
||||||
|
Loading…
x
Reference in New Issue
Block a user