Add drop() implementation (was missed) to httpserver and fix transport leak. (#158)
* Add drop() implementation (was missed). Add tests for drop(). Fix transport leak because of drop(). * Fix future leak.
This commit is contained in:
parent
25688cd0aa
commit
d49e0a9c47
|
@ -11,7 +11,8 @@ import stew/results, httputils
|
||||||
import ../../asyncloop, ../../asyncsync
|
import ../../asyncloop, ../../asyncsync
|
||||||
import ../../streams/[asyncstream, boundstream, chunkstream, tlsstream]
|
import ../../streams/[asyncstream, boundstream, chunkstream, tlsstream]
|
||||||
import httptable, httpcommon, multipart
|
import httptable, httpcommon, multipart
|
||||||
export httptable, httpcommon, multipart, tlsstream, asyncstream
|
export httptable, httpcommon, multipart, tlsstream, asyncstream, uri, tables,
|
||||||
|
options, results
|
||||||
|
|
||||||
type
|
type
|
||||||
HttpServerFlags* {.pure.} = enum
|
HttpServerFlags* {.pure.} = enum
|
||||||
|
@ -516,6 +517,7 @@ proc processLoop(server: HttpServerRef, transp: StreamTransport) {.async.} =
|
||||||
except CancelledError:
|
except CancelledError:
|
||||||
# We could be cancelled only when we perform TLS handshake, connection
|
# We could be cancelled only when we perform TLS handshake, connection
|
||||||
server.connections.del(transp.getId())
|
server.connections.del(transp.getId())
|
||||||
|
await transp.closeWait()
|
||||||
return
|
return
|
||||||
except HttpCriticalError as exc:
|
except HttpCriticalError as exc:
|
||||||
let error = HttpProcessError.init(HTTPServerError.CriticalError, exc,
|
let error = HttpProcessError.init(HTTPServerError.CriticalError, exc,
|
||||||
|
@ -526,11 +528,10 @@ proc processLoop(server: HttpServerRef, transp: StreamTransport) {.async.} =
|
||||||
if not(runLoop):
|
if not(runLoop):
|
||||||
try:
|
try:
|
||||||
# We still want to notify process callback about failure, but we ignore
|
# We still want to notify process callback about failure, but we ignore
|
||||||
# result and swallow all the exceptions.
|
# result.
|
||||||
discard await server.processCallback(connArg)
|
discard await server.processCallback(connArg)
|
||||||
except CancelledError:
|
except CancelledError:
|
||||||
server.connections.del(transp.getId())
|
runLoop = false
|
||||||
return
|
|
||||||
except CatchableError as exc:
|
except CatchableError as exc:
|
||||||
# There should be no exceptions, so we will raise `Defect`.
|
# There should be no exceptions, so we will raise `Defect`.
|
||||||
raiseHttpDefect("Unexpected exception catched [" & $exc.name & "]")
|
raiseHttpDefect("Unexpected exception catched [" & $exc.name & "]")
|
||||||
|
@ -688,8 +689,13 @@ proc stop*(server: HttpServerRef) {.async.} =
|
||||||
|
|
||||||
proc drop*(server: HttpServerRef) {.async.} =
|
proc drop*(server: HttpServerRef) {.async.} =
|
||||||
## Drop all pending HTTP connections.
|
## Drop all pending HTTP connections.
|
||||||
|
var pending: seq[Future[void]]
|
||||||
if server.state in {ServerStopped, ServerRunning}:
|
if server.state in {ServerStopped, ServerRunning}:
|
||||||
discard
|
for fut in server.connections.values():
|
||||||
|
if not(fut.finished()):
|
||||||
|
fut.cancel()
|
||||||
|
pending.add(fut)
|
||||||
|
await allFutures(pending)
|
||||||
|
|
||||||
proc closeWait*(server: HttpServerRef) {.async.} =
|
proc closeWait*(server: HttpServerRef) {.async.} =
|
||||||
## Stop HTTP server and drop all the pending connections.
|
## Stop HTTP server and drop all the pending connections.
|
||||||
|
|
|
@ -681,6 +681,57 @@ suite "HTTP server testing suite":
|
||||||
|
|
||||||
check waitFor(testHTTPS2(initTAddress("127.0.0.1:30080"))) == true
|
check waitFor(testHTTPS2(initTAddress("127.0.0.1:30080"))) == true
|
||||||
|
|
||||||
|
test "drop() connections test":
|
||||||
|
const ClientsCount = 10
|
||||||
|
|
||||||
|
proc testHTTPdrop(address: TransportAddress): Future[bool] {.async.} =
|
||||||
|
var eventWait = newAsyncEvent()
|
||||||
|
var eventContinue = newAsyncEvent()
|
||||||
|
var count = 0
|
||||||
|
|
||||||
|
proc process(r: RequestFence): Future[HttpResponseRef] {.async.} =
|
||||||
|
if r.isOk():
|
||||||
|
let request = r.get()
|
||||||
|
inc(count)
|
||||||
|
if count == ClientsCount:
|
||||||
|
eventWait.fire()
|
||||||
|
await eventContinue.wait()
|
||||||
|
return await request.respond(Http404, "", HttpTable.init())
|
||||||
|
else:
|
||||||
|
return dumbResponse()
|
||||||
|
|
||||||
|
let socketFlags = {ServerFlags.TcpNoDelay, ServerFlags.ReuseAddr}
|
||||||
|
let res = HttpServerRef.new(address, process,
|
||||||
|
socketFlags = socketFlags,
|
||||||
|
maxConnections = 100)
|
||||||
|
if res.isErr():
|
||||||
|
return false
|
||||||
|
|
||||||
|
let server = res.get()
|
||||||
|
server.start()
|
||||||
|
|
||||||
|
var clients: seq[Future[string]]
|
||||||
|
let message = "GET / HTTP/1.0\r\nHost: https://127.0.0.1:80\r\n\r\n"
|
||||||
|
for i in 0 ..< ClientsCount:
|
||||||
|
var clientFut = httpClient(address, message)
|
||||||
|
if clientFut.finished():
|
||||||
|
return false
|
||||||
|
clients.add(clientFut)
|
||||||
|
# Waiting for all clients to connect to the server
|
||||||
|
await eventWait.wait()
|
||||||
|
# Dropping
|
||||||
|
await server.closeWait()
|
||||||
|
# We are firing second event to unblock client loops, but this loops
|
||||||
|
# must be already cancelled.
|
||||||
|
eventContinue.fire()
|
||||||
|
# Now all clients should be dropped
|
||||||
|
discard await allFutures(clients).withTimeout(1.seconds)
|
||||||
|
for item in clients:
|
||||||
|
if item.read() != "":
|
||||||
|
return false
|
||||||
|
return true
|
||||||
|
|
||||||
|
check waitFor(testHTTPdrop(initTAddress("127.0.0.1:30080"))) == true
|
||||||
|
|
||||||
test "Content-Type multipart boundary test":
|
test "Content-Type multipart boundary test":
|
||||||
const AllowedCharacters = {
|
const AllowedCharacters = {
|
||||||
|
|
Loading…
Reference in New Issue