From 525aaf68377f39103b693ea9f7435901f274b74b Mon Sep 17 00:00:00 2001 From: cheatfate Date: Thu, 14 Jun 2018 09:49:59 +0300 Subject: [PATCH] Comment out asyncmacro2 skipStmtList(). Many changes in datagram.nim. Fixed testdatagram.nim. Fixed testserver.nim. --- asyncdispatch2/asyncmacro2.nim | 8 +- asyncdispatch2/transports/datagram.nim | 293 ++++++++++++++----------- asyncdispatch2/transports/stream.nim | 6 +- tests/testdatagram.nim | 264 +++++++++------------- tests/testserver.nim | 20 -- 5 files changed, 278 insertions(+), 313 deletions(-) diff --git a/asyncdispatch2/asyncmacro2.nim b/asyncdispatch2/asyncmacro2.nim index bbac3860..a7395e11 100644 --- a/asyncdispatch2/asyncmacro2.nim +++ b/asyncdispatch2/asyncmacro2.nim @@ -19,10 +19,10 @@ proc skipUntilStmtList(node: NimNode): NimNode {.compileTime.} = if node[0].kind == nnkStmtList: result = skipUntilStmtList(node[0]) -proc skipStmtList(node: NimNode): NimNode {.compileTime.} = - result = node - if node[0].kind == nnkStmtList: - result = node[0] +# proc skipStmtList(node: NimNode): NimNode {.compileTime.} = +# result = node +# if node[0].kind == nnkStmtList: +# result = node[0] template createCb(retFutureSym, iteratorNameSym, strName, identName, futureVarCompletions: untyped) = diff --git a/asyncdispatch2/transports/datagram.nim b/asyncdispatch2/transports/datagram.nim index 1fdc64c9..d6b58c8d 100644 --- a/asyncdispatch2/transports/datagram.nim +++ b/asyncdispatch2/transports/datagram.nim @@ -27,49 +27,23 @@ type buflen: int # Writer buffer size writer: Future[void] # Writer vector completion Future - DatagramServer* = ref object of RootRef - ## Datagram server object - transport*: DatagramTransport ## Datagram transport - status*: ServerStatus ## Current server status - DatagramCallback* = proc(transp: DatagramTransport, - pbytes: pointer, - nbytes: int, - remote: TransportAddress, - udata: pointer): Future[void] {.gcsafe.} - ## Datagram asynchronous receive callback. - ## ``transp`` - transport object - ## ``pbytes`` - pointer to data received - ## ``nbytes`` - number of bytes received - ## ``remote`` - remote peer address - ## ``udata`` - user-defined pointer, specified at Transport creation. - ## - ## ``pbytes`` will be `nil` and ``nbytes`` will be ``0``, if there an error - ## happens. + remote: TransportAddress): Future[void] {.gcsafe.} DatagramTransport* = ref object of RootRef - fd: AsyncFD # File descriptor + fd*: AsyncFD # File descriptor state: set[TransportState] # Current Transport state + flags: set[ServerFlags] # Flags buffer: seq[byte] # Reading buffer + buflen: int # Reading buffer effective size error: ref Exception # Current error queue: Deque[GramVector] # Writer queue local: TransportAddress # Local address remote: TransportAddress # Remote address - udata: pointer # User-driven pointer + udata*: pointer # User-driven pointer function: DatagramCallback # Receive data callback future: Future[void] # Transport's life future - -template setReadError(t, e: untyped) = - (t).state.incl(ReadError) - (t).error = newException(TransportOsError, osErrorMsg((e))) - -template setWriterWSABuffer(t, v: untyped) = - (t).wwsabuf.buf = cast[cstring](v.buf) - (t).wwsabuf.len = cast[int32](v.buflen) - -when defined(windows): - type - WindowsDatagramTransport* = ref object of DatagramTransport + when defined(windows): rovl: CustomOverlapped # Reader OVERLAPPED structure wovl: CustomOverlapped # Writer OVERLAPPED structure raddr: Sockaddr_storage # Reader address storage @@ -80,10 +54,20 @@ when defined(windows): wlen: SockLen # Writer address length wwsabuf: TWSABuf # Writer WSABUF structure +template setReadError(t, e: untyped) = + (t).state.incl(ReadError) + (t).error = newException(TransportOsError, osErrorMsg((e))) + +template setWriterWSABuffer(t, v: untyped) = + (t).wwsabuf.buf = cast[cstring](v.buf) + (t).wwsabuf.len = cast[int32](v.buflen) + +when defined(windows): + proc writeDatagramLoop(udata: pointer) = var bytesCount: int32 var ovl = cast[PtrCustomOverlapped](udata) - var transp = cast[WindowsDatagramTransport](ovl.data.udata) + var transp = cast[DatagramTransport](ovl.data.udata) while len(transp.queue) > 0: if WritePending in transp.state: ## Continuation @@ -140,7 +124,7 @@ when defined(windows): bytesCount: int32 raddr: TransportAddress var ovl = cast[PtrCustomOverlapped](udata) - var transp = cast[WindowsDatagramTransport](ovl.data.udata) + var transp = cast[DatagramTransport](ovl.data.udata) while true: if ReadPending in transp.state: ## Continuation @@ -151,11 +135,10 @@ when defined(windows): if err == OSErrorCode(-1): let bytesCount = transp.rovl.data.bytesCount if bytesCount == 0: - transp.state.incl(ReadEof) - transp.state.incl(ReadPaused) + transp.state.incl({ReadEof, ReadPaused}) fromSockAddr(transp.raddr, transp.ralen, raddr.address, raddr.port) - discard transp.function(transp, addr transp.buffer[0], bytesCount, - raddr, transp.udata) + transp.buflen = bytesCount + discard transp.function(transp, raddr) elif int(err) == ERROR_OPERATION_ABORTED: # CancelIO() interrupt transp.state.incl(ReadPaused) @@ -163,10 +146,11 @@ when defined(windows): else: transp.setReadError(err) transp.state.incl(ReadPaused) - discard transp.function(transp, nil, 0, raddr, transp.udata) + transp.buflen = 0 + discard transp.function(transp, raddr) else: ## Initiation - if (ReadEof notin transp.state) and (ReadClosed notin transp.state): + if transp.state * {ReadEof, ReadClosed, ReadError} == {}: transp.state.incl(ReadPending) let fd = SocketHandle(ovl.data.fd) transp.rflag = 0 @@ -184,7 +168,7 @@ when defined(windows): transp.state.incl(ReadPaused) elif int(err) == WSAECONNRESET: transp.state.excl(ReadPending) - transp.state = transp.state + {ReadPaused, ReadEof} + transp.state.incl({ReadPaused, ReadEof}) break elif int(err) == ERROR_IO_PENDING: discard @@ -192,18 +176,17 @@ when defined(windows): transp.state.excl(ReadPending) transp.state.incl(ReadPaused) transp.setReadError(err) - discard transp.function(transp, nil, 0, raddr, transp.udata) + transp.buflen = 0 + discard transp.function(transp, raddr) break proc resumeRead(transp: DatagramTransport) {.inline.} = - var wtransp = cast[WindowsDatagramTransport](transp) - wtransp.state.excl(ReadPaused) - readDatagramLoop(cast[pointer](addr wtransp.rovl)) + transp.state.excl(ReadPaused) + readDatagramLoop(cast[pointer](addr transp.rovl)) proc resumeWrite(transp: DatagramTransport) {.inline.} = - var wtransp = cast[WindowsDatagramTransport](transp) - wtransp.state.excl(WritePaused) - writeDatagramLoop(cast[pointer](addr wtransp.wovl)) + transp.state.excl(WritePaused) + writeDatagramLoop(cast[pointer](addr transp.wovl)) proc newDatagramTransportCommon(cbproc: DatagramCallback, remote: TransportAddress, @@ -211,12 +194,16 @@ when defined(windows): sock: AsyncFD, flags: set[ServerFlags], udata: pointer, + child: DatagramTransport, bufferSize: int): DatagramTransport = var localSock: AsyncFD assert(remote.address.family == local.address.family) assert(not isNil(cbproc)) - var wresult = new WindowsDatagramTransport + if isNil(child): + result = DatagramTransport() + else: + result = child if sock == asyncInvalidSocket: if local.address.family == IpAddressFamily.IPv4: @@ -251,7 +238,7 @@ when defined(windows): if sock == asyncInvalidSocket: closeAsyncSocket(localSock) raiseOsError(err) - wresult.local = local + result.local = local else: var saddr: Sockaddr_storage var slen: SockLen @@ -278,27 +265,26 @@ when defined(windows): if sock == asyncInvalidSocket: closeAsyncSocket(localSock) raiseOsError(err) - wresult.remote = remote + result.remote = remote - wresult.fd = localSock - wresult.function = cbproc - wresult.buffer = newSeq[byte](bufferSize) - wresult.queue = initDeque[GramVector]() - wresult.udata = udata - wresult.state = {WritePaused} - wresult.future = newFuture[void]("datagram.transport") - wresult.rovl.data = CompletionData(fd: localSock, cb: readDatagramLoop, - udata: cast[pointer](wresult)) - wresult.wovl.data = CompletionData(fd: localSock, cb: writeDatagramLoop, - udata: cast[pointer](wresult)) - wresult.rwsabuf = TWSABuf(buf: cast[cstring](addr wresult.buffer[0]), - len: int32(len(wresult.buffer))) - GC_ref(wresult) - result = cast[DatagramTransport](wresult) + result.fd = localSock + result.function = cbproc + result.buffer = newSeq[byte](bufferSize) + result.queue = initDeque[GramVector]() + result.udata = udata + result.state = {WritePaused} + result.future = newFuture[void]("datagram.transport") + result.rovl.data = CompletionData(fd: localSock, cb: readDatagramLoop, + udata: cast[pointer](result)) + result.wovl.data = CompletionData(fd: localSock, cb: writeDatagramLoop, + udata: cast[pointer](result)) + result.rwsabuf = TWSABuf(buf: cast[cstring](addr result.buffer[0]), + len: int32(len(result.buffer))) + GC_ref(result) if NoAutoRead notin flags: result.resumeRead() else: - wresult.state.incl(ReadPaused) + result.state.incl(ReadPaused) proc close*(transp: DatagramTransport) = ## Closes and frees resources of transport ``transp``. @@ -308,8 +294,9 @@ when defined(windows): transp.state.incl(WriteClosed) transp.state.incl(ReadClosed) transp.future.complete() - var wresult = cast[WindowsDatagramTransport](transp) - GC_unref(wresult) + if not isNil(transp.udata) and GCUserData in transp.flags: + GC_unref(cast[ref int](transp.udata)) + GC_unref(transp) else: @@ -334,15 +321,16 @@ else: addr slen) if res >= 0: fromSockAddr(saddr, slen, raddr.address, raddr.port) - discard transp.function(transp, addr transp.buffer[0], res, - raddr, transp.udata) + transp.buflen = res + discard transp.function(transp, raddr) else: let err = osLastError() if int(err) == EINTR: continue else: + transp.buflen = 0 transp.setReadError(err) - discard transp.function(transp, nil, 0, raddr, transp.udata) + discard transp.function(transp, raddr) break proc writeDatagramLoop(udata: pointer) = @@ -352,7 +340,7 @@ else: slen: SockLen var cdata = cast[ptr CompletionData](udata) - if not isNil(cdata) and int(cdata.fd) == 0: + if not isNil(cdata) and (int(cdata.fd) == 0 or isNil(cdata.udata)): # Transport was closed earlier, exiting return var transp = cast[DatagramTransport](cdata.udata) @@ -396,12 +384,16 @@ else: sock: AsyncFD, flags: set[ServerFlags], udata: pointer, + child: DatagramTransport = nil, bufferSize: int): DatagramTransport = var localSock: AsyncFD assert(remote.address.family == local.address.family) assert(not isNil(cbproc)) - result = new DatagramTransport + if isNil(child): + result = DatagramTransport() + else: + result = child if sock == asyncInvalidSocket: if local.address.family == IpAddressFamily.IPv4: @@ -452,6 +444,7 @@ else: result.fd = localSock result.function = cbproc + result.flags = flags result.buffer = newSeq[byte](bufferSize) result.queue = initDeque[GramVector]() result.udata = udata @@ -465,10 +458,9 @@ else: proc close*(transp: DatagramTransport) = ## Closes and frees resources of transport ``transp``. - if ReadClosed notin transp.state and WriteClosed notin transp.state: + if {ReadClosed, WriteClosed} * transp.state == {}: closeAsyncSocket(transp.fd) - transp.state.incl(WriteClosed) - transp.state.incl(ReadClosed) + transp.state.incl({WriteClosed, ReadClosed}) transp.future.complete() GC_unref(transp) @@ -478,6 +470,7 @@ proc newDatagramTransport*(cbproc: DatagramCallback, sock: AsyncFD = asyncInvalidSocket, flags: set[ServerFlags] = {}, udata: pointer = nil, + child: DatagramTransport = nil, bufSize: int = DefaultDatagramBufferSize ): DatagramTransport = ## Create new UDP datagram transport (IPv4). @@ -491,7 +484,21 @@ proc newDatagramTransport*(cbproc: DatagramCallback, ## ``udata`` - custom argument which will be passed to ``cbproc``. ## ``bufSize`` - size of internal buffer result = newDatagramTransportCommon(cbproc, remote, local, sock, - flags, udata, bufSize) + flags, udata, child, bufSize) + +proc newDatagramTransport*[T](cbproc: DatagramCallback, + udata: ref T, + remote: TransportAddress = AnyAddress, + local: TransportAddress = AnyAddress, + sock: AsyncFD = asyncInvalidSocket, + flags: set[ServerFlags] = {}, + child: DatagramTransport = nil, + bufSize: int = DefaultDatagramBufferSize + ): DatagramTransport = + var fflags = flags + {GCUserData} + GC_ref(udata) + result = newDatagramTransportCommon(cbproc, remote, local, sock, + fflags, udata, child, bufSize) proc newDatagramTransport6*(cbproc: DatagramCallback, remote: TransportAddress = AnyAddress6, @@ -499,6 +506,7 @@ proc newDatagramTransport6*(cbproc: DatagramCallback, sock: AsyncFD = asyncInvalidSocket, flags: set[ServerFlags] = {}, udata: pointer = nil, + child: DatagramTransport = nil, bufSize: int = DefaultDatagramBufferSize ): DatagramTransport = ## Create new UDP datagram transport (IPv6). @@ -512,7 +520,21 @@ proc newDatagramTransport6*(cbproc: DatagramCallback, ## ``udata`` - custom argument which will be passed to ``cbproc``. ## ``bufSize`` - size of internal buffer. result = newDatagramTransportCommon(cbproc, remote, local, sock, - flags, udata, bufSize) + flags, udata, child, bufSize) + +proc newDatagramTransport6*[T](cbproc: DatagramCallback, + udata: ref T, + remote: TransportAddress = AnyAddress6, + local: TransportAddress = AnyAddress6, + sock: AsyncFD = asyncInvalidSocket, + flags: set[ServerFlags] = {}, + child: DatagramTransport = nil, + bufSize: int = DefaultDatagramBufferSize + ): DatagramTransport = + var fflags = flags + {GCUserData} + GC_ref(udata) + result = newDatagramTransportCommon(cbproc, remote, local, sock, + fflags, udata, child, bufSize) proc join*(transp: DatagramTransport) {.async.} = ## Wait until the transport ``transp`` will be closed. @@ -608,53 +630,74 @@ proc sendTo*[T](transp: DatagramTransport, msg: var seq[T], transp.resumeWrite() return retFuture -proc createDatagramServer*(host: TransportAddress, - cbproc: DatagramCallback, - flags: set[ServerFlags] = {}, - sock: AsyncFD = asyncInvalidSocket, - bufferSize: int = DefaultDatagramBufferSize, - udata: pointer = nil): DatagramServer = - var transp: DatagramTransport - var fflags = flags + {NoAutoRead} - if host.address.family == IpAddressFamily.IPv4: - transp = newDatagramTransport(cbproc, AnyAddress, host, sock, - fflags, udata, bufferSize) - else: - transp = newDatagramTransport6(cbproc, AnyAddress6, host, sock, - fflags, udata, bufferSize) - result = DatagramServer() - result.transport = transp - result.status = ServerStatus.Starting - GC_ref(result) +proc peekMessage*(transp: DatagramTransport, msg: var seq[byte], + msglen: var int) = + ## Get access to internal message buffer and length of incoming datagram. + if ReadError in transp.state: + raise transp.getError() + shallowCopy(msg, transp.buffer) + msglen = transp.buflen -proc start*(server: DatagramServer) = - ## Starts ``server``. - if server.status == ServerStatus.Starting: - server.transport.resumeRead() - server.status = ServerStatus.Running +proc getMessage*(transp: DatagramTransport): seq[byte] = + ## Copy data from internal message buffer and return result. + if ReadError in transp.state: + raise transp.getError() + if transp.buflen > 0: + result = newSeq[byte](transp.buflen) + copyMem(addr result[0], addr transp.buffer[0], transp.buflen) -proc stop*(server: DatagramServer) = - ## Stops ``server``. - if server.status == ServerStatus.Running: - when defined(windows): - if {WritePending, ReadPending} * server.transport.state != {}: - ## CancelIO will stop both reading and writing. - discard cancelIo(Handle(server.transport.fd)) - else: - if WritePaused notin server.transport.state: - server.transport.fd.removeWriter() - if ReadPaused notin server.transport.state: - server.transport.fd.removeReader() - server.status = ServerStatus.Stopped +proc getUserData*[T](transp: DatagramTransport): T {.inline.} = + ## Obtain user data stored in ``transp`` object. + result = cast[T](transp.udata) -proc join*(server: DatagramServer) {.async.} = - ## Waits until ``server`` is not stopped. - if not server.transport.future.finished: - await server.transport.future -proc close*(server: DatagramServer) = - ## Release ``server`` resources. - if server.status == ServerStatus.Stopped: - server.status = ServerStatus.Closed - server.transport.close() - GC_unref(server) +# proc createDatagramServer*(host: TransportAddress, +# cbproc: DatagramCallback, +# flags: set[ServerFlags] = {}, +# sock: AsyncFD = asyncInvalidSocket, +# bufferSize: int = DefaultDatagramBufferSize, +# udata: pointer = nil): DatagramServer = +# var transp: DatagramTransport +# var fflags = flags + {NoAutoRead} +# if host.address.family == IpAddressFamily.IPv4: +# transp = newDatagramTransport(cbproc, AnyAddress, host, sock, +# fflags, udata, bufferSize) +# else: +# transp = newDatagramTransport6(cbproc, AnyAddress6, host, sock, +# fflags, udata, bufferSize) +# result = DatagramServer() +# result.transport = transp +# result.status = ServerStatus.Starting +# GC_ref(result) + +# proc start*(server: DatagramServer) = +# ## Starts ``server``. +# if server.status == ServerStatus.Starting: +# server.transport.resumeRead() +# server.status = ServerStatus.Running + +# proc stop*(server: DatagramServer) = +# ## Stops ``server``. +# if server.status == ServerStatus.Running: +# when defined(windows): +# if {WritePending, ReadPending} * server.transport.state != {}: +# ## CancelIO will stop both reading and writing. +# discard cancelIo(Handle(server.transport.fd)) +# else: +# if WritePaused notin server.transport.state: +# server.transport.fd.removeWriter() +# if ReadPaused notin server.transport.state: +# server.transport.fd.removeReader() +# server.status = ServerStatus.Stopped + +# proc join*(server: DatagramServer) {.async.} = +# ## Waits until ``server`` is not stopped. +# if not server.transport.future.finished: +# await server.transport.future + +# proc close*(server: DatagramServer) = +# ## Release ``server`` resources. +# if server.status == ServerStatus.Stopped: +# server.status = ServerStatus.Closed +# server.transport.close() +# GC_unref(server) diff --git a/asyncdispatch2/transports/stream.nim b/asyncdispatch2/transports/stream.nim index e875c8c7..f54e6fe9 100644 --- a/asyncdispatch2/transports/stream.nim +++ b/asyncdispatch2/transports/stream.nim @@ -300,8 +300,7 @@ when defined(windows): if err == OSErrorCode(-1): let bytesCount = transp.rovl.data.bytesCount if bytesCount == 0: - transp.state.incl(ReadEof) - transp.state.incl(ReadPaused) + transp.state.incl({ReadEof, ReadPaused}) else: if transp.offset != transp.roffset: moveMem(addr transp.buffer[transp.offset], @@ -1206,7 +1205,6 @@ proc close*(transp: StreamTransport) = when defined(windows): discard cancelIo(Handle(transp.fd)) closeAsyncSocket(transp.fd) - transp.state.incl(WriteClosed) - transp.state.incl(ReadClosed) + transp.state.incl({WriteClosed, ReadClosed}) transp.future.complete() GC_unref(transp) diff --git a/tests/testdatagram.nim b/tests/testdatagram.nim index 15bd0dbd..8bc0fee6 100644 --- a/tests/testdatagram.nim +++ b/tests/testdatagram.nim @@ -14,11 +14,14 @@ const ClientsCount = 100 MessagesCount = 100 -proc client1(transp: DatagramTransport, pbytes: pointer, nbytes: int, - raddr: TransportAddress, udata: pointer): Future[void] {.async.} = - if not isNil(pbytes): +proc client1(transp: DatagramTransport, + raddr: TransportAddress): Future[void] {.async.} = + var pbytes: seq[byte] + var nbytes: int + transp.peekMessage(pbytes, nbytes) + if nbytes > 0: var data = newString(nbytes + 1) - copyMem(addr data[0], pbytes, nbytes) + copyMem(addr data[0], addr pbytes[0], nbytes) data.setLen(nbytes) if data.startsWith("REQUEST"): var numstr = data[7..^1] @@ -29,19 +32,21 @@ proc client1(transp: DatagramTransport, pbytes: pointer, nbytes: int, var err = "ERROR" await transp.sendTo(addr err[0], len(err), raddr) else: - ## Read operation failed with error - var counterPtr = cast[ptr int](udata) + var counterPtr = cast[ptr int](transp.udata) counterPtr[] = -1 transp.close() -proc client2(transp: DatagramTransport, pbytes: pointer, nbytes: int, - raddr: TransportAddress, udata: pointer): Future[void] {.async.} = - if not isNil(pbytes): +proc client2(transp: DatagramTransport, + raddr: TransportAddress): Future[void] {.async.} = + var pbytes: seq[byte] + var nbytes: int + transp.peekMessage(pbytes, nbytes) + if nbytes > 0: var data = newString(nbytes + 1) - copyMem(addr data[0], pbytes, nbytes) + copyMem(addr data[0], addr pbytes[0], nbytes) data.setLen(nbytes) if data.startsWith("ANSWER"): - var counterPtr = cast[ptr int](udata) + var counterPtr = cast[ptr int](transp.udata) counterPtr[] = counterPtr[] + 1 if counterPtr[] == TestsCount: transp.close() @@ -50,23 +55,26 @@ proc client2(transp: DatagramTransport, pbytes: pointer, nbytes: int, var req = "REQUEST" & $counterPtr[] await transp.sendTo(addr req[0], len(req), ta) else: - var counterPtr = cast[ptr int](udata) + var counterPtr = cast[ptr int](transp.udata) counterPtr[] = -1 transp.close() else: ## Read operation failed with error - var counterPtr = cast[ptr int](udata) + var counterPtr = cast[ptr int](transp.udata) counterPtr[] = -1 transp.close() -proc client3(transp: DatagramTransport, pbytes: pointer, nbytes: int, - raddr: TransportAddress, udata: pointer): Future[void] {.async.} = - if not isNil(pbytes): +proc client3(transp: DatagramTransport, + raddr: TransportAddress): Future[void] {.async.} = + var pbytes: seq[byte] + var nbytes: int + transp.peekMessage(pbytes, nbytes) + if nbytes > 0: var data = newString(nbytes + 1) - copyMem(addr data[0], pbytes, nbytes) + copyMem(addr data[0], addr pbytes[0], nbytes) data.setLen(nbytes) if data.startsWith("ANSWER"): - var counterPtr = cast[ptr int](udata) + var counterPtr = cast[ptr int](transp.udata) counterPtr[] = counterPtr[] + 1 if counterPtr[] == TestsCount: transp.close() @@ -74,23 +82,26 @@ proc client3(transp: DatagramTransport, pbytes: pointer, nbytes: int, var req = "REQUEST" & $counterPtr[] await transp.send(addr req[0], len(req)) else: - var counterPtr = cast[ptr int](udata) + var counterPtr = cast[ptr int](transp.udata) counterPtr[] = -1 transp.close() else: ## Read operation failed with error - var counterPtr = cast[ptr int](udata) + var counterPtr = cast[ptr int](transp.udata) counterPtr[] = -1 transp.close() -proc client4(transp: DatagramTransport, pbytes: pointer, nbytes: int, - raddr: TransportAddress, udata: pointer): Future[void] {.async.} = - if not isNil(pbytes): +proc client4(transp: DatagramTransport, + raddr: TransportAddress): Future[void] {.async.} = + var pbytes: seq[byte] + var nbytes: int + transp.peekMessage(pbytes, nbytes) + if nbytes > 0: var data = newString(nbytes + 1) - copyMem(addr data[0], pbytes, nbytes) + copyMem(addr data[0], addr pbytes[0], nbytes) data.setLen(nbytes) if data.startsWith("ANSWER"): - var counterPtr = cast[ptr int](udata) + var counterPtr = cast[ptr int](transp.udata) counterPtr[] = counterPtr[] + 1 if counterPtr[] == MessagesCount: transp.close() @@ -98,23 +109,26 @@ proc client4(transp: DatagramTransport, pbytes: pointer, nbytes: int, var req = "REQUEST" & $counterPtr[] await transp.send(addr req[0], len(req)) else: - var counterPtr = cast[ptr int](udata) + var counterPtr = cast[ptr int](transp.udata) counterPtr[] = -1 transp.close() else: ## Read operation failed with error - var counterPtr = cast[ptr int](udata) + var counterPtr = cast[ptr int](transp.udata) counterPtr[] = -1 transp.close() -proc client5(transp: DatagramTransport, pbytes: pointer, nbytes: int, - raddr: TransportAddress, udata: pointer): Future[void] {.async.} = - if not isNil(pbytes): +proc client5(transp: DatagramTransport, + raddr: TransportAddress): Future[void] {.async.} = + var pbytes: seq[byte] + var nbytes: int + transp.peekMessage(pbytes, nbytes) + if nbytes > 0: var data = newString(nbytes + 1) - copyMem(addr data[0], pbytes, nbytes) + copyMem(addr data[0], addr pbytes[0], nbytes) data.setLen(nbytes) if data.startsWith("ANSWER"): - var counterPtr = cast[ptr int](udata) + var counterPtr = cast[ptr int](transp.udata) counterPtr[] = counterPtr[] + 1 if counterPtr[] == MessagesCount: transp.close() @@ -123,20 +137,23 @@ proc client5(transp: DatagramTransport, pbytes: pointer, nbytes: int, var req = "REQUEST" & $counterPtr[] await transp.sendTo(addr req[0], len(req), ta) else: - var counterPtr = cast[ptr int](udata) + var counterPtr = cast[ptr int](transp.udata) counterPtr[] = -1 transp.close() else: ## Read operation failed with error - var counterPtr = cast[ptr int](udata) + var counterPtr = cast[ptr int](transp.udata) counterPtr[] = -1 transp.close() -proc client6(transp: DatagramTransport, pbytes: pointer, nbytes: int, - raddr: TransportAddress, udata: pointer): Future[void] {.async.} = - if not isNil(pbytes): +proc client6(transp: DatagramTransport, + raddr: TransportAddress): Future[void] {.async.} = + var pbytes: seq[byte] + var nbytes: int + transp.peekMessage(pbytes, nbytes) + if nbytes > 0: var data = newString(nbytes + 1) - copyMem(addr data[0], pbytes, nbytes) + copyMem(addr data[0], addr pbytes[0], nbytes) data.setLen(nbytes) if data.startsWith("REQUEST"): var numstr = data[7..^1] @@ -148,18 +165,21 @@ proc client6(transp: DatagramTransport, pbytes: pointer, nbytes: int, await transp.sendTo(err, raddr) else: ## Read operation failed with error - var counterPtr = cast[ptr int](udata) + var counterPtr = cast[ptr int](transp.udata) counterPtr[] = -1 transp.close() -proc client7(transp: DatagramTransport, pbytes: pointer, nbytes: int, - raddr: TransportAddress, udata: pointer): Future[void] {.async.} = - if not isNil(pbytes): +proc client7(transp: DatagramTransport, + raddr: TransportAddress): Future[void] {.async.} = + var pbytes: seq[byte] + var nbytes: int + transp.peekMessage(pbytes, nbytes) + if nbytes > 0: var data = newString(nbytes + 1) - copyMem(addr data[0], pbytes, nbytes) + copyMem(addr data[0], addr pbytes[0], nbytes) data.setLen(nbytes) if data.startsWith("ANSWER"): - var counterPtr = cast[ptr int](udata) + var counterPtr = cast[ptr int](transp.udata) counterPtr[] = counterPtr[] + 1 if counterPtr[] == TestsCount: transp.close() @@ -168,23 +188,26 @@ proc client7(transp: DatagramTransport, pbytes: pointer, nbytes: int, var req = "REQUEST" & $counterPtr[] await transp.sendTo(req, ta) else: - var counterPtr = cast[ptr int](udata) + var counterPtr = cast[ptr int](transp.udata) counterPtr[] = -1 transp.close() else: ## Read operation failed with error - var counterPtr = cast[ptr int](udata) + var counterPtr = cast[ptr int](transp.udata) counterPtr[] = -1 transp.close() -proc client8(transp: DatagramTransport, pbytes: pointer, nbytes: int, - raddr: TransportAddress, udata: pointer): Future[void] {.async.} = - if not isNil(pbytes): +proc client8(transp: DatagramTransport, + raddr: TransportAddress): Future[void] {.async.} = + var pbytes: seq[byte] + var nbytes: int + transp.peekMessage(pbytes, nbytes) + if nbytes > 0: var data = newString(nbytes + 1) - copyMem(addr data[0], pbytes, nbytes) + copyMem(addr data[0], addr pbytes[0], nbytes) data.setLen(nbytes) if data.startsWith("ANSWER"): - var counterPtr = cast[ptr int](udata) + var counterPtr = cast[ptr int](transp.udata) counterPtr[] = counterPtr[] + 1 if counterPtr[] == TestsCount: transp.close() @@ -192,20 +215,23 @@ proc client8(transp: DatagramTransport, pbytes: pointer, nbytes: int, var req = "REQUEST" & $counterPtr[] await transp.send(req) else: - var counterPtr = cast[ptr int](udata) + var counterPtr = cast[ptr int](transp.udata) counterPtr[] = -1 transp.close() else: ## Read operation failed with error - var counterPtr = cast[ptr int](udata) + var counterPtr = cast[ptr int](transp.udata) counterPtr[] = -1 transp.close() -proc client9(transp: DatagramTransport, pbytes: pointer, nbytes: int, - raddr: TransportAddress, udata: pointer): Future[void] {.async.} = - if not isNil(pbytes): +proc client9(transp: DatagramTransport, + raddr: TransportAddress): Future[void] {.async.} = + var pbytes: seq[byte] + var nbytes: int + transp.peekMessage(pbytes, nbytes) + if nbytes > 0: var data = newString(nbytes + 1) - copyMem(addr data[0], pbytes, nbytes) + copyMem(addr data[0], addr pbytes[0], nbytes) data.setLen(nbytes) if data.startsWith("REQUEST"): var numstr = data[7..^1] @@ -221,18 +247,21 @@ proc client9(transp: DatagramTransport, pbytes: pointer, nbytes: int, await transp.sendTo(errseq, raddr) else: ## Read operation failed with error - var counterPtr = cast[ptr int](udata) + var counterPtr = cast[ptr int](transp.udata) counterPtr[] = -1 transp.close() -proc client10(transp: DatagramTransport, pbytes: pointer, nbytes: int, - raddr: TransportAddress, udata: pointer): Future[void] {.async.} = - if not isNil(pbytes): +proc client10(transp: DatagramTransport, + raddr: TransportAddress): Future[void] {.async.} = + var pbytes: seq[byte] + var nbytes: int + transp.peekMessage(pbytes, nbytes) + if nbytes > 0: var data = newString(nbytes + 1) - copyMem(addr data[0], pbytes, nbytes) + copyMem(addr data[0], addr pbytes[0], nbytes) data.setLen(nbytes) if data.startsWith("ANSWER"): - var counterPtr = cast[ptr int](udata) + var counterPtr = cast[ptr int](transp.udata) counterPtr[] = counterPtr[] + 1 if counterPtr[] == TestsCount: transp.close() @@ -243,23 +272,26 @@ proc client10(transp: DatagramTransport, pbytes: pointer, nbytes: int, copyMem(addr reqseq[0], addr req[0], len(req)) await transp.sendTo(reqseq, ta) else: - var counterPtr = cast[ptr int](udata) + var counterPtr = cast[ptr int](transp.udata) counterPtr[] = -1 transp.close() else: ## Read operation failed with error - var counterPtr = cast[ptr int](udata) + var counterPtr = cast[ptr int](transp.udata) counterPtr[] = -1 transp.close() -proc client11(transp: DatagramTransport, pbytes: pointer, nbytes: int, - raddr: TransportAddress, udata: pointer): Future[void] {.async.} = - if not isNil(pbytes): +proc client11(transp: DatagramTransport, + raddr: TransportAddress): Future[void] {.async.} = + var pbytes: seq[byte] + var nbytes: int + transp.peekMessage(pbytes, nbytes) + if nbytes > 0: var data = newString(nbytes + 1) - copyMem(addr data[0], pbytes, nbytes) + copyMem(addr data[0], addr pbytes[0], nbytes) data.setLen(nbytes) if data.startsWith("ANSWER"): - var counterPtr = cast[ptr int](udata) + var counterPtr = cast[ptr int](transp.udata) counterPtr[] = counterPtr[] + 1 if counterPtr[] == TestsCount: transp.close() @@ -269,12 +301,12 @@ proc client11(transp: DatagramTransport, pbytes: pointer, nbytes: int, copyMem(addr reqseq[0], addr req[0], len(req)) await transp.send(reqseq) else: - var counterPtr = cast[ptr int](udata) + var counterPtr = cast[ptr int](transp.udata) counterPtr[] = -1 transp.close() else: ## Read operation failed with error - var counterPtr = cast[ptr int](udata) + var counterPtr = cast[ptr int](transp.udata) counterPtr[] = -1 transp.close() @@ -397,90 +429,6 @@ proc test3(bounded: bool): Future[int] {.async.} = for i in 0.. 0: - var answer = newString(nbytes + 1) - copyMem(addr answer[0], pbytes, nbytes) - answer.setLen(nbytes) - doAssert(answer.startsWith("ANSWER")) - var numstr = answer[6..^1] - var num = parseInt(numstr) - doAssert(num < MessagesCount) - results[num] = 1 - inc(counter) - if not future.finished: - future.complete() - - var transp = newDatagramTransport(receiver, - udata = addr counter, - remote = address) - for i in 0.. 0) - var request = newString(nbytes + 1) - copyMem(addr request[0], pbytes, nbytes) - request.setLen(nbytes) - doAssert(request.startsWith("REQUEST")) - var numstr = request[7..^1] - var num = parseInt(numstr) - var answer = "ANSWER" & $num - await transp.sendTo(addr answer[0], len(answer), raddr) - -proc test4(): Future[int] {.async.} = - var ta = initTAddress("127.0.0.1:31346") - var counter = 0 - var server = createDatagramServer(ta, serveDatagramClient, {ReuseAddr}) - server.start() - result = await swarmManager(ta) - server.stop() - server.close() - when isMainModule: const m1 = "sendTo(pointer) test (" & $TestsCount & " messages)" @@ -493,8 +441,6 @@ when isMainModule: " clients x " & $MessagesCount & " messages)" m8 = "Bounded multiple clients with messages (" & $ClientsCount & " clients x " & $MessagesCount & " messages)" - m9 = "DatagramServer multiple clients with messages (" & $ClientsCount & - " clients x " & $MessagesCount & " messages)" suite "Datagram Transport test suite": test m1: check waitFor(testPointerSendTo()) == TestsCount @@ -512,5 +458,3 @@ when isMainModule: check waitFor(test3(false)) == ClientsCount * MessagesCount test m8: check waitFor(test3(true)) == ClientsCount * MessagesCount - # test m9: - # check waitFor(test4()) == ClientsCount * MessagesCount diff --git a/tests/testserver.nim b/tests/testserver.nim index 8343d84f..74cb1d1e 100644 --- a/tests/testserver.nim +++ b/tests/testserver.nim @@ -50,12 +50,6 @@ proc customServerTransport(server: StreamServer, transp.test = "CUSTOM" result = cast[StreamTransport](transp) -proc serveDatagramClient(transp: DatagramTransport, - pbytes: pointer, nbytes: int, - raddr: TransportAddress, - udata: pointer): Future[void] {.async.} = - discard - proc test1(): bool = var ta = initTAddress("127.0.0.1:31354") var server1 = createStreamServer(ta, serveStreamClient, {ReuseAddr}) @@ -68,18 +62,6 @@ proc test1(): bool = server2.close() result = true -proc test2(): bool = - var ta = initTAddress("127.0.0.1:31354") - var server1 = createDatagramServer(ta, serveDatagramClient, {ReuseAddr}) - server1.start() - server1.stop() - server1.close() - var server2 = createDatagramServer(ta, serveDatagramClient, {ReuseAddr}) - server2.start() - server2.stop() - server2.close() - result = true - proc client1(server: CustomServer, ta: TransportAddress) {.async.} = var transp = CustomTransport() transp.test = "CLIENT" @@ -130,7 +112,5 @@ when isMainModule: check test1() == true test "Stream Server inherited object test": check test3() == true - test "Datagram Server start/stop test": - check test2() == true test "StreamServer[T] test": check test4() == true