mirror of
https://github.com/codex-storage/nim-websock.git
synced 2025-01-23 00:49:15 +00:00
Implement a HTTP server using httputils package
This commit is contained in:
parent
19a573e756
commit
8755d03265
167
src/ws.nim
167
src/ws.nim
@ -1,4 +1,11 @@
|
||||
import chronos, asynchttpserver, base64, nativesockets
|
||||
import chronos, chronicles, httputils
|
||||
|
||||
const
|
||||
MaxHttpHeadersSize = 8192 # maximum size of HTTP headers in octets
|
||||
MaxHttpRequestSize = 128 * 1024 # maximum size of HTTP body in octets
|
||||
HttpHeadersTimeout = 120.seconds # timeout for receiving headers (120 sec)
|
||||
HttpBodyTimeout = 12.seconds # timeout for receiving body (12 sec)
|
||||
HeadersMark = @[byte(0x0D), byte(0x0A), byte(0x0D), byte(0x0A)]
|
||||
|
||||
type HeaderVerificationError* {.pure.} = enum
|
||||
None
|
||||
@ -34,54 +41,140 @@ type
|
||||
Closed = 3 # The connection is closed or couldn't be opened.
|
||||
|
||||
WebSocket* = ref object
|
||||
tcpSocket*: AsyncSocket
|
||||
version*: int
|
||||
key*: string
|
||||
protocol*: string
|
||||
readyState*: ReadyState
|
||||
masked*: bool # send masked packets
|
||||
|
||||
HttpServer* = ref object
|
||||
server*: StreamServer
|
||||
callback: AsyncCallback
|
||||
maxBody: int
|
||||
|
||||
ReqStatus = enum
|
||||
Success, Error, ErrorFailure
|
||||
|
||||
WebSocketError* = object of IOError
|
||||
|
||||
proc handshake*(ws: WebSocket, headers: HttpHeaders): Future[error: HeaderVerificationError] {.async.} =
|
||||
ws.version = parseInt(headers["Sec-WebSocket-Version"])
|
||||
ws.key = headers["Sec-WebSocket-Key"].strip()
|
||||
if headers.hasKey("Sec-WebSocket-Protocol"):
|
||||
let wantProtocol = headers["Sec-WebSocket-Protocol"].strip()
|
||||
if ws.protocol != wantProtocol:
|
||||
return NoProtocolsSupported
|
||||
proc sendAnswer(transp: StreamTransport, version: HttpVersion, code: HttpCode,
|
||||
data: string = ""): Future[bool] {.async.} =
|
||||
var answer = $version
|
||||
answer.add(" ")
|
||||
answer.add($code)
|
||||
answer.add("\r\n")
|
||||
answer.add("Date: " & httpDate() & "\r\n")
|
||||
if len(data) > 0:
|
||||
answer.add("Content-Type: application/json\r\n")
|
||||
answer.add("Content-Length: " & $len(data) & "\r\n")
|
||||
answer.add("\r\n")
|
||||
if len(data) > 0:
|
||||
answer.add(data)
|
||||
try:
|
||||
let res = await transp.write(answer)
|
||||
if res != len(answer):
|
||||
result = false
|
||||
result = true
|
||||
except:
|
||||
result = false
|
||||
|
||||
let
|
||||
sh = secureHash(ws.key & "258EAFA5-E914-47DA-95CA-C5AB0DC85B11")
|
||||
acceptKey = base64.encode(decodeBase16($sh))
|
||||
proc validateRequest(transp: StreamTransport,
|
||||
header: HttpRequestHeader): Future[ReqStatus] {.async.} =
|
||||
if header.meth notin {MethodGet}:
|
||||
# Request method is either PUT or DELETE.
|
||||
debug "GET method is only allowed", address = transp.remoteAddress()
|
||||
if await transp.sendAnswer(header.version, Http405):
|
||||
result = Error
|
||||
else:
|
||||
result = ErrorFailure
|
||||
return
|
||||
|
||||
var response = "HTTP/1.1 101 Web Socket Protocol Handshake\c\L"
|
||||
response.add("Sec-WebSocket-Accept: " & acceptKey & "\c\L")
|
||||
response.add("Connection: Upgrade\c\L")
|
||||
response.add("Upgrade: webSocket\c\L")
|
||||
let length = header.contentLength()
|
||||
if length <= 0:
|
||||
# request length could not be calculated.
|
||||
debug "Content-Length is missing or 0", address = transp.remoteAddress()
|
||||
if await transp.sendAnswer(header.version, Http411):
|
||||
result = Error
|
||||
else:
|
||||
result = ErrorFailure
|
||||
return
|
||||
|
||||
if ws.protocol != "":
|
||||
response.add("Sec-WebSocket-Protocol: " & ws.protocol & "\c\L")
|
||||
response.add "\c\L"
|
||||
if length > MaxHttpRequestSize:
|
||||
# request length is more then `MaxHttpRequestSize`.
|
||||
debug "Maximum size of request body reached",
|
||||
address = transp.remoteAddress()
|
||||
if await transp.sendAnswer(header.version, Http413):
|
||||
result = Error
|
||||
else:
|
||||
result = ErrorFailure
|
||||
return
|
||||
|
||||
await ws.tcpSocket.send(response)
|
||||
ws.readyState = Open
|
||||
result = Success
|
||||
|
||||
proc newWebSocket*(req: Request, protocol: string = ""): Future[tuple[ws: AsyncWebSocket, error: HeaderVerificationError]] {.async.} =
|
||||
if not req.headers.hasKey("Sec-WebSocket-Version"):
|
||||
return ("", UnsupportedVersion)
|
||||
proc serveClient(server: StreamServer, transp: StreamTransport) {.async, gcsafe.} =
|
||||
## Process transport data to the RPC server
|
||||
var buffer = newSeq[byte](MaxHttpHeadersSize)
|
||||
var header: HttpRequestHeader
|
||||
var connection: string
|
||||
|
||||
var ws = WebSocket()
|
||||
ws.masked = false
|
||||
# Todo: Change this to chronos AsyncFD
|
||||
ws.tcpSocket = req.client
|
||||
ws.protocol = protocol
|
||||
let (ws, error) = await ws.handshake(req.headers)
|
||||
return ws, error
|
||||
info "Received connection", address = $transp.remoteAddress()
|
||||
try:
|
||||
let hlenfut = transp.readUntil(addr buffer[0], MaxHttpHeadersSize, HeadersMark)
|
||||
let ores = await withTimeout(hlenfut, HttpHeadersTimeout)
|
||||
if not ores:
|
||||
# Timeout
|
||||
debug "Timeout expired while receiving headers",
|
||||
address = transp.remoteAddress()
|
||||
let res = await transp.sendAnswer(HttpVersion11, Http408)
|
||||
await transp.closeWait()
|
||||
return
|
||||
else:
|
||||
let hlen = hlenfut.read()
|
||||
buffer.setLen(hlen)
|
||||
header = buffer.parseRequest()
|
||||
if header.failed():
|
||||
# Header could not be parsed
|
||||
debug "Malformed header received",
|
||||
address = transp.remoteAddress()
|
||||
let res = await transp.sendAnswer(HttpVersion11, Http400)
|
||||
await transp.closeWait()
|
||||
return
|
||||
except TransportLimitError:
|
||||
# size of headers exceeds `MaxHttpHeadersSize`
|
||||
debug "Maximum size of headers limit reached",
|
||||
address = transp.remoteAddress()
|
||||
let res = await transp.sendAnswer(HttpVersion11, Http413)
|
||||
await transp.closeWait()
|
||||
return
|
||||
except TransportIncompleteError:
|
||||
# remote peer disconnected
|
||||
debug "Remote peer disconnected", address = transp.remoteAddress()
|
||||
await transp.closeWait()
|
||||
return
|
||||
except TransportOsError as exc:
|
||||
debug "Problems with networking", address = transp.remoteAddress(),
|
||||
error = exc.msg
|
||||
await transp.closeWait()
|
||||
return
|
||||
except CatchableError as exc:
|
||||
debug "Unknown exception", address = transp.remoteAddress(),
|
||||
error = exc.msg
|
||||
await transp.closeWait()
|
||||
return
|
||||
|
||||
proc close*(ws: WebSocket) =
|
||||
ws.readyState = Closed
|
||||
proc close() {.async.} =
|
||||
await ws.send("", Close)
|
||||
ws.tcpSocket.close()
|
||||
asyncCheck close()
|
||||
let vres = await validateRequest(transp, header)
|
||||
|
||||
if vres == Success:
|
||||
trace "Received valid RPC request", address = $transp.remoteAddress()
|
||||
info "Header: ", header
|
||||
debug "Disconnecting client", address = transp.remoteAddress()
|
||||
await transp.closeWait()
|
||||
elif vres == ErrorFailure:
|
||||
debug "Remote peer disconnected", address = transp.remoteAddress()
|
||||
await transp.closeWait()
|
||||
|
||||
proc newHttpServer*(address: string,
|
||||
flags: set[ServerFlags] = {ReuseAddr}): HttpServer =
|
||||
let address = initTAddress(address)
|
||||
new result
|
||||
result.server = createStreamServer(address, serveClient, {ReuseAddr})
|
||||
|
@ -1,8 +1,7 @@
|
||||
import ws, chronos, asynchttpserver
|
||||
import ws, chronos
|
||||
|
||||
proc cb(req: Request) {.async.} =
|
||||
let (ws, error) = await newWebSocket(req)
|
||||
ws.close()
|
||||
|
||||
var server = newAsyncHttpServer()
|
||||
waitFor server.serve(Port(9001), cb)
|
||||
when isMainModule:
|
||||
let address = "127.0.0.1:8888"
|
||||
var httpServer = newHttpServer(address)
|
||||
httpServer.server.start()
|
||||
waitFor httpServer.server.join()
|
||||
|
Loading…
x
Reference in New Issue
Block a user