General fixes.
This commit is contained in:
parent
0b03f8ec50
commit
a310a5620a
|
@ -11,7 +11,7 @@ import stew/results, httputils
|
|||
import ../../asyncloop, ../../asyncsync
|
||||
import ../../streams/[asyncstream, boundstream, chunkstream]
|
||||
import httptable, httpcommon, multipart
|
||||
export httpcommon, multipart
|
||||
export httptable, httpcommon, multipart
|
||||
|
||||
when defined(useChroniclesLogging):
|
||||
echo "Importing chronicles"
|
||||
|
@ -21,12 +21,12 @@ type
|
|||
HttpServerFlags* {.pure.} = enum
|
||||
Secure, NoExpectHandler
|
||||
|
||||
HttpStatus* = enum
|
||||
DropConnection, KeepConnection
|
||||
|
||||
HTTPServerError* {.pure.} = enum
|
||||
HttpServerError* {.pure.} = enum
|
||||
TimeoutError, CatchableError, RecoverableError, CriticalError
|
||||
|
||||
HttpServerState* {.pure.} = enum
|
||||
ServerRunning, ServerStopped, ServerClosed
|
||||
|
||||
HttpProcessError* = object
|
||||
error*: HTTPServerError
|
||||
exc*: ref CatchableError
|
||||
|
@ -39,12 +39,15 @@ type
|
|||
ClientExpect
|
||||
|
||||
HttpResponseFlags* {.pure.} = enum
|
||||
Prepared, DataSent, DataSending, KeepAlive
|
||||
KeepAlive, Chunked
|
||||
|
||||
HttpResponseState* {.pure.} = enum
|
||||
Empty, Prepared, Sending, Finished, Failed, Cancelled, Dumb
|
||||
|
||||
HttpProcessCallback* =
|
||||
proc(req: RequestFence[HttpRequest]): Future[HttpStatus] {.gcsafe.}
|
||||
proc(req: RequestFence[HttpRequestRef]): Future[HttpResponseRef] {.gcsafe.}
|
||||
|
||||
HttpServer* = ref object of RootRef
|
||||
HttpServer* = object of RootObj
|
||||
instance*: StreamServer
|
||||
# semaphore*: AsyncSemaphore
|
||||
maxConnections*: int
|
||||
|
@ -59,10 +62,9 @@ type
|
|||
maxRequestBodySize: int
|
||||
processCallback: HttpProcessCallback
|
||||
|
||||
HttpServerState* = enum
|
||||
ServerRunning, ServerStopped, ServerClosed
|
||||
HttpServerRef* = ref HttpServer
|
||||
|
||||
HttpRequest* = ref object of RootRef
|
||||
HttpRequest* = object of RootObj
|
||||
headersTable: HttpTable
|
||||
queryTable: HttpTable
|
||||
postTable: Option[HttpTable]
|
||||
|
@ -76,40 +78,38 @@ type
|
|||
transferEncoding*: set[TransferEncodingFlags]
|
||||
requestFlags*: set[HttpRequestFlags]
|
||||
contentLength: int
|
||||
connection*: HttpConnection
|
||||
mainReader*: AsyncStreamReader
|
||||
mainWriter*: AsyncStreamWriter
|
||||
response*: Option[HttpResponse]
|
||||
connection*: HttpConnectionRef
|
||||
response*: Option[HttpResponseRef]
|
||||
|
||||
HttpResponse* = ref object of RootRef
|
||||
HttpRequestRef* = ref HttpRequest
|
||||
|
||||
HttpResponse* = object of RootObj
|
||||
status*: HttpCode
|
||||
version*: HttpVersion
|
||||
headersTable: HttpTable
|
||||
body*: seq[byte]
|
||||
responseFlags*: set[HttpResponseFlags]
|
||||
connection*: HttpConnection
|
||||
mainWriter: AsyncStreamWriter
|
||||
body: seq[byte]
|
||||
flags: set[HttpResponseFlags]
|
||||
state: HttpResponseState
|
||||
connection*: HttpConnectionRef
|
||||
chunkedWriter: AsyncStreamWriter
|
||||
|
||||
HttpConnection* = ref object of RootRef
|
||||
server*: HttpServer
|
||||
HttpResponseRef* = ref HttpResponse
|
||||
|
||||
HttpConnection* = object of RootObj
|
||||
server*: HttpServerRef
|
||||
transp: StreamTransport
|
||||
mainReader*: AsyncStreamReader
|
||||
mainWriter*: AsyncStreamWriter
|
||||
buffer: seq[byte]
|
||||
|
||||
HttpConnectionRef* = ref HttpConnection
|
||||
|
||||
proc init(htype: typedesc[HttpProcessError], error: HTTPServerError,
|
||||
exc: ref CatchableError,
|
||||
remote: TransportAddress): HttpProcessError =
|
||||
HttpProcessError(error: error, exc: exc, remote: remote)
|
||||
|
||||
proc new*(htype: typedesc[HttpResponse], req: HttpRequest): HttpResponse =
|
||||
HttpResponse(
|
||||
status: Http200,
|
||||
version: req.version,
|
||||
headersTable: HttpTable.init(),
|
||||
connection: req.connection,
|
||||
mainWriter: req.mainWriter
|
||||
)
|
||||
|
||||
proc new*(htype: typedesc[HttpServer],
|
||||
proc new*(htype: typedesc[HttpServerRef],
|
||||
address: TransportAddress,
|
||||
processCallback: HttpProcessCallback,
|
||||
serverFlags: set[HttpServerFlags] = {},
|
||||
|
@ -121,9 +121,9 @@ proc new*(htype: typedesc[HttpServer],
|
|||
httpHeadersTimeout = 10.seconds,
|
||||
httpBodyTimeout = 30.seconds,
|
||||
maxHeadersSize: int = 8192,
|
||||
maxRequestBodySize: int = 1_048_576): HttpResult[HttpServer] =
|
||||
maxRequestBodySize: int = 1_048_576): HttpResult[HttpServerRef] =
|
||||
|
||||
var res = HttpServer(
|
||||
var res = HttpServerRef(
|
||||
maxConnections: maxConnections,
|
||||
headersTimeout: httpHeadersTimeout,
|
||||
bodyTimeout: httpBodyTimeout,
|
||||
|
@ -156,18 +156,37 @@ proc new*(htype: typedesc[HttpServer],
|
|||
except CatchableError as exc:
|
||||
return err(exc.msg)
|
||||
|
||||
proc getResponse*(req: HttpRequestRef): HttpResponseRef =
|
||||
if req.response.isNone():
|
||||
var resp = HttpResponseRef(
|
||||
status: Http200,
|
||||
state: HttpResponseState.Empty,
|
||||
version: req.version,
|
||||
headersTable: HttpTable.init(),
|
||||
connection: req.connection,
|
||||
flags: if req.version == HttpVersion11: {KeepAlive} else: {}
|
||||
)
|
||||
req.response = some(resp)
|
||||
resp
|
||||
else:
|
||||
req.response.get()
|
||||
|
||||
proc dumbResponse*(): HttpResponseRef =
|
||||
## Create an empty response to return when request processor got no request.
|
||||
HttpResponseRef(state: HttpResponseState.Dumb, version: HttpVersion11)
|
||||
|
||||
proc getId(transp: StreamTransport): string {.inline.} =
|
||||
## Returns string unique transport's identifier as string.
|
||||
$transp.remoteAddress() & "_" & $transp.localAddress()
|
||||
|
||||
proc hasBody*(request: HttpRequest): bool =
|
||||
proc hasBody*(request: HttpRequestRef): bool =
|
||||
## Returns ``true`` if request has body.
|
||||
request.requestFlags * {HttpRequestFlags.BoundBody,
|
||||
HttpRequestFlags.UnboundBody} != {}
|
||||
|
||||
proc prepareRequest(conn: HttpConnection,
|
||||
req: HttpRequestHeader): HttpResultCode[HttpRequest] =
|
||||
var request = HttpRequest(connection: conn)
|
||||
proc prepareRequest(conn: HttpConnectionRef,
|
||||
req: HttpRequestHeader): HttpResultCode[HttpRequestRef] =
|
||||
var request = HttpRequestRef(connection: conn)
|
||||
|
||||
if req.version notin {HttpVersion10, HttpVersion11}:
|
||||
return err(Http505)
|
||||
|
@ -274,28 +293,24 @@ proc prepareRequest(conn: HttpConnection,
|
|||
if strip(expectHeader).toLowerAscii() == "100-continue":
|
||||
request.requestFlags.incl(HttpRequestFlags.ClientExpect)
|
||||
|
||||
request.mainReader = newAsyncStreamReader(conn.transp)
|
||||
request.mainWriter = newAsyncStreamWriter(conn.transp)
|
||||
ok(request)
|
||||
|
||||
proc clear*(request: HttpRequest) {.async.} =
|
||||
await allFutures(
|
||||
request.mainReader.closeWait(),
|
||||
request.mainWriter.closeWait(),
|
||||
)
|
||||
|
||||
proc getBodyStream*(request: HttpRequest): HttpResult[AsyncStreamReader] =
|
||||
proc getBodyStream*(request: HttpRequestRef): HttpResult[AsyncStreamReader] =
|
||||
## Returns stream's reader instance which can be used to read request's body.
|
||||
##
|
||||
## Please be sure to handle ``Expect`` header properly.
|
||||
##
|
||||
## Streams which was obtained using this procedure must be closed to avoid
|
||||
## leaks.
|
||||
if HttpRequestFlags.BoundBody in request.requestFlags:
|
||||
ok(newBoundedStreamReader(request.mainReader, request.contentLength))
|
||||
ok(newBoundedStreamReader(request.connection.mainReader,
|
||||
request.contentLength))
|
||||
elif HttpRequestFlags.UnboundBody in request.requestFlags:
|
||||
ok(newChunkedStreamReader(request.mainReader))
|
||||
ok(newChunkedStreamReader(request.connection.mainReader))
|
||||
else:
|
||||
err("Request do not have body available")
|
||||
|
||||
proc handleExpect*(request: HttpRequest) {.async.} =
|
||||
proc handleExpect*(request: HttpRequestRef) {.async.} =
|
||||
## Handle expectation for ``Expect`` header.
|
||||
## https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Expect
|
||||
if HttpServerFlags.NoExpectHandler notin request.connection.server.flags:
|
||||
|
@ -303,13 +318,13 @@ proc handleExpect*(request: HttpRequest) {.async.} =
|
|||
if request.version == HttpVersion11:
|
||||
try:
|
||||
let message = $request.version & " " & $Http100 & "\r\n\r\n"
|
||||
await request.mainWriter.write(message)
|
||||
await request.connection.mainWriter.write(message)
|
||||
except CancelledError as exc:
|
||||
raise exc
|
||||
except AsyncStreamWriteError, AsyncStreamIncompleteError:
|
||||
raise newHttpCriticalError("Unable to send `100-continue` response")
|
||||
|
||||
proc getBody*(request: HttpRequest): Future[seq[byte]] {.async.} =
|
||||
proc getBody*(request: HttpRequestRef): Future[seq[byte]] {.async.} =
|
||||
## Obtain request's body as sequence of bytes.
|
||||
let res = request.getBodyStream()
|
||||
if res.isErr():
|
||||
|
@ -320,8 +335,10 @@ proc getBody*(request: HttpRequest): Future[seq[byte]] {.async.} =
|
|||
return await read(res.get())
|
||||
except AsyncStreamError:
|
||||
raise newHttpCriticalError("Unable to read request's body")
|
||||
finally:
|
||||
await closeWait(res.get())
|
||||
|
||||
proc consumeBody*(request: HttpRequest): Future[void] {.async.} =
|
||||
proc consumeBody*(request: HttpRequestRef): Future[void] {.async.} =
|
||||
## Consume/discard request's body.
|
||||
let res = request.getBodyStream()
|
||||
if res.isErr():
|
||||
|
@ -331,11 +348,12 @@ proc consumeBody*(request: HttpRequest): Future[void] {.async.} =
|
|||
try:
|
||||
await request.handleExpect()
|
||||
discard await reader.consume()
|
||||
return
|
||||
except AsyncStreamError:
|
||||
raise newHttpCriticalError("Unable to consume request's body")
|
||||
finally:
|
||||
await closeWait(res.get())
|
||||
|
||||
proc sendErrorResponse(conn: HttpConnection, version: HttpVersion,
|
||||
proc sendErrorResponse(conn: HttpConnectionRef, version: HttpVersion,
|
||||
code: HttpCode, keepAlive = true,
|
||||
datatype = "text/text",
|
||||
databody = ""): Future[bool] {.async.} =
|
||||
|
@ -352,22 +370,16 @@ proc sendErrorResponse(conn: HttpConnection, version: HttpVersion,
|
|||
if len(databody) > 0:
|
||||
answer.add(databody)
|
||||
try:
|
||||
let res {.used.} = await conn.transp.write(answer)
|
||||
await conn.mainWriter.write(answer)
|
||||
return true
|
||||
except CancelledError:
|
||||
return false
|
||||
except TransportOsError:
|
||||
except AsyncStreamWriteError:
|
||||
return false
|
||||
except CatchableError:
|
||||
except AsyncStreamIncompleteError:
|
||||
return false
|
||||
|
||||
proc sendErrorResponse*(request: HttpRequest, code: HttpCode, keepAlive = true,
|
||||
datatype = "text/text",
|
||||
databody = ""): Future[bool] =
|
||||
sendErrorResponse(request.connection, request.version, code, keepAlive,
|
||||
datatype, databody)
|
||||
|
||||
proc getRequest*(conn: HttpConnection): Future[HttpRequest] {.async.} =
|
||||
proc getRequest*(conn: HttpConnectionRef): Future[HttpRequestRef] {.async.} =
|
||||
try:
|
||||
conn.buffer.setLen(conn.server.maxHeadersSize)
|
||||
let res = await conn.transp.readUntil(addr conn.buffer[0], len(conn.buffer),
|
||||
|
@ -392,39 +404,56 @@ proc getRequest*(conn: HttpConnection): Future[HttpRequest] {.async.} =
|
|||
discard await conn.sendErrorResponse(HttpVersion11, Http413, false)
|
||||
raise newHttpCriticalError("Maximum size of request headers reached")
|
||||
|
||||
proc processLoop(server: HttpServer, transp: StreamTransport) {.async.} =
|
||||
var conn = HttpConnection(
|
||||
transp: transp, buffer: newSeq[byte](server.maxHeadersSize),
|
||||
server: server
|
||||
proc new(ht: typedesc[HttpConnectionRef], server: HttpServerRef,
|
||||
transp: StreamTransport): HttpConnectionRef =
|
||||
HttpConnectionRef(
|
||||
transp: transp,
|
||||
server: server,
|
||||
buffer: newSeq[byte](server.maxHeadersSize),
|
||||
mainReader: newAsyncStreamReader(transp),
|
||||
mainWriter: newAsyncStreamWriter(transp)
|
||||
)
|
||||
|
||||
proc close(conn: HttpConnectionRef): Future[void] =
|
||||
allFutures(conn.mainReader.closeWait(), conn.mainWriter.closeWait(),
|
||||
conn.transp.closeWait())
|
||||
|
||||
proc close(req: HttpRequestRef) {.async.} =
|
||||
if req.response.isSome():
|
||||
let resp = req.response.get()
|
||||
if (HttpResponseFlags.Chunked in resp.flags) and
|
||||
not(isNil(resp.chunkedWriter)):
|
||||
await resp.chunkedWriter.closeWait()
|
||||
|
||||
proc processLoop(server: HttpServerRef, transp: StreamTransport) {.async.} =
|
||||
var conn = HttpConnectionRef.new(server, transp)
|
||||
var breakLoop = false
|
||||
while true:
|
||||
var
|
||||
arg: RequestFence[HttpRequest]
|
||||
resp = HttpStatus.DropConnection
|
||||
arg: RequestFence[HttpRequestRef]
|
||||
resp: HttpResponseRef
|
||||
|
||||
try:
|
||||
let request = await conn.getRequest().wait(server.headersTimeout)
|
||||
arg = RequestFence[HttpRequest].ok(request)
|
||||
arg = RequestFence[HttpRequestRef].ok(request)
|
||||
except CancelledError:
|
||||
breakLoop = true
|
||||
except AsyncTimeoutError as exc:
|
||||
let error = HttpProcessError.init(HTTPServerError.TimeoutError, exc,
|
||||
transp.remoteAddress())
|
||||
arg = RequestFence[HttpRequest].err(error)
|
||||
arg = RequestFence[HttpRequestRef].err(error)
|
||||
except HttpRecoverableError as exc:
|
||||
let error = HttpProcessError.init(HTTPServerError.RecoverableError, exc,
|
||||
transp.remoteAddress())
|
||||
arg = RequestFence[HttpRequest].err(error)
|
||||
arg = RequestFence[HttpRequestRef].err(error)
|
||||
except HttpCriticalError as exc:
|
||||
let error = HttpProcessError.init(HTTPServerError.CriticalError, exc,
|
||||
transp.remoteAddress())
|
||||
arg = RequestFence[HttpRequest].err(error)
|
||||
arg = RequestFence[HttpRequestRef].err(error)
|
||||
except CatchableError as exc:
|
||||
let error = HttpProcessError.init(HTTPServerError.CatchableError, exc,
|
||||
transp.remoteAddress())
|
||||
arg = RequestFence[HttpRequest].err(error)
|
||||
arg = RequestFence[HttpRequestRef].err(error)
|
||||
|
||||
if breakLoop:
|
||||
break
|
||||
|
@ -437,61 +466,51 @@ proc processLoop(server: HttpServer, transp: StreamTransport) {.async.} =
|
|||
except CancelledError:
|
||||
breakLoop = true
|
||||
except CatchableError as exc:
|
||||
resp = DropConnection
|
||||
echo "Exception received from processor callback ", exc.name
|
||||
lastError = exc
|
||||
|
||||
if breakLoop:
|
||||
break
|
||||
|
||||
let keepConn =
|
||||
case resp
|
||||
of DropConnection:
|
||||
false
|
||||
of KeepConnection:
|
||||
false
|
||||
|
||||
if arg.isErr():
|
||||
case arg.error().error
|
||||
of HTTPServerError.TimeoutError:
|
||||
discard await conn.sendErrorResponse(HttpVersion11, Http408, keepConn)
|
||||
discard await conn.sendErrorResponse(HttpVersion11, Http408, false)
|
||||
of HTTPServerError.RecoverableError:
|
||||
discard await conn.sendErrorResponse(HttpVersion11, Http400, keepConn)
|
||||
discard await conn.sendErrorResponse(HttpVersion11, Http400, false)
|
||||
of HTTPServerError.CriticalError:
|
||||
discard await conn.sendErrorResponse(HttpVersion11, Http400, keepConn)
|
||||
discard await conn.sendErrorResponse(HttpVersion11, Http400, false)
|
||||
of HTTPServerError.CatchableError:
|
||||
discard await conn.sendErrorResponse(HttpVersion11, Http400, keepConn)
|
||||
if not(keepConn):
|
||||
break
|
||||
discard await conn.sendErrorResponse(HttpVersion11, Http400, false)
|
||||
break
|
||||
else:
|
||||
let request = arg.get()
|
||||
let keepConn = if request.version == HttpVersion11: true else: false
|
||||
if isNil(lastError):
|
||||
echo "lastError = nil"
|
||||
echo "mainWriter.bytesCount = ", arg.get().mainWriter.bytesCount
|
||||
|
||||
if arg.get().mainWriter.bytesCount == 0'u64:
|
||||
echo "Sending 404 keepConn = ", keepConn
|
||||
# Processor callback finished without an error, but response was not
|
||||
# sent to client, so we going to send HTTP404 error.
|
||||
case resp.state
|
||||
of HttpResponseState.Empty:
|
||||
# Response was ignored
|
||||
discard await conn.sendErrorResponse(HttpVersion11, Http404, keepConn)
|
||||
echo "bytesCount = ", arg.get().mainWriter.bytesCount
|
||||
of HttpResponseState.Prepared:
|
||||
# Response was prepared but not sent.
|
||||
discard await conn.sendErrorResponse(HttpVersion11, Http409, keepConn)
|
||||
else:
|
||||
# some data was already sent to the client.
|
||||
discard
|
||||
else:
|
||||
if arg.get().mainWriter.bytesCount == 0'u64:
|
||||
# Processor callback finished with an error, but response was not
|
||||
# sent to client, so we going to send HTTP503 error.
|
||||
discard await conn.sendErrorResponse(HttpVersion11, Http503, true)
|
||||
discard await conn.sendErrorResponse(HttpVersion11, Http503, true)
|
||||
|
||||
## Perform cleanup of request instance
|
||||
await arg.get().clear()
|
||||
# Closing and releasing all the request resources.
|
||||
await request.close()
|
||||
|
||||
if not(keepConn):
|
||||
break
|
||||
|
||||
await transp.closeWait()
|
||||
await conn.close()
|
||||
server.connections.del(transp.getId())
|
||||
# if server.maxConnections > 0:
|
||||
# server.semaphore.release()
|
||||
|
||||
proc acceptClientLoop(server: HttpServer) {.async.} =
|
||||
proc acceptClientLoop(server: HttpServerRef) {.async.} =
|
||||
var breakLoop = false
|
||||
while true:
|
||||
try:
|
||||
|
@ -518,7 +537,7 @@ proc acceptClientLoop(server: HttpServer) {.async.} =
|
|||
if breakLoop:
|
||||
break
|
||||
|
||||
proc state*(server: HttpServer): HttpServerState =
|
||||
proc state*(server: HttpServerRef): HttpServerState =
|
||||
## Returns current HTTP server's state.
|
||||
if server.lifetime.finished():
|
||||
ServerClosed
|
||||
|
@ -531,22 +550,22 @@ proc state*(server: HttpServer): HttpServerState =
|
|||
else:
|
||||
ServerRunning
|
||||
|
||||
proc start*(server: HttpServer) =
|
||||
proc start*(server: HttpServerRef) =
|
||||
## Starts HTTP server.
|
||||
if server.state == ServerStopped:
|
||||
server.acceptLoop = acceptClientLoop(server)
|
||||
|
||||
proc stop*(server: HttpServer) {.async.} =
|
||||
proc stop*(server: HttpServerRef) {.async.} =
|
||||
## Stop HTTP server from accepting new connections.
|
||||
if server.state == ServerRunning:
|
||||
await server.acceptLoop.cancelAndWait()
|
||||
|
||||
proc drop*(server: HttpServer) {.async.} =
|
||||
proc drop*(server: HttpServerRef) {.async.} =
|
||||
## Drop all pending HTTP connections.
|
||||
if server.state in {ServerStopped, ServerRunning}:
|
||||
discard
|
||||
|
||||
proc close*(server: HttpServer) {.async.} =
|
||||
proc close*(server: HttpServerRef) {.async.} =
|
||||
## Stop HTTP server and drop all the pending connections.
|
||||
if server.state != ServerClosed:
|
||||
await server.stop()
|
||||
|
@ -554,7 +573,7 @@ proc close*(server: HttpServer) {.async.} =
|
|||
await server.instance.closeWait()
|
||||
server.lifetime.complete()
|
||||
|
||||
proc join*(server: HttpServer): Future[void] =
|
||||
proc join*(server: HttpServerRef): Future[void] =
|
||||
## Wait until HTTP server will not be closed.
|
||||
var retFuture = newFuture[void]("http.server.join")
|
||||
|
||||
|
@ -574,7 +593,25 @@ proc join*(server: HttpServer): Future[void] =
|
|||
|
||||
retFuture
|
||||
|
||||
proc post*(req: HttpRequest): Future[HttpTable] {.async.} =
|
||||
proc getMultipartReader*(req: HttpRequestRef): HttpResult[MultiPartReaderRef] =
|
||||
## Create new MultiPartReader interface for specific request.
|
||||
if req.meth in PostMethods:
|
||||
if MultipartForm in req.requestFlags:
|
||||
let ctype = ? getContentType(req.headersTable.getList("content-type"))
|
||||
if ctype != "multipart/form-data":
|
||||
err("Content type is not supported")
|
||||
else:
|
||||
let boundary = ? getMultipartBoundary(
|
||||
req.headersTable.getList("content-type")
|
||||
)
|
||||
var stream = ? req.getBodyStream()
|
||||
ok(MultiPartReaderRef.new(stream, boundary))
|
||||
else:
|
||||
err("Request's data is not multipart encoded")
|
||||
else:
|
||||
err("Request's method do not supports multipart")
|
||||
|
||||
proc post*(req: HttpRequestRef): Future[HttpTable] {.async.} =
|
||||
## Return POST parameters
|
||||
if req.postTable.isSome():
|
||||
return req.postTable.get()
|
||||
|
@ -595,31 +632,42 @@ proc post*(req: HttpRequest): Future[HttpTable] {.async.} =
|
|||
req.postTable = some(table)
|
||||
return table
|
||||
elif MultipartForm in req.requestFlags:
|
||||
let cres = getContentType(req.headersTable.getList("content-type"))
|
||||
if cres.isErr():
|
||||
raise newHttpCriticalError(cres.error)
|
||||
let bres = getMultipartBoundary(req.headersTable.getList("content-type"))
|
||||
if bres.isErr():
|
||||
raise newHttpCriticalError(bres.error)
|
||||
# We must handle `Expect`.
|
||||
await req.handleExpect()
|
||||
var reader = req.getBodyStream()
|
||||
if reader.isErr():
|
||||
raise newHttpCriticalError(reader.error)
|
||||
var mpreader = MultiPartReaderRef.new(reader.get(), bres.get())
|
||||
var table = HttpTable.init()
|
||||
let res = getMultipartReader(req)
|
||||
if res.isErr():
|
||||
raise newHttpCriticalError("Unable to retrieve multipart form data")
|
||||
var mpreader = res.get()
|
||||
|
||||
# We must handle `Expect` first.
|
||||
try:
|
||||
await req.handleExpect()
|
||||
except HttpCriticalError as exc:
|
||||
await mpreader.close()
|
||||
raise exc
|
||||
|
||||
# Reading multipart/form-data parts.
|
||||
var runLoop = true
|
||||
var loopError: ref MultipartError
|
||||
while runLoop:
|
||||
try:
|
||||
let res = await mpreader.readPart()
|
||||
var value = await res.getBody()
|
||||
let part = await mpreader.readPart()
|
||||
var value = await part.getBody()
|
||||
## TODO (cheatfate) double copy here.
|
||||
var strvalue = newString(len(value))
|
||||
if len(value) > 0:
|
||||
copyMem(addr strvalue[0], addr value[0], len(value))
|
||||
table.add(res.name, strvalue)
|
||||
table.add(part.name, strvalue)
|
||||
await part.close()
|
||||
except MultiPartEoM:
|
||||
runLoop = false
|
||||
except MultipartError as exc:
|
||||
# We preserve error to avoid Nim's exception transformation bug.
|
||||
runLoop = false
|
||||
loopError = exc
|
||||
|
||||
await mpreader.close()
|
||||
if not(isNil(loopError)):
|
||||
raise loopError
|
||||
req.postTable = some(table)
|
||||
return table
|
||||
else:
|
||||
|
@ -630,58 +678,225 @@ proc post*(req: HttpRequest): Future[HttpTable] {.async.} =
|
|||
elif HttpRequestFlags.UnboundBody in req.requestFlags:
|
||||
raise newHttpCriticalError("Unsupported request body")
|
||||
|
||||
proc `keepalive=`*(resp: HttpResponse, value: bool) =
|
||||
proc `keepalive=`*(resp: HttpResponseRef, value: bool) =
|
||||
doAssert(resp.state == HttpResponseState.Empty)
|
||||
if value:
|
||||
resp.responseFlags.incl(KeepAlive)
|
||||
resp.flags.incl(KeepAlive)
|
||||
else:
|
||||
resp.responseFlags.excl(KeepAlive)
|
||||
resp.flags.excl(KeepAlive)
|
||||
|
||||
proc keepalive*(resp: HttpResponse): bool =
|
||||
KeepAlive in resp.responseFlags
|
||||
proc keepalive*(resp: HttpResponseRef): bool =
|
||||
KeepAlive in resp.flags
|
||||
|
||||
proc setHeader*(resp: HttpResponse, key, value: string) =
|
||||
resp.httpTable.set(key, value)
|
||||
proc setHeader*(resp: HttpResponseRef, key, value: string) =
|
||||
doAssert(resp.state == HttpResponseState.Empty)
|
||||
resp.headersTable.set(key, value)
|
||||
|
||||
proc addHeader*(resp: HttpResponse, key, value: string) =
|
||||
resp.httpTable.add(key, value)
|
||||
proc addHeader*(resp: HttpResponseRef, key, value: string) =
|
||||
doAssert(resp.state == HttpResponseState.Empty)
|
||||
resp.headersTable.add(key, value)
|
||||
|
||||
proc getHeader*(resp: HttpResponse, key: string): string =
|
||||
resp.httpTable.getString(key)
|
||||
proc getHeader*(resp: HttpResponseRef, key: string,
|
||||
default: string = ""): string =
|
||||
resp.headersTable.getString(key, default)
|
||||
|
||||
proc getHeaderOrDefault*(resp: HttpResponse, key: string,
|
||||
default: string = ""): string =
|
||||
proc hasHeader*(resp: HttpResponseRef, key: string): bool =
|
||||
key in resp.headersTable
|
||||
|
||||
template doHeaderDef(buf, resp, name, default) =
|
||||
buf.add(name)
|
||||
buf.add(": ")
|
||||
buf.add(resp.getHeader(name, default))
|
||||
buf.add("\r\n")
|
||||
|
||||
proc sendBody*(resp: HttpResponse, pbytes: pointer, nbytes: int) {.async.} =
|
||||
resp.headersTable
|
||||
template doHeaderVal(buf, name, value) =
|
||||
buf.add(name)
|
||||
buf.add(": ")
|
||||
buf.add(value)
|
||||
buf.add("\r\n")
|
||||
|
||||
proc sendBody*(resp: HttpResponse, data: string) {.async.}
|
||||
var answer = resp.version & " " & resp.status & "\r\n"
|
||||
answer.add("")
|
||||
proc prepare*(resp: HttpResponse) {.async.} =
|
||||
discard
|
||||
template checkPending(t: untyped) =
|
||||
if t.state != HttpResponseState.Empty:
|
||||
raise newHttpCriticalError("Response body was already sent")
|
||||
|
||||
proc sendChunk*(resp: HttpResponse) {.async.} =
|
||||
discard
|
||||
proc prepareLengthHeaders(resp: HttpResponseRef, length: int): string =
|
||||
var answer = $(resp.version) & " " & $(resp.status) & "\r\n"
|
||||
answer.doHeaderDef(resp, "Date", httpDate())
|
||||
answer.doHeaderDef(resp, "Content-Type", "text/html; charset=utf-8")
|
||||
if length > 0:
|
||||
answer.doHeaderVal("Content-Length", $(length))
|
||||
if "Connection" notin resp.headersTable:
|
||||
if KeepAlive in resp.flags:
|
||||
answer.doHeaderVal("Connection", "keep-alive")
|
||||
else:
|
||||
answer.doHeaderVal("Connection", "close")
|
||||
for k, v in resp.headersTable.stringItems():
|
||||
if k notin ["date", "content-type", "content-length"]:
|
||||
answer.doHeaderVal(normalizeHeaderName(k), v)
|
||||
answer.add("\r\n")
|
||||
answer
|
||||
|
||||
proc finish*(resp: HttpResponse) {.async.} =
|
||||
discard
|
||||
proc prepareChunkedHeaders(resp: HttpResponseRef): string =
|
||||
var answer = $(resp.version) & " " & $(resp.status) & "\r\n"
|
||||
answer.doHeaderDef(resp, "Date", httpDate())
|
||||
answer.doHeaderDef(resp, "Content-Type", "text/html; charset=utf-8")
|
||||
answer.doHeaderDef(resp, "Transfer-Encoding", "chunked")
|
||||
if "Connection" notin resp.headersTable:
|
||||
if KeepAlive in resp.flags:
|
||||
answer.doHeaderVal("Connection", "keep-alive")
|
||||
else:
|
||||
answer.doHeaderVal("Connection", "close")
|
||||
for k, v in resp.headersTable.stringItems():
|
||||
if k notin ["date", "content-type", "content-length", "transfer-encoding"]:
|
||||
answer.doHeaderVal(normalizeHeaderName(k), v)
|
||||
answer.add("\r\n")
|
||||
answer
|
||||
|
||||
proc sendBody*(resp: HttpResponseRef, pbytes: pointer, nbytes: int) {.async.} =
|
||||
## Send HTTP response at once by using bytes pointer ``pbytes`` and length
|
||||
## ``nbytes``.
|
||||
doAssert(not(isNil(pbytes)), "pbytes must not be nil")
|
||||
doAssert(nbytes >= 0, "nbytes should be bigger or equal to zero")
|
||||
checkPending(resp)
|
||||
let responseHeaders = resp.prepareLengthHeaders(nbytes)
|
||||
resp.state = HttpResponseState.Prepared
|
||||
try:
|
||||
resp.state = HttpResponseState.Sending
|
||||
await resp.connection.mainWriter.write(responseHeaders)
|
||||
if nbytes > 0:
|
||||
await resp.connection.mainWriter.write(pbytes, nbytes)
|
||||
resp.state = HttpResponseState.Finished
|
||||
except CancelledError as exc:
|
||||
resp.state = HttpResponseState.Cancelled
|
||||
raise exc
|
||||
except AsyncStreamWriteError, AsyncStreamIncompleteError:
|
||||
resp.state = HttpResponseState.Failed
|
||||
raise newHttpCriticalError("Unable to send response")
|
||||
|
||||
proc sendBody*[T: string|seq[byte]](resp: HttpResponseRef, data: T) {.async.} =
|
||||
## Send HTTP response at once by using data ``data``.
|
||||
checkPending(resp)
|
||||
let responseHeaders = resp.prepareLengthHeaders(len(data))
|
||||
resp.state = HttpResponseState.Prepared
|
||||
try:
|
||||
resp.state = HttpResponseState.Sending
|
||||
await resp.connection.mainWriter.write(responseHeaders)
|
||||
if len(data) > 0:
|
||||
await resp.connection.mainWriter.write(data)
|
||||
resp.state = HttpResponseState.Finished
|
||||
except CancelledError as exc:
|
||||
resp.state = HttpResponseState.Cancelled
|
||||
raise exc
|
||||
except AsyncStreamWriteError, AsyncStreamIncompleteError:
|
||||
resp.state = HttpResponseState.Failed
|
||||
raise newHttpCriticalError("Unable to send response")
|
||||
|
||||
proc sendError*(resp: HttpResponseRef, code: HttpCode, body = "") {.async.} =
|
||||
## Send HTTP error status response.
|
||||
checkPending(resp)
|
||||
resp.status = code
|
||||
let responseHeaders = resp.prepareLengthHeaders(len(body))
|
||||
resp.state = HttpResponseState.Prepared
|
||||
try:
|
||||
resp.state = HttpResponseState.Sending
|
||||
await resp.connection.mainWriter.write(responseHeaders)
|
||||
if len(body) > 0:
|
||||
await resp.connection.mainWriter.write(body)
|
||||
resp.state = HttpResponseState.Finished
|
||||
except CancelledError as exc:
|
||||
resp.state = HttpResponseState.Cancelled
|
||||
raise exc
|
||||
except AsyncStreamWriteError, AsyncStreamIncompleteError:
|
||||
resp.state = HttpResponseState.Failed
|
||||
raise newHttpCriticalError("Unable to send response")
|
||||
|
||||
proc prepare*(resp: HttpResponseRef) {.async.} =
|
||||
## Prepare for HTTP stream response.
|
||||
##
|
||||
## Such responses will be sent chunk by chunk using ``chunked`` encoding.
|
||||
resp.checkPending()
|
||||
let responseHeaders = resp.prepareChunkedHeaders()
|
||||
resp.state = HttpResponseState.Prepared
|
||||
try:
|
||||
resp.state = HttpResponseState.Sending
|
||||
await resp.connection.mainWriter.write(responseHeaders)
|
||||
resp.chunkedWriter = newChunkedStreamWriter(resp.connection.mainWriter)
|
||||
resp.flags.incl(HttpResponseFlags.Chunked)
|
||||
except CancelledError as exc:
|
||||
resp.state = HttpResponseState.Cancelled
|
||||
raise exc
|
||||
except AsyncStreamWriteError, AsyncStreamIncompleteError:
|
||||
resp.state = HttpResponseState.Failed
|
||||
raise newHttpCriticalError("Unable to send response")
|
||||
|
||||
proc sendChunk*(resp: HttpResponseRef, pbytes: pointer, nbytes: int) {.async.} =
|
||||
## Send single chunk of data pointed by ``pbytes`` and ``nbytes``.
|
||||
doAssert(not(isNil(pbytes)), "pbytes must not be nil")
|
||||
doAssert(nbytes >= 0, "nbytes should be bigger or equal to zero")
|
||||
if HttpResponseFlags.Chunked notin resp.flags:
|
||||
raise newHttpCriticalError("Response was not prepared")
|
||||
if resp.state notin {HttpResponseState.Prepared, HttpResponseState.Sending}:
|
||||
raise newHttpCriticalError("Response in incorrect state")
|
||||
try:
|
||||
resp.state = HttpResponseState.Sending
|
||||
await resp.chunkedWriter.write(pbytes, nbytes)
|
||||
resp.state = HttpResponseState.Sending
|
||||
except CancelledError as exc:
|
||||
resp.state = HttpResponseState.Cancelled
|
||||
raise exc
|
||||
except AsyncStreamWriteError, AsyncStreamIncompleteError:
|
||||
resp.state = HttpResponseState.Failed
|
||||
raise newHttpCriticalError("Unable to send response")
|
||||
|
||||
proc sendChunk*[T: string|seq[byte]](resp: HttpResponseRef,
|
||||
data: T) {.async.} =
|
||||
## Send single chunk of data ``data``.
|
||||
if HttpResponseFlags.Chunked notin resp.flags:
|
||||
raise newHttpCriticalError("Response was not prepared")
|
||||
if resp.state notin {HttpResponseState.Prepared, HttpResponseState.Sending}:
|
||||
raise newHttpCriticalError("Response in incorrect state")
|
||||
try:
|
||||
resp.state = HttpResponseState.Sending
|
||||
await resp.chunkedWriter.write(data)
|
||||
resp.state = HttpResponseState.Sending
|
||||
except CancelledError as exc:
|
||||
resp.state = HttpResponseState.Cancelled
|
||||
raise exc
|
||||
except AsyncStreamWriteError, AsyncStreamIncompleteError:
|
||||
resp.state = HttpResponseState.Failed
|
||||
raise newHttpCriticalError("Unable to send response")
|
||||
|
||||
proc finish*(resp: HttpResponseRef) {.async.} =
|
||||
## Sending last chunk of data, so it will indicate end of HTTP response.
|
||||
if HttpResponseFlags.Chunked notin resp.flags:
|
||||
raise newHttpCriticalError("Response was not prepared")
|
||||
if resp.state notin {HttpResponseState.Prepared, HttpResponseState.Sending}:
|
||||
raise newHttpCriticalError("Response in incorrect state")
|
||||
try:
|
||||
resp.state = HttpResponseState.Sending
|
||||
await resp.chunkedWriter.finish()
|
||||
resp.state = HttpResponseState.Finished
|
||||
except CancelledError as exc:
|
||||
resp.state = HttpResponseState.Cancelled
|
||||
raise exc
|
||||
except AsyncStreamWriteError, AsyncStreamIncompleteError:
|
||||
resp.state = HttpResponseState.Failed
|
||||
raise newHttpCriticalError("Unable to send response")
|
||||
|
||||
when isMainModule:
|
||||
proc processCallback(req: RequestFence[HttpRequest]): Future[HttpStatus] {.
|
||||
proc process(req: RequestFence[HttpRequestRef]): Future[HttpResponseRef] {.
|
||||
async.} =
|
||||
if req.isOk():
|
||||
let request = req.get()
|
||||
echo "Got ", request.meth, " request"
|
||||
let post = await request.post()
|
||||
echo "post = ", post
|
||||
let response = request.getResponse()
|
||||
await response.sendBody("Got [" & $request.meth & "] request")
|
||||
return response
|
||||
else:
|
||||
echo "Got FAILURE", req.error()
|
||||
echo "process callback"
|
||||
return dumbResponse()
|
||||
|
||||
let res = HttpServer.new(initTAddress("127.0.0.1:30080"), processCallback,
|
||||
maxConnections = 1)
|
||||
let res = HttpServerRef.new(initTAddress("127.0.0.1:30080"), process,
|
||||
maxConnections = 1)
|
||||
if res.isOk():
|
||||
let server = res.get()
|
||||
server.start()
|
||||
|
|
|
@ -105,6 +105,15 @@ proc init*(htt: typedesc[HttpTable]): HttpTable =
|
|||
proc new*(htt: typedesc[HttpTableRef]): HttpTableRef =
|
||||
HttpTableRef(table: initTable[string, seq[string]]())
|
||||
|
||||
iterator stringItems*(ht: HttpTables): tuple[key: string, value: string] =
|
||||
for k, v in ht.table.pairs():
|
||||
for item in v:
|
||||
yield (k, item)
|
||||
|
||||
iterator items*(ht: HttpTables): tuple[key: string, value: seq[string]] =
|
||||
for k, v in ht.table.pairs():
|
||||
yield (k, v)
|
||||
|
||||
proc normalizeHeaderName*(value: string): string =
|
||||
var res = value.toLowerAscii()
|
||||
var k = 0
|
||||
|
|
|
@ -15,13 +15,13 @@ import httptable, httpcommon
|
|||
export httptable, httpcommon, asyncstream
|
||||
|
||||
type
|
||||
MultiPartSource {.pure.} = enum
|
||||
MultiPartSource* {.pure.} = enum
|
||||
Stream, Buffer
|
||||
|
||||
MultiPartReader* = object
|
||||
case kind: MultiPartSource
|
||||
of MultiPartSource.Stream:
|
||||
stream: AsyncStreamReader
|
||||
stream*: AsyncStreamReader
|
||||
of MultiPartSource.Buffer:
|
||||
discard
|
||||
firstTime: bool
|
||||
|
@ -35,7 +35,7 @@ type
|
|||
MultiPart* = object
|
||||
case kind: MultiPartSource
|
||||
of MultiPartSource.Stream:
|
||||
stream*: BoundedStreamReader
|
||||
stream: BoundedStreamReader
|
||||
of MultiPartSource.Buffer:
|
||||
discard
|
||||
buffer: seq[byte]
|
||||
|
@ -76,10 +76,13 @@ proc parseUntil*(s, until: openarray[byte]): int =
|
|||
proc init*[A: BChar, B: BChar](mpt: typedesc[MultiPartReader],
|
||||
buffer: openarray[A],
|
||||
boundary: openarray[B]): MultiPartReader =
|
||||
# Boundary should not be empty.
|
||||
## Create new MultiPartReader instance with `buffer` interface.
|
||||
##
|
||||
## ``buffer`` - is buffer which will be used to read data.
|
||||
## ``boundary`` - is multipart boundary, this value must not be empty.
|
||||
doAssert(len(boundary) > 0)
|
||||
# Our internal boundary has format `<CR><LF><-><-><boundary>`, so we can reuse
|
||||
# different parts of this sequence for processing.
|
||||
# Our internal boundary has format `<CR><LF><-><-><boundary>`, so we can
|
||||
# reuse different parts of this sequence for processing.
|
||||
var fboundary = newSeq[byte](len(boundary) + 4)
|
||||
fboundary[0] = 0x0D'u8
|
||||
fboundary[1] = 0x0A'u8
|
||||
|
@ -94,13 +97,17 @@ proc init*[A: BChar, B: BChar](mpt: typedesc[MultiPartReader],
|
|||
buffer: buf, offset: 0, boundary: fboundary)
|
||||
|
||||
proc new*[B: BChar](mpt: typedesc[MultiPartReaderRef],
|
||||
stream: AsyncStreamReader,
|
||||
boundary: openarray[B],
|
||||
partHeadersMaxSize = 4096): MultiPartReaderRef =
|
||||
# Boundary should not be empty.
|
||||
stream: AsyncStreamReader,
|
||||
boundary: openarray[B],
|
||||
partHeadersMaxSize = 4096): MultiPartReaderRef =
|
||||
## Create new MultiPartReader instance with `stream` interface.
|
||||
##
|
||||
## ``stream`` is stream used to read data.
|
||||
## ``boundary`` is multipart boundary, this value must not be empty.
|
||||
## ``partHeadersMaxSize`` is maximum size of multipart's headers.
|
||||
doAssert(len(boundary) > 0)
|
||||
# Our internal boundary has format `<CR><LF><-><-><boundary>`, so we can reuse
|
||||
# different parts of this sequence for processing.
|
||||
# Our internal boundary has format `<CR><LF><-><-><boundary>`, so we can
|
||||
# reuse different parts of this sequence for processing.
|
||||
var fboundary = newSeq[byte](len(boundary) + 4)
|
||||
fboundary[0] = 0x0D'u8
|
||||
fboundary[1] = 0x0A'u8
|
||||
|
@ -111,7 +118,7 @@ proc new*[B: BChar](mpt: typedesc[MultiPartReaderRef],
|
|||
stream: stream, offset: 0, boundary: fboundary,
|
||||
buffer: newSeq[byte](partHeadersMaxSize))
|
||||
|
||||
func setPartNames*(part: var MultiPart): HttpResult[void] =
|
||||
func setPartNames(part: var MultiPart): HttpResult[void] =
|
||||
if part.headers.count("content-disposition") != 1:
|
||||
return err("Content-Disposition header is incorrect")
|
||||
var header = part.headers.getString("content-disposition")
|
||||
|
@ -192,28 +199,55 @@ proc readPart*(mpr: MultiPartReaderRef): Future[MultiPart] {.async.} =
|
|||
raise newMultipartReadError("Error reading multipart message")
|
||||
|
||||
proc getBody*(mp: MultiPart): Future[seq[byte]] {.async.} =
|
||||
## Get multipart's ``mp`` value as sequence of bytes.
|
||||
case mp.kind
|
||||
of MultiPartSource.Stream:
|
||||
try:
|
||||
let res = await mp.stream.read()
|
||||
return res
|
||||
except AsyncStreamError:
|
||||
raise newException(HttpCriticalError, "Could not read multipart body")
|
||||
raise newException(MultipartReadError, "Could not read multipart body")
|
||||
of MultiPartSource.Buffer:
|
||||
return mp.buffer
|
||||
|
||||
proc consumeBody*(mp: MultiPart) {.async.} =
|
||||
## Discard multipart's ``mp`` value.
|
||||
case mp.kind
|
||||
of MultiPartSource.Stream:
|
||||
try:
|
||||
await mp.stream.consume()
|
||||
except AsyncStreamError:
|
||||
raise newException(HttpCriticalError, "Could not consume multipart body")
|
||||
raise newException(MultipartReadError, "Could not consume multipart body")
|
||||
of MultiPartSource.Buffer:
|
||||
discard
|
||||
|
||||
proc getBodyStream*(mp: MultiPart): HttpResult[AsyncStreamReader] =
|
||||
## Get multipart's ``mp`` stream, which can be used to obtain value of the
|
||||
## part.
|
||||
case mp.kind
|
||||
of MultiPartSource.Stream:
|
||||
ok(mp.stream)
|
||||
else:
|
||||
err("Could not obtain stream from buffer-like part")
|
||||
|
||||
proc close*(mp: MultiPart) {.async.} =
|
||||
## Close and release MultiPart's ``mp`` stream and resources.
|
||||
case mp.kind
|
||||
of MultiPartSource.Stream:
|
||||
await closeWait(mp.stream)
|
||||
else:
|
||||
discard
|
||||
|
||||
proc close*(mpr: MultiPartReaderRef) {.async.} =
|
||||
## Close and release MultiPartReader's ``mpr`` stream and resources.
|
||||
case mpr.kind
|
||||
of MultiPartSource.Stream:
|
||||
await mpr.stream.closeWait()
|
||||
else:
|
||||
discard
|
||||
|
||||
proc getBytes*(mp: MultiPart): seq[byte] =
|
||||
## Returns MultiPart value as sequence of bytes.
|
||||
## Returns value for MultiPart ``mp`` as sequence of bytes.
|
||||
case mp.kind
|
||||
of MultiPartSource.Buffer:
|
||||
mp.buffer
|
||||
|
@ -222,7 +256,7 @@ proc getBytes*(mp: MultiPart): seq[byte] =
|
|||
mp.buffer
|
||||
|
||||
proc getString*(mp: MultiPart): string =
|
||||
## Returns MultiPart value as string.
|
||||
## Returns value for MultiPart ``mp`` as string.
|
||||
case mp.kind
|
||||
of MultiPartSource.Buffer:
|
||||
if len(mp.buffer) > 0:
|
||||
|
@ -240,7 +274,28 @@ proc getString*(mp: MultiPart): string =
|
|||
else:
|
||||
""
|
||||
|
||||
proc atEoM*(mpr: var MultiPartReader): bool =
|
||||
## Procedure returns ``true`` if MultiPartReader has reached the end of
|
||||
## multipart message.
|
||||
case mpr.kind
|
||||
of MultiPartSource.Buffer:
|
||||
mpr.offset >= len(mpr.buffer)
|
||||
of MultiPartSource.Stream:
|
||||
mpr.stream.atEof()
|
||||
|
||||
proc atEoM*(mpr: MultiPartReaderRef): bool =
|
||||
## Procedure returns ``true`` if MultiPartReader has reached the end of
|
||||
## multipart message.
|
||||
case mpr.kind
|
||||
of MultiPartSource.Buffer:
|
||||
mpr.offset >= len(mpr.buffer)
|
||||
of MultiPartSource.Stream:
|
||||
mpr.stream.atEof()
|
||||
|
||||
proc getPart*(mpr: var MultiPartReader): Result[MultiPart, string] =
|
||||
## Get multipart part from MultiPartReader instance.
|
||||
##
|
||||
## This procedure will work only for MultiPartReader with buffer source.
|
||||
doAssert(mpr.kind == MultiPartSource.Buffer)
|
||||
if mpr.offset >= len(mpr.buffer):
|
||||
return err("End of multipart form encountered")
|
||||
|
@ -320,6 +375,15 @@ proc getPart*(mpr: var MultiPartReader): Result[MultiPart, string] =
|
|||
err("Incorrect multipart form")
|
||||
|
||||
func getMultipartBoundary*(ch: openarray[string]): HttpResult[string] =
|
||||
## Returns ``multipart/form-data`` boundary value from ``Content-Type``
|
||||
## header.
|
||||
##
|
||||
## The procedure carries out all the necessary checks:
|
||||
## 1) There should be single `Content-Type` header value in headers.
|
||||
## 2) `Content-Type` must be ``multipart/form-data``.
|
||||
## 3) `boundary` value must be present
|
||||
## 4) `boundary` value must be less then 70 characters length and
|
||||
## all characters should be part of alphabet.
|
||||
if len(ch) > 1:
|
||||
err("Multiple Content-Type headers found")
|
||||
else:
|
||||
|
@ -347,12 +411,3 @@ func getMultipartBoundary*(ch: openarray[string]): HttpResult[string] =
|
|||
'\'' .. ')', '+' .. '/', ':', '=', '?', '_'}:
|
||||
return err("Content-Type boundary alphabat incorrect")
|
||||
ok(candidate)
|
||||
|
||||
when isMainModule:
|
||||
var buf = "--------------------------5e7d0dd0ed6eb849\r\nContent-Disposition: form-data; =\"key1\"\r\n\r\nvalue1\r\n--------------------------5e7d0dd0ed6eb849\r\nContent-Disposition: form-data; name=\"key2\"\r\n\r\nvalue2\r\n--------------------------5e7d0dd0ed6eb849--"
|
||||
var reader = MultiPartReader.init(buf, "------------------------5e7d0dd0ed6eb849")
|
||||
echo getPart(reader)
|
||||
echo "===="
|
||||
echo getPart(reader)
|
||||
echo "===="
|
||||
echo getPart(reader)
|
||||
|
|
|
@ -730,18 +730,18 @@ proc write*(wstream: AsyncStreamWriter, sbytes: seq[byte],
|
|||
if isNil(wstream.wsource):
|
||||
var res: int
|
||||
try:
|
||||
res = await write(wstream.tsource, sbytes, msglen)
|
||||
res = await write(wstream.tsource, sbytes, length)
|
||||
except CancelledError as exc:
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
raise newAsyncStreamWriteError(exc)
|
||||
if res != length:
|
||||
raise newAsyncStreamIncompleteError()
|
||||
wstream.bytesCount = wstream.bytesCount + uint64(msglen)
|
||||
wstream.bytesCount = wstream.bytesCount + uint64(length)
|
||||
else:
|
||||
if isNil(wstream.writerLoop):
|
||||
await write(wstream.wsource, sbytes, msglen)
|
||||
wstream.bytesCount = wstream.bytesCount + uint64(msglen)
|
||||
await write(wstream.wsource, sbytes, length)
|
||||
wstream.bytesCount = wstream.bytesCount + uint64(length)
|
||||
else:
|
||||
var item = WriteItem(kind: Sequence)
|
||||
if not isLiteral(sbytes):
|
||||
|
@ -778,18 +778,18 @@ proc write*(wstream: AsyncStreamWriter, sbytes: string,
|
|||
if isNil(wstream.wsource):
|
||||
var res: int
|
||||
try:
|
||||
res = await write(wstream.tsource, sbytes, msglen)
|
||||
res = await write(wstream.tsource, sbytes, length)
|
||||
except CancelledError as exc:
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
raise newAsyncStreamWriteError(exc)
|
||||
if res != length:
|
||||
raise newAsyncStreamIncompleteError()
|
||||
wstream.bytesCount = wstream.bytesCount + uint64(msglen)
|
||||
wstream.bytesCount = wstream.bytesCount + uint64(length)
|
||||
else:
|
||||
if isNil(wstream.writerLoop):
|
||||
await write(wstream.wsource, sbytes, msglen)
|
||||
wstream.bytesCount = wstream.bytesCount + uint64(msglen)
|
||||
await write(wstream.wsource, sbytes, length)
|
||||
wstream.bytesCount = wstream.bytesCount + uint64(length)
|
||||
else:
|
||||
var item = WriteItem(kind: String)
|
||||
if not isLiteral(sbytes):
|
||||
|
|
Loading…
Reference in New Issue