From 3cb521c9206ace81872ddb03ba437916ab297e86 Mon Sep 17 00:00:00 2001 From: cheatfate Date: Tue, 5 Jun 2018 08:51:59 +0300 Subject: [PATCH] Add datagram transport utility templates send(string) send(seq[byte]). Fix bugs in stream.nim Add more tests for stream.nim --- asyncdispatch2/transports/datagram.nim | 22 ++++++ asyncdispatch2/transports/stream.nim | 78 ++++++++++---------- tests/teststream.nim | 99 ++++++++++++++++++++++++-- 3 files changed, 152 insertions(+), 47 deletions(-) diff --git a/asyncdispatch2/transports/datagram.nim b/asyncdispatch2/transports/datagram.nim index af4b67de..b8b7a968 100644 --- a/asyncdispatch2/transports/datagram.nim +++ b/asyncdispatch2/transports/datagram.nim @@ -559,6 +559,28 @@ proc sendTo*(transp: DatagramTransport, pbytes: pointer, nbytes: int, if WriteError in transp.state: raise transp.getError() +template send*(transp: DatagramTransport, msg: var string): untyped = + ## Send message ``msg`` using transport ``transp`` to remote destination + ## address which was bounded on transport. + send(transp, addr msg[0], len(msg)) + +template send*(transp: DatagramTransport, msg: var seq[byte]): untyped = + ## Send message ``msg`` using transport ``transp`` to remote destination + ## address which was bounded on transport. + send(transp, addr msg[0], len(msg)) + +template sendTo*(transp: DatagramTransport, msg: var string, + remote: TransportAddress): untyped = + ## Send message ``msg`` using transport ``transp`` to remote + ## destination address ``remote``. + sendTo(transp, addr msg[0], len(msg), remote) + +template sendTo*(transp: DatagramTransport, msg: var seq[byte], + remote: TransportAddress): untyped = + ## Send message ``msg`` using transport ``transp`` to remote + ## destination address ``remote``. + sendTo(transp, addr msg[0], len(msg), remote) + proc createDatagramServer*(host: TransportAddress, cbproc: DatagramCallback, flags: set[ServerFlags] = {}, diff --git a/asyncdispatch2/transports/stream.nim b/asyncdispatch2/transports/stream.nim index dab5b066..196a416a 100644 --- a/asyncdispatch2/transports/stream.nim +++ b/asyncdispatch2/transports/stream.nim @@ -680,7 +680,7 @@ else: let err = osLastError() if int(err) == EINTR: continue - elif int(err) in {EBADF, EINVAL, ENOTSOCK, EOPNOTSUPP, EPROTO}: + else: ## Critical unrecoverable error raiseOsError(err) @@ -859,7 +859,7 @@ proc readExactly*(transp: StreamTransport, pbytes: pointer, if transp.offset == 0: if (ReadError in transp.state): raise transp.getError() - if ReadClosed in transp.state or transp.atEof(): + if (ReadClosed in transp.state) or transp.atEof(): raise newException(TransportIncompleteError, "Data incomplete!") if transp.offset >= (nbytes - index): @@ -894,7 +894,7 @@ proc readOnce*(transp: StreamTransport, pbytes: pointer, if transp.offset == 0: if (ReadError in transp.state): raise transp.getError() - if (ReadEof in transp.state) or (ReadClosed in transp.state): + if (ReadClosed in transp.state) or transp.atEof(): result = 0 break transp.reader = newFuture[void]("stream.transport.readOnce") @@ -937,13 +937,10 @@ proc readUntil*(transp: StreamTransport, pbytes: pointer, nbytes: int, var index = 0 while true: - if (transp.offset - index) == 0: - if ReadError in transp.state: - transp.shiftBuffer(index) - raise transp.getError() - if (ReadEof in transp.state) or (ReadClosed in transp.state): - transp.shiftBuffer(index) - raise newException(TransportIncompleteError, "Data incomplete!") + if ReadError in transp.state: + raise transp.getError() + if (ReadClosed in transp.state) or transp.atEof(): + raise newException(TransportIncompleteError, "Data incomplete!") index = 0 while index < transp.offset: @@ -957,25 +954,23 @@ proc readUntil*(transp: StreamTransport, pbytes: pointer, nbytes: int, inc(k) else: raise newException(TransportLimitError, "Limit reached!") - if state == len(sep): - transp.shiftBuffer(index + 1) break - inc(index) if state == len(sep): + transp.shiftBuffer(index + 1) result = k break else: - if (transp.offset - index) == 0: - transp.reader = newFuture[void]("stream.transport.readUntil") - if ReadPaused in transp.state: - transp.resumeRead() - await transp.reader - # we need to clear transp.reader to avoid double completion of this - # Future[T], because readLoop continues working. - transp.reader = nil + transp.shiftBuffer(transp.offset) + transp.reader = newFuture[void]("stream.transport.readUntil") + if ReadPaused in transp.state: + transp.resumeRead() + await transp.reader + # we need to clear transp.reader to avoid double completion of this + # Future[T], because readLoop continues working. + transp.reader = nil proc readLine*(transp: StreamTransport, limit = 0, sep = "\r\n"): Future[string] {.async.} = @@ -998,13 +993,10 @@ proc readLine*(transp: StreamTransport, limit = 0, var index = 0 while true: - if (transp.offset - index) == 0: - if (ReadError in transp.state): - transp.shiftBuffer(index) - raise transp.getError() - if (ReadEof in transp.state) or (ReadClosed in transp.state): - transp.shiftBuffer(index) - break + if (ReadError in transp.state): + raise transp.getError() + if (ReadClosed in transp.state) or transp.atEof(): + break index = 0 while index < transp.offset: @@ -1025,17 +1017,17 @@ proc readLine*(transp: StreamTransport, limit = 0, if (state == len(sep)) or (lim == len(result)): break else: - if (transp.offset - index) == 0: - transp.reader = newFuture[void]("stream.transport.readLine") - if ReadPaused in transp.state: - transp.resumeRead() - await transp.reader - # we need to clear transp.reader to avoid double completion of this - # Future[T], because readLoop continues working. - transp.reader = nil + transp.shiftBuffer(transp.offset) + transp.reader = newFuture[void]("stream.transport.readLine") + if ReadPaused in transp.state: + transp.resumeRead() + await transp.reader + # we need to clear transp.reader to avoid double completion of this + # Future[T], because readLoop continues working. + transp.reader = nil proc read*(transp: StreamTransport, n = -1): Future[seq[byte]] {.async.} = - ## Read all bytes (n == -1) or `n` bytes from transport ``transp``. + ## Read all bytes (n == -1) or exactly `n` bytes from transport ``transp``. ## ## This procedure allocates buffer seq[byte] and return it as result. checkClosed(transp) @@ -1044,9 +1036,9 @@ proc read*(transp: StreamTransport, n = -1): Future[seq[byte]] {.async.} = while true: if (ReadError in transp.state): raise transp.getError() - if ReadClosed in transp.state or transp.atEof(): + if (ReadClosed in transp.state) or transp.atEof(): break - + if transp.offset > 0: let s = len(result) let o = s + transp.offset @@ -1057,12 +1049,13 @@ proc read*(transp: StreamTransport, n = -1): Future[seq[byte]] {.async.} = transp.offset) transp.offset = 0 else: - if transp.offset >= (n - s): + let left = n - s + if transp.offset >= left: # size of buffer data is more then we need, grabbing only part result.setLen(n) copyMem(cast[pointer](addr result[s]), addr(transp.buffer[0]), - n - s) - transp.shiftBuffer(n - s) + left) + transp.shiftBuffer(left) break else: # there not enough data in buffer, grabbing all @@ -1091,6 +1084,7 @@ proc consume*(transp: StreamTransport, n = -1): Future[int] {.async.} = raise transp.getError() if ReadClosed in transp.state or transp.atEof(): break + if transp.offset > 0: if n == -1: # consume all incoming data, until EOF diff --git a/tests/teststream.nim b/tests/teststream.nim index 6362fb33..14566154 100644 --- a/tests/teststream.nim +++ b/tests/teststream.nim @@ -16,6 +16,8 @@ else: const ConstantMessage = "SOMEDATA" + BigMessagePattern = "ABCDEFGHIJKLMNOPQRSTUVWXYZ" + BigMessageCount = 1000 ClientsCount = 100 MessagesCount = 100 MessageSize = 20 @@ -68,7 +70,8 @@ proc serveClient3(server: StreamServer, var suffixStr = "SUFFIX" var suffix = newSeq[byte](6) copyMem(addr suffix[0], addr suffixStr[0], len(suffixStr)) - while not transp.atEof(): + var counter = MessagesCount + while counter > 0: zeroMem(addr buffer[0], MessageSize) var res = await transp.readUntil(addr buffer[0], MessageSize, suffix) doAssert(equalMem(addr buffer[0], addr check[0], len(check))) @@ -84,6 +87,7 @@ proc serveClient3(server: StreamServer, copyMem(addr buffer[0], addr ans[0], len(ans)) res = await transp.write(cast[pointer](addr buffer[0]), len(ans)) doAssert(res == len(ans)) + dec(counter) transp.close() proc serveClient4(server: StreamServer, @@ -100,6 +104,7 @@ proc serveClient4(server: StreamServer, var answer = "OK\r\n" var res = await transp.write(cast[pointer](addr answer[0]), len(answer)) doAssert(res == len(answer)) + transp.close() proc serveClient5(server: StreamServer, transp: StreamTransport, udata: pointer) {.async.} = @@ -131,6 +136,41 @@ proc serveClient6(server: StreamServer, server.stop() server.close() +proc serveClient7(server: StreamServer, + transp: StreamTransport, udata: pointer) {.async.} = + var answer = "DONE\r\n" + var expect = "" + var line = await transp.readLine() + doAssert(len(line) == BigMessageCount * len(BigMessagePattern)) + for i in 0..