diff --git a/asyncdispatch2/transports/common.nim b/asyncdispatch2/transports/common.nim index 2bb846c..a740e20 100644 --- a/asyncdispatch2/transports/common.nim +++ b/asyncdispatch2/transports/common.nim @@ -113,5 +113,6 @@ when defined(windows): import winlean const ERROR_OPERATION_ABORTED* = 995 + const ERROR_SUCCESS* = 0 proc cancelIo*(hFile: HANDLE): WINBOOL {.stdcall, dynlib: "kernel32", importc: "CancelIo".} diff --git a/asyncdispatch2/transports/datagram.nim b/asyncdispatch2/transports/datagram.nim index c730c54..3ed4f92 100644 --- a/asyncdispatch2/transports/datagram.nim +++ b/asyncdispatch2/transports/datagram.nim @@ -7,7 +7,7 @@ # Apache License, version 2.0, (LICENSE-APACHEv2) # MIT license (LICENSE-MIT) -import net, nativesockets, os, deques +import net, nativesockets, os, deques, strutils import ../asyncloop, ../handles import common @@ -20,7 +20,7 @@ when defined(windows): type GramVector = object kind: VectorKind # Vector kind (with address/without address) - buf: TWSABuf # Writer vector buffer + buf: ptr TWSABuf # Writer vector buffer address: TransportAddress # Destination address writer: Future[void] # Writer vector completion Future @@ -91,6 +91,7 @@ when defined(windows): return var ovl = cast[PCustomOverlapped](udata) var transp = cast[WindowsDatagramTransport](ovl.data.udata) + echo "writeDatagramLoop(" & toHex(cast[uint](transp)) & ")" while len(transp.queue) > 0: if WritePending in transp.state: ## Continuation @@ -108,12 +109,12 @@ when defined(windows): var slen: SockLen transp.state.incl(WritePending) let fd = SocketHandle(ovl.data.fd) - var vector = transp.queue[0] + var vector = transp.queue.popFirst() if vector.kind == WithAddress: toSockAddr(vector.address.address, vector.address.port, saddr, slen) else: toSockAddr(transp.remote.address, transp.remote.port, saddr, slen) - let ret = WSASendTo(fd, addr vector.buf, DWORD(1), addr bytesCount, + let ret = WSASendTo(fd, vector.buf, DWORD(1), addr bytesCount, DWORD(0), cast[ptr SockAddr](addr saddr), cint(slen), cast[POVERLAPPED](addr transp.wovl), nil) @@ -121,10 +122,14 @@ when defined(windows): let err = osLastError() if int(err) == ERROR_OPERATION_ABORTED: transp.state.incl(WritePaused) - elif int(err) != ERROR_IO_PENDING: + elif int(err) == ERROR_IO_PENDING: + transp.queue.addFirst(vector) + else: transp.state.excl(WritePending) transp.setWriteError(err) transp.finishWriter() + else: + transp.queue.addFirst(vector) break if len(transp.queue) == 0: @@ -257,8 +262,6 @@ when defined(windows): if remote.port != Port(0): wresult.remote = remote - ## TODO: Apply server flags - wresult.fd = localSock wresult.function = cbproc wresult.buffer = newSeq[byte](bufferSize) @@ -464,8 +467,8 @@ proc send*(transp: DatagramTransport, pbytes: pointer, raise newException(TransportError, "Remote peer is not set!") var waitFuture = newFuture[void]("datagram.transport.send") when defined(windows): - let wsabuf = TWSABuf(buf: cast[cstring](pbytes), len: int32(nbytes)) - var vector = GramVector(kind: WithoutAddress, buf: wsabuf, + var wsabuf = TWSABuf(buf: cast[cstring](pbytes), len: int32(nbytes)) + var vector = GramVector(kind: WithoutAddress, buf: addr wsabuf, writer: waitFuture) else: var vector = GramVector(kind: WithoutAddress, buf: pbytes, buflen: nbytes, @@ -482,15 +485,16 @@ proc sendTo*(transp: DatagramTransport, pbytes: pointer, nbytes: int, checkClosed(transp) var saddr: Sockaddr_storage var slen: SockLen + var vector: GramVector toSockAddr(remote.address, remote.port, saddr, slen) var waitFuture = newFuture[void]("datagram.transport.sendto") when defined(windows): - let wsabuf = TWSABuf(buf: cast[cstring](pbytes), len: int32(nbytes)) - var vector = GramVector(kind: WithAddress, buf: wsabuf, - address: remote, writer: waitFuture) + var wsabuf = TWSABuf(buf: cast[cstring](pbytes), len: int32(nbytes)) + vector = GramVector(kind: WithAddress, buf: addr wsabuf, + address: remote, writer: waitFuture) else: - var vector = GramVector(kind: WithAddress, buf: pbytes, buflen: nbytes, - address: remote, writer: waitFuture) + vector = GramVector(kind: WithAddress, buf: pbytes, buflen: nbytes, + address: remote, writer: waitFuture) transp.queue.addLast(vector) if WritePaused in transp.state: transp.resumeWrite() diff --git a/tests/testdatagram.nim b/tests/testdatagram.nim index a2ba657..f709678 100644 --- a/tests/testdatagram.nim +++ b/tests/testdatagram.nim @@ -3,8 +3,11 @@ import ../asyncdispatch2 const TestsCount = 5000 - ClientsCount = 10 - MessagesCount = 50 + ClientsCount = 2 + MessagesCount = 350 + +when defined(vcc): + {.passC: "/Zi /FS".} proc client1(transp: DatagramTransport, pbytes: pointer, nbytes: int, raddr: TransportAddress, udata: pointer): Future[void] {.async.} = @@ -92,6 +95,7 @@ proc client4(transp: DatagramTransport, pbytes: pointer, nbytes: int, transp.close() else: var req = "REQUEST" & $counterPtr[] + echo $counterPtr[] & "-SEND" await transp.send(addr req[0], len(req)) else: echo "ERROR" @@ -153,8 +157,9 @@ proc test3(): Future[int] {.async.} = for i in 0..