Unroll `defer`s and remove `break`s. (#440)
* Unpack `finally/defer` blocks and introduce explicit cleaning of objects. Add request query to debug information. * Unroll one more loop to avoid `break`. Add test for query debug string. * Fix cancellation behavior. * Address review comments.
This commit is contained in:
parent
466241aa95
commit
6c2ea67512
|
@ -29,6 +29,7 @@ type
|
||||||
handle*: SocketHandle
|
handle*: SocketHandle
|
||||||
connectionType*: ConnectionType
|
connectionType*: ConnectionType
|
||||||
connectionState*: ConnectionState
|
connectionState*: ConnectionState
|
||||||
|
query*: Opt[string]
|
||||||
remoteAddress*: Opt[TransportAddress]
|
remoteAddress*: Opt[TransportAddress]
|
||||||
localAddress*: Opt[TransportAddress]
|
localAddress*: Opt[TransportAddress]
|
||||||
acceptMoment*: Moment
|
acceptMoment*: Moment
|
||||||
|
@ -85,6 +86,12 @@ proc getConnectionState*(holder: HttpConnectionHolderRef): ConnectionState =
|
||||||
else:
|
else:
|
||||||
ConnectionState.Accepted
|
ConnectionState.Accepted
|
||||||
|
|
||||||
|
proc getQueryString*(holder: HttpConnectionHolderRef): Opt[string] =
|
||||||
|
if not(isNil(holder.connection)):
|
||||||
|
holder.connection.currentRawQuery
|
||||||
|
else:
|
||||||
|
Opt.none(string)
|
||||||
|
|
||||||
proc init*(t: typedesc[ServerConnectionInfo],
|
proc init*(t: typedesc[ServerConnectionInfo],
|
||||||
holder: HttpConnectionHolderRef): ServerConnectionInfo =
|
holder: HttpConnectionHolderRef): ServerConnectionInfo =
|
||||||
let
|
let
|
||||||
|
@ -98,6 +105,7 @@ proc init*(t: typedesc[ServerConnectionInfo],
|
||||||
Opt.some(holder.transp.remoteAddress())
|
Opt.some(holder.transp.remoteAddress())
|
||||||
except CatchableError:
|
except CatchableError:
|
||||||
Opt.none(TransportAddress)
|
Opt.none(TransportAddress)
|
||||||
|
queryString = holder.getQueryString()
|
||||||
|
|
||||||
ServerConnectionInfo(
|
ServerConnectionInfo(
|
||||||
handle: SocketHandle(holder.transp.fd),
|
handle: SocketHandle(holder.transp.fd),
|
||||||
|
@ -106,6 +114,7 @@ proc init*(t: typedesc[ServerConnectionInfo],
|
||||||
remoteAddress: remoteAddress,
|
remoteAddress: remoteAddress,
|
||||||
localAddress: localAddress,
|
localAddress: localAddress,
|
||||||
acceptMoment: holder.acceptMoment,
|
acceptMoment: holder.acceptMoment,
|
||||||
|
query: queryString,
|
||||||
createMoment:
|
createMoment:
|
||||||
if not(isNil(holder.connection)):
|
if not(isNil(holder.connection)):
|
||||||
Opt.some(holder.connection.createMoment)
|
Opt.some(holder.connection.createMoment)
|
||||||
|
|
|
@ -148,6 +148,7 @@ type
|
||||||
writer*: AsyncStreamWriter
|
writer*: AsyncStreamWriter
|
||||||
closeCb*: HttpCloseConnectionCallback
|
closeCb*: HttpCloseConnectionCallback
|
||||||
createMoment*: Moment
|
createMoment*: Moment
|
||||||
|
currentRawQuery*: Opt[string]
|
||||||
buffer: seq[byte]
|
buffer: seq[byte]
|
||||||
|
|
||||||
HttpConnectionRef* = ref HttpConnection
|
HttpConnectionRef* = ref HttpConnection
|
||||||
|
@ -813,6 +814,7 @@ proc closeUnsecureConnection(conn: HttpConnectionRef) {.async.} =
|
||||||
except CancelledError:
|
except CancelledError:
|
||||||
await allFutures(pending)
|
await allFutures(pending)
|
||||||
untrackCounter(HttpServerUnsecureConnectionTrackerName)
|
untrackCounter(HttpServerUnsecureConnectionTrackerName)
|
||||||
|
reset(conn[])
|
||||||
conn.state = HttpState.Closed
|
conn.state = HttpState.Closed
|
||||||
|
|
||||||
proc new(ht: typedesc[HttpConnectionRef], server: HttpServerRef,
|
proc new(ht: typedesc[HttpConnectionRef], server: HttpServerRef,
|
||||||
|
@ -844,7 +846,9 @@ proc closeWait*(req: HttpRequestRef) {.async.} =
|
||||||
await writer
|
await writer
|
||||||
except CancelledError:
|
except CancelledError:
|
||||||
await writer
|
await writer
|
||||||
|
reset(resp[])
|
||||||
untrackCounter(HttpServerRequestTrackerName)
|
untrackCounter(HttpServerRequestTrackerName)
|
||||||
|
reset(req[])
|
||||||
req.state = HttpState.Closed
|
req.state = HttpState.Closed
|
||||||
|
|
||||||
proc createConnection(server: HttpServerRef,
|
proc createConnection(server: HttpServerRef,
|
||||||
|
@ -931,6 +935,7 @@ proc getRequestFence*(server: HttpServerRef,
|
||||||
await connection.getRequest()
|
await connection.getRequest()
|
||||||
else:
|
else:
|
||||||
await connection.getRequest().wait(server.headersTimeout)
|
await connection.getRequest().wait(server.headersTimeout)
|
||||||
|
connection.currentRawQuery = Opt.some(res.rawPath)
|
||||||
RequestFence.ok(res)
|
RequestFence.ok(res)
|
||||||
except CancelledError:
|
except CancelledError:
|
||||||
RequestFence.err(HttpProcessError.init(HttpServerError.InterruptError))
|
RequestFence.err(HttpProcessError.init(HttpServerError.InterruptError))
|
||||||
|
@ -962,13 +967,17 @@ proc getConnectionFence*(server: HttpServerRef,
|
||||||
let res = await server.createConnCallback(server, transp)
|
let res = await server.createConnCallback(server, transp)
|
||||||
ConnectionFence.ok(res)
|
ConnectionFence.ok(res)
|
||||||
except CancelledError:
|
except CancelledError:
|
||||||
await transp.closeWait()
|
|
||||||
ConnectionFence.err(HttpProcessError.init(HttpServerError.InterruptError))
|
ConnectionFence.err(HttpProcessError.init(HttpServerError.InterruptError))
|
||||||
except HttpCriticalError as exc:
|
except HttpCriticalError as exc:
|
||||||
await transp.closeWait()
|
# On error `transp` will be closed by `createConnCallback()` call.
|
||||||
let address = transp.getRemoteAddress()
|
let address = Opt.none(TransportAddress)
|
||||||
ConnectionFence.err(HttpProcessError.init(
|
ConnectionFence.err(HttpProcessError.init(
|
||||||
HttpServerError.CriticalError, exc, address, exc.code))
|
HttpServerError.CriticalError, exc, address, exc.code))
|
||||||
|
except CatchableError as exc:
|
||||||
|
# On error `transp` will be closed by `createConnCallback()` call.
|
||||||
|
let address = Opt.none(TransportAddress)
|
||||||
|
ConnectionFence.err(HttpProcessError.init(
|
||||||
|
HttpServerError.CriticalError, exc, address, Http503))
|
||||||
|
|
||||||
proc processRequest(server: HttpServerRef,
|
proc processRequest(server: HttpServerRef,
|
||||||
connection: HttpConnectionRef,
|
connection: HttpConnectionRef,
|
||||||
|
@ -984,19 +993,23 @@ proc processRequest(server: HttpServerRef,
|
||||||
else:
|
else:
|
||||||
discard
|
discard
|
||||||
|
|
||||||
defer:
|
|
||||||
if requestFence.isOk():
|
|
||||||
await requestFence.get().closeWait()
|
|
||||||
|
|
||||||
let responseFence = await getResponseFence(connection, requestFence)
|
let responseFence = await getResponseFence(connection, requestFence)
|
||||||
if responseFence.isErr() and
|
if responseFence.isErr() and
|
||||||
(responseFence.error.kind == HttpServerError.InterruptError):
|
(responseFence.error.kind == HttpServerError.InterruptError):
|
||||||
|
if requestFence.isOk():
|
||||||
|
await requestFence.get().closeWait()
|
||||||
return HttpProcessExitType.Immediate
|
return HttpProcessExitType.Immediate
|
||||||
|
|
||||||
if responseFence.isErr():
|
let res =
|
||||||
await connection.sendErrorResponse(requestFence, responseFence.error)
|
if responseFence.isErr():
|
||||||
else:
|
await connection.sendErrorResponse(requestFence, responseFence.error)
|
||||||
await connection.sendDefaultResponse(requestFence, responseFence.get())
|
else:
|
||||||
|
await connection.sendDefaultResponse(requestFence, responseFence.get())
|
||||||
|
|
||||||
|
if requestFence.isOk():
|
||||||
|
await requestFence.get().closeWait()
|
||||||
|
|
||||||
|
res
|
||||||
|
|
||||||
proc processLoop(holder: HttpConnectionHolderRef) {.async.} =
|
proc processLoop(holder: HttpConnectionHolderRef) {.async.} =
|
||||||
let
|
let
|
||||||
|
@ -1016,23 +1029,27 @@ proc processLoop(holder: HttpConnectionHolderRef) {.async.} =
|
||||||
holder.connection = connection
|
holder.connection = connection
|
||||||
|
|
||||||
var runLoop = HttpProcessExitType.KeepAlive
|
var runLoop = HttpProcessExitType.KeepAlive
|
||||||
|
|
||||||
defer:
|
|
||||||
server.connections.del(connectionId)
|
|
||||||
case runLoop
|
|
||||||
of HttpProcessExitType.KeepAlive:
|
|
||||||
# This could happened only on CancelledError.
|
|
||||||
await connection.closeWait()
|
|
||||||
of HttpProcessExitType.Immediate:
|
|
||||||
await connection.closeWait()
|
|
||||||
of HttpProcessExitType.Graceful:
|
|
||||||
await connection.gracefulCloseWait()
|
|
||||||
|
|
||||||
while runLoop == HttpProcessExitType.KeepAlive:
|
while runLoop == HttpProcessExitType.KeepAlive:
|
||||||
runLoop = await server.processRequest(connection, connectionId)
|
runLoop =
|
||||||
|
try:
|
||||||
|
await server.processRequest(connection, connectionId)
|
||||||
|
except CancelledError:
|
||||||
|
HttpProcessExitType.Immediate
|
||||||
|
except CatchableError as exc:
|
||||||
|
raiseAssert "Unexpected error [" & $exc.name & "] happens: " & $exc.msg
|
||||||
|
|
||||||
|
server.connections.del(connectionId)
|
||||||
|
case runLoop
|
||||||
|
of HttpProcessExitType.KeepAlive:
|
||||||
|
await connection.closeWait()
|
||||||
|
of HttpProcessExitType.Immediate:
|
||||||
|
await connection.closeWait()
|
||||||
|
of HttpProcessExitType.Graceful:
|
||||||
|
await connection.gracefulCloseWait()
|
||||||
|
|
||||||
proc acceptClientLoop(server: HttpServerRef) {.async.} =
|
proc acceptClientLoop(server: HttpServerRef) {.async.} =
|
||||||
while true:
|
var runLoop = true
|
||||||
|
while runLoop:
|
||||||
try:
|
try:
|
||||||
# if server.maxConnections > 0:
|
# if server.maxConnections > 0:
|
||||||
# await server.semaphore.acquire()
|
# await server.semaphore.acquire()
|
||||||
|
@ -1042,27 +1059,18 @@ proc acceptClientLoop(server: HttpServerRef) {.async.} =
|
||||||
# We are unable to identify remote peer, it means that remote peer
|
# We are unable to identify remote peer, it means that remote peer
|
||||||
# disconnected before identification.
|
# disconnected before identification.
|
||||||
await transp.closeWait()
|
await transp.closeWait()
|
||||||
break
|
runLoop = false
|
||||||
else:
|
else:
|
||||||
let connId = resId.get()
|
let connId = resId.get()
|
||||||
let holder = HttpConnectionHolderRef.new(server, transp, resId.get())
|
let holder = HttpConnectionHolderRef.new(server, transp, resId.get())
|
||||||
server.connections[connId] = holder
|
server.connections[connId] = holder
|
||||||
holder.future = processLoop(holder)
|
holder.future = processLoop(holder)
|
||||||
except CancelledError:
|
except TransportTooManyError, TransportAbortedError:
|
||||||
# Server was stopped
|
# Non-critical error
|
||||||
break
|
|
||||||
except TransportOsError:
|
|
||||||
# This is some critical unrecoverable error.
|
|
||||||
break
|
|
||||||
except TransportTooManyError:
|
|
||||||
# Non critical error
|
|
||||||
discard
|
discard
|
||||||
except TransportAbortedError:
|
except CancelledError, TransportOsError, CatchableError:
|
||||||
# Non critical error
|
# Critical, cancellation or unexpected error
|
||||||
discard
|
runLoop = false
|
||||||
except CatchableError:
|
|
||||||
# Unexpected error
|
|
||||||
break
|
|
||||||
|
|
||||||
proc state*(server: HttpServerRef): HttpServerState {.raises: [].} =
|
proc state*(server: HttpServerRef): HttpServerState {.raises: [].} =
|
||||||
## Returns current HTTP server's state.
|
## Returns current HTTP server's state.
|
||||||
|
|
|
@ -197,3 +197,7 @@ proc toList*(ht: HttpTables, normKey = false): auto =
|
||||||
for key, value in ht.stringItems(normKey):
|
for key, value in ht.stringItems(normKey):
|
||||||
res.add((key, value))
|
res.add((key, value))
|
||||||
res
|
res
|
||||||
|
|
||||||
|
proc clear*(ht: var HttpTables) =
|
||||||
|
## Resets the HtppTable so that it is empty.
|
||||||
|
ht.table.clear()
|
||||||
|
|
|
@ -43,6 +43,7 @@ proc closeSecConnection(conn: HttpConnectionRef) {.async.} =
|
||||||
await allFutures(pending)
|
await allFutures(pending)
|
||||||
except CancelledError:
|
except CancelledError:
|
||||||
await allFutures(pending)
|
await allFutures(pending)
|
||||||
|
reset(cast[SecureHttpConnectionRef](conn)[])
|
||||||
untrackCounter(HttpServerSecureConnectionTrackerName)
|
untrackCounter(HttpServerSecureConnectionTrackerName)
|
||||||
conn.state = HttpState.Closed
|
conn.state = HttpState.Closed
|
||||||
|
|
||||||
|
@ -74,9 +75,16 @@ proc createSecConnection(server: HttpServerRef,
|
||||||
except CancelledError as exc:
|
except CancelledError as exc:
|
||||||
await HttpConnectionRef(sconn).closeWait()
|
await HttpConnectionRef(sconn).closeWait()
|
||||||
raise exc
|
raise exc
|
||||||
except TLSStreamError:
|
except TLSStreamError as exc:
|
||||||
await HttpConnectionRef(sconn).closeWait()
|
await HttpConnectionRef(sconn).closeWait()
|
||||||
raiseHttpCriticalError("Unable to establish secure connection")
|
let msg = "Unable to establish secure connection, reason [" &
|
||||||
|
$exc.msg & "]"
|
||||||
|
raiseHttpCriticalError(msg)
|
||||||
|
except CatchableError as exc:
|
||||||
|
await HttpConnectionRef(sconn).closeWait()
|
||||||
|
let msg = "Unexpected error while trying to establish secure connection, " &
|
||||||
|
"reason [" & $exc.msg & "]"
|
||||||
|
raiseHttpCriticalError(msg)
|
||||||
|
|
||||||
proc new*(htype: typedesc[SecureHttpServerRef],
|
proc new*(htype: typedesc[SecureHttpServerRef],
|
||||||
address: TransportAddress,
|
address: TransportAddress,
|
||||||
|
|
|
@ -7,9 +7,8 @@
|
||||||
# MIT license (LICENSE-MIT)
|
# MIT license (LICENSE-MIT)
|
||||||
import std/[strutils, algorithm]
|
import std/[strutils, algorithm]
|
||||||
import ".."/chronos/unittest2/asynctests,
|
import ".."/chronos/unittest2/asynctests,
|
||||||
".."/chronos, ".."/chronos/apps/http/httpserver,
|
".."/chronos,
|
||||||
".."/chronos/apps/http/httpcommon,
|
".."/chronos/apps/http/[httpserver, httpcommon, httpdebug]
|
||||||
".."/chronos/apps/http/httpdebug
|
|
||||||
import stew/base10
|
import stew/base10
|
||||||
|
|
||||||
{.used.}
|
{.used.}
|
||||||
|
@ -1357,7 +1356,7 @@ suite "HTTP server testing suite":
|
||||||
asyncTest "HTTP debug tests":
|
asyncTest "HTTP debug tests":
|
||||||
const
|
const
|
||||||
TestsCount = 10
|
TestsCount = 10
|
||||||
TestRequest = "GET / HTTP/1.1\r\nConnection: keep-alive\r\n\r\n"
|
TestRequest = "GET /httpdebug HTTP/1.1\r\nConnection: keep-alive\r\n\r\n"
|
||||||
|
|
||||||
proc process(r: RequestFence): Future[HttpResponseRef] {.async.} =
|
proc process(r: RequestFence): Future[HttpResponseRef] {.async.} =
|
||||||
if r.isOk():
|
if r.isOk():
|
||||||
|
@ -1417,6 +1416,7 @@ suite "HTTP server testing suite":
|
||||||
connection.localAddress.get() == transp.remoteAddress()
|
connection.localAddress.get() == transp.remoteAddress()
|
||||||
connection.connectionType == ConnectionType.NonSecure
|
connection.connectionType == ConnectionType.NonSecure
|
||||||
connection.connectionState == ConnectionState.Alive
|
connection.connectionState == ConnectionState.Alive
|
||||||
|
connection.query.get("") == "/httpdebug"
|
||||||
(currentTime - connection.createMoment.get()) != ZeroDuration
|
(currentTime - connection.createMoment.get()) != ZeroDuration
|
||||||
(currentTime - connection.acceptMoment) != ZeroDuration
|
(currentTime - connection.acceptMoment) != ZeroDuration
|
||||||
var pending: seq[Future[void]]
|
var pending: seq[Future[void]]
|
||||||
|
|
|
@ -7,7 +7,8 @@
|
||||||
# MIT license (LICENSE-MIT)
|
# MIT license (LICENSE-MIT)
|
||||||
import std/strutils
|
import std/strutils
|
||||||
import ".."/chronos/unittest2/asynctests
|
import ".."/chronos/unittest2/asynctests
|
||||||
import ".."/chronos, ".."/chronos/apps/http/shttpserver
|
import ".."/chronos,
|
||||||
|
".."/chronos/apps/http/shttpserver
|
||||||
import stew/base10
|
import stew/base10
|
||||||
|
|
||||||
{.used.}
|
{.used.}
|
||||||
|
|
Loading…
Reference in New Issue