From 027e7c02a994e32b853e742cf80b4d7214131f09 Mon Sep 17 00:00:00 2001 From: cheatfate Date: Thu, 31 May 2018 11:03:58 +0300 Subject: [PATCH] Added DatagramServer. Changed StreamServer API a bit. Added test for DatagramServer. --- asyncdispatch2/transports/common.nim | 2 +- asyncdispatch2/transports/datagram.nim | 95 ++++++++++++++++++++++++-- asyncdispatch2/transports/stream.nim | 2 +- tests/testdatagram.nim | 89 ++++++++++++++++++++++++ tests/teststream.nim | 8 +-- 5 files changed, 184 insertions(+), 12 deletions(-) diff --git a/asyncdispatch2/transports/common.nim b/asyncdispatch2/transports/common.nim index d297bf57..8809830b 100644 --- a/asyncdispatch2/transports/common.nim +++ b/asyncdispatch2/transports/common.nim @@ -18,7 +18,7 @@ const type ServerFlags* = enum ## Server's flags - ReuseAddr, ReusePort + ReuseAddr, ReusePort, NoAutoRead TransportAddress* = object ## Transport network address diff --git a/asyncdispatch2/transports/datagram.nim b/asyncdispatch2/transports/datagram.nim index 6a777c2a..f49e594d 100644 --- a/asyncdispatch2/transports/datagram.nim +++ b/asyncdispatch2/transports/datagram.nim @@ -288,7 +288,10 @@ when defined(windows): len: int32(len(wresult.buffer))) GC_ref(wresult) result = cast[DatagramTransport](wresult) - result.resumeRead() + if NoAutoRead notin flags: + result.resumeRead() + else: + wresult.state.incl(ReadPaused) proc close*(transp: DatagramTransport) = ## Closes and frees resources of transport ``transp``. @@ -310,6 +313,9 @@ else: raddr: TransportAddress var cdata = cast[ptr CompletionData](udata) + if not isNil(cdata) and int(cdata.fd) == 0: + # Transport was closed earlier, exiting + return var transp = cast[DatagramTransport](cdata.udata) let fd = SocketHandle(cdata.fd) if not isNil(transp): @@ -333,11 +339,16 @@ else: break proc writeDatagramLoop(udata: pointer) = - var res: int = 0 + var + res: int + saddr: Sockaddr_storage + slen: SockLen + var cdata = cast[ptr CompletionData](udata) + if not isNil(cdata) and int(cdata.fd) == 0: + # Transport was closed earlier, exiting + return var transp = cast[DatagramTransport](cdata.udata) - var saddr: Sockaddr_storage - var slen: SockLen let fd = SocketHandle(cdata.fd) if not isNil(transp): if len(transp.queue) > 0: @@ -440,10 +451,12 @@ else: result.state = {WritePaused} result.future = newFuture[void]("datagram.transport") GC_ref(result) - result.resumeRead() + if NoAutoRead notin flags: + result.resumeRead() + else: + result.state.incl(ReadPaused) proc close*(transp: DatagramTransport) = - ## ZAH: This could use a destructor as well ## Closes and frees resources of transport ``transp``. if ReadClosed notin transp.state and WriteClosed notin transp.state: closeAsyncSocket(transp.fd) @@ -544,3 +557,73 @@ proc sendTo*(transp: DatagramTransport, pbytes: pointer, nbytes: int, await vector.writer if WriteError in transp.state: raise transp.getError() + +type + DatagramServer* = ref object of RootRef + transport*: DatagramTransport + status*: ServerStatus + +proc createDatagramServer*(host: TransportAddress, + cbproc: DatagramCallback, + flags: set[ServerFlags] = {}, + sock: AsyncFD = asyncInvalidSocket, + bufferSize: int = DefaultDatagramBufferSize, + udata: pointer = nil): DatagramServer = + var transp: DatagramTransport + var fflags = flags + {NoAutoRead} + if host.address.family == IpAddressFamily.IPv4: + transp = newDatagramTransport(cbproc, AnyAddress, host, sock, + fflags, udata, bufferSize) + else: + transp = newDatagramTransport6(cbproc, AnyAddress6, host, sock, + fflags, udata, bufferSize) + result = DatagramServer() + result.transport = transp + result.status = ServerStatus.Starting + GC_ref(result) + +proc start*(server: DatagramServer) = + ## Starts ``server``. + if server.status in {ServerStatus.Starting, ServerStatus.Paused}: + server.transport.resumeRead() + +proc stop*(server: DatagramServer) = + ## Stops ``server``. + if server.status in {ServerStatus.Paused, ServerStatus.Running}: + when defined(windows): + if server.status == ServerStatus.Running: + if {WritePending, ReadPending} * server.transport.state != {}: + ## CancelIO will stop both reading and writing. + discard cancelIo(Handle(server.transport.fd)) + else: + if server.status == ServerStatus.Running: + if WritePaused notin server.transport.state: + server.transport.fd.removeWriter() + if ReadPaused notin server.transport.state: + server.transport.fd.removeReader() + server.status = ServerStatus.Stopped + +proc pause*(server: DatagramServer) = + ## Pause ``server``. + if server.status == ServerStatus.Running: + when defined(windows): + if {WritePending, ReadPending} * server.transport.state != {}: + ## CancelIO will stop both reading and writing. + discard cancelIo(Handle(server.transport.fd)) + else: + if WritePaused notin server.transport.state: + server.transport.fd.removeWriter() + if ReadPaused notin server.transport.state: + server.transport.fd.removeReader() + server.status = ServerStatus.Paused + +proc join*(server: DatagramServer) {.async.} = + ## Waits until ``server`` is not stopped. + if not server.transport.future.finished: + await server.transport.future + +proc close*(server: DatagramServer) = + ## Release ``server`` resources. + if server.status == ServerStatus.Stopped: + server.transport.close() + GC_unref(server) diff --git a/asyncdispatch2/transports/stream.nim b/asyncdispatch2/transports/stream.nim index 63041b87..8973f143 100644 --- a/asyncdispatch2/transports/stream.nim +++ b/asyncdispatch2/transports/stream.nim @@ -770,8 +770,8 @@ proc close*(server: SocketServer) = GC_unref(server) proc createStreamServer*(host: TransportAddress, - flags: set[ServerFlags], cbproc: StreamCallback, + flags: set[ServerFlags] = {}, sock: AsyncFD = asyncInvalidSocket, backlog: int = 100, bufferSize: int = DefaultStreamBufferSize, diff --git a/tests/testdatagram.nim b/tests/testdatagram.nim index bfad4971..a38b8e6d 100644 --- a/tests/testdatagram.nim +++ b/tests/testdatagram.nim @@ -189,7 +189,92 @@ proc test3(bounded: bool): Future[int] {.async.} = for i in 0.. 0: + var answer = newString(nbytes + 1) + copyMem(addr answer[0], pbytes, nbytes) + answer.setLen(nbytes) + doAssert(answer.startsWith("ANSWER")) + var numstr = answer[6..^1] + var num = parseInt(numstr) + doAssert(num < MessagesCount) + results[num] = 1 + inc(counter) + if not future.finished: + future.complete() + + var transp = newDatagramTransport(receiver, + udata = addr counter, + remote = address) + for i in 0.. 0) + var request = newString(nbytes + 1) + copyMem(addr request[0], pbytes, nbytes) + request.setLen(nbytes) + doAssert(request.startsWith("REQUEST")) + var numstr = request[7..^1] + var num = parseInt(numstr) + var answer = "ANSWER" & $num + await transp.sendTo(addr answer[0], len(answer), raddr) + +proc test4(): Future[int] {.async.} = + var ta = strAddress("127.0.0.1:31346") + var counter = 0 + var server = createDatagramServer(ta, serveDatagramClient, {ReuseAddr}) + server.start() + result = await swarmManager(ta) + server.stop() + server.close() + when isMainModule: + echo waitFor(test4()) const m1 = "Unbounded test (" & $TestsCount & " messages)" m2 = "Bounded test (" & $TestsCount & " messages)" @@ -197,6 +282,8 @@ when isMainModule: " clients x " & $MessagesCount & " messages)" m4 = "Bounded multiple clients with messages (" & $ClientsCount & " clients x " & $MessagesCount & " messages)" + m5 = "DatagramServer multiple clients with messages (" & $ClientsCount & + " clients x " & $MessagesCount & " messages)" suite "Datagram Transport test suite": test m1: check waitFor(test1()) == TestsCount @@ -206,3 +293,5 @@ when isMainModule: check waitFor(test3(false)) == ClientsCount * MessagesCount test m4: check waitFor(test3(true)) == ClientsCount * MessagesCount + test m5: + check waitFor(test4()) == ClientsCount * MessagesCount diff --git a/tests/teststream.nim b/tests/teststream.nim index 92e73447..6dd58733 100644 --- a/tests/teststream.nim +++ b/tests/teststream.nim @@ -245,7 +245,7 @@ proc swarmManager4(address: TransportAddress): Future[int] {.async.} = proc test1(): Future[int] {.async.} = var ta = strAddress("127.0.0.1:31344") - var server = createStreamServer(ta, {ReuseAddr}, serveClient1) + var server = createStreamServer(ta, serveClient1, {ReuseAddr}) server.start() result = await swarmManager1(ta) server.stop() @@ -254,7 +254,7 @@ proc test1(): Future[int] {.async.} = proc test2(): Future[int] {.async.} = var ta = strAddress("127.0.0.1:31345") var counter = 0 - var server = createStreamServer(ta, {ReuseAddr}, serveClient2) + var server = createStreamServer(ta, serveClient2, {ReuseAddr}) server.start() result = await swarmManager2(ta) server.stop() @@ -263,7 +263,7 @@ proc test2(): Future[int] {.async.} = proc test3(): Future[int] {.async.} = var ta = strAddress("127.0.0.1:31346") var counter = 0 - var server = createStreamServer(ta, {ReuseAddr}, serveClient3) + var server = createStreamServer(ta, serveClient3, {ReuseAddr}) server.start() result = await swarmManager3(ta) server.stop() @@ -272,7 +272,7 @@ proc test3(): Future[int] {.async.} = proc test4(): Future[int] {.async.} = var ta = strAddress("127.0.0.1:31347") var counter = 0 - var server = createStreamServer(ta, {ReuseAddr}, serveClient4) + var server = createStreamServer(ta, serveClient4, {ReuseAddr}) server.start() result = await swarmManager4(ta) server.stop()