Added DatagramServer.

Changed StreamServer API a bit.
Added test for DatagramServer.
This commit is contained in:
cheatfate 2018-05-31 11:03:58 +03:00
parent 0f7f616133
commit 027e7c02a9
5 changed files with 184 additions and 12 deletions

View File

@ -18,7 +18,7 @@ const
type type
ServerFlags* = enum ServerFlags* = enum
## Server's flags ## Server's flags
ReuseAddr, ReusePort ReuseAddr, ReusePort, NoAutoRead
TransportAddress* = object TransportAddress* = object
## Transport network address ## Transport network address

View File

@ -288,7 +288,10 @@ when defined(windows):
len: int32(len(wresult.buffer))) len: int32(len(wresult.buffer)))
GC_ref(wresult) GC_ref(wresult)
result = cast[DatagramTransport](wresult) result = cast[DatagramTransport](wresult)
result.resumeRead() if NoAutoRead notin flags:
result.resumeRead()
else:
wresult.state.incl(ReadPaused)
proc close*(transp: DatagramTransport) = proc close*(transp: DatagramTransport) =
## Closes and frees resources of transport ``transp``. ## Closes and frees resources of transport ``transp``.
@ -310,6 +313,9 @@ else:
raddr: TransportAddress raddr: TransportAddress
var cdata = cast[ptr CompletionData](udata) var cdata = cast[ptr CompletionData](udata)
if not isNil(cdata) and int(cdata.fd) == 0:
# Transport was closed earlier, exiting
return
var transp = cast[DatagramTransport](cdata.udata) var transp = cast[DatagramTransport](cdata.udata)
let fd = SocketHandle(cdata.fd) let fd = SocketHandle(cdata.fd)
if not isNil(transp): if not isNil(transp):
@ -333,11 +339,16 @@ else:
break break
proc writeDatagramLoop(udata: pointer) = proc writeDatagramLoop(udata: pointer) =
var res: int = 0 var
res: int
saddr: Sockaddr_storage
slen: SockLen
var cdata = cast[ptr CompletionData](udata) var cdata = cast[ptr CompletionData](udata)
if not isNil(cdata) and int(cdata.fd) == 0:
# Transport was closed earlier, exiting
return
var transp = cast[DatagramTransport](cdata.udata) var transp = cast[DatagramTransport](cdata.udata)
var saddr: Sockaddr_storage
var slen: SockLen
let fd = SocketHandle(cdata.fd) let fd = SocketHandle(cdata.fd)
if not isNil(transp): if not isNil(transp):
if len(transp.queue) > 0: if len(transp.queue) > 0:
@ -440,10 +451,12 @@ else:
result.state = {WritePaused} result.state = {WritePaused}
result.future = newFuture[void]("datagram.transport") result.future = newFuture[void]("datagram.transport")
GC_ref(result) GC_ref(result)
result.resumeRead() if NoAutoRead notin flags:
result.resumeRead()
else:
result.state.incl(ReadPaused)
proc close*(transp: DatagramTransport) = proc close*(transp: DatagramTransport) =
## ZAH: This could use a destructor as well
## Closes and frees resources of transport ``transp``. ## Closes and frees resources of transport ``transp``.
if ReadClosed notin transp.state and WriteClosed notin transp.state: if ReadClosed notin transp.state and WriteClosed notin transp.state:
closeAsyncSocket(transp.fd) closeAsyncSocket(transp.fd)
@ -544,3 +557,73 @@ proc sendTo*(transp: DatagramTransport, pbytes: pointer, nbytes: int,
await vector.writer await vector.writer
if WriteError in transp.state: if WriteError in transp.state:
raise transp.getError() raise transp.getError()
type
DatagramServer* = ref object of RootRef
transport*: DatagramTransport
status*: ServerStatus
proc createDatagramServer*(host: TransportAddress,
cbproc: DatagramCallback,
flags: set[ServerFlags] = {},
sock: AsyncFD = asyncInvalidSocket,
bufferSize: int = DefaultDatagramBufferSize,
udata: pointer = nil): DatagramServer =
var transp: DatagramTransport
var fflags = flags + {NoAutoRead}
if host.address.family == IpAddressFamily.IPv4:
transp = newDatagramTransport(cbproc, AnyAddress, host, sock,
fflags, udata, bufferSize)
else:
transp = newDatagramTransport6(cbproc, AnyAddress6, host, sock,
fflags, udata, bufferSize)
result = DatagramServer()
result.transport = transp
result.status = ServerStatus.Starting
GC_ref(result)
proc start*(server: DatagramServer) =
## Starts ``server``.
if server.status in {ServerStatus.Starting, ServerStatus.Paused}:
server.transport.resumeRead()
proc stop*(server: DatagramServer) =
## Stops ``server``.
if server.status in {ServerStatus.Paused, ServerStatus.Running}:
when defined(windows):
if server.status == ServerStatus.Running:
if {WritePending, ReadPending} * server.transport.state != {}:
## CancelIO will stop both reading and writing.
discard cancelIo(Handle(server.transport.fd))
else:
if server.status == ServerStatus.Running:
if WritePaused notin server.transport.state:
server.transport.fd.removeWriter()
if ReadPaused notin server.transport.state:
server.transport.fd.removeReader()
server.status = ServerStatus.Stopped
proc pause*(server: DatagramServer) =
## Pause ``server``.
if server.status == ServerStatus.Running:
when defined(windows):
if {WritePending, ReadPending} * server.transport.state != {}:
## CancelIO will stop both reading and writing.
discard cancelIo(Handle(server.transport.fd))
else:
if WritePaused notin server.transport.state:
server.transport.fd.removeWriter()
if ReadPaused notin server.transport.state:
server.transport.fd.removeReader()
server.status = ServerStatus.Paused
proc join*(server: DatagramServer) {.async.} =
## Waits until ``server`` is not stopped.
if not server.transport.future.finished:
await server.transport.future
proc close*(server: DatagramServer) =
## Release ``server`` resources.
if server.status == ServerStatus.Stopped:
server.transport.close()
GC_unref(server)

View File

@ -770,8 +770,8 @@ proc close*(server: SocketServer) =
GC_unref(server) GC_unref(server)
proc createStreamServer*(host: TransportAddress, proc createStreamServer*(host: TransportAddress,
flags: set[ServerFlags],
cbproc: StreamCallback, cbproc: StreamCallback,
flags: set[ServerFlags] = {},
sock: AsyncFD = asyncInvalidSocket, sock: AsyncFD = asyncInvalidSocket,
backlog: int = 100, backlog: int = 100,
bufferSize: int = DefaultStreamBufferSize, bufferSize: int = DefaultStreamBufferSize,

View File

@ -189,7 +189,92 @@ proc test3(bounded: bool): Future[int] {.async.} =
for i in 0..<ClientsCount: for i in 0..<ClientsCount:
result += counters[i] result += counters[i]
proc swarmWorker(address: TransportAddress): Future[int] {.async.} =
var counter = 0
var results = newSeq[int](MessagesCount)
var future = newFuture[void]("testdatagram.client.wait")
proc receiver(transp: DatagramTransport,
pbytes: pointer, nbytes: int,
raddr: TransportAddress,
udata: pointer): Future[void] {.async.} =
if not isNil(pbytes) and nbytes > 0:
var answer = newString(nbytes + 1)
copyMem(addr answer[0], pbytes, nbytes)
answer.setLen(nbytes)
doAssert(answer.startsWith("ANSWER"))
var numstr = answer[6..^1]
var num = parseInt(numstr)
doAssert(num < MessagesCount)
results[num] = 1
inc(counter)
if not future.finished:
future.complete()
var transp = newDatagramTransport(receiver,
udata = addr counter,
remote = address)
for i in 0..<MessagesCount:
var data = "REQUEST" & $i
await transp.send(addr data[0], len(data))
# We need to wait answer here, or we can overflow OS network
# buffer and some datagrams will be dropped.
await future
future = newFuture[void]("testdatagram.client.wait")
transp.close()
result = 0
for i in 0..<MessagesCount:
if results[i] == 1:
inc(result)
proc waitAll[T](futs: seq[Future[T]]): Future[void] =
var counter = len(futs)
var retFuture = newFuture[void]("waitAll")
proc cb(udata: pointer) =
dec(counter)
if counter == 0:
retFuture.complete()
for fut in futs:
fut.addCallback(cb)
return retFuture
proc swarmManager(address: TransportAddress): Future[int] {.async.} =
var retFuture = newFuture[void]("swarm.manager.datagram")
var workers = newSeq[Future[int]](ClientsCount)
var count = ClientsCount
for i in 0..<ClientsCount:
workers[i] = swarmWorker(address)
await waitAll(workers)
for i in 0..<ClientsCount:
var res = workers[i].read()
result += res
proc serveDatagramClient(transp: DatagramTransport,
pbytes: pointer, nbytes: int,
raddr: TransportAddress,
udata: pointer): Future[void] {.async.} =
doAssert(not isNil(pbytes) and nbytes > 0)
var request = newString(nbytes + 1)
copyMem(addr request[0], pbytes, nbytes)
request.setLen(nbytes)
doAssert(request.startsWith("REQUEST"))
var numstr = request[7..^1]
var num = parseInt(numstr)
var answer = "ANSWER" & $num
await transp.sendTo(addr answer[0], len(answer), raddr)
proc test4(): Future[int] {.async.} =
var ta = strAddress("127.0.0.1:31346")
var counter = 0
var server = createDatagramServer(ta, serveDatagramClient, {ReuseAddr})
server.start()
result = await swarmManager(ta)
server.stop()
server.close()
when isMainModule: when isMainModule:
echo waitFor(test4())
const const
m1 = "Unbounded test (" & $TestsCount & " messages)" m1 = "Unbounded test (" & $TestsCount & " messages)"
m2 = "Bounded test (" & $TestsCount & " messages)" m2 = "Bounded test (" & $TestsCount & " messages)"
@ -197,6 +282,8 @@ when isMainModule:
" clients x " & $MessagesCount & " messages)" " clients x " & $MessagesCount & " messages)"
m4 = "Bounded multiple clients with messages (" & $ClientsCount & m4 = "Bounded multiple clients with messages (" & $ClientsCount &
" clients x " & $MessagesCount & " messages)" " clients x " & $MessagesCount & " messages)"
m5 = "DatagramServer multiple clients with messages (" & $ClientsCount &
" clients x " & $MessagesCount & " messages)"
suite "Datagram Transport test suite": suite "Datagram Transport test suite":
test m1: test m1:
check waitFor(test1()) == TestsCount check waitFor(test1()) == TestsCount
@ -206,3 +293,5 @@ when isMainModule:
check waitFor(test3(false)) == ClientsCount * MessagesCount check waitFor(test3(false)) == ClientsCount * MessagesCount
test m4: test m4:
check waitFor(test3(true)) == ClientsCount * MessagesCount check waitFor(test3(true)) == ClientsCount * MessagesCount
test m5:
check waitFor(test4()) == ClientsCount * MessagesCount

View File

@ -245,7 +245,7 @@ proc swarmManager4(address: TransportAddress): Future[int] {.async.} =
proc test1(): Future[int] {.async.} = proc test1(): Future[int] {.async.} =
var ta = strAddress("127.0.0.1:31344") var ta = strAddress("127.0.0.1:31344")
var server = createStreamServer(ta, {ReuseAddr}, serveClient1) var server = createStreamServer(ta, serveClient1, {ReuseAddr})
server.start() server.start()
result = await swarmManager1(ta) result = await swarmManager1(ta)
server.stop() server.stop()
@ -254,7 +254,7 @@ proc test1(): Future[int] {.async.} =
proc test2(): Future[int] {.async.} = proc test2(): Future[int] {.async.} =
var ta = strAddress("127.0.0.1:31345") var ta = strAddress("127.0.0.1:31345")
var counter = 0 var counter = 0
var server = createStreamServer(ta, {ReuseAddr}, serveClient2) var server = createStreamServer(ta, serveClient2, {ReuseAddr})
server.start() server.start()
result = await swarmManager2(ta) result = await swarmManager2(ta)
server.stop() server.stop()
@ -263,7 +263,7 @@ proc test2(): Future[int] {.async.} =
proc test3(): Future[int] {.async.} = proc test3(): Future[int] {.async.} =
var ta = strAddress("127.0.0.1:31346") var ta = strAddress("127.0.0.1:31346")
var counter = 0 var counter = 0
var server = createStreamServer(ta, {ReuseAddr}, serveClient3) var server = createStreamServer(ta, serveClient3, {ReuseAddr})
server.start() server.start()
result = await swarmManager3(ta) result = await swarmManager3(ta)
server.stop() server.stop()
@ -272,7 +272,7 @@ proc test3(): Future[int] {.async.} =
proc test4(): Future[int] {.async.} = proc test4(): Future[int] {.async.} =
var ta = strAddress("127.0.0.1:31347") var ta = strAddress("127.0.0.1:31347")
var counter = 0 var counter = 0
var server = createStreamServer(ta, {ReuseAddr}, serveClient4) var server = createStreamServer(ta, serveClient4, {ReuseAddr})
server.start() server.start()
result = await swarmManager4(ta) result = await swarmManager4(ta)
server.stop() server.stop()