diff --git a/asyncdispatch2/asyncloop.nim b/asyncdispatch2/asyncloop.nim index ed281d8f..52bb7449 100644 --- a/asyncdispatch2/asyncloop.nim +++ b/asyncdispatch2/asyncloop.nim @@ -292,8 +292,6 @@ when defined(windows) or defined(nimdoc): ## (Unix) for the specified dispatcher. return disp.ioPort - # ZAH: Shouldn't all of these procs be defined over the Dispatcher type? - # The "global" variants can be defined as templates passing the global dispatcher proc register*(fd: AsyncFD) = ## Registers ``fd`` with the dispatcher. let p = getGlobalDispatcher() diff --git a/asyncdispatch2/transports/datagram.nim b/asyncdispatch2/transports/datagram.nim index f6cad55b..d955b1d6 100644 --- a/asyncdispatch2/transports/datagram.nim +++ b/asyncdispatch2/transports/datagram.nim @@ -11,31 +11,22 @@ import net, nativesockets, os, deques, strutils import ../asyncloop, ../handles import common +when defined(windows): + import winlean +else: + import posix + type VectorKind = enum WithoutAddress, WithAddress -when defined(windows): - import winlean - type - GramVector = object - kind: VectorKind # Vector kind (with address/without address) - buf: ptr TWSABuf # Writer vector buffer - address: TransportAddress # Destination address - writer: Future[void] # Writer vector completion Future + GramVector = object + kind: VectorKind # Vector kind (with address/without address) + address: TransportAddress # Destination address + buf: pointer # Writer buffer pointer + buflen: int # Writer buffer size + writer: Future[void] # Writer vector completion Future -else: - import posix - - type - GramVector = object - kind: VectorKind # Vector kind (with address/without address) - buf: pointer # Writer buffer pointer - buflen: int # Writer buffer size - address: TransportAddress # Destination address - writer: Future[void] # Writer vector completion Future - -type DatagramServer* = ref object of RootRef ## Datagram server object transport*: DatagramTransport ## Datagram transport @@ -76,15 +67,22 @@ template setWriteError(t, e: untyped) = (t).state.incl(WriteError) (t).error = newException(TransportOsError, osErrorMsg((e))) +template setWriterWSABuffer(t, v: untyped) = + (t).wwsabuf.buf = cast[cstring](v.buf) + (t).wwsabuf.len = cast[int32](v.buflen) + when defined(windows): type WindowsDatagramTransport* = ref object of DatagramTransport - rovl: CustomOverlapped - wovl: CustomOverlapped - raddr: Sockaddr_storage - ralen: SockLen - rflag: int32 - wsabuf: TWSABuf + rovl: CustomOverlapped # Reader OVERLAPPED structure + wovl: CustomOverlapped # Writer OVERLAPPED structure + raddr: Sockaddr_storage # Reader address storage + ralen: SockLen # Reader address length + rflag: int32 # Reader flags storage + rwsabuf: TWSABuf # Reader WSABUF structure + waddr: Sockaddr_storage # Writer address storage + wlen: SockLen # Writer address length + wwsabuf: TWSABuf # Writer WSABUF structure template finishWriter(t: untyped) = var vv = (t).queue.popFirst() @@ -111,19 +109,21 @@ when defined(windows): transp.finishWriter() else: ## Initiation - var saddr: Sockaddr_storage - var slen: SockLen transp.state.incl(WritePending) let fd = SocketHandle(ovl.data.fd) var vector = transp.queue.popFirst() + transp.setWriterWSABuffer(vector) + var ret: cint if vector.kind == WithAddress: - toSockAddr(vector.address.address, vector.address.port, saddr, slen) + toSockAddr(vector.address.address, vector.address.port, + transp.waddr, transp.wlen) + ret = WSASendTo(fd, addr transp.wwsabuf, DWORD(1), addr bytesCount, + DWORD(0), cast[ptr SockAddr](addr transp.waddr), + cint(transp.wlen), + cast[POVERLAPPED](addr transp.wovl), nil) else: - toSockAddr(transp.remote.address, transp.remote.port, saddr, slen) - let ret = WSASendTo(fd, vector.buf, DWORD(1), addr bytesCount, - DWORD(0), cast[ptr SockAddr](addr saddr), - cint(slen), - cast[POVERLAPPED](addr transp.wovl), nil) + ret = WSASend(fd, addr transp.wwsabuf, DWORD(1), addr bytesCount, + DWORD(0), cast[POVERLAPPED](addr transp.wovl), nil) if ret != 0: let err = osLastError() if int(err) == ERROR_OPERATION_ABORTED: @@ -178,11 +178,8 @@ when defined(windows): let fd = SocketHandle(ovl.data.fd) transp.rflag = 0 transp.ralen = SockLen(sizeof(Sockaddr_storage)) - let ret = WSARecvFrom(fd, - addr transp.wsabuf, - DWORD(1), - addr bytesCount, - addr transp.rflag, + let ret = WSARecvFrom(fd, addr transp.rwsabuf, DWORD(1), + addr bytesCount, addr transp.rflag, cast[ptr SockAddr](addr transp.raddr), cast[ptr cint](addr transp.ralen), cast[POVERLAPPED](addr transp.rovl), nil) @@ -275,7 +272,17 @@ when defined(windows): if sock == asyncInvalidSocket: closeAsyncSocket(localSock) raiseOsError(err) + if remote.port != Port(0): + var saddr: Sockaddr_storage + var slen: SockLen + toSockAddr(remote.address, remote.port, saddr, slen) + if connect(SocketHandle(localSock), cast[ptr SockAddr](addr saddr), + slen) != 0: + let err = osLastError() + if sock == asyncInvalidSocket: + closeAsyncSocket(localSock) + raiseOsError(err) wresult.remote = remote wresult.fd = localSock @@ -289,8 +296,8 @@ when defined(windows): udata: cast[pointer](wresult)) wresult.wovl.data = CompletionData(fd: localSock, cb: writeDatagramLoop, udata: cast[pointer](wresult)) - wresult.wsabuf = TWSABuf(buf: cast[cstring](addr wresult.buffer[0]), - len: int32(len(wresult.buffer))) + wresult.rwsabuf = TWSABuf(buf: cast[cstring](addr wresult.buffer[0]), + len: int32(len(wresult.buffer))) GC_ref(wresult) result = cast[DatagramTransport](wresult) if NoAutoRead notin flags: @@ -301,7 +308,7 @@ when defined(windows): proc close*(transp: DatagramTransport) = ## Closes and frees resources of transport ``transp``. if ReadClosed notin transp.state and WriteClosed notin transp.state: - discard cancelIo(Handle(transp.fd)) + # discard cancelIo(Handle(transp.fd)) closeAsyncSocket(transp.fd) transp.state.incl(WriteClosed) transp.state.incl(ReadClosed) @@ -525,13 +532,8 @@ proc send*(transp: DatagramTransport, pbytes: pointer, if transp.remote.port == Port(0): raise newException(TransportError, "Remote peer is not set!") var waitFuture = newFuture[void]("datagram.transport.send") - when defined(windows): - var wsabuf = TWSABuf(buf: cast[cstring](pbytes), len: int32(nbytes)) - var vector = GramVector(kind: WithoutAddress, buf: addr wsabuf, - writer: waitFuture) - else: - var vector = GramVector(kind: WithoutAddress, buf: pbytes, buflen: nbytes, - writer: waitFuture) + var vector = GramVector(kind: WithoutAddress, buf: pbytes, buflen: nbytes, + writer: waitFuture) transp.queue.addLast(vector) if WritePaused in transp.state: transp.resumeWrite() @@ -546,16 +548,10 @@ proc sendTo*(transp: DatagramTransport, pbytes: pointer, nbytes: int, checkClosed(transp) var saddr: Sockaddr_storage var slen: SockLen - var vector: GramVector toSockAddr(remote.address, remote.port, saddr, slen) var waitFuture = newFuture[void]("datagram.transport.sendto") - when defined(windows): - var wsabuf = TWSABuf(buf: cast[cstring](pbytes), len: int32(nbytes)) - vector = GramVector(kind: WithAddress, buf: addr wsabuf, - address: remote, writer: waitFuture) - else: - vector = GramVector(kind: WithAddress, buf: pbytes, buflen: nbytes, - address: remote, writer: waitFuture) + var vector = GramVector(kind: WithAddress, buf: pbytes, buflen: nbytes, + writer: waitFuture, address: remote) transp.queue.addLast(vector) if WritePaused in transp.state: transp.resumeWrite()