Merge pull request #22 from status-im/RemoveServerGenerics
Remove generics from server and update sockettransport
This commit is contained in:
commit
279353c0a1
|
@ -1,18 +1,16 @@
|
||||||
import json, tables, options, macros
|
import json, tables, options, macros
|
||||||
import asyncdispatch2, router
|
import asyncdispatch2, router, chronicles
|
||||||
import jsonmarshal
|
import jsonmarshal
|
||||||
|
|
||||||
export asyncdispatch2, json, jsonmarshal, router
|
export asyncdispatch2, json, jsonmarshal, router, chronicles
|
||||||
|
|
||||||
type
|
type
|
||||||
RpcServer*[S] = ref object
|
RpcServer* = ref object of RootRef
|
||||||
servers*: seq[S]
|
|
||||||
router*: RpcRouter
|
router*: RpcRouter
|
||||||
|
|
||||||
proc newRpcServer*[S](): RpcServer[S] =
|
proc newRpcServer*(): RpcServer =
|
||||||
new result
|
new result
|
||||||
result.router = newRpcRouter()
|
result.router = newRpcRouter()
|
||||||
result.servers = @[]
|
|
||||||
|
|
||||||
template rpc*(server: RpcServer, path: string, body: untyped): untyped =
|
template rpc*(server: RpcServer, path: string, body: untyped): untyped =
|
||||||
server.router.rpc(path, body)
|
server.router.rpc(path, body)
|
||||||
|
@ -21,14 +19,14 @@ template hasMethod*(server: RpcServer, methodName: string): bool = server.router
|
||||||
|
|
||||||
# Wrapper for message processing
|
# Wrapper for message processing
|
||||||
|
|
||||||
proc route*[T](server: RpcServer[T], line: string): Future[string] {.async, gcsafe.} =
|
proc route*(server: RpcServer, line: string): Future[string] {.async, gcsafe.} =
|
||||||
result = await server.router.route(line)
|
result = await server.router.route(line)
|
||||||
|
|
||||||
# Server registration
|
# Server registration
|
||||||
|
|
||||||
proc register*(server: RpcServer, name: string, rpc: RpcProc) =
|
proc register*(server: RpcServer, name: string, rpc: RpcProc) =
|
||||||
## Add a name/code pair to the RPC server.
|
## Add a name/code pair to the RPC server.
|
||||||
server.router.addRoute(name, rpc)
|
server.router.register(name, rpc)
|
||||||
|
|
||||||
proc unRegisterAll*(server: RpcServer) =
|
proc unRegisterAll*(server: RpcServer) =
|
||||||
# Remove all remote procedure calls from this server.
|
# Remove all remote procedure calls from this server.
|
||||||
|
|
|
@ -1,4 +1,8 @@
|
||||||
import server, json, chronicles
|
import ../ server, json, chronicles
|
||||||
|
|
||||||
|
type
|
||||||
|
RpcSocketServer* = ref object of RpcServer
|
||||||
|
servers: seq[StreamServer]
|
||||||
|
|
||||||
proc sendError*[T](transport: T, code: int, msg: string, id: JsonNode,
|
proc sendError*[T](transport: T, code: int, msg: string, id: JsonNode,
|
||||||
data: JsonNode = newJNull()) {.async.} =
|
data: JsonNode = newJNull()) {.async.} =
|
||||||
|
@ -9,7 +13,7 @@ proc sendError*[T](transport: T, code: int, msg: string, id: JsonNode,
|
||||||
|
|
||||||
proc processClient(server: StreamServer, transport: StreamTransport) {.async, gcsafe.} =
|
proc processClient(server: StreamServer, transport: StreamTransport) {.async, gcsafe.} =
|
||||||
## Process transport data to the RPC server
|
## Process transport data to the RPC server
|
||||||
var rpc = getUserData[RpcServer[StreamTransport]](server)
|
var rpc = getUserData[RpcSocketServer](server)
|
||||||
while true:
|
while true:
|
||||||
var
|
var
|
||||||
maxRequestLength = defaultMaxRequestLength
|
maxRequestLength = defaultMaxRequestLength
|
||||||
|
@ -38,7 +42,7 @@ proc processClient(server: StreamServer, transport: StreamTransport) {.async, gc
|
||||||
|
|
||||||
# Utility functions for setting up servers using stream transport addresses
|
# Utility functions for setting up servers using stream transport addresses
|
||||||
|
|
||||||
proc addStreamServer*(server: RpcServer[StreamServer], address: TransportAddress) =
|
proc addStreamServer*(server: RpcSocketServer, address: TransportAddress) =
|
||||||
try:
|
try:
|
||||||
info "Creating server on ", address = $address
|
info "Creating server on ", address = $address
|
||||||
var transportServer = createStreamServer(address, processClient, {ReuseAddr}, udata = server)
|
var transportServer = createStreamServer(address, processClient, {ReuseAddr}, udata = server)
|
||||||
|
@ -50,11 +54,11 @@ proc addStreamServer*(server: RpcServer[StreamServer], address: TransportAddress
|
||||||
# Server was not bound, critical error.
|
# Server was not bound, critical error.
|
||||||
raise newException(RpcBindError, "Unable to create server!")
|
raise newException(RpcBindError, "Unable to create server!")
|
||||||
|
|
||||||
proc addStreamServers*(server: RpcServer[StreamServer], addresses: openarray[TransportAddress]) =
|
proc addStreamServers*(server: RpcSocketServer, addresses: openarray[TransportAddress]) =
|
||||||
for item in addresses:
|
for item in addresses:
|
||||||
server.addStreamServer(item)
|
server.addStreamServer(item)
|
||||||
|
|
||||||
proc addStreamServer*(server: RpcServer[StreamServer], address: string) =
|
proc addStreamServer*(server: RpcSocketServer, address: string) =
|
||||||
## Create new server and assign it to addresses ``addresses``.
|
## Create new server and assign it to addresses ``addresses``.
|
||||||
var
|
var
|
||||||
tas4: seq[TransportAddress]
|
tas4: seq[TransportAddress]
|
||||||
|
@ -84,11 +88,11 @@ proc addStreamServer*(server: RpcServer[StreamServer], address: string) =
|
||||||
# Addresses could not be resolved, critical error.
|
# Addresses could not be resolved, critical error.
|
||||||
raise newException(RpcAddressUnresolvableError, "Unable to get address!")
|
raise newException(RpcAddressUnresolvableError, "Unable to get address!")
|
||||||
|
|
||||||
proc addStreamServers*(server: RpcServer[StreamServer], addresses: openarray[string]) =
|
proc addStreamServers*(server: RpcSocketServer, addresses: openarray[string]) =
|
||||||
for address in addresses:
|
for address in addresses:
|
||||||
server.addStreamServer(address)
|
server.addStreamServer(address)
|
||||||
|
|
||||||
proc addStreamServer*(server: RpcServer[StreamServer], address: string, port: Port) =
|
proc addStreamServer*(server: RpcSocketServer, address: string, port: Port) =
|
||||||
var
|
var
|
||||||
tas4: seq[TransportAddress]
|
tas4: seq[TransportAddress]
|
||||||
tas6: seq[TransportAddress]
|
tas6: seq[TransportAddress]
|
||||||
|
@ -123,34 +127,35 @@ proc addStreamServer*(server: RpcServer[StreamServer], address: string, port: Po
|
||||||
raise newException(RpcBindError,
|
raise newException(RpcBindError,
|
||||||
"Could not setup server on " & address & ":" & $int(port))
|
"Could not setup server on " & address & ":" & $int(port))
|
||||||
|
|
||||||
type RpcStreamServer* = RpcServer[StreamServer]
|
proc newRpcSocketServer*: RpcSocketServer =
|
||||||
|
RpcSocketServer(router: newRpcRouter(), servers: @[])
|
||||||
|
|
||||||
proc newRpcStreamServer*(addresses: openarray[TransportAddress]): RpcStreamServer =
|
proc newRpcSocketServer*(addresses: openarray[TransportAddress]): RpcSocketServer =
|
||||||
## Create new server and assign it to addresses ``addresses``.
|
## Create new server and assign it to addresses ``addresses``.
|
||||||
result = newRpcServer[StreamServer]()
|
result = newRpcSocketServer()
|
||||||
result.addStreamServers(addresses)
|
result.addStreamServers(addresses)
|
||||||
|
|
||||||
proc newRpcStreamServer*(addresses: openarray[string]): RpcStreamServer =
|
proc newRpcSocketServer*(addresses: openarray[string]): RpcSocketServer =
|
||||||
## Create new server and assign it to addresses ``addresses``.
|
## Create new server and assign it to addresses ``addresses``.
|
||||||
result = newRpcServer[StreamServer]()
|
result = newRpcSocketServer()
|
||||||
result.addStreamServers(addresses)
|
result.addStreamServers(addresses)
|
||||||
|
|
||||||
proc newRpcStreamServer*(address = "localhost", port: Port = Port(8545)): RpcStreamServer =
|
proc newRpcSocketServer*(address: string, port: Port = Port(8545)): RpcSocketServer =
|
||||||
# Create server on specified port
|
# Create server on specified port
|
||||||
result = newRpcServer[StreamServer]()
|
result = newRpcSocketServer()
|
||||||
result.addStreamServer(address, port)
|
result.addStreamServer(address, port)
|
||||||
|
|
||||||
proc start*(server: RpcStreamServer) =
|
proc start*(server: RpcSocketServer) =
|
||||||
## Start the RPC server.
|
## Start the RPC server.
|
||||||
for item in server.servers:
|
for item in server.servers:
|
||||||
item.start()
|
item.start()
|
||||||
|
|
||||||
proc stop*(server: RpcStreamServer) =
|
proc stop*(server: RpcSocketServer) =
|
||||||
## Stop the RPC server.
|
## Stop the RPC server.
|
||||||
for item in server.servers:
|
for item in server.servers:
|
||||||
item.stop()
|
item.stop()
|
||||||
|
|
||||||
proc close*(server: RpcStreamServer) =
|
proc close*(server: RpcSocketServer) =
|
||||||
## Cleanup resources of RPC server.
|
## Cleanup resources of RPC server.
|
||||||
for item in server.servers:
|
for item in server.servers:
|
||||||
item.close()
|
item.close()
|
|
@ -0,0 +1,2 @@
|
||||||
|
import json_rpc / server, json_rpc / transports / socket
|
||||||
|
export server, socket
|
|
@ -1,2 +0,0 @@
|
||||||
import json_rpc / [server, sockettransport]
|
|
||||||
export server, sockettransport
|
|
|
@ -3,10 +3,10 @@
|
||||||
allow unchecked and unformatted calls.
|
allow unchecked and unformatted calls.
|
||||||
]#
|
]#
|
||||||
|
|
||||||
import unittest, debugclient, ../rpcsockets
|
import unittest, debugclient, ../rpcsocket
|
||||||
import strformat, chronicles
|
import strformat, chronicles
|
||||||
|
|
||||||
var server = newRpcStreamServer("localhost", 8547.Port)
|
var server = newRpcSocketServer("localhost", 8547.Port)
|
||||||
var client = newRpcStreamClient()
|
var client = newRpcStreamClient()
|
||||||
|
|
||||||
server.start()
|
server.start()
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
import unittest, json, tables
|
import unittest, json, tables
|
||||||
import ../rpcclient, ../rpcsockets
|
import ../rpcclient, ../rpcsocket
|
||||||
import stint, ethtypes, ethprocs, stintjson, nimcrypto, ethhexstrings, chronicles
|
import stint, ethtypes, ethprocs, stintjson, nimcrypto, ethhexstrings, chronicles
|
||||||
|
|
||||||
from os import getCurrentDir, DirSep
|
from os import getCurrentDir, DirSep
|
||||||
|
@ -7,7 +7,7 @@ from strutils import rsplit
|
||||||
template sourceDir: string = currentSourcePath.rsplit(DirSep, 1)[0]
|
template sourceDir: string = currentSourcePath.rsplit(DirSep, 1)[0]
|
||||||
|
|
||||||
var
|
var
|
||||||
server = newRpcStreamServer("localhost", Port(8546))
|
server = newRpcSocketServer("localhost", Port(8546))
|
||||||
client = newRpcStreamClient()
|
client = newRpcStreamClient()
|
||||||
|
|
||||||
## Generate Ethereum server RPCs
|
## Generate Ethereum server RPCs
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
import unittest, json, tables, chronicles
|
import unittest, json, tables, chronicles
|
||||||
import ../rpcsockets
|
import ../rpcsocket
|
||||||
|
|
||||||
type
|
type
|
||||||
# some nested types to check object parsing
|
# some nested types to check object parsing
|
||||||
|
@ -27,7 +27,7 @@ let
|
||||||
},
|
},
|
||||||
"c": %1.23}
|
"c": %1.23}
|
||||||
|
|
||||||
var s = newRpcStreamServer(["localhost:8545"])
|
var s = newRpcSocketServer(["localhost:8545"])
|
||||||
|
|
||||||
# RPC definitions
|
# RPC definitions
|
||||||
|
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
import unittest, json, chronicles
|
import unittest, json, chronicles
|
||||||
import ../rpcclient, ../rpcsockets
|
import ../rpcclient, ../rpcsocket
|
||||||
|
|
||||||
var srv = newRpcStreamServer(["localhost:8545"])
|
var srv = newRpcSocketServer(["localhost:8545"])
|
||||||
var client = newRpcStreamClient()
|
var client = newRpcStreamClient()
|
||||||
|
|
||||||
# Create RPC on server
|
# Create RPC on server
|
||||||
|
|
Loading…
Reference in New Issue