Fix datagram bugs.

Make changes to asyncloop according to zahary review.
This commit is contained in:
cheatfate 2018-06-04 03:54:17 +03:00
parent 0b7115eba9
commit 2758abd7a6
2 changed files with 53 additions and 59 deletions

View File

@ -292,8 +292,6 @@ when defined(windows) or defined(nimdoc):
## (Unix) for the specified dispatcher.
return disp.ioPort
# ZAH: Shouldn't all of these procs be defined over the Dispatcher type?
# The "global" variants can be defined as templates passing the global dispatcher
proc register*(fd: AsyncFD) =
## Registers ``fd`` with the dispatcher.
let p = getGlobalDispatcher()

View File

@ -11,31 +11,22 @@ import net, nativesockets, os, deques, strutils
import ../asyncloop, ../handles
import common
when defined(windows):
import winlean
else:
import posix
type
VectorKind = enum
WithoutAddress, WithAddress
when defined(windows):
import winlean
type
GramVector = object
kind: VectorKind # Vector kind (with address/without address)
buf: ptr TWSABuf # Writer vector buffer
address: TransportAddress # Destination address
writer: Future[void] # Writer vector completion Future
GramVector = object
kind: VectorKind # Vector kind (with address/without address)
address: TransportAddress # Destination address
buf: pointer # Writer buffer pointer
buflen: int # Writer buffer size
writer: Future[void] # Writer vector completion Future
else:
import posix
type
GramVector = object
kind: VectorKind # Vector kind (with address/without address)
buf: pointer # Writer buffer pointer
buflen: int # Writer buffer size
address: TransportAddress # Destination address
writer: Future[void] # Writer vector completion Future
type
DatagramServer* = ref object of RootRef
## Datagram server object
transport*: DatagramTransport ## Datagram transport
@ -76,15 +67,22 @@ template setWriteError(t, e: untyped) =
(t).state.incl(WriteError)
(t).error = newException(TransportOsError, osErrorMsg((e)))
template setWriterWSABuffer(t, v: untyped) =
(t).wwsabuf.buf = cast[cstring](v.buf)
(t).wwsabuf.len = cast[int32](v.buflen)
when defined(windows):
type
WindowsDatagramTransport* = ref object of DatagramTransport
rovl: CustomOverlapped
wovl: CustomOverlapped
raddr: Sockaddr_storage
ralen: SockLen
rflag: int32
wsabuf: TWSABuf
rovl: CustomOverlapped # Reader OVERLAPPED structure
wovl: CustomOverlapped # Writer OVERLAPPED structure
raddr: Sockaddr_storage # Reader address storage
ralen: SockLen # Reader address length
rflag: int32 # Reader flags storage
rwsabuf: TWSABuf # Reader WSABUF structure
waddr: Sockaddr_storage # Writer address storage
wlen: SockLen # Writer address length
wwsabuf: TWSABuf # Writer WSABUF structure
template finishWriter(t: untyped) =
var vv = (t).queue.popFirst()
@ -111,19 +109,21 @@ when defined(windows):
transp.finishWriter()
else:
## Initiation
var saddr: Sockaddr_storage
var slen: SockLen
transp.state.incl(WritePending)
let fd = SocketHandle(ovl.data.fd)
var vector = transp.queue.popFirst()
transp.setWriterWSABuffer(vector)
var ret: cint
if vector.kind == WithAddress:
toSockAddr(vector.address.address, vector.address.port, saddr, slen)
toSockAddr(vector.address.address, vector.address.port,
transp.waddr, transp.wlen)
ret = WSASendTo(fd, addr transp.wwsabuf, DWORD(1), addr bytesCount,
DWORD(0), cast[ptr SockAddr](addr transp.waddr),
cint(transp.wlen),
cast[POVERLAPPED](addr transp.wovl), nil)
else:
toSockAddr(transp.remote.address, transp.remote.port, saddr, slen)
let ret = WSASendTo(fd, vector.buf, DWORD(1), addr bytesCount,
DWORD(0), cast[ptr SockAddr](addr saddr),
cint(slen),
cast[POVERLAPPED](addr transp.wovl), nil)
ret = WSASend(fd, addr transp.wwsabuf, DWORD(1), addr bytesCount,
DWORD(0), cast[POVERLAPPED](addr transp.wovl), nil)
if ret != 0:
let err = osLastError()
if int(err) == ERROR_OPERATION_ABORTED:
@ -178,11 +178,8 @@ when defined(windows):
let fd = SocketHandle(ovl.data.fd)
transp.rflag = 0
transp.ralen = SockLen(sizeof(Sockaddr_storage))
let ret = WSARecvFrom(fd,
addr transp.wsabuf,
DWORD(1),
addr bytesCount,
addr transp.rflag,
let ret = WSARecvFrom(fd, addr transp.rwsabuf, DWORD(1),
addr bytesCount, addr transp.rflag,
cast[ptr SockAddr](addr transp.raddr),
cast[ptr cint](addr transp.ralen),
cast[POVERLAPPED](addr transp.rovl), nil)
@ -275,7 +272,17 @@ when defined(windows):
if sock == asyncInvalidSocket:
closeAsyncSocket(localSock)
raiseOsError(err)
if remote.port != Port(0):
var saddr: Sockaddr_storage
var slen: SockLen
toSockAddr(remote.address, remote.port, saddr, slen)
if connect(SocketHandle(localSock), cast[ptr SockAddr](addr saddr),
slen) != 0:
let err = osLastError()
if sock == asyncInvalidSocket:
closeAsyncSocket(localSock)
raiseOsError(err)
wresult.remote = remote
wresult.fd = localSock
@ -289,8 +296,8 @@ when defined(windows):
udata: cast[pointer](wresult))
wresult.wovl.data = CompletionData(fd: localSock, cb: writeDatagramLoop,
udata: cast[pointer](wresult))
wresult.wsabuf = TWSABuf(buf: cast[cstring](addr wresult.buffer[0]),
len: int32(len(wresult.buffer)))
wresult.rwsabuf = TWSABuf(buf: cast[cstring](addr wresult.buffer[0]),
len: int32(len(wresult.buffer)))
GC_ref(wresult)
result = cast[DatagramTransport](wresult)
if NoAutoRead notin flags:
@ -301,7 +308,7 @@ when defined(windows):
proc close*(transp: DatagramTransport) =
## Closes and frees resources of transport ``transp``.
if ReadClosed notin transp.state and WriteClosed notin transp.state:
discard cancelIo(Handle(transp.fd))
# discard cancelIo(Handle(transp.fd))
closeAsyncSocket(transp.fd)
transp.state.incl(WriteClosed)
transp.state.incl(ReadClosed)
@ -525,13 +532,8 @@ proc send*(transp: DatagramTransport, pbytes: pointer,
if transp.remote.port == Port(0):
raise newException(TransportError, "Remote peer is not set!")
var waitFuture = newFuture[void]("datagram.transport.send")
when defined(windows):
var wsabuf = TWSABuf(buf: cast[cstring](pbytes), len: int32(nbytes))
var vector = GramVector(kind: WithoutAddress, buf: addr wsabuf,
writer: waitFuture)
else:
var vector = GramVector(kind: WithoutAddress, buf: pbytes, buflen: nbytes,
writer: waitFuture)
var vector = GramVector(kind: WithoutAddress, buf: pbytes, buflen: nbytes,
writer: waitFuture)
transp.queue.addLast(vector)
if WritePaused in transp.state:
transp.resumeWrite()
@ -546,16 +548,10 @@ proc sendTo*(transp: DatagramTransport, pbytes: pointer, nbytes: int,
checkClosed(transp)
var saddr: Sockaddr_storage
var slen: SockLen
var vector: GramVector
toSockAddr(remote.address, remote.port, saddr, slen)
var waitFuture = newFuture[void]("datagram.transport.sendto")
when defined(windows):
var wsabuf = TWSABuf(buf: cast[cstring](pbytes), len: int32(nbytes))
vector = GramVector(kind: WithAddress, buf: addr wsabuf,
address: remote, writer: waitFuture)
else:
vector = GramVector(kind: WithAddress, buf: pbytes, buflen: nbytes,
address: remote, writer: waitFuture)
var vector = GramVector(kind: WithAddress, buf: pbytes, buflen: nbytes,
writer: waitFuture, address: remote)
transp.queue.addLast(vector)
if WritePaused in transp.state:
transp.resumeWrite()