Comment out asyncmacro2 skipStmtList().

Many changes in datagram.nim.
Fixed testdatagram.nim.
Fixed testserver.nim.
This commit is contained in:
cheatfate 2018-06-14 09:49:59 +03:00
parent 978203691e
commit 525aaf6837
5 changed files with 278 additions and 313 deletions

View File

@ -19,10 +19,10 @@ proc skipUntilStmtList(node: NimNode): NimNode {.compileTime.} =
if node[0].kind == nnkStmtList: if node[0].kind == nnkStmtList:
result = skipUntilStmtList(node[0]) result = skipUntilStmtList(node[0])
proc skipStmtList(node: NimNode): NimNode {.compileTime.} = # proc skipStmtList(node: NimNode): NimNode {.compileTime.} =
result = node # result = node
if node[0].kind == nnkStmtList: # if node[0].kind == nnkStmtList:
result = node[0] # result = node[0]
template createCb(retFutureSym, iteratorNameSym, template createCb(retFutureSym, iteratorNameSym,
strName, identName, futureVarCompletions: untyped) = strName, identName, futureVarCompletions: untyped) =

View File

@ -27,49 +27,23 @@ type
buflen: int # Writer buffer size buflen: int # Writer buffer size
writer: Future[void] # Writer vector completion Future writer: Future[void] # Writer vector completion Future
DatagramServer* = ref object of RootRef
## Datagram server object
transport*: DatagramTransport ## Datagram transport
status*: ServerStatus ## Current server status
DatagramCallback* = proc(transp: DatagramTransport, DatagramCallback* = proc(transp: DatagramTransport,
pbytes: pointer, remote: TransportAddress): Future[void] {.gcsafe.}
nbytes: int,
remote: TransportAddress,
udata: pointer): Future[void] {.gcsafe.}
## Datagram asynchronous receive callback.
## ``transp`` - transport object
## ``pbytes`` - pointer to data received
## ``nbytes`` - number of bytes received
## ``remote`` - remote peer address
## ``udata`` - user-defined pointer, specified at Transport creation.
##
## ``pbytes`` will be `nil` and ``nbytes`` will be ``0``, if there an error
## happens.
DatagramTransport* = ref object of RootRef DatagramTransport* = ref object of RootRef
fd: AsyncFD # File descriptor fd*: AsyncFD # File descriptor
state: set[TransportState] # Current Transport state state: set[TransportState] # Current Transport state
flags: set[ServerFlags] # Flags
buffer: seq[byte] # Reading buffer buffer: seq[byte] # Reading buffer
buflen: int # Reading buffer effective size
error: ref Exception # Current error error: ref Exception # Current error
queue: Deque[GramVector] # Writer queue queue: Deque[GramVector] # Writer queue
local: TransportAddress # Local address local: TransportAddress # Local address
remote: TransportAddress # Remote address remote: TransportAddress # Remote address
udata: pointer # User-driven pointer udata*: pointer # User-driven pointer
function: DatagramCallback # Receive data callback function: DatagramCallback # Receive data callback
future: Future[void] # Transport's life future future: Future[void] # Transport's life future
when defined(windows):
template setReadError(t, e: untyped) =
(t).state.incl(ReadError)
(t).error = newException(TransportOsError, osErrorMsg((e)))
template setWriterWSABuffer(t, v: untyped) =
(t).wwsabuf.buf = cast[cstring](v.buf)
(t).wwsabuf.len = cast[int32](v.buflen)
when defined(windows):
type
WindowsDatagramTransport* = ref object of DatagramTransport
rovl: CustomOverlapped # Reader OVERLAPPED structure rovl: CustomOverlapped # Reader OVERLAPPED structure
wovl: CustomOverlapped # Writer OVERLAPPED structure wovl: CustomOverlapped # Writer OVERLAPPED structure
raddr: Sockaddr_storage # Reader address storage raddr: Sockaddr_storage # Reader address storage
@ -80,10 +54,20 @@ when defined(windows):
wlen: SockLen # Writer address length wlen: SockLen # Writer address length
wwsabuf: TWSABuf # Writer WSABUF structure wwsabuf: TWSABuf # Writer WSABUF structure
template setReadError(t, e: untyped) =
(t).state.incl(ReadError)
(t).error = newException(TransportOsError, osErrorMsg((e)))
template setWriterWSABuffer(t, v: untyped) =
(t).wwsabuf.buf = cast[cstring](v.buf)
(t).wwsabuf.len = cast[int32](v.buflen)
when defined(windows):
proc writeDatagramLoop(udata: pointer) = proc writeDatagramLoop(udata: pointer) =
var bytesCount: int32 var bytesCount: int32
var ovl = cast[PtrCustomOverlapped](udata) var ovl = cast[PtrCustomOverlapped](udata)
var transp = cast[WindowsDatagramTransport](ovl.data.udata) var transp = cast[DatagramTransport](ovl.data.udata)
while len(transp.queue) > 0: while len(transp.queue) > 0:
if WritePending in transp.state: if WritePending in transp.state:
## Continuation ## Continuation
@ -140,7 +124,7 @@ when defined(windows):
bytesCount: int32 bytesCount: int32
raddr: TransportAddress raddr: TransportAddress
var ovl = cast[PtrCustomOverlapped](udata) var ovl = cast[PtrCustomOverlapped](udata)
var transp = cast[WindowsDatagramTransport](ovl.data.udata) var transp = cast[DatagramTransport](ovl.data.udata)
while true: while true:
if ReadPending in transp.state: if ReadPending in transp.state:
## Continuation ## Continuation
@ -151,11 +135,10 @@ when defined(windows):
if err == OSErrorCode(-1): if err == OSErrorCode(-1):
let bytesCount = transp.rovl.data.bytesCount let bytesCount = transp.rovl.data.bytesCount
if bytesCount == 0: if bytesCount == 0:
transp.state.incl(ReadEof) transp.state.incl({ReadEof, ReadPaused})
transp.state.incl(ReadPaused)
fromSockAddr(transp.raddr, transp.ralen, raddr.address, raddr.port) fromSockAddr(transp.raddr, transp.ralen, raddr.address, raddr.port)
discard transp.function(transp, addr transp.buffer[0], bytesCount, transp.buflen = bytesCount
raddr, transp.udata) discard transp.function(transp, raddr)
elif int(err) == ERROR_OPERATION_ABORTED: elif int(err) == ERROR_OPERATION_ABORTED:
# CancelIO() interrupt # CancelIO() interrupt
transp.state.incl(ReadPaused) transp.state.incl(ReadPaused)
@ -163,10 +146,11 @@ when defined(windows):
else: else:
transp.setReadError(err) transp.setReadError(err)
transp.state.incl(ReadPaused) transp.state.incl(ReadPaused)
discard transp.function(transp, nil, 0, raddr, transp.udata) transp.buflen = 0
discard transp.function(transp, raddr)
else: else:
## Initiation ## Initiation
if (ReadEof notin transp.state) and (ReadClosed notin transp.state): if transp.state * {ReadEof, ReadClosed, ReadError} == {}:
transp.state.incl(ReadPending) transp.state.incl(ReadPending)
let fd = SocketHandle(ovl.data.fd) let fd = SocketHandle(ovl.data.fd)
transp.rflag = 0 transp.rflag = 0
@ -184,7 +168,7 @@ when defined(windows):
transp.state.incl(ReadPaused) transp.state.incl(ReadPaused)
elif int(err) == WSAECONNRESET: elif int(err) == WSAECONNRESET:
transp.state.excl(ReadPending) transp.state.excl(ReadPending)
transp.state = transp.state + {ReadPaused, ReadEof} transp.state.incl({ReadPaused, ReadEof})
break break
elif int(err) == ERROR_IO_PENDING: elif int(err) == ERROR_IO_PENDING:
discard discard
@ -192,18 +176,17 @@ when defined(windows):
transp.state.excl(ReadPending) transp.state.excl(ReadPending)
transp.state.incl(ReadPaused) transp.state.incl(ReadPaused)
transp.setReadError(err) transp.setReadError(err)
discard transp.function(transp, nil, 0, raddr, transp.udata) transp.buflen = 0
discard transp.function(transp, raddr)
break break
proc resumeRead(transp: DatagramTransport) {.inline.} = proc resumeRead(transp: DatagramTransport) {.inline.} =
var wtransp = cast[WindowsDatagramTransport](transp) transp.state.excl(ReadPaused)
wtransp.state.excl(ReadPaused) readDatagramLoop(cast[pointer](addr transp.rovl))
readDatagramLoop(cast[pointer](addr wtransp.rovl))
proc resumeWrite(transp: DatagramTransport) {.inline.} = proc resumeWrite(transp: DatagramTransport) {.inline.} =
var wtransp = cast[WindowsDatagramTransport](transp) transp.state.excl(WritePaused)
wtransp.state.excl(WritePaused) writeDatagramLoop(cast[pointer](addr transp.wovl))
writeDatagramLoop(cast[pointer](addr wtransp.wovl))
proc newDatagramTransportCommon(cbproc: DatagramCallback, proc newDatagramTransportCommon(cbproc: DatagramCallback,
remote: TransportAddress, remote: TransportAddress,
@ -211,12 +194,16 @@ when defined(windows):
sock: AsyncFD, sock: AsyncFD,
flags: set[ServerFlags], flags: set[ServerFlags],
udata: pointer, udata: pointer,
child: DatagramTransport,
bufferSize: int): DatagramTransport = bufferSize: int): DatagramTransport =
var localSock: AsyncFD var localSock: AsyncFD
assert(remote.address.family == local.address.family) assert(remote.address.family == local.address.family)
assert(not isNil(cbproc)) assert(not isNil(cbproc))
var wresult = new WindowsDatagramTransport if isNil(child):
result = DatagramTransport()
else:
result = child
if sock == asyncInvalidSocket: if sock == asyncInvalidSocket:
if local.address.family == IpAddressFamily.IPv4: if local.address.family == IpAddressFamily.IPv4:
@ -251,7 +238,7 @@ when defined(windows):
if sock == asyncInvalidSocket: if sock == asyncInvalidSocket:
closeAsyncSocket(localSock) closeAsyncSocket(localSock)
raiseOsError(err) raiseOsError(err)
wresult.local = local result.local = local
else: else:
var saddr: Sockaddr_storage var saddr: Sockaddr_storage
var slen: SockLen var slen: SockLen
@ -278,27 +265,26 @@ when defined(windows):
if sock == asyncInvalidSocket: if sock == asyncInvalidSocket:
closeAsyncSocket(localSock) closeAsyncSocket(localSock)
raiseOsError(err) raiseOsError(err)
wresult.remote = remote result.remote = remote
wresult.fd = localSock result.fd = localSock
wresult.function = cbproc result.function = cbproc
wresult.buffer = newSeq[byte](bufferSize) result.buffer = newSeq[byte](bufferSize)
wresult.queue = initDeque[GramVector]() result.queue = initDeque[GramVector]()
wresult.udata = udata result.udata = udata
wresult.state = {WritePaused} result.state = {WritePaused}
wresult.future = newFuture[void]("datagram.transport") result.future = newFuture[void]("datagram.transport")
wresult.rovl.data = CompletionData(fd: localSock, cb: readDatagramLoop, result.rovl.data = CompletionData(fd: localSock, cb: readDatagramLoop,
udata: cast[pointer](wresult)) udata: cast[pointer](result))
wresult.wovl.data = CompletionData(fd: localSock, cb: writeDatagramLoop, result.wovl.data = CompletionData(fd: localSock, cb: writeDatagramLoop,
udata: cast[pointer](wresult)) udata: cast[pointer](result))
wresult.rwsabuf = TWSABuf(buf: cast[cstring](addr wresult.buffer[0]), result.rwsabuf = TWSABuf(buf: cast[cstring](addr result.buffer[0]),
len: int32(len(wresult.buffer))) len: int32(len(result.buffer)))
GC_ref(wresult) GC_ref(result)
result = cast[DatagramTransport](wresult)
if NoAutoRead notin flags: if NoAutoRead notin flags:
result.resumeRead() result.resumeRead()
else: else:
wresult.state.incl(ReadPaused) result.state.incl(ReadPaused)
proc close*(transp: DatagramTransport) = proc close*(transp: DatagramTransport) =
## Closes and frees resources of transport ``transp``. ## Closes and frees resources of transport ``transp``.
@ -308,8 +294,9 @@ when defined(windows):
transp.state.incl(WriteClosed) transp.state.incl(WriteClosed)
transp.state.incl(ReadClosed) transp.state.incl(ReadClosed)
transp.future.complete() transp.future.complete()
var wresult = cast[WindowsDatagramTransport](transp) if not isNil(transp.udata) and GCUserData in transp.flags:
GC_unref(wresult) GC_unref(cast[ref int](transp.udata))
GC_unref(transp)
else: else:
@ -334,15 +321,16 @@ else:
addr slen) addr slen)
if res >= 0: if res >= 0:
fromSockAddr(saddr, slen, raddr.address, raddr.port) fromSockAddr(saddr, slen, raddr.address, raddr.port)
discard transp.function(transp, addr transp.buffer[0], res, transp.buflen = res
raddr, transp.udata) discard transp.function(transp, raddr)
else: else:
let err = osLastError() let err = osLastError()
if int(err) == EINTR: if int(err) == EINTR:
continue continue
else: else:
transp.buflen = 0
transp.setReadError(err) transp.setReadError(err)
discard transp.function(transp, nil, 0, raddr, transp.udata) discard transp.function(transp, raddr)
break break
proc writeDatagramLoop(udata: pointer) = proc writeDatagramLoop(udata: pointer) =
@ -352,7 +340,7 @@ else:
slen: SockLen slen: SockLen
var cdata = cast[ptr CompletionData](udata) var cdata = cast[ptr CompletionData](udata)
if not isNil(cdata) and int(cdata.fd) == 0: if not isNil(cdata) and (int(cdata.fd) == 0 or isNil(cdata.udata)):
# Transport was closed earlier, exiting # Transport was closed earlier, exiting
return return
var transp = cast[DatagramTransport](cdata.udata) var transp = cast[DatagramTransport](cdata.udata)
@ -396,12 +384,16 @@ else:
sock: AsyncFD, sock: AsyncFD,
flags: set[ServerFlags], flags: set[ServerFlags],
udata: pointer, udata: pointer,
child: DatagramTransport = nil,
bufferSize: int): DatagramTransport = bufferSize: int): DatagramTransport =
var localSock: AsyncFD var localSock: AsyncFD
assert(remote.address.family == local.address.family) assert(remote.address.family == local.address.family)
assert(not isNil(cbproc)) assert(not isNil(cbproc))
result = new DatagramTransport if isNil(child):
result = DatagramTransport()
else:
result = child
if sock == asyncInvalidSocket: if sock == asyncInvalidSocket:
if local.address.family == IpAddressFamily.IPv4: if local.address.family == IpAddressFamily.IPv4:
@ -452,6 +444,7 @@ else:
result.fd = localSock result.fd = localSock
result.function = cbproc result.function = cbproc
result.flags = flags
result.buffer = newSeq[byte](bufferSize) result.buffer = newSeq[byte](bufferSize)
result.queue = initDeque[GramVector]() result.queue = initDeque[GramVector]()
result.udata = udata result.udata = udata
@ -465,10 +458,9 @@ else:
proc close*(transp: DatagramTransport) = proc close*(transp: DatagramTransport) =
## Closes and frees resources of transport ``transp``. ## Closes and frees resources of transport ``transp``.
if ReadClosed notin transp.state and WriteClosed notin transp.state: if {ReadClosed, WriteClosed} * transp.state == {}:
closeAsyncSocket(transp.fd) closeAsyncSocket(transp.fd)
transp.state.incl(WriteClosed) transp.state.incl({WriteClosed, ReadClosed})
transp.state.incl(ReadClosed)
transp.future.complete() transp.future.complete()
GC_unref(transp) GC_unref(transp)
@ -478,6 +470,7 @@ proc newDatagramTransport*(cbproc: DatagramCallback,
sock: AsyncFD = asyncInvalidSocket, sock: AsyncFD = asyncInvalidSocket,
flags: set[ServerFlags] = {}, flags: set[ServerFlags] = {},
udata: pointer = nil, udata: pointer = nil,
child: DatagramTransport = nil,
bufSize: int = DefaultDatagramBufferSize bufSize: int = DefaultDatagramBufferSize
): DatagramTransport = ): DatagramTransport =
## Create new UDP datagram transport (IPv4). ## Create new UDP datagram transport (IPv4).
@ -491,7 +484,21 @@ proc newDatagramTransport*(cbproc: DatagramCallback,
## ``udata`` - custom argument which will be passed to ``cbproc``. ## ``udata`` - custom argument which will be passed to ``cbproc``.
## ``bufSize`` - size of internal buffer ## ``bufSize`` - size of internal buffer
result = newDatagramTransportCommon(cbproc, remote, local, sock, result = newDatagramTransportCommon(cbproc, remote, local, sock,
flags, udata, bufSize) flags, udata, child, bufSize)
proc newDatagramTransport*[T](cbproc: DatagramCallback,
udata: ref T,
remote: TransportAddress = AnyAddress,
local: TransportAddress = AnyAddress,
sock: AsyncFD = asyncInvalidSocket,
flags: set[ServerFlags] = {},
child: DatagramTransport = nil,
bufSize: int = DefaultDatagramBufferSize
): DatagramTransport =
var fflags = flags + {GCUserData}
GC_ref(udata)
result = newDatagramTransportCommon(cbproc, remote, local, sock,
fflags, udata, child, bufSize)
proc newDatagramTransport6*(cbproc: DatagramCallback, proc newDatagramTransport6*(cbproc: DatagramCallback,
remote: TransportAddress = AnyAddress6, remote: TransportAddress = AnyAddress6,
@ -499,6 +506,7 @@ proc newDatagramTransport6*(cbproc: DatagramCallback,
sock: AsyncFD = asyncInvalidSocket, sock: AsyncFD = asyncInvalidSocket,
flags: set[ServerFlags] = {}, flags: set[ServerFlags] = {},
udata: pointer = nil, udata: pointer = nil,
child: DatagramTransport = nil,
bufSize: int = DefaultDatagramBufferSize bufSize: int = DefaultDatagramBufferSize
): DatagramTransport = ): DatagramTransport =
## Create new UDP datagram transport (IPv6). ## Create new UDP datagram transport (IPv6).
@ -512,7 +520,21 @@ proc newDatagramTransport6*(cbproc: DatagramCallback,
## ``udata`` - custom argument which will be passed to ``cbproc``. ## ``udata`` - custom argument which will be passed to ``cbproc``.
## ``bufSize`` - size of internal buffer. ## ``bufSize`` - size of internal buffer.
result = newDatagramTransportCommon(cbproc, remote, local, sock, result = newDatagramTransportCommon(cbproc, remote, local, sock,
flags, udata, bufSize) flags, udata, child, bufSize)
proc newDatagramTransport6*[T](cbproc: DatagramCallback,
udata: ref T,
remote: TransportAddress = AnyAddress6,
local: TransportAddress = AnyAddress6,
sock: AsyncFD = asyncInvalidSocket,
flags: set[ServerFlags] = {},
child: DatagramTransport = nil,
bufSize: int = DefaultDatagramBufferSize
): DatagramTransport =
var fflags = flags + {GCUserData}
GC_ref(udata)
result = newDatagramTransportCommon(cbproc, remote, local, sock,
fflags, udata, child, bufSize)
proc join*(transp: DatagramTransport) {.async.} = proc join*(transp: DatagramTransport) {.async.} =
## Wait until the transport ``transp`` will be closed. ## Wait until the transport ``transp`` will be closed.
@ -608,53 +630,74 @@ proc sendTo*[T](transp: DatagramTransport, msg: var seq[T],
transp.resumeWrite() transp.resumeWrite()
return retFuture return retFuture
proc createDatagramServer*(host: TransportAddress, proc peekMessage*(transp: DatagramTransport, msg: var seq[byte],
cbproc: DatagramCallback, msglen: var int) =
flags: set[ServerFlags] = {}, ## Get access to internal message buffer and length of incoming datagram.
sock: AsyncFD = asyncInvalidSocket, if ReadError in transp.state:
bufferSize: int = DefaultDatagramBufferSize, raise transp.getError()
udata: pointer = nil): DatagramServer = shallowCopy(msg, transp.buffer)
var transp: DatagramTransport msglen = transp.buflen
var fflags = flags + {NoAutoRead}
if host.address.family == IpAddressFamily.IPv4:
transp = newDatagramTransport(cbproc, AnyAddress, host, sock,
fflags, udata, bufferSize)
else:
transp = newDatagramTransport6(cbproc, AnyAddress6, host, sock,
fflags, udata, bufferSize)
result = DatagramServer()
result.transport = transp
result.status = ServerStatus.Starting
GC_ref(result)
proc start*(server: DatagramServer) = proc getMessage*(transp: DatagramTransport): seq[byte] =
## Starts ``server``. ## Copy data from internal message buffer and return result.
if server.status == ServerStatus.Starting: if ReadError in transp.state:
server.transport.resumeRead() raise transp.getError()
server.status = ServerStatus.Running if transp.buflen > 0:
result = newSeq[byte](transp.buflen)
copyMem(addr result[0], addr transp.buffer[0], transp.buflen)
proc stop*(server: DatagramServer) = proc getUserData*[T](transp: DatagramTransport): T {.inline.} =
## Stops ``server``. ## Obtain user data stored in ``transp`` object.
if server.status == ServerStatus.Running: result = cast[T](transp.udata)
when defined(windows):
if {WritePending, ReadPending} * server.transport.state != {}:
## CancelIO will stop both reading and writing.
discard cancelIo(Handle(server.transport.fd))
else:
if WritePaused notin server.transport.state:
server.transport.fd.removeWriter()
if ReadPaused notin server.transport.state:
server.transport.fd.removeReader()
server.status = ServerStatus.Stopped
proc join*(server: DatagramServer) {.async.} =
## Waits until ``server`` is not stopped.
if not server.transport.future.finished:
await server.transport.future
proc close*(server: DatagramServer) = # proc createDatagramServer*(host: TransportAddress,
## Release ``server`` resources. # cbproc: DatagramCallback,
if server.status == ServerStatus.Stopped: # flags: set[ServerFlags] = {},
server.status = ServerStatus.Closed # sock: AsyncFD = asyncInvalidSocket,
server.transport.close() # bufferSize: int = DefaultDatagramBufferSize,
GC_unref(server) # udata: pointer = nil): DatagramServer =
# var transp: DatagramTransport
# var fflags = flags + {NoAutoRead}
# if host.address.family == IpAddressFamily.IPv4:
# transp = newDatagramTransport(cbproc, AnyAddress, host, sock,
# fflags, udata, bufferSize)
# else:
# transp = newDatagramTransport6(cbproc, AnyAddress6, host, sock,
# fflags, udata, bufferSize)
# result = DatagramServer()
# result.transport = transp
# result.status = ServerStatus.Starting
# GC_ref(result)
# proc start*(server: DatagramServer) =
# ## Starts ``server``.
# if server.status == ServerStatus.Starting:
# server.transport.resumeRead()
# server.status = ServerStatus.Running
# proc stop*(server: DatagramServer) =
# ## Stops ``server``.
# if server.status == ServerStatus.Running:
# when defined(windows):
# if {WritePending, ReadPending} * server.transport.state != {}:
# ## CancelIO will stop both reading and writing.
# discard cancelIo(Handle(server.transport.fd))
# else:
# if WritePaused notin server.transport.state:
# server.transport.fd.removeWriter()
# if ReadPaused notin server.transport.state:
# server.transport.fd.removeReader()
# server.status = ServerStatus.Stopped
# proc join*(server: DatagramServer) {.async.} =
# ## Waits until ``server`` is not stopped.
# if not server.transport.future.finished:
# await server.transport.future
# proc close*(server: DatagramServer) =
# ## Release ``server`` resources.
# if server.status == ServerStatus.Stopped:
# server.status = ServerStatus.Closed
# server.transport.close()
# GC_unref(server)

View File

@ -300,8 +300,7 @@ when defined(windows):
if err == OSErrorCode(-1): if err == OSErrorCode(-1):
let bytesCount = transp.rovl.data.bytesCount let bytesCount = transp.rovl.data.bytesCount
if bytesCount == 0: if bytesCount == 0:
transp.state.incl(ReadEof) transp.state.incl({ReadEof, ReadPaused})
transp.state.incl(ReadPaused)
else: else:
if transp.offset != transp.roffset: if transp.offset != transp.roffset:
moveMem(addr transp.buffer[transp.offset], moveMem(addr transp.buffer[transp.offset],
@ -1206,7 +1205,6 @@ proc close*(transp: StreamTransport) =
when defined(windows): when defined(windows):
discard cancelIo(Handle(transp.fd)) discard cancelIo(Handle(transp.fd))
closeAsyncSocket(transp.fd) closeAsyncSocket(transp.fd)
transp.state.incl(WriteClosed) transp.state.incl({WriteClosed, ReadClosed})
transp.state.incl(ReadClosed)
transp.future.complete() transp.future.complete()
GC_unref(transp) GC_unref(transp)

View File

@ -14,11 +14,14 @@ const
ClientsCount = 100 ClientsCount = 100
MessagesCount = 100 MessagesCount = 100
proc client1(transp: DatagramTransport, pbytes: pointer, nbytes: int, proc client1(transp: DatagramTransport,
raddr: TransportAddress, udata: pointer): Future[void] {.async.} = raddr: TransportAddress): Future[void] {.async.} =
if not isNil(pbytes): var pbytes: seq[byte]
var nbytes: int
transp.peekMessage(pbytes, nbytes)
if nbytes > 0:
var data = newString(nbytes + 1) var data = newString(nbytes + 1)
copyMem(addr data[0], pbytes, nbytes) copyMem(addr data[0], addr pbytes[0], nbytes)
data.setLen(nbytes) data.setLen(nbytes)
if data.startsWith("REQUEST"): if data.startsWith("REQUEST"):
var numstr = data[7..^1] var numstr = data[7..^1]
@ -29,19 +32,21 @@ proc client1(transp: DatagramTransport, pbytes: pointer, nbytes: int,
var err = "ERROR" var err = "ERROR"
await transp.sendTo(addr err[0], len(err), raddr) await transp.sendTo(addr err[0], len(err), raddr)
else: else:
## Read operation failed with error var counterPtr = cast[ptr int](transp.udata)
var counterPtr = cast[ptr int](udata)
counterPtr[] = -1 counterPtr[] = -1
transp.close() transp.close()
proc client2(transp: DatagramTransport, pbytes: pointer, nbytes: int, proc client2(transp: DatagramTransport,
raddr: TransportAddress, udata: pointer): Future[void] {.async.} = raddr: TransportAddress): Future[void] {.async.} =
if not isNil(pbytes): var pbytes: seq[byte]
var nbytes: int
transp.peekMessage(pbytes, nbytes)
if nbytes > 0:
var data = newString(nbytes + 1) var data = newString(nbytes + 1)
copyMem(addr data[0], pbytes, nbytes) copyMem(addr data[0], addr pbytes[0], nbytes)
data.setLen(nbytes) data.setLen(nbytes)
if data.startsWith("ANSWER"): if data.startsWith("ANSWER"):
var counterPtr = cast[ptr int](udata) var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = counterPtr[] + 1 counterPtr[] = counterPtr[] + 1
if counterPtr[] == TestsCount: if counterPtr[] == TestsCount:
transp.close() transp.close()
@ -50,23 +55,26 @@ proc client2(transp: DatagramTransport, pbytes: pointer, nbytes: int,
var req = "REQUEST" & $counterPtr[] var req = "REQUEST" & $counterPtr[]
await transp.sendTo(addr req[0], len(req), ta) await transp.sendTo(addr req[0], len(req), ta)
else: else:
var counterPtr = cast[ptr int](udata) var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = -1 counterPtr[] = -1
transp.close() transp.close()
else: else:
## Read operation failed with error ## Read operation failed with error
var counterPtr = cast[ptr int](udata) var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = -1 counterPtr[] = -1
transp.close() transp.close()
proc client3(transp: DatagramTransport, pbytes: pointer, nbytes: int, proc client3(transp: DatagramTransport,
raddr: TransportAddress, udata: pointer): Future[void] {.async.} = raddr: TransportAddress): Future[void] {.async.} =
if not isNil(pbytes): var pbytes: seq[byte]
var nbytes: int
transp.peekMessage(pbytes, nbytes)
if nbytes > 0:
var data = newString(nbytes + 1) var data = newString(nbytes + 1)
copyMem(addr data[0], pbytes, nbytes) copyMem(addr data[0], addr pbytes[0], nbytes)
data.setLen(nbytes) data.setLen(nbytes)
if data.startsWith("ANSWER"): if data.startsWith("ANSWER"):
var counterPtr = cast[ptr int](udata) var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = counterPtr[] + 1 counterPtr[] = counterPtr[] + 1
if counterPtr[] == TestsCount: if counterPtr[] == TestsCount:
transp.close() transp.close()
@ -74,23 +82,26 @@ proc client3(transp: DatagramTransport, pbytes: pointer, nbytes: int,
var req = "REQUEST" & $counterPtr[] var req = "REQUEST" & $counterPtr[]
await transp.send(addr req[0], len(req)) await transp.send(addr req[0], len(req))
else: else:
var counterPtr = cast[ptr int](udata) var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = -1 counterPtr[] = -1
transp.close() transp.close()
else: else:
## Read operation failed with error ## Read operation failed with error
var counterPtr = cast[ptr int](udata) var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = -1 counterPtr[] = -1
transp.close() transp.close()
proc client4(transp: DatagramTransport, pbytes: pointer, nbytes: int, proc client4(transp: DatagramTransport,
raddr: TransportAddress, udata: pointer): Future[void] {.async.} = raddr: TransportAddress): Future[void] {.async.} =
if not isNil(pbytes): var pbytes: seq[byte]
var nbytes: int
transp.peekMessage(pbytes, nbytes)
if nbytes > 0:
var data = newString(nbytes + 1) var data = newString(nbytes + 1)
copyMem(addr data[0], pbytes, nbytes) copyMem(addr data[0], addr pbytes[0], nbytes)
data.setLen(nbytes) data.setLen(nbytes)
if data.startsWith("ANSWER"): if data.startsWith("ANSWER"):
var counterPtr = cast[ptr int](udata) var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = counterPtr[] + 1 counterPtr[] = counterPtr[] + 1
if counterPtr[] == MessagesCount: if counterPtr[] == MessagesCount:
transp.close() transp.close()
@ -98,23 +109,26 @@ proc client4(transp: DatagramTransport, pbytes: pointer, nbytes: int,
var req = "REQUEST" & $counterPtr[] var req = "REQUEST" & $counterPtr[]
await transp.send(addr req[0], len(req)) await transp.send(addr req[0], len(req))
else: else:
var counterPtr = cast[ptr int](udata) var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = -1 counterPtr[] = -1
transp.close() transp.close()
else: else:
## Read operation failed with error ## Read operation failed with error
var counterPtr = cast[ptr int](udata) var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = -1 counterPtr[] = -1
transp.close() transp.close()
proc client5(transp: DatagramTransport, pbytes: pointer, nbytes: int, proc client5(transp: DatagramTransport,
raddr: TransportAddress, udata: pointer): Future[void] {.async.} = raddr: TransportAddress): Future[void] {.async.} =
if not isNil(pbytes): var pbytes: seq[byte]
var nbytes: int
transp.peekMessage(pbytes, nbytes)
if nbytes > 0:
var data = newString(nbytes + 1) var data = newString(nbytes + 1)
copyMem(addr data[0], pbytes, nbytes) copyMem(addr data[0], addr pbytes[0], nbytes)
data.setLen(nbytes) data.setLen(nbytes)
if data.startsWith("ANSWER"): if data.startsWith("ANSWER"):
var counterPtr = cast[ptr int](udata) var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = counterPtr[] + 1 counterPtr[] = counterPtr[] + 1
if counterPtr[] == MessagesCount: if counterPtr[] == MessagesCount:
transp.close() transp.close()
@ -123,20 +137,23 @@ proc client5(transp: DatagramTransport, pbytes: pointer, nbytes: int,
var req = "REQUEST" & $counterPtr[] var req = "REQUEST" & $counterPtr[]
await transp.sendTo(addr req[0], len(req), ta) await transp.sendTo(addr req[0], len(req), ta)
else: else:
var counterPtr = cast[ptr int](udata) var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = -1 counterPtr[] = -1
transp.close() transp.close()
else: else:
## Read operation failed with error ## Read operation failed with error
var counterPtr = cast[ptr int](udata) var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = -1 counterPtr[] = -1
transp.close() transp.close()
proc client6(transp: DatagramTransport, pbytes: pointer, nbytes: int, proc client6(transp: DatagramTransport,
raddr: TransportAddress, udata: pointer): Future[void] {.async.} = raddr: TransportAddress): Future[void] {.async.} =
if not isNil(pbytes): var pbytes: seq[byte]
var nbytes: int
transp.peekMessage(pbytes, nbytes)
if nbytes > 0:
var data = newString(nbytes + 1) var data = newString(nbytes + 1)
copyMem(addr data[0], pbytes, nbytes) copyMem(addr data[0], addr pbytes[0], nbytes)
data.setLen(nbytes) data.setLen(nbytes)
if data.startsWith("REQUEST"): if data.startsWith("REQUEST"):
var numstr = data[7..^1] var numstr = data[7..^1]
@ -148,18 +165,21 @@ proc client6(transp: DatagramTransport, pbytes: pointer, nbytes: int,
await transp.sendTo(err, raddr) await transp.sendTo(err, raddr)
else: else:
## Read operation failed with error ## Read operation failed with error
var counterPtr = cast[ptr int](udata) var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = -1 counterPtr[] = -1
transp.close() transp.close()
proc client7(transp: DatagramTransport, pbytes: pointer, nbytes: int, proc client7(transp: DatagramTransport,
raddr: TransportAddress, udata: pointer): Future[void] {.async.} = raddr: TransportAddress): Future[void] {.async.} =
if not isNil(pbytes): var pbytes: seq[byte]
var nbytes: int
transp.peekMessage(pbytes, nbytes)
if nbytes > 0:
var data = newString(nbytes + 1) var data = newString(nbytes + 1)
copyMem(addr data[0], pbytes, nbytes) copyMem(addr data[0], addr pbytes[0], nbytes)
data.setLen(nbytes) data.setLen(nbytes)
if data.startsWith("ANSWER"): if data.startsWith("ANSWER"):
var counterPtr = cast[ptr int](udata) var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = counterPtr[] + 1 counterPtr[] = counterPtr[] + 1
if counterPtr[] == TestsCount: if counterPtr[] == TestsCount:
transp.close() transp.close()
@ -168,23 +188,26 @@ proc client7(transp: DatagramTransport, pbytes: pointer, nbytes: int,
var req = "REQUEST" & $counterPtr[] var req = "REQUEST" & $counterPtr[]
await transp.sendTo(req, ta) await transp.sendTo(req, ta)
else: else:
var counterPtr = cast[ptr int](udata) var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = -1 counterPtr[] = -1
transp.close() transp.close()
else: else:
## Read operation failed with error ## Read operation failed with error
var counterPtr = cast[ptr int](udata) var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = -1 counterPtr[] = -1
transp.close() transp.close()
proc client8(transp: DatagramTransport, pbytes: pointer, nbytes: int, proc client8(transp: DatagramTransport,
raddr: TransportAddress, udata: pointer): Future[void] {.async.} = raddr: TransportAddress): Future[void] {.async.} =
if not isNil(pbytes): var pbytes: seq[byte]
var nbytes: int
transp.peekMessage(pbytes, nbytes)
if nbytes > 0:
var data = newString(nbytes + 1) var data = newString(nbytes + 1)
copyMem(addr data[0], pbytes, nbytes) copyMem(addr data[0], addr pbytes[0], nbytes)
data.setLen(nbytes) data.setLen(nbytes)
if data.startsWith("ANSWER"): if data.startsWith("ANSWER"):
var counterPtr = cast[ptr int](udata) var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = counterPtr[] + 1 counterPtr[] = counterPtr[] + 1
if counterPtr[] == TestsCount: if counterPtr[] == TestsCount:
transp.close() transp.close()
@ -192,20 +215,23 @@ proc client8(transp: DatagramTransport, pbytes: pointer, nbytes: int,
var req = "REQUEST" & $counterPtr[] var req = "REQUEST" & $counterPtr[]
await transp.send(req) await transp.send(req)
else: else:
var counterPtr = cast[ptr int](udata) var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = -1 counterPtr[] = -1
transp.close() transp.close()
else: else:
## Read operation failed with error ## Read operation failed with error
var counterPtr = cast[ptr int](udata) var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = -1 counterPtr[] = -1
transp.close() transp.close()
proc client9(transp: DatagramTransport, pbytes: pointer, nbytes: int, proc client9(transp: DatagramTransport,
raddr: TransportAddress, udata: pointer): Future[void] {.async.} = raddr: TransportAddress): Future[void] {.async.} =
if not isNil(pbytes): var pbytes: seq[byte]
var nbytes: int
transp.peekMessage(pbytes, nbytes)
if nbytes > 0:
var data = newString(nbytes + 1) var data = newString(nbytes + 1)
copyMem(addr data[0], pbytes, nbytes) copyMem(addr data[0], addr pbytes[0], nbytes)
data.setLen(nbytes) data.setLen(nbytes)
if data.startsWith("REQUEST"): if data.startsWith("REQUEST"):
var numstr = data[7..^1] var numstr = data[7..^1]
@ -221,18 +247,21 @@ proc client9(transp: DatagramTransport, pbytes: pointer, nbytes: int,
await transp.sendTo(errseq, raddr) await transp.sendTo(errseq, raddr)
else: else:
## Read operation failed with error ## Read operation failed with error
var counterPtr = cast[ptr int](udata) var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = -1 counterPtr[] = -1
transp.close() transp.close()
proc client10(transp: DatagramTransport, pbytes: pointer, nbytes: int, proc client10(transp: DatagramTransport,
raddr: TransportAddress, udata: pointer): Future[void] {.async.} = raddr: TransportAddress): Future[void] {.async.} =
if not isNil(pbytes): var pbytes: seq[byte]
var nbytes: int
transp.peekMessage(pbytes, nbytes)
if nbytes > 0:
var data = newString(nbytes + 1) var data = newString(nbytes + 1)
copyMem(addr data[0], pbytes, nbytes) copyMem(addr data[0], addr pbytes[0], nbytes)
data.setLen(nbytes) data.setLen(nbytes)
if data.startsWith("ANSWER"): if data.startsWith("ANSWER"):
var counterPtr = cast[ptr int](udata) var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = counterPtr[] + 1 counterPtr[] = counterPtr[] + 1
if counterPtr[] == TestsCount: if counterPtr[] == TestsCount:
transp.close() transp.close()
@ -243,23 +272,26 @@ proc client10(transp: DatagramTransport, pbytes: pointer, nbytes: int,
copyMem(addr reqseq[0], addr req[0], len(req)) copyMem(addr reqseq[0], addr req[0], len(req))
await transp.sendTo(reqseq, ta) await transp.sendTo(reqseq, ta)
else: else:
var counterPtr = cast[ptr int](udata) var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = -1 counterPtr[] = -1
transp.close() transp.close()
else: else:
## Read operation failed with error ## Read operation failed with error
var counterPtr = cast[ptr int](udata) var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = -1 counterPtr[] = -1
transp.close() transp.close()
proc client11(transp: DatagramTransport, pbytes: pointer, nbytes: int, proc client11(transp: DatagramTransport,
raddr: TransportAddress, udata: pointer): Future[void] {.async.} = raddr: TransportAddress): Future[void] {.async.} =
if not isNil(pbytes): var pbytes: seq[byte]
var nbytes: int
transp.peekMessage(pbytes, nbytes)
if nbytes > 0:
var data = newString(nbytes + 1) var data = newString(nbytes + 1)
copyMem(addr data[0], pbytes, nbytes) copyMem(addr data[0], addr pbytes[0], nbytes)
data.setLen(nbytes) data.setLen(nbytes)
if data.startsWith("ANSWER"): if data.startsWith("ANSWER"):
var counterPtr = cast[ptr int](udata) var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = counterPtr[] + 1 counterPtr[] = counterPtr[] + 1
if counterPtr[] == TestsCount: if counterPtr[] == TestsCount:
transp.close() transp.close()
@ -269,12 +301,12 @@ proc client11(transp: DatagramTransport, pbytes: pointer, nbytes: int,
copyMem(addr reqseq[0], addr req[0], len(req)) copyMem(addr reqseq[0], addr req[0], len(req))
await transp.send(reqseq) await transp.send(reqseq)
else: else:
var counterPtr = cast[ptr int](udata) var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = -1 counterPtr[] = -1
transp.close() transp.close()
else: else:
## Read operation failed with error ## Read operation failed with error
var counterPtr = cast[ptr int](udata) var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = -1 counterPtr[] = -1
transp.close() transp.close()
@ -397,90 +429,6 @@ proc test3(bounded: bool): Future[int] {.async.} =
for i in 0..<ClientsCount: for i in 0..<ClientsCount:
result += counters[i] result += counters[i]
proc swarmWorker(address: TransportAddress): Future[int] {.async.} =
var counter = 0
var results = newSeq[int](MessagesCount)
var future = newFuture[void]("testdatagram.client.wait")
proc receiver(transp: DatagramTransport,
pbytes: pointer, nbytes: int,
raddr: TransportAddress,
udata: pointer): Future[void] {.async.} =
if not isNil(pbytes) and nbytes > 0:
var answer = newString(nbytes + 1)
copyMem(addr answer[0], pbytes, nbytes)
answer.setLen(nbytes)
doAssert(answer.startsWith("ANSWER"))
var numstr = answer[6..^1]
var num = parseInt(numstr)
doAssert(num < MessagesCount)
results[num] = 1
inc(counter)
if not future.finished:
future.complete()
var transp = newDatagramTransport(receiver,
udata = addr counter,
remote = address)
for i in 0..<MessagesCount:
var data = "REQUEST" & $i
await transp.send(addr data[0], len(data))
# We need to wait answer here, or we can overflow OS network
# buffer and some datagrams will be dropped.
await future
future = newFuture[void]("testdatagram.client.wait")
transp.close()
result = 0
for i in 0..<MessagesCount:
if results[i] == 1:
inc(result)
proc waitAll[T](futs: seq[Future[T]]): Future[void] =
var counter = len(futs)
var retFuture = newFuture[void]("waitAll")
proc cb(udata: pointer) =
dec(counter)
if counter == 0:
retFuture.complete()
for fut in futs:
fut.addCallback(cb)
return retFuture
proc swarmManager(address: TransportAddress): Future[int] {.async.} =
var retFuture = newFuture[void]("swarm.manager.datagram")
var workers = newSeq[Future[int]](ClientsCount)
var count = ClientsCount
for i in 0..<ClientsCount:
workers[i] = swarmWorker(address)
await waitAll(workers)
for i in 0..<ClientsCount:
var res = workers[i].read()
result += res
proc serveDatagramClient(transp: DatagramTransport,
pbytes: pointer, nbytes: int,
raddr: TransportAddress,
udata: pointer): Future[void] {.async.} =
doAssert(not isNil(pbytes) and nbytes > 0)
var request = newString(nbytes + 1)
copyMem(addr request[0], pbytes, nbytes)
request.setLen(nbytes)
doAssert(request.startsWith("REQUEST"))
var numstr = request[7..^1]
var num = parseInt(numstr)
var answer = "ANSWER" & $num
await transp.sendTo(addr answer[0], len(answer), raddr)
proc test4(): Future[int] {.async.} =
var ta = initTAddress("127.0.0.1:31346")
var counter = 0
var server = createDatagramServer(ta, serveDatagramClient, {ReuseAddr})
server.start()
result = await swarmManager(ta)
server.stop()
server.close()
when isMainModule: when isMainModule:
const const
m1 = "sendTo(pointer) test (" & $TestsCount & " messages)" m1 = "sendTo(pointer) test (" & $TestsCount & " messages)"
@ -493,8 +441,6 @@ when isMainModule:
" clients x " & $MessagesCount & " messages)" " clients x " & $MessagesCount & " messages)"
m8 = "Bounded multiple clients with messages (" & $ClientsCount & m8 = "Bounded multiple clients with messages (" & $ClientsCount &
" clients x " & $MessagesCount & " messages)" " clients x " & $MessagesCount & " messages)"
m9 = "DatagramServer multiple clients with messages (" & $ClientsCount &
" clients x " & $MessagesCount & " messages)"
suite "Datagram Transport test suite": suite "Datagram Transport test suite":
test m1: test m1:
check waitFor(testPointerSendTo()) == TestsCount check waitFor(testPointerSendTo()) == TestsCount
@ -512,5 +458,3 @@ when isMainModule:
check waitFor(test3(false)) == ClientsCount * MessagesCount check waitFor(test3(false)) == ClientsCount * MessagesCount
test m8: test m8:
check waitFor(test3(true)) == ClientsCount * MessagesCount check waitFor(test3(true)) == ClientsCount * MessagesCount
# test m9:
# check waitFor(test4()) == ClientsCount * MessagesCount

View File

@ -50,12 +50,6 @@ proc customServerTransport(server: StreamServer,
transp.test = "CUSTOM" transp.test = "CUSTOM"
result = cast[StreamTransport](transp) result = cast[StreamTransport](transp)
proc serveDatagramClient(transp: DatagramTransport,
pbytes: pointer, nbytes: int,
raddr: TransportAddress,
udata: pointer): Future[void] {.async.} =
discard
proc test1(): bool = proc test1(): bool =
var ta = initTAddress("127.0.0.1:31354") var ta = initTAddress("127.0.0.1:31354")
var server1 = createStreamServer(ta, serveStreamClient, {ReuseAddr}) var server1 = createStreamServer(ta, serveStreamClient, {ReuseAddr})
@ -68,18 +62,6 @@ proc test1(): bool =
server2.close() server2.close()
result = true result = true
proc test2(): bool =
var ta = initTAddress("127.0.0.1:31354")
var server1 = createDatagramServer(ta, serveDatagramClient, {ReuseAddr})
server1.start()
server1.stop()
server1.close()
var server2 = createDatagramServer(ta, serveDatagramClient, {ReuseAddr})
server2.start()
server2.stop()
server2.close()
result = true
proc client1(server: CustomServer, ta: TransportAddress) {.async.} = proc client1(server: CustomServer, ta: TransportAddress) {.async.} =
var transp = CustomTransport() var transp = CustomTransport()
transp.test = "CLIENT" transp.test = "CLIENT"
@ -130,7 +112,5 @@ when isMainModule:
check test1() == true check test1() == true
test "Stream Server inherited object test": test "Stream Server inherited object test":
check test3() == true check test3() == true
test "Datagram Server start/stop test":
check test2() == true
test "StreamServer[T] test": test "StreamServer[T] test":
check test4() == true check test4() == true