Add RpcHttpServer.

Add RpcHttpClient.
This commit is contained in:
cheatfate 2018-07-14 10:51:54 +03:00
parent 24ee0c9505
commit 0942b0387d
9 changed files with 699 additions and 17 deletions

View File

@ -10,6 +10,7 @@ requires "nim >= 0.17.3",
"nimcrypto",
"stint",
"https://github.com/status-im/nim-asyncdispatch2",
"https://github.com/status-im/nim-http-utils",
"chronicles"
proc configForTests() =

View File

@ -6,7 +6,7 @@ export asyncdispatch2
type
ClientId* = int64
RpcClient* = ref object of RootRef#
RpcClient* = ref object of RootRef
awaiting*: Table[ClientId, Future[Response]]
nextId: ClientId
@ -21,10 +21,7 @@ proc getNextId*(client: RpcClient): ClientId =
client.nextId.inc
proc rpcCallNode*(path: string, params: JsonNode, id: ClientId): JsonNode =
%{"jsonrpc": %"2.0",
"method": %path,
"params": params,
"id": %id}
%{"jsonrpc": %"2.0", "method": %path, "params": params, "id": %id}
template asyncRaise[T](fut: Future[T], errType: typedesc, msg: string) =
fut.fail(newException(errType, msg))
@ -49,7 +46,8 @@ macro checkGet(node: JsonNode, fieldName: string,
else: discard
proc processMessage*[T: RpcClient](self: T, line: string) =
# Note: this doesn't use any transport code so doesn't need to be differentiated.
# Note: this doesn't use any transport code so doesn't need to be
# differentiated.
let
node = parseJson(line)
id = checkGet(node, "id", JInt)

View File

@ -0,0 +1,182 @@
import json, strutils, tables
import chronicles, httputils, asyncdispatch2
import ../client
logScope:
topic = "JSONRPC-HTTP-CLIENT"
type
RpcHttpClient* = ref object of RpcClient
transp*: StreamTransport
addresses: seq[TransportAddress]
const
MaxHttpHeadersSize = 8192 # maximum size of HTTP headers in octets
MaxHttpRequestSize = 128 * 1024 # maximum size of HTTP body in octets
HttpHeadersTimeout = 120000 # timeout for receiving headers (120 sec)
HttpBodyTimeout = 12000 # timeout for receiving body (12 sec)
HeadersMark = @[byte(0x0D), byte(0x0A), byte(0x0D), byte(0x0A)]
proc sendRequest(transp: StreamTransport,
data: string): Future[bool] {.async.} =
var request = "GET / "
request.add($HttpVersion11)
request.add("\r\n")
request.add("Date: " & httpDate() & "\r\n")
request.add("Content-Type: application/json\r\n")
request.add("Content-Length: " & $len(data) & "\r\n")
request.add("Connection: keep-alive\r\n")
request.add("\r\n")
if len(data) > 0:
request.add(data)
try:
let res = await transp.write(cast[seq[char]](request))
if res != len(request):
result = false
result = true
except:
result = false
proc validateResponse*(transp: StreamTransport,
header: HttpResponseHeader): bool =
if header.code != 200:
debug "Server returns error",
httpcode = header.code,
httpreason = header.reason(),
address = transp.remoteAddress()
result = false
return
var ctype = header["Content-Type"]
if ctype.toLowerAscii() != "application/json":
# Content-Type header is not "application/json"
debug "Content type must be application/json",
address = transp.remoteAddress()
result = false
return
let length = header.contentLength()
if length <= 0:
# request length could not be calculated.
debug "Content-Length is missing or 0", address = transp.remoteAddress()
result = false
return
result = true
proc recvData(transp: StreamTransport): Future[string] {.async.} =
var buffer = newSeq[byte](MaxHttpHeadersSize)
var header: HttpResponseHeader
var error = false
try:
let hlenfut = transp.readUntil(addr buffer[0], MaxHttpHeadersSize,
HeadersMark)
let ores = await withTimeout(hlenfut, HttpHeadersTimeout)
if not ores:
# Timeout
debug "Timeout expired while receiving headers",
address = transp.remoteAddress()
error = true
else:
let hlen = hlenfut.read()
buffer.setLen(hlen)
header = buffer.parseResponse()
if header.failed():
# Header could not be parsed
debug "Malformed header received",
address = transp.remoteAddress()
error = true
except TransportLimitError:
# size of headers exceeds `MaxHttpHeadersSize`
debug "Maximum size of headers limit reached",
address = transp.remoteAddress()
error = true
except TransportIncompleteError:
# remote peer disconnected
debug "Remote peer disconnected", address = transp.remoteAddress()
error = true
except TransportOsError:
debug "Problems with networking", address = transp.remoteAddress(),
error = getCurrentExceptionMsg()
error = true
if error or not transp.validateResponse(header):
transp.close()
result = ""
return
let length = header.contentLength()
buffer.setLen(length)
try:
let blenfut = transp.readExactly(addr buffer[0], length)
let ores = await withTimeout(blenfut, HttpBodyTimeout)
if not ores:
# Timeout
debug "Timeout expired while receiving request body",
address = transp.remoteAddress()
error = true
else:
blenfut.read()
except TransportIncompleteError:
# remote peer disconnected
debug "Remote peer disconnected", address = transp.remoteAddress()
error = true
except TransportOsError:
debug "Problems with networking", address = transp.remoteAddress(),
error = getCurrentExceptionMsg()
error = true
if error:
transp.close()
result = ""
else:
result = cast[string](buffer)
proc newRpcHttpClient*(): RpcHttpClient =
## Creates a new HTTP client instance.
new result
result.initRpcClient()
proc call*(client: RpcHttpClient, name: string,
params: JsonNode): Future[Response] {.async.} =
## Remotely calls the specified RPC method.
let id = client.getNextId()
var value = $rpcCallNode(name, params, id) & "\c\l"
if isNil(client.transp) or client.transp.closed():
raise newException(ValueError,
"Transport is not initialised or already closed")
let res = await client.transp.sendRequest(value)
if not res:
debug "Failed to send message to RPC server",
address = client.transp.remoteAddress(), msg_len = res
client.transp.close()
raise newException(ValueError, "Transport error")
else:
debug "Message sent to RPC server", address = client.transp.remoteAddress(),
msg_len = res
# completed by processMessage.
var newFut = newFuture[Response]()
# add to awaiting responses
client.awaiting[id] = newFut
result = await newFut
proc processData(client: RpcHttpClient) {.async.} =
while true:
var value = await client.transp.recvData()
if value == "":
break
debug "Received response from RPC server",
address = client.transp.remoteAddress(),
msg_len = len(value)
client.processMessage(value)
# async loop reconnection and waiting
client.transp = await connect(client.addresses[0])
proc connect*(client: RpcHttpClient, address: string, port: Port) {.async.} =
client.addresses = resolveTAddress(address, port)
client.transp = await connect(client.addresses[0])
asyncCheck processData(client)

View File

@ -16,8 +16,10 @@ proc call*(self: RpcSocketClient, name: string,
params: JsonNode): Future[Response] {.async.} =
## Remotely calls the specified RPC method.
let id = self.getNextId()
var value = $rpcCallNode(name, params, id) & "\c\l"
var
value =
$rpcCallNode(name, params, id) & "\c\l"
if self.transport.isNil:
var connectStr = ""
raise newException(ValueError, "Transport is not initialised (missing a call to connect?)")

View File

@ -0,0 +1,321 @@
import json, strutils
import chronicles, httputils, asyncdispatch2
import ../server
logScope:
topic = "JSONRPC-HTTP-SERVER"
const
MaxHttpHeadersSize = 8192 # maximum size of HTTP headers in octets
MaxHttpRequestSize = 128 * 1024 # maximum size of HTTP body in octets
HttpHeadersTimeout = 120000 # timeout for receiving headers (120 sec)
HttpBodyTimeout = 12000 # timeout for receiving body (12 sec)
HeadersMark = @[byte(0x0D), byte(0x0A), byte(0x0D), byte(0x0A)]
type
ReqStatus = enum
Success, Error, ErrorFailure
RpcHttpServer* = ref object of RpcServer
servers: seq[StreamServer]
proc sendAnswer(transp: StreamTransport, version: HttpVersion, code: HttpCode,
data: string = ""): Future[bool] {.async.} =
var answer = $version
answer.add(" ")
answer.add($code)
answer.add("\r\n")
answer.add("Date: " & httpDate() & "\r\n")
if len(data) > 0:
answer.add("Content-Type: application/json\r\n")
answer.add("Content-Length: " & $len(data) & "\r\n")
answer.add("\r\n")
if len(data) > 0:
answer.add(data)
try:
let res = await transp.write(answer)
if res != len(answer):
result = false
result = true
except:
result = false
proc validateRequest(transp: StreamTransport,
header: HttpRequestHeader): Future[ReqStatus] {.async.} =
if header.meth in {MethodPut, MethodDelete}:
# Request method is either PUT or DELETE.
debug "PUT/DELETE methods are not allowed", address = transp.remoteAddress()
if await transp.sendAnswer(header.version, Http405):
result = Error
else:
result = ErrorFailure
return
let length = header.contentLength()
if length <= 0:
# request length could not be calculated.
debug "Content-Length is missing or 0", address = transp.remoteAddress()
if await transp.sendAnswer(header.version, Http411):
result = Error
else:
result = ErrorFailure
return
if length > MaxHttpRequestSize:
# request length is more then `MaxHttpRequestSize`.
debug "Maximum size of request body reached",
address = transp.remoteAddress()
if await transp.sendAnswer(header.version, Http413):
result = Error
else:
result = ErrorFailure
return
var ctype = header["Content-Type"]
if ctype.toLowerAscii() != "application/json":
# Content-Type header is not "application/json"
debug "Content type must be application/json",
address = transp.remoteAddress()
if await transp.sendAnswer(header.version, Http415):
result = Error
else:
result = ErrorFailure
return
result = Success
proc processClient(server: StreamServer,
transp: StreamTransport) {.async, gcsafe.} =
## Process transport data to the RPC server
var rpc = getUserData[RpcHttpServer](server)
var buffer = newSeq[byte](MaxHttpHeadersSize)
var header: HttpRequestHeader
var connection: string
info "Received connection", address = transp.remoteAddress()
while true:
try:
let hlenfut = transp.readUntil(addr buffer[0], MaxHttpHeadersSize,
HeadersMark)
let ores = await withTimeout(hlenfut, HttpHeadersTimeout)
if not ores:
# Timeout
debug "Timeout expired while receiving headers",
address = transp.remoteAddress()
let res = await transp.sendAnswer(HttpVersion11, Http408)
transp.close()
break
else:
let hlen = hlenfut.read()
buffer.setLen(hlen)
header = buffer.parseRequest()
if header.failed():
# Header could not be parsed
debug "Malformed header received",
address = transp.remoteAddress()
let res = await transp.sendAnswer(HttpVersion11, Http400)
transp.close()
break
except TransportLimitError:
# size of headers exceeds `MaxHttpHeadersSize`
debug "Maximum size of headers limit reached",
address = transp.remoteAddress()
let res = await transp.sendAnswer(HttpVersion11, Http413)
transp.close()
break
except TransportIncompleteError:
# remote peer disconnected
debug "Remote peer disconnected", address = transp.remoteAddress()
transp.close()
break
except TransportOsError:
debug "Problems with networking", address = transp.remoteAddress(),
error = getCurrentExceptionMsg()
transp.close()
break
let vres = await validateRequest(transp, header)
if vres == Success:
info "Received valid RPC request", address = transp.remoteAddress()
# we need to get `Connection` header value before, because
# we are reusing `buffer`, and its value will be lost.
connection = header["Connection"]
let length = header.contentLength()
buffer.setLen(length)
try:
let blenfut = transp.readExactly(addr buffer[0], length)
let ores = await withTimeout(blenfut, HttpBodyTimeout)
if not ores:
# Timeout
debug "Timeout expired while receiving request body",
address = transp.remoteAddress()
let res = await transp.sendAnswer(header.version, Http413)
transp.close()
break
else:
blenfut.read()
except TransportIncompleteError:
# remote peer disconnected
debug "Remote peer disconnected", address = transp.remoteAddress()
transp.close()
break
except TransportOsError:
debug "Problems with networking", address = transp.remoteAddress(),
error = getCurrentExceptionMsg()
transp.close()
break
let future = rpc.route(cast[string](buffer))
yield future
if future.failed:
# rpc.route exception
debug "Internal error, while processing RPC call",
address = transp.remoteAddress()
let res = await transp.sendAnswer(header.version, Http503)
if not res:
transp.close()
break
else:
var data = future.read()
let res = await transp.sendAnswer(header.version, Http200, data)
info "RPC result has been sent", address = transp.remoteAddress()
if not res:
transp.close()
break
elif vres == ErrorFailure:
debug "Remote peer disconnected", address = transp.remoteAddress()
transp.close()
break
if header.version in {HttpVersion09, HttpVersion10}:
debug "Disconnecting client", address = transp.remoteAddress()
transp.close()
break
else:
if connection == "close":
debug "Disconnecting client", address = transp.remoteAddress()
transp.close()
break
# Utility functions for setting up servers using stream transport addresses
proc addStreamServer*(server: RpcHttpServer, address: TransportAddress) =
try:
info "Creating server on ", address = $address
var transServer = createStreamServer(address, processClient,
{ReuseAddr}, udata = server)
server.servers.add(transServer)
except:
error "Failed to create server", address = $address,
message = getCurrentExceptionMsg()
if len(server.servers) == 0:
# Server was not bound, critical error.
raise newException(RpcBindError, "Unable to create server!")
proc addStreamServers*(server: RpcHttpServer,
addresses: openarray[TransportAddress]) =
for item in addresses:
server.addStreamServer(item)
proc addStreamServer*(server: RpcHttpServer, address: string) =
## Create new server and assign it to addresses ``addresses``.
var
tas4: seq[TransportAddress]
tas6: seq[TransportAddress]
added = 0
# Attempt to resolve `address` for IPv4 address space.
try:
tas4 = resolveTAddress(address, IpAddressFamily.IPv4)
except:
discard
# Attempt to resolve `address` for IPv6 address space.
try:
tas6 = resolveTAddress(address, IpAddressFamily.IPv6)
except:
discard
for r in tas4:
server.addStreamServer(r)
added.inc
for r in tas6:
server.addStreamServer(r)
added.inc
if added == 0:
# Addresses could not be resolved, critical error.
raise newException(RpcAddressUnresolvableError, "Unable to get address!")
proc addStreamServers*(server: RpcHttpServer, addresses: openarray[string]) =
for address in addresses:
server.addStreamServer(address)
proc addStreamServer*(server: RpcHttpServer, address: string, port: Port) =
var
tas4: seq[TransportAddress]
tas6: seq[TransportAddress]
added = 0
# Attempt to resolve `address` for IPv4 address space.
try:
tas4 = resolveTAddress(address, port, IpAddressFamily.IPv4)
except:
discard
# Attempt to resolve `address` for IPv6 address space.
try:
tas6 = resolveTAddress(address, port, IpAddressFamily.IPv6)
except:
discard
if len(tas4) == 0 and len(tas6) == 0:
# Address was not resolved, critical error.
raise newException(RpcAddressUnresolvableError,
"Address " & address & " could not be resolved!")
for r in tas4:
server.addStreamServer(r)
added.inc
for r in tas6:
server.addStreamServer(r)
added.inc
if len(server.servers) == 0:
# Server was not bound, critical error.
raise newException(RpcBindError,
"Could not setup server on " & address & ":" & $int(port))
proc newRpcHttpServer*(): RpcHttpServer =
RpcHttpServer(router: newRpcRouter(), servers: @[])
proc newRpcHttpServer*(addresses: openarray[TransportAddress]): RpcHttpServer =
## Create new server and assign it to addresses ``addresses``.
result = newRpcHttpServer()
result.addStreamServers(addresses)
proc newRpcHttpServer*(addresses: openarray[string]): RpcHttpServer =
## Create new server and assign it to addresses ``addresses``.
result = newRpcHttpServer()
result.addStreamServers(addresses)
proc start*(server: RpcHttpServer) =
## Start the RPC server.
for item in server.servers:
debug "HTTP RPC server started", address = item.local
item.start()
proc stop*(server: RpcHttpServer) =
## Stop the RPC server.
for item in server.servers:
debug "HTTP RPC server stopped", address = item.local
item.stop()
proc close*(server: RpcHttpServer) =
## Cleanup resources of RPC server.
for item in server.servers:
item.close()

View File

@ -1,5 +1,3 @@
import
json_rpc / client,
json_rpc / clients / socketclient
export client, socketclient
import json_rpc/client
import json_rpc/clients/[socketclient, httpclient]
export client, socketclient, httpclient

View File

@ -1,2 +1,3 @@
import json_rpc / server, json_rpc / servers / socketserver
export server, socketserver
import json_rpc/server
import json_rpc/servers/[socketserver, httpserver]
export server, socketserver, httpserver

View File

@ -1,3 +1,2 @@
import
testrpcmacro, testserverclient, testethcalls #, testerrors
testrpcmacro, testserverclient, testethcalls, testhttp #, testerrors

180
tests/testhttp.nim Normal file
View File

@ -0,0 +1,180 @@
import unittest, json, strutils
import httputils, chronicles
import ../rpcserver, ../rpcclient
const
TestsCount = 100
BufferSize = 8192
BigHeaderSize = 8 * 1024 + 1
BigBodySize = 128 * 1024 + 1
HeadersMark = @[byte(0x0D), byte(0x0A), byte(0x0D), byte(0x0A)]
Requests = [
"GET / HTTP/1.1\r\n" &
"Host: www.google.com\r\n" &
"Content-Length: 71\r\n" &
"Content-Type: text/html\r\n" &
"Connection: close\r\n" &
"\r\n" &
"{\"jsonrpc\":\"2.0\",\"method\":\"myProc\",\"params\":[\"abc\", [1, 2, 3]],\"id\":67}",
"BADHEADER\r\n\r\n",
"GET / HTTP/1.1\r\n" &
"Host: www.google.com\r\n" &
"Content-Type: application/json\r\n" &
"Connection: close\r\n" &
"\r\n",
"PUT / HTTP/1.1\r\n" &
"Host: www.google.com\r\n" &
"Content-Length: 71\r\n" &
"Content-Type: text/html\r\n" &
"Connection: close\r\n" &
"\r\n" &
"{\"jsonrpc\":\"2.0\",\"method\":\"myProc\",\"params\":[\"abc\", [1, 2, 3]],\"id\":67}",
"DELETE / HTTP/1.1\r\n" &
"Host: www.google.com\r\n" &
"Content-Length: 71\r\n" &
"Content-Type: text/html\r\n" &
"Connection: close\r\n" &
"\r\n" &
"{\"jsonrpc\":\"2.0\",\"method\":\"myProc\",\"params\":[\"abc\", [1, 2, 3]],\"id\":67}",
"GET / HTTP/0.9\r\n" &
"Host: www.google.com\r\n" &
"Content-Length: 71\r\n" &
"Content-Type: application/json\r\n" &
"\r\n" &
"{\"jsonrpc\":\"2.0\",\"method\":\"myProc\",\"params\":[\"abc\", [1, 2, 3]],\"id\":67}",
"GET / HTTP/1.0\r\n" &
"Host: www.google.com\r\n" &
"Content-Length: 71\r\n" &
"Content-Type: application/json\r\n" &
"\r\n" &
"{\"jsonrpc\":\"2.0\",\"method\":\"myProc\",\"params\":[\"abc\", [1, 2, 3]],\"id\":67}",
"GET / HTTP/1.1\r\n" &
"Host: www.google.com\r\n" &
"Content-Length: 71\r\n" &
"Content-Type: application/json\r\n" &
"Connection: close\r\n" &
"\r\n" &
"{\"jsonrpc\":\"2.0\",\"method\":\"myProc\",\"params\":[\"abc\", [1, 2, 3]],\"id\":67}",
]
proc continuousTest(address: string, port: Port): Future[int] {.async.} =
var client = newRpcHttpClient()
await client.connect(address, port)
result = 0
for i in 0..<TestsCount:
var r = await client.call("myProc", %[%"abc", %[1, 2, 3, i]])
if r.result.getStr == "Hello abc data: [1, 2, 3, " & $i & "]":
result += 1
proc customMessage(address: TransportAddress,
data: string,
expect: int): Future[bool] {.async.} =
var buffer = newSeq[byte](BufferSize)
var header: HttpResponseHeader
var transp = await connect(address)
let wres = await transp.write(data)
doAssert(wres == len(data))
let rres = await transp.readUntil(addr buffer[0], BufferSize, HeadersMark)
doAssert(rres > 0)
buffer.setLen(rres)
header = parseResponse(buffer)
doAssert(header.success())
if header.code == expect:
result = true
else:
result = false
transp.close()
proc headerTest(address: string, port: Port): Future[bool] {.async.} =
var a = resolveTAddress(address, port)
var header = "GET / HTTP/1.1\r\n"
var i = 0
while len(header) <= BigHeaderSize:
header.add("Field" & $i & ": " & $i & "\r\n")
inc(i)
header.add("Content-Length: 71\r\n")
header.add("Content-Type: application/json\r\n")
header.add("Connection: close\r\n\r\n")
header.add("{\"jsonrpc\":\"2.0\",\"method\":\"myProc\",\"params\":[\"abc\", [1, 2, 3]],\"id\":67}")
result = await customMessage(a[0], header, 413)
proc bodyTest(address: string, port: Port): Future[bool] {.async.} =
var body = repeat('B', BigBodySize)
var a = resolveTAddress(address, port)
var header = "GET / HTTP/1.1\r\n"
header.add("Content-Length: " & $len(body) & "\r\n")
header.add("Content-Type: application/json\r\n")
header.add("Connection: close\r\n\r\n")
header.add(body)
result = await customMessage(a[0], header, 413)
proc disconTest(address: string, port: Port,
number: int, expect: int): Future[bool] {.async.} =
var a = resolveTAddress(address, port)
var buffer = newSeq[byte](BufferSize)
var header: HttpResponseHeader
var transp = await connect(a[0])
var data = Requests[number]
let wres = await transp.write(data)
doAssert(wres == len(data))
let rres = await transp.readUntil(addr buffer[0], BufferSize, HeadersMark)
doAssert(rres > 0)
buffer.setLen(rres)
header = parseResponse(buffer)
doAssert(header.success())
if header.code == expect:
result = true
else:
result = false
var length = header.contentLength()
doAssert(length > 0)
buffer.setLen(length)
await transp.readExactly(addr buffer[0], len(buffer))
var left = await transp.read()
if len(left) == 0 and transp.atEof():
result = true
else:
result = false
transp.close()
proc simpleTest(address: string, port: Port,
number: int, expect: int): Future[bool] {.async.} =
var a = resolveTAddress(address, port)
result = await customMessage(a[0], Requests[number], expect)
var httpsrv = newRpcHttpServer(["localhost:8545"])
# Create RPC on server
httpsrv.rpc("myProc") do(input: string, data: array[0..3, int]):
result = %("Hello " & input & " data: " & $data)
httpsrv.start()
suite "Server/Client RPC":
test "Continuous RPC calls (" & $TestsCount & " messages)":
check waitFor(continuousTest("localhost", Port(8545))) == TestsCount
test "Wrong [Content-Type] test":
check waitFor(simpleTest("localhost", Port(8545), 0, 415)) == true
test "Bad request header test":
check waitFor(simpleTest("localhost", Port(8545), 1, 400)) == true
test "Zero [Content-Length] test":
check waitFor(simpleTest("localhost", Port(8545), 2, 411)) == true
test "PUT/DELETE methods test":
check:
waitFor(simpleTest("localhost", Port(8545), 3, 405)) == true
waitFor(simpleTest("localhost", Port(8545), 4, 405)) == true
test "Oversized headers test":
check waitFor(headerTest("localhost", Port(8545))) == true
test "Oversized request test":
check waitFor(bodyTest("localhost", Port(8545))) == true
test "HTTP/0.9 and HTTP/1.0 client test":
check:
waitFor(disconTest("localhost", Port(8545), 5, 200)) == true
waitFor(disconTest("localhost", Port(8545), 6, 200)) == true
test "[Connection]: close test":
check waitFor(disconTest("localhost", Port(8545), 7, 200)) == true
httpsrv.stop()
httpsrv.close()