mirror of
https://github.com/status-im/news.git
synced 2025-02-19 15:24:25 +00:00
Chronos support (#1)
This commit is contained in:
parent
1693b5c022
commit
35d14b09a4
104
src/news.nim
104
src/news.nim
@ -1,5 +1,31 @@
|
||||
import httpcore, asynchttpserver, asyncdispatch, nativesockets, asyncnet,
|
||||
strutils, streams, random, securehash, base64, uri, strformat
|
||||
import strutils, streams, random, securehash, base64, uri, strformat, nativesockets
|
||||
|
||||
when not declaredInScope(newsUseChronos):
|
||||
# Currently chronos is second class citizen. To use this library in chronos-based
|
||||
# projects, include this file as follows:
|
||||
# const newsUseChronos = true
|
||||
# include news
|
||||
const newsUseChronos = false
|
||||
|
||||
when newsUseChronos:
|
||||
import chronos, httpcore
|
||||
type Transport = StreamTransport
|
||||
|
||||
proc send(s: StreamTransport, data: string) {.async.} =
|
||||
discard await s.write(data)
|
||||
|
||||
proc recv(s: StreamTransport, len: int): Future[string] {.async.} =
|
||||
var res = newString(len)
|
||||
if len != 0:
|
||||
await s.readExactly(addr res[0], len)
|
||||
return res
|
||||
|
||||
proc isClosed*(transp: StreamTransport): bool {.inline.} = transp.closed
|
||||
|
||||
else:
|
||||
import httpcore, asyncdispatch, asyncnet, asynchttpserver
|
||||
type Transport = AsyncSocket
|
||||
|
||||
|
||||
type
|
||||
ReadyState* = enum
|
||||
@ -9,7 +35,7 @@ type
|
||||
Closed = 3 # The connection is closed or couldn't be opened.
|
||||
|
||||
WebSocket* = ref object
|
||||
req*: Request
|
||||
transp*: Transport
|
||||
version*: int
|
||||
key*: string
|
||||
protocol*: string
|
||||
@ -61,38 +87,33 @@ proc genMaskKey*(): array[4, char] =
|
||||
## Generates a random key of 4 random chars
|
||||
[char(rand(255)), char(rand(255)), char(rand(255)), char(rand(255))]
|
||||
|
||||
proc newWebSocket*(req: Request): Future[WebSocket] {.async.} =
|
||||
## Creates a new socket from a request
|
||||
var ws = WebSocket()
|
||||
ws.req = req
|
||||
ws.version = parseInt(req.headers["sec-webSocket-version"])
|
||||
ws.key = req.headers["sec-webSocket-key"].strip()
|
||||
if req.headers.hasKey("sec-webSocket-protocol"):
|
||||
ws.protocol = req.headers["sec-webSocket-protocol"].strip()
|
||||
when not newsUseChronos:
|
||||
proc newWebSocket*(req: Request): Future[WebSocket] {.async.} =
|
||||
## Creates a new socket from a request
|
||||
var ws = WebSocket()
|
||||
ws.version = parseInt(req.headers["sec-webSocket-version"])
|
||||
ws.key = req.headers["sec-webSocket-key"].strip()
|
||||
if req.headers.hasKey("sec-webSocket-protocol"):
|
||||
ws.protocol = req.headers["sec-webSocket-protocol"].strip()
|
||||
|
||||
let sh = secureHash(ws.key & "258EAFA5-E914-47DA-95CA-C5AB0DC85B11")
|
||||
let acceptKey = base64.encode(decodeBase16($sh))
|
||||
let sh = secureHash(ws.key & "258EAFA5-E914-47DA-95CA-C5AB0DC85B11")
|
||||
let acceptKey = base64.encode(decodeBase16($sh))
|
||||
|
||||
var responce = "HTTP/1.1 101 Web Socket Protocol Handshake\c\L"
|
||||
responce.add("Sec-WebSocket-Accept: " & acceptKey & "\c\L")
|
||||
responce.add("Connection: Upgrade\c\L")
|
||||
responce.add("Upgrade: webSocket\c\L")
|
||||
if not ws.protocol.len == 0:
|
||||
responce.add("Sec-WebSocket-Protocol: " & ws.protocol & "\c\L")
|
||||
responce.add "\c\L"
|
||||
|
||||
await ws.req.client.send(responce)
|
||||
ws.readyState = Open
|
||||
return ws
|
||||
var responce = "HTTP/1.1 101 Web Socket Protocol Handshake\c\L"
|
||||
responce.add("Sec-WebSocket-Accept: " & acceptKey & "\c\L")
|
||||
responce.add("Connection: Upgrade\c\L")
|
||||
responce.add("Upgrade: webSocket\c\L")
|
||||
if not ws.protocol.len == 0:
|
||||
responce.add("Sec-WebSocket-Protocol: " & ws.protocol & "\c\L")
|
||||
responce.add "\c\L"
|
||||
|
||||
await ws.transp.send(responce)
|
||||
ws.readyState = Open
|
||||
return ws
|
||||
|
||||
proc newWebSocket*(url: string): Future[WebSocket] {.async.} =
|
||||
## Creates a client
|
||||
var ws = WebSocket()
|
||||
ws.req = Request()
|
||||
ws.req.client = newAsyncSocket()
|
||||
|
||||
|
||||
let uri = parseUri(url)
|
||||
var port = Port(9001)
|
||||
if uri.scheme != "ws":
|
||||
@ -102,8 +123,13 @@ proc newWebSocket*(url: string): Future[WebSocket] {.async.} =
|
||||
if uri.port.len > 0:
|
||||
port = Port(parseInt(uri.port))
|
||||
|
||||
await ws.req.client.connect(uri.hostname, port)
|
||||
await ws.req.client.send &"""GET {url} HTTP/1.1
|
||||
when newsUseChronos:
|
||||
ws.transp = await connect(resolveTAddress(uri.hostname, port)[0])
|
||||
else:
|
||||
ws.transp = newAsyncSocket()
|
||||
await ws.transp.connect(uri.hostname, port)
|
||||
|
||||
await ws.transp.send &"""GET {url} HTTP/1.1
|
||||
Host: {uri.hostname}:{$port}
|
||||
Connection: Upgrade
|
||||
Upgrade: websocket
|
||||
@ -114,7 +140,7 @@ Sec-WebSocket-Extensions: permessage-deflate; client_max_window_bits
|
||||
"""
|
||||
var output = ""
|
||||
while not output.endsWith("\c\L\c\L"):
|
||||
output.add await ws.req.client.recv(1)
|
||||
output.add await ws.transp.recv(1)
|
||||
|
||||
ws.readyState = Open
|
||||
return ws
|
||||
@ -240,7 +266,7 @@ proc send*(ws: WebSocket, text: string, opcode = Opcode.Text): Future[void] {.as
|
||||
var i = 0
|
||||
while i < frame.len:
|
||||
let data = frame[i ..< min(frame.len, i + maxSize)]
|
||||
await ws.req.client.send(data)
|
||||
await ws.transp.send(data)
|
||||
i += maxSize
|
||||
await sleepAsync(1)
|
||||
|
||||
@ -249,12 +275,12 @@ proc recvFrame(ws: WebSocket): Future[Frame] {.async.} =
|
||||
## Gets a frame from the WebSocket
|
||||
## See https://tools.ietf.org/html/rfc6455#section-5.2
|
||||
|
||||
if cast[int](ws.req.client.getFd) == -1:
|
||||
if ws.transp.isClosed:
|
||||
ws.readyState = Closed
|
||||
return result
|
||||
|
||||
# grab the header
|
||||
let header = await ws.req.client.recv(2)
|
||||
let header = await ws.transp.recv(2)
|
||||
|
||||
if header.len != 2:
|
||||
ws.readyState = Closed
|
||||
@ -281,7 +307,7 @@ proc recvFrame(ws: WebSocket): Future[Frame] {.async.} =
|
||||
let headerLen = uint(b1 and 0x7f)
|
||||
if headerLen == 0x7e:
|
||||
# length must be 7+16 bits
|
||||
var lenstr = await ws.req.client.recv(2)
|
||||
var lenstr = await ws.transp.recv(2)
|
||||
if lenstr.len != 2:
|
||||
raise newException(WebSocketClosedError, "Socket closed")
|
||||
|
||||
@ -289,7 +315,7 @@ proc recvFrame(ws: WebSocket): Future[Frame] {.async.} =
|
||||
|
||||
elif headerLen == 0x7f:
|
||||
# length must be 7+64 bits
|
||||
var lenstr = await ws.req.client.recv(8)
|
||||
var lenstr = await ws.transp.recv(8)
|
||||
if lenstr.len != 8:
|
||||
raise newException(WebSocketClosedError, "Socket closed")
|
||||
finalLen = cast[ptr uint32](lenstr[4].addr)[].htonl
|
||||
@ -303,12 +329,12 @@ proc recvFrame(ws: WebSocket): Future[Frame] {.async.} =
|
||||
var maskKey = ""
|
||||
if result.mask:
|
||||
# read mask
|
||||
maskKey = await ws.req.client.recv(4)
|
||||
maskKey = await ws.transp.recv(4)
|
||||
if maskKey.len != 4:
|
||||
raise newException(WebSocketClosedError, "Socket closed")
|
||||
|
||||
# read the data
|
||||
result.data = await ws.req.client.recv(int finalLen)
|
||||
result.data = await ws.transp.recv(int finalLen)
|
||||
if result.data.len != int finalLen:
|
||||
raise newException(WebSocketClosedError, "Socket closed")
|
||||
|
||||
@ -349,4 +375,4 @@ proc receivePacket*(ws: WebSocket): Future[string] {.async.} =
|
||||
proc close*(ws: WebSocket) =
|
||||
## close the socket
|
||||
ws.readyState = Closed
|
||||
ws.req.client.close()
|
||||
ws.transp.close()
|
||||
|
Loading…
x
Reference in New Issue
Block a user