Fix rarely appearing Windows bug with close(transport).

Add tests for it.
Add fromProc for all Future[T] in transports.
Add testall to improve tests speed.
Bump version to 2.2.4.
This commit is contained in:
cheatfate 2019-03-31 00:31:10 +02:00
parent 4290e06e77
commit 80ee289847
No known key found for this signature in database
GPG Key ID: 46ADD633A7201F95
17 changed files with 2455 additions and 2412 deletions

View File

@ -5,7 +5,6 @@
# Licensed under either of
# Apache License, version 2.0, (LICENSE-APACHEv2)
# MIT license (LICENSE-MIT)
import chronos/[asyncloop, asyncfutures2, asyncsync, handles, transport,
timer]
timer]
export asyncloop, asyncfutures2, asyncsync, handles, transport, timer

View File

@ -1,5 +1,5 @@
packageName = "chronos"
version = "2.2.3"
version = "2.2.4"
author = "Status Research & Development GmbH"
description = "Chronos"
license = "Apache License 2.0 or MIT"
@ -10,43 +10,14 @@ skipDirs = @["tests"]
requires "nim > 0.18.0"
task test, "Run all tests":
var testFiles = @[
"testsync",
"testsoon",
"testtime",
"testfut",
"testsignal",
"testaddress",
"testdatagram",
"teststream",
"testserver",
"testbugs",
var commands = [
"nim c -r -d:useSysAssert -d:useGcAssert tests/testall",
"nim c -r tests/testall",
"nim c -r -d:release tests/testall"
]
var testCommands = @[
"nim c -r -d:useSysAssert -d:useGcAssert",
"nim c -r",
"nim c -r -d:release"
]
var timerCommands = @[
" -d:asyncTimer=system",
" -d:asyncTimer=mono"
]
for tfile in testFiles:
if tfile == "testtime":
for cmd in testCommands:
for def in timerCommands:
var commandLine = cmd & def & " tests/" & tfile
echo "\n" & commandLine
exec commandLine
rmFile("tests/" & tfile.toExe())
else:
for cmd in testCommands:
var commandLine = cmd & " tests/" & tfile
echo "\n" & commandLine
exec commandLine
rmFile("tests/" & tfile.toExe())
echo "\n" & commands[0]
exec commands[0]
echo "\n" & commands[1]
exec commands[1]
echo "\n" & commands[2]
exec commands[2]

View File

@ -40,6 +40,14 @@ type
Future*[T] = ref object of FutureBase ## Typed future.
value: T ## Stored value
FutureStr*[T] = ref object of Future[T]
## Future to hold GC strings
gcholder*: string
FutureSeq*[A, B] = ref object of Future[A]
## Future to hold GC seqs
gcholder*: seq[B]
FutureVar*[T] = distinct Future[T]
FutureError* = object of Exception
@ -92,6 +100,22 @@ proc newFutureVar*[T](fromProc = "unspecified"): FutureVar[T] =
## that this future belongs to, is a good habit as it helps with debugging.
result = FutureVar[T](newFuture[T](fromProc))
proc newFutureSeq*[A, B](fromProc = "unspecified"): FutureSeq[A, B] =
## Create a new future which can hold/preserve GC string until future will
## not be completed.
##
## Specifying ``fromProc``, which is a string specifying the name of the proc
## that this future belongs to, is a good habit as it helps with debugging.
setupFutureBase(fromProc)
proc newFutureStr*[A](fromProc = "unspecified"): FutureStr[A] =
## Create a new future which can hold/preserve GC seq[T] until future will
## not be completed.
##
## Specifying ``fromProc``, which is a string specifying the name of the proc
## that this future belongs to, is a good habit as it helps with debugging.
setupFutureBase(fromProc)
proc clean*[T](future: FutureVar[T]) =
## Resets the ``finished`` status of ``future``.
Future[T](future).finished = false

View File

@ -24,7 +24,7 @@ type
ServerFlags* = enum
## Server's flags
ReuseAddr, ReusePort, TcpNoDelay, NoAutoRead, GCUserData, FirstPipe,
NoPipeFlash
NoPipeFlash, Broadcast
AddressFamily* {.pure.} = enum
None, IPv4, IPv6, Unix
@ -55,14 +55,6 @@ type
Running, # Server running
Closed # Server closed
FutureGCString*[T] = ref object of Future[T]
## Future to hold GC strings
gcholder*: string
FutureGCSeq*[A, B] = ref object of Future[A]
## Future to hold GC seqs
gcholder*: seq[B]
when defined(windows):
type
SocketServer* = ref object of RootRef
@ -510,6 +502,7 @@ when defined(windows):
ERROR_BROKEN_PIPE* = 109
ERROR_PIPE_NOT_CONNECTED* = 233
ERROR_NO_DATA* = 232
ERROR_CONNECTION_ABORTED* = 1236
proc cancelIo*(hFile: HANDLE): WINBOOL
{.stdcall, dynlib: "kernel32", importc: "CancelIo".}

View File

@ -141,8 +141,12 @@ when defined(windows):
transp.buflen = bytesCount
asyncCheck transp.function(transp, raddr)
elif int(err) == ERROR_OPERATION_ABORTED:
# CancelIO() interrupt
# CancelIO() interrupt or closeSocket() call.
transp.state.incl(ReadPaused)
if ReadClosed in transp.state:
# If `ReadClosed` present, then close(transport) was called.
transp.future.complete()
GC_unref(transp)
break
else:
transp.setReadError(err)
@ -179,6 +183,12 @@ when defined(windows):
transp.setReadError(err)
transp.buflen = 0
asyncCheck transp.function(transp, raddr)
else:
# Transport closure happens in callback, and we not started new
# WSARecvFrom session.
if ReadClosed in transp.state:
if not transp.future.finished:
transp.future.complete()
break
proc resumeRead(transp: DatagramTransport) {.inline.} =
@ -450,11 +460,8 @@ proc close*(transp: DatagramTransport) =
## Closes and frees resources of transport ``transp``.
when defined(windows):
if {ReadClosed, WriteClosed} * transp.state == {}:
discard cancelIo(Handle(transp.fd))
closeSocket(transp.fd)
transp.state.incl({WriteClosed, ReadClosed})
transp.future.complete()
GC_unref(transp)
closeSocket(transp.fd)
else:
proc continuation(udata: pointer) =
transp.future.complete()
@ -539,7 +546,7 @@ proc newDatagramTransport6*[T](cbproc: DatagramCallback,
proc join*(transp: DatagramTransport): Future[void] =
## Wait until the transport ``transp`` will be closed.
var retFuture = newFuture[void]("datagramtransport.join")
var retFuture = newFuture[void]("datagram.transport.join")
proc continuation(udata: pointer) = retFuture.complete()
if not transp.future.finished:
transp.future.addCallback(continuation)
@ -556,7 +563,7 @@ proc send*(transp: DatagramTransport, pbytes: pointer,
nbytes: int): Future[void] =
## Send buffer with pointer ``pbytes`` and size ``nbytes`` using transport
## ``transp`` to remote destination address which was bounded on transport.
var retFuture = newFuture[void]()
var retFuture = newFuture[void]("datagram.transport.send(pointer)")
transp.checkClosed(retFuture)
if transp.remote.port == Port(0):
retFuture.fail(newException(TransportError, "Remote peer not set!"))
@ -571,7 +578,7 @@ proc send*(transp: DatagramTransport, pbytes: pointer,
proc send*(transp: DatagramTransport, msg: string, msglen = -1): Future[void] =
## Send string ``msg`` using transport ``transp`` to remote destination
## address which was bounded on transport.
var retFuture = FutureGCString[void]()
var retFuture = newFutureStr[void]("datagram.transport.send(string)")
transp.checkClosed(retFuture)
if not isLiteral(msg):
shallowCopy(retFuture.gcholder, msg)
@ -590,7 +597,7 @@ proc send*[T](transp: DatagramTransport, msg: seq[T],
msglen = -1): Future[void] =
## Send string ``msg`` using transport ``transp`` to remote destination
## address which was bounded on transport.
var retFuture = FutureGCSeq[void, T]()
var retFuture = newFutureSeq[void, T]("datagram.transport.send(seq)")
transp.checkClosed(retFuture)
if not isLiteral(msg):
shallowCopy(retFuture.gcholder, msg)
@ -609,7 +616,7 @@ proc sendTo*(transp: DatagramTransport, remote: TransportAddress,
pbytes: pointer, nbytes: int): Future[void] =
## Send buffer with pointer ``pbytes`` and size ``nbytes`` using transport
## ``transp`` to remote destination address ``remote``.
var retFuture = newFuture[void]()
var retFuture = newFuture[void]("datagram.transport.sendTo(pointer)")
transp.checkClosed(retFuture)
let vector = GramVector(kind: WithAddress, buf: pbytes, buflen: nbytes,
writer: retFuture, address: remote)
@ -622,7 +629,7 @@ proc sendTo*(transp: DatagramTransport, remote: TransportAddress,
msg: string, msglen = -1): Future[void] =
## Send string ``msg`` using transport ``transp`` to remote destination
## address ``remote``.
var retFuture = FutureGCString[void]()
var retFuture = newFutureStr[void]("datagram.transport.sendTo(string)")
transp.checkClosed(retFuture)
if not isLiteral(msg):
shallowCopy(retFuture.gcholder, msg)
@ -642,7 +649,7 @@ proc sendTo*[T](transp: DatagramTransport, remote: TransportAddress,
msg: seq[T], msglen = -1): Future[void] =
## Send sequence ``msg`` using transport ``transp`` to remote destination
## address ``remote``.
var retFuture = FutureGCSeq[void, T]()
var retFuture = newFutureSeq[void, T]("datagram.transport.sendTo(seq)")
transp.checkClosed(retFuture)
if not isLiteral(msg):
shallowCopy(retFuture.gcholder, msg)

View File

@ -343,13 +343,6 @@ when defined(windows):
if ReadPending in transp.state:
## Continuation
transp.state.excl(ReadPending)
if ReadClosed in transp.state:
transp.state.incl({ReadPaused})
if not isNil(transp.reader):
if not transp.reader.finished:
transp.reader.complete()
transp.reader = nil
break
let err = transp.rovl.data.errCode
if err == OSErrorCode(-1):
let bytesCount = transp.rovl.data.bytesCount
@ -364,14 +357,23 @@ when defined(windows):
transp.roffset = transp.offset
if transp.offset == len(transp.buffer):
transp.state.incl(ReadPaused)
elif int(err) == ERROR_OPERATION_ABORTED:
# CancelIO() interrupt
elif int(err) in {ERROR_OPERATION_ABORTED, ERROR_CONNECTION_ABORTED,
ERROR_BROKEN_PIPE, ERROR_NETNAME_DELETED}:
# CancelIO() interrupt or closeSocket() call.
transp.state.incl(ReadPaused)
if ReadClosed in transp.state:
if not isNil(transp.reader):
if not transp.reader.finished:
transp.reader.complete()
transp.reader = nil
# If `ReadClosed` present, then close(transport) was called.
transp.future.complete()
GC_unref(transp)
elif transp.kind == TransportKind.Socket and
(int(err) in {ERROR_NETNAME_DELETED, WSAECONNABORTED}):
transp.state.incl({ReadEof, ReadPaused})
elif transp.kind == TransportKind.Pipe and
(int(err) in {ERROR_BROKEN_PIPE, ERROR_PIPE_NOT_CONNECTED}):
(int(err) in {ERROR_PIPE_NOT_CONNECTED}):
transp.state.incl({ReadEof, ReadPaused})
else:
transp.setReadError(err)
@ -446,6 +448,11 @@ when defined(windows):
if not isNil(transp.reader):
transp.reader.complete()
transp.reader = nil
# Transport close happens in callback, and we not started new
# WSARecvFrom session.
if ReadClosed in transp.state:
if not transp.future.finished:
transp.future.complete()
## Finish Loop
break
@ -602,64 +609,75 @@ when defined(windows):
if server.apending:
## Continuation
server.apending = false
if server.status in {ServerStatus.Stopped, ServerStatus.Closed}:
if ovl.data.errCode == OSErrorCode(-1):
var ntransp: StreamTransport
var flags = {WinServerPipe}
if NoPipeFlash in server.flags:
flags.incl(WinNoPipeFlash)
if not isNil(server.init):
var transp = server.init(server, server.sock)
ntransp = newStreamPipeTransport(server.sock, server.bufferSize,
transp, flags)
else:
ntransp = newStreamPipeTransport(server.sock, server.bufferSize,
nil, flags)
asyncCheck server.function(server, ntransp)
elif int32(ovl.data.errCode) == ERROR_OPERATION_ABORTED:
# CancelIO() interrupt or close call.
if server.status == ServerStatus.Closed:
server.loopFuture.complete()
if not isNil(server.udata) and GCUserData in server.flags:
GC_unref(cast[ref int](server.udata))
GC_unref(server)
break
else:
if ovl.data.errCode == OSErrorCode(-1):
var ntransp: StreamTransport
var flags = {WinServerPipe}
if NoPipeFlash in server.flags:
flags.incl(WinNoPipeFlash)
if not isNil(server.init):
var transp = server.init(server, server.sock)
ntransp = newStreamPipeTransport(server.sock, server.bufferSize,
transp, flags)
else:
ntransp = newStreamPipeTransport(server.sock, server.bufferSize,
nil, flags)
asyncCheck server.function(server, ntransp)
elif int32(ovl.data.errCode) == ERROR_OPERATION_ABORTED:
# CancelIO() interrupt
break
else:
doAssert disconnectNamedPipe(Handle(server.sock)) == 1
doAssert closeHandle(HANDLE(server.sock)) == 1
raiseTransportOsError(osLastError())
doAssert disconnectNamedPipe(Handle(server.sock)) == 1
doAssert closeHandle(HANDLE(server.sock)) == 1
raiseTransportOsError(osLastError())
else:
## Initiation
server.apending = true
if server.status in {ServerStatus.Stopped, ServerStatus.Closed}:
## Server was already stopped/closed exiting
if server.status notin {ServerStatus.Stopped, ServerStatus.Closed}:
server.apending = true
var pipeSuffix = $cast[cstring](addr server.local.address_un)
var pipeName = newWideCString(r"\\.\pipe\" & pipeSuffix[1 .. ^1])
var openMode = PIPE_ACCESS_DUPLEX or FILE_FLAG_OVERLAPPED
if FirstPipe notin server.flags:
openMode = openMode or FILE_FLAG_FIRST_PIPE_INSTANCE
server.flags.incl(FirstPipe)
let pipeMode = int32(PIPE_TYPE_BYTE or PIPE_READMODE_BYTE or PIPE_WAIT)
let pipeHandle = createNamedPipe(pipeName, openMode, pipeMode,
PIPE_UNLIMITED_INSTANCES,
DWORD(server.bufferSize),
DWORD(server.bufferSize),
DWORD(0), nil)
if pipeHandle == INVALID_HANDLE_VALUE:
raiseTransportOsError(osLastError())
server.sock = AsyncFD(pipeHandle)
server.aovl.data.fd = AsyncFD(pipeHandle)
register(server.sock)
let res = connectNamedPipe(pipeHandle,
cast[POVERLAPPED](addr server.aovl))
if res == 0:
let err = osLastError()
if int32(err) == ERROR_OPERATION_ABORTED:
server.apending = false
break
elif int32(err) == ERROR_IO_PENDING:
discard
elif int32(err) == ERROR_PIPE_CONNECTED:
discard
else:
raiseTransportOsError(err)
break
var pipeSuffix = $cast[cstring](addr server.local.address_un)
var pipeName = newWideCString(r"\\.\pipe\" & pipeSuffix[1 .. ^1])
var openMode = PIPE_ACCESS_DUPLEX or FILE_FLAG_OVERLAPPED
if FirstPipe notin server.flags:
openMode = openMode or FILE_FLAG_FIRST_PIPE_INSTANCE
server.flags.incl(FirstPipe)
let pipeMode = int32(PIPE_TYPE_BYTE or PIPE_READMODE_BYTE or PIPE_WAIT)
let pipeHandle = createNamedPipe(pipeName, openMode, pipeMode,
PIPE_UNLIMITED_INSTANCES,
DWORD(server.bufferSize),
DWORD(server.bufferSize),
DWORD(0), nil)
if pipeHandle == INVALID_HANDLE_VALUE:
raiseTransportOsError(osLastError())
server.sock = AsyncFD(pipeHandle)
server.aovl.data.fd = AsyncFD(pipeHandle)
register(server.sock)
let res = connectNamedPipe(pipeHandle,
cast[POVERLAPPED](addr server.aovl))
if res == 0:
let err = osLastError()
if int32(err) == ERROR_IO_PENDING:
discard
elif int32(err) == ERROR_PIPE_CONNECTED:
discard
else:
raiseTransportOsError(err)
break
else:
# Server close happens in callback, and we are not started new
# connectNamedPipe session.
if server.status == ServerStatus.Closed:
if not server.loopFuture.finished:
server.loopFuture.complete()
if not isNil(server.udata) and GCUserData in server.flags:
GC_unref(cast[ref int](server.udata))
GC_unref(server)
proc acceptLoop(udata: pointer) {.gcsafe, nimcall.} =
var ovl = cast[PtrCustomOverlapped](udata)
@ -670,70 +688,75 @@ when defined(windows):
if server.apending:
## Continuation
server.apending = false
if server.status in {ServerStatus.Stopped, ServerStatus.Closed}:
## Server was already stopped/closed exiting
server.asock.closeSocket()
if ovl.data.errCode == OSErrorCode(-1):
if setsockopt(SocketHandle(server.asock), cint(SOL_SOCKET),
cint(SO_UPDATE_ACCEPT_CONTEXT), addr server.sock,
SockLen(sizeof(SocketHandle))) != 0'i32:
let err = OSErrorCode(wsaGetLastError())
server.asock.closeSocket()
raiseTransportOsError(err)
else:
var ntransp: StreamTransport
if not isNil(server.init):
let transp = server.init(server, server.asock)
ntransp = newStreamSocketTransport(server.asock,
server.bufferSize,
transp)
else:
ntransp = newStreamSocketTransport(server.asock,
server.bufferSize, nil)
asyncCheck server.function(server, ntransp)
elif int32(ovl.data.errCode) == ERROR_OPERATION_ABORTED:
# CancelIO() interrupt or close.
if server.status == ServerStatus.Closed:
server.loopFuture.complete()
if not isNil(server.udata) and GCUserData in server.flags:
GC_unref(cast[ref int](server.udata))
GC_unref(server)
break
else:
if ovl.data.errCode == OSErrorCode(-1):
if setsockopt(SocketHandle(server.asock), cint(SOL_SOCKET),
cint(SO_UPDATE_ACCEPT_CONTEXT), addr server.sock,
SockLen(sizeof(SocketHandle))) != 0'i32:
let err = OSErrorCode(wsaGetLastError())
server.asock.closeSocket()
raiseTransportOsError(err)
else:
var ntransp: StreamTransport
if not isNil(server.init):
let transp = server.init(server, server.asock)
ntransp = newStreamSocketTransport(server.asock,
server.bufferSize,
transp)
else:
ntransp = newStreamSocketTransport(server.asock,
server.bufferSize, nil)
asyncCheck server.function(server, ntransp)
elif int32(ovl.data.errCode) == ERROR_OPERATION_ABORTED:
# CancelIO() interrupt
server.asock.closeSocket()
break
else:
server.asock.closeSocket()
raiseTransportOsError(ovl.data.errCode)
server.asock.closeSocket()
raiseTransportOsError(ovl.data.errCode)
else:
## Initiation
if server.status in {ServerStatus.Stopped, ServerStatus.Closed}:
## Server was already stopped/closed exiting
if server.status notin {ServerStatus.Stopped, ServerStatus.Closed}:
server.apending = true
server.asock = createAsyncSocket(server.domain, SockType.SOCK_STREAM,
Protocol.IPPROTO_TCP)
if server.asock == asyncInvalidSocket:
raiseTransportOsError(OSErrorCode(wsaGetLastError()))
var dwBytesReceived = DWORD(0)
let dwReceiveDataLength = DWORD(0)
let dwLocalAddressLength = DWORD(sizeof(Sockaddr_in6) + 16)
let dwRemoteAddressLength = DWORD(sizeof(Sockaddr_in6) + 16)
let res = loop.acceptEx(SocketHandle(server.sock),
SocketHandle(server.asock),
addr server.abuffer[0],
dwReceiveDataLength, dwLocalAddressLength,
dwRemoteAddressLength, addr dwBytesReceived,
cast[POVERLAPPED](addr server.aovl))
if not res:
let err = osLastError()
if int32(err) == ERROR_OPERATION_ABORTED:
server.apending = false
break
elif int32(err) == ERROR_IO_PENDING:
discard
else:
raiseTransportOsError(err)
break
server.apending = true
server.asock = createAsyncSocket(server.domain, SockType.SOCK_STREAM,
Protocol.IPPROTO_TCP)
if server.asock == asyncInvalidSocket:
raiseTransportOsError(OSErrorCode(wsaGetLastError()))
var dwBytesReceived = DWORD(0)
let dwReceiveDataLength = DWORD(0)
let dwLocalAddressLength = DWORD(sizeof(Sockaddr_in6) + 16)
let dwRemoteAddressLength = DWORD(sizeof(Sockaddr_in6) + 16)
let res = loop.acceptEx(SocketHandle(server.sock),
SocketHandle(server.asock),
addr server.abuffer[0],
dwReceiveDataLength, dwLocalAddressLength,
dwRemoteAddressLength, addr dwBytesReceived,
cast[POVERLAPPED](addr server.aovl))
if not res:
let err = osLastError()
if int32(err) == ERROR_OPERATION_ABORTED:
server.apending = false
break
elif int32(err) == ERROR_IO_PENDING:
discard
else:
raiseTransportOsError(err)
break
else:
# Server close happens in callback, and we are not started new
# AcceptEx session.
if server.status == ServerStatus.Closed:
if not server.loopFuture.finished:
server.loopFuture.complete()
if not isNil(server.udata) and GCUserData in server.flags:
GC_unref(cast[ref int](server.udata))
GC_unref(server)
proc resumeRead(transp: StreamTransport) {.inline.} =
transp.state.excl(ReadPaused)
@ -881,7 +904,7 @@ else:
slen: SockLen
sock: AsyncFD
proto: Protocol
var retFuture = newFuture[StreamTransport]("transport.connect")
var retFuture = newFuture[StreamTransport]("stream.transport.connect")
address.toSAddr(saddr, slen)
proto = Protocol.IPPROTO_TCP
if address.family == AddressFamily.Unix:
@ -985,7 +1008,7 @@ proc stop*(server: StreamServer) =
proc join*(server: StreamServer): Future[void] =
## Waits until ``server`` is not closed.
var retFuture = newFuture[void]("stream.server.join")
var retFuture = newFuture[void]("stream.transport.server.join")
proc continuation(udata: pointer) = retFuture.complete()
if not server.loopFuture.finished:
server.loopFuture.addCallback(continuation)
@ -998,21 +1021,22 @@ proc close*(server: StreamServer) =
##
## Please note that release of resources is not completed immediately, to be
## sure all resources got released please use ``await server.join()``.
proc continuation(udata: pointer) =
server.loopFuture.complete()
if not isNil(server.udata) and GCUserData in server.flags:
GC_unref(cast[ref int](server.udata))
GC_unref(server)
when not defined(windows):
proc continuation(udata: pointer) =
server.loopFuture.complete()
if not isNil(server.udata) and GCUserData in server.flags:
GC_unref(cast[ref int](server.udata))
GC_unref(server)
if server.status == ServerStatus.Stopped:
server.status = ServerStatus.Closed
when defined(windows):
if server.local.family in {AddressFamily.IPv4, AddressFamily.IPv6}:
server.sock.closeSocket(continuation)
server.sock.closeSocket()
elif server.local.family in {AddressFamily.Unix}:
if NoPipeFlash notin server.flags:
discard flushFileBuffers(Handle(server.sock))
doAssert disconnectNamedPipe(Handle(server.sock)) == 1
closeHandle(server.sock, continuation)
closeHandle(server.sock)
else:
server.sock.closeSocket(continuation)
@ -1157,7 +1181,7 @@ proc createStreamServer*(host: TransportAddress,
result.init = init
result.bufferSize = bufferSize
result.status = Starting
result.loopFuture = newFuture[void]("stream.server")
result.loopFuture = newFuture[void]("stream.transport.server")
result.udata = udata
result.local = host
@ -1197,7 +1221,7 @@ proc write*(transp: StreamTransport, pbytes: pointer,
nbytes: int): Future[int] =
## Write data from buffer ``pbytes`` with size ``nbytes`` using transport
## ``transp``.
var retFuture = newFuture[int]()
var retFuture = newFuture[int]("stream.transport.write(pointer)")
transp.checkClosed(retFuture)
var vector = StreamVector(kind: DataBuffer, writer: retFuture,
buf: pbytes, buflen: nbytes)
@ -1208,7 +1232,7 @@ proc write*(transp: StreamTransport, pbytes: pointer,
proc write*(transp: StreamTransport, msg: string, msglen = -1): Future[int] =
## Write data from string ``msg`` using transport ``transp``.
var retFuture = FutureGCString[int]()
var retFuture = newFutureStr[int]("stream.transport.write(string)")
transp.checkClosed(retFuture)
if not isLiteral(msg):
shallowCopy(retFuture.gcholder, msg)
@ -1225,7 +1249,7 @@ proc write*(transp: StreamTransport, msg: string, msglen = -1): Future[int] =
proc write*[T](transp: StreamTransport, msg: seq[T], msglen = -1): Future[int] =
## Write sequence ``msg`` using transport ``transp``.
var retFuture = FutureGCSeq[int, T]()
var retFuture = newFutureSeq[int, T]("stream.transport.write(seq)")
transp.checkClosed(retFuture)
if not isLiteral(msg):
shallowCopy(retFuture.gcholder, msg)
@ -1250,7 +1274,7 @@ proc writeFile*(transp: StreamTransport, handle: int,
when defined(windows):
if transp.kind != TransportKind.Socket:
raise newException(TransportNoSupport, "writeFile() is not supported!")
var retFuture = newFuture[int]("transport.writeFile")
var retFuture = newFuture[int]("stream.transport.writeFile")
transp.checkClosed(retFuture)
var vector = StreamVector(kind: DataFile, writer: retFuture,
buf: cast[pointer](size), offset: offset,
@ -1309,6 +1333,7 @@ proc readOnce*(transp: StreamTransport, pbytes: pointer,
## internal buffer, otherwise it will wait until some bytes will be received.
checkClosed(transp)
checkPending(transp)
while true:
if transp.offset == 0:
if (ReadError in transp.state):
@ -1490,6 +1515,7 @@ proc consume*(transp: StreamTransport, n = -1): Future[int] {.async.} =
## Return number of bytes actually consumed
checkClosed(transp)
checkPending(transp)
result = 0
while true:
if (ReadError in transp.state):
@ -1522,7 +1548,7 @@ proc consume*(transp: StreamTransport, n = -1): Future[int] {.async.} =
proc join*(transp: StreamTransport): Future[void] =
## Wait until ``transp`` will not be closed.
var retFuture = newFuture[void]("streamtransport.join")
var retFuture = newFuture[void]("stream.transport.join")
proc continuation(udata: pointer) = retFuture.complete()
if not transp.future.finished:
transp.future.addCallback(continuation)
@ -1542,7 +1568,6 @@ proc close*(transp: StreamTransport) =
if {ReadClosed, WriteClosed} * transp.state == {}:
transp.state.incl({WriteClosed, ReadClosed})
when defined(windows):
discard cancelIo(Handle(transp.fd))
if transp.kind == TransportKind.Pipe:
if WinServerPipe in transp.flags:
if WinNoPipeFlash notin transp.flags:
@ -1551,9 +1576,23 @@ proc close*(transp: StreamTransport) =
else:
if WinNoPipeFlash notin transp.flags:
discard flushFileBuffers(Handle(transp.fd))
closeHandle(transp.fd, continuation)
if ReadPaused in transp.state:
# If readStreamLoop() is not running we need to finish in
# continuation step.
closeHandle(transp.fd, continuation)
else:
# If readStreamLoop() is running, it will be properly finished inside
# of readStreamLoop().
closeHandle(transp.fd)
elif transp.kind == TransportKind.Socket:
closeSocket(transp.fd, continuation)
if ReadPaused in transp.state:
# If readStreamLoop() is not running we need to finish in
# continuation step.
closeSocket(transp.fd, continuation)
else:
# If readStreamLoop() is running, it will be properly finished inside
# of readStreamLoop().
closeSocket(transp.fd)
else:
closeSocket(transp.fd, continuation)

View File

@ -5,192 +5,190 @@
# Licensed under either of
# Apache License, version 2.0, (LICENSE-APACHEv2)
# MIT license (LICENSE-MIT)
import strutils, unittest
import ../chronos
when isMainModule:
suite "TransportAddress test suite":
test "initTAddress(string)":
check $initTAddress("0.0.0.0:1") == "0.0.0.0:1"
check $initTAddress("255.255.255.255:65535") == "255.255.255.255:65535"
check $initTAddress("[::]:1") == "[::]:1"
check $initTAddress("[FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF]:65535") ==
"[ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff]:65535"
suite "TransportAddress test suite":
test "initTAddress(string)":
check $initTAddress("0.0.0.0:1") == "0.0.0.0:1"
check $initTAddress("255.255.255.255:65535") == "255.255.255.255:65535"
check $initTAddress("[::]:1") == "[::]:1"
check $initTAddress("[FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF]:65535") ==
"[ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff]:65535"
test "initTAddress(string, Port)":
check $initTAddress("0.0.0.0", Port(0)) == "0.0.0.0:0"
check $initTAddress("255.255.255.255", Port(65535)) ==
"255.255.255.255:65535"
check $initTAddress("::", Port(0)) == "[::]:0"
check $initTAddress("FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF",
Port(65535)) ==
"[ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff]:65535"
test "initTAddress(string, Port)":
check $initTAddress("0.0.0.0", Port(0)) == "0.0.0.0:0"
check $initTAddress("255.255.255.255", Port(65535)) ==
"255.255.255.255:65535"
check $initTAddress("::", Port(0)) == "[::]:0"
check $initTAddress("FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF",
Port(65535)) ==
"[ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff]:65535"
test "initTAddress(string, int)":
check $initTAddress("0.0.0.0", 1) == "0.0.0.0:1"
check $initTAddress("255.255.255.255", 65535) ==
"255.255.255.255:65535"
check $initTAddress("::", 0) == "[::]:0"
check $initTAddress("FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF", 65535) ==
"[ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff]:65535"
test "initTAddress(string, int)":
check $initTAddress("0.0.0.0", 1) == "0.0.0.0:1"
check $initTAddress("255.255.255.255", 65535) ==
"255.255.255.255:65535"
check $initTAddress("::", 0) == "[::]:0"
check $initTAddress("FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF", 65535) ==
"[ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff]:65535"
test "resolveTAddress(string, IPv4)":
var numeric = ["0.0.0.0:1", "255.0.0.255:54321", "128.128.128.128:12345",
"255.255.255.255:65535"]
var hostnames = ["www.google.com:443", "www.github.com:443"]
test "resolveTAddress(string, IPv4)":
var numeric = ["0.0.0.0:1", "255.0.0.255:54321", "128.128.128.128:12345",
"255.255.255.255:65535"]
var hostnames = ["www.google.com:443", "www.github.com:443"]
for item in numeric:
var taseq = resolveTAddress(item)
check len(taseq) == 1
check $taseq[0] == item
for item in numeric:
var taseq = resolveTAddress(item)
check len(taseq) == 1
check $taseq[0] == item
for item in hostnames:
var taseq = resolveTAddress(item)
check len(taseq) >= 1
for item in hostnames:
var taseq = resolveTAddress(item)
check len(taseq) >= 1
# test "resolveTAddress(string, IPv6)":
# var numeric = [
# "[::]:1",
# "[ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff]:65535",
# "[aaaa:bbbb:cccc:dddd:eeee:ffff::1111]:12345",
# "[aaaa:bbbb:cccc:dddd:eeee:ffff::]:12345",
# "[a:b:c:d:e:f::]:12345",
# "[2222:3333:4444:5555:6666:7777:8888:9999]:56789"
# ]
# var hostnames = ["localhost:443"]
# test "resolveTAddress(string, IPv6)":
# var numeric = [
# "[::]:1",
# "[ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff]:65535",
# "[aaaa:bbbb:cccc:dddd:eeee:ffff::1111]:12345",
# "[aaaa:bbbb:cccc:dddd:eeee:ffff::]:12345",
# "[a:b:c:d:e:f::]:12345",
# "[2222:3333:4444:5555:6666:7777:8888:9999]:56789"
# ]
# var hostnames = ["localhost:443"]
# for item in numeric:
# var taseq = resolveTAddress(item, IpAddressFamily.IPv6)
# check len(taseq) == 1
# check $taseq[0] == item
# for item in numeric:
# var taseq = resolveTAddress(item, IpAddressFamily.IPv6)
# check len(taseq) == 1
# check $taseq[0] == item
# for item in hostnames:
# var taseq = resolveTAddress(item, IpAddressFamily.IPv6)
# check len(taseq) >= 1
# for item in hostnames:
# var taseq = resolveTAddress(item, IpAddressFamily.IPv6)
# check len(taseq) >= 1
test "resolveTAddress(string, Port, IPv4)":
var numeric = ["0.0.0.0", "255.0.0.255", "128.128.128.128",
"255.255.255.255"]
var hostnames = ["www.google.com", "www.github.com", "localhost"]
test "resolveTAddress(string, Port, IPv4)":
var numeric = ["0.0.0.0", "255.0.0.255", "128.128.128.128",
"255.255.255.255"]
var hostnames = ["www.google.com", "www.github.com", "localhost"]
for item in numeric:
var taseq = resolveTAddress(item, Port(443))
check len(taseq) == 1
check $taseq[0] == item & ":443"
for item in numeric:
var taseq = resolveTAddress(item, Port(443))
check len(taseq) == 1
check $taseq[0] == item & ":443"
for item in hostnames:
var taseq = resolveTAddress(item, Port(443))
check len(taseq) >= 1
for item in hostnames:
var taseq = resolveTAddress(item, Port(443))
check len(taseq) >= 1
# test "resolveTAddress(string, Port, IPv6)":
# var numeric = [
# "::",
# "ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff",
# "aaaa:bbbb:cccc:dddd:eeee:ffff::1111",
# "aaaa:bbbb:cccc:dddd:eeee:ffff::",
# "a:b:c:d:e:f::",
# "2222:3333:4444:5555:6666:7777:8888:9999"
# ]
# var hostnames = ["localhost"]
# for item in numeric:
# var taseq = resolveTAddress(item, Port(443), IpAddressFamily.IPv6)
# check len(taseq) == 1
# check $taseq[0] == "[" & item & "]:443"
# test "resolveTAddress(string, Port, IPv6)":
# var numeric = [
# "::",
# "ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff",
# "aaaa:bbbb:cccc:dddd:eeee:ffff::1111",
# "aaaa:bbbb:cccc:dddd:eeee:ffff::",
# "a:b:c:d:e:f::",
# "2222:3333:4444:5555:6666:7777:8888:9999"
# ]
# var hostnames = ["localhost"]
# for item in numeric:
# var taseq = resolveTAddress(item, Port(443), IpAddressFamily.IPv6)
# check len(taseq) == 1
# check $taseq[0] == "[" & item & "]:443"
# for item in hostnames:
# var taseq = resolveTAddress(item, Port(443), IpAddressFamily.IPv6)
# check len(taseq) >= 1
# for item in hostnames:
# var taseq = resolveTAddress(item, Port(443), IpAddressFamily.IPv6)
# check len(taseq) >= 1
test "Faulty initTAddress(string)":
var tests = [
"z:1",
"256.256.256.256:65534",
"127.0.0.1:65536"
]
var errcounter = 0
for item in tests:
try:
var ta = initTAddress(item)
except TransportAddressError:
inc(errcounter)
check errcounter == len(tests)
test "Faulty initTAddress(string, Port)":
var tests = [
":::",
"999.999.999.999",
"gggg:aaaa:bbbb:gggg:aaaa:bbbb:gggg:aaaa",
"hostname"
]
var errcounter = 0
for item in tests:
try:
var ta = initTAddress(item, Port(443))
except TransportAddressError:
inc(errcounter)
check errcounter == len(tests)
test "Faulty initTAddress(string, Port)":
var errcounter = 0
test "Faulty initTAddress(string)":
var tests = [
"z:1",
"256.256.256.256:65534",
"127.0.0.1:65536"
]
var errcounter = 0
for item in tests:
try:
var ta = initTAddress("127.0.0.1", 100000)
var ta = initTAddress(item)
except TransportAddressError:
inc(errcounter)
check errcounter == 1
check errcounter == len(tests)
test "Faulty resolveTAddress(string, IPv4) for IPv6 address":
var numeric = [
"[::]:1",
"[ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff]:65535",
"[aaaa:bbbb:cccc:dddd:eeee:ffff::1111]:12345",
"[aaaa:bbbb:cccc:dddd:eeee:ffff::]:12345",
"[a:b:c:d:e:f::]:12345",
"[2222:3333:4444:5555:6666:7777:8888:9999]:56789"
]
var errcounter = 0
for item in numeric:
try:
var taseq = resolveTAddress(item)
except TransportAddressError:
inc(errcounter)
check errcounter == len(numeric)
test "Faulty initTAddress(string, Port)":
var tests = [
":::",
"999.999.999.999",
"gggg:aaaa:bbbb:gggg:aaaa:bbbb:gggg:aaaa",
"hostname"
]
var errcounter = 0
for item in tests:
try:
var ta = initTAddress(item, Port(443))
except TransportAddressError:
inc(errcounter)
check errcounter == len(tests)
test "Faulty resolveTAddress(string, Port, IPv4) for IPv6 address":
var numeric = [
"::",
"ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff",
"aaaa:bbbb:cccc:dddd:eeee:ffff::1111",
"aaaa:bbbb:cccc:dddd:eeee:ffff::",
"a:b:c:d:e:f::",
"2222:3333:4444:5555:6666:7777:8888:9999"
]
var errcounter = 0
for item in numeric:
try:
var taseq = resolveTAddress(item, Port(443))
except TransportAddressError:
inc(errcounter)
check errcounter == len(numeric)
test "Faulty initTAddress(string, Port)":
var errcounter = 0
try:
var ta = initTAddress("127.0.0.1", 100000)
except TransportAddressError:
inc(errcounter)
check errcounter == 1
# test "Faulty resolveTAddress(string, IPv6) for IPv4 address":
# var numeric = ["0.0.0.0:0", "255.0.0.255:54321", "128.128.128.128:12345",
# "255.255.255.255:65535"]
# var errcounter = 0
# for item in numeric:
# try:
# var taseq = resolveTAddress(item, IpAddressFamily.IPv6)
# except TransportAddressError:
# inc(errcounter)
# check errcounter == len(numeric)
test "Faulty resolveTAddress(string, IPv4) for IPv6 address":
var numeric = [
"[::]:1",
"[ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff]:65535",
"[aaaa:bbbb:cccc:dddd:eeee:ffff::1111]:12345",
"[aaaa:bbbb:cccc:dddd:eeee:ffff::]:12345",
"[a:b:c:d:e:f::]:12345",
"[2222:3333:4444:5555:6666:7777:8888:9999]:56789"
]
var errcounter = 0
for item in numeric:
try:
var taseq = resolveTAddress(item)
except TransportAddressError:
inc(errcounter)
check errcounter == len(numeric)
# test "Faulty resolveTAddress(string, Port, IPv6) for IPv4 address":
# var numeric = ["0.0.0.0", "255.0.0.255", "128.128.128.128",
# "255.255.255.255"]
# var errcounter = 0
# for item in numeric:
# try:
# var taseq = resolveTAddress(item, Port(443), IpAddressFamily.IPv6)
# except TransportAddressError:
# inc(errcounter)
# check errcounter == len(numeric)
test "Faulty resolveTAddress(string, Port, IPv4) for IPv6 address":
var numeric = [
"::",
"ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff",
"aaaa:bbbb:cccc:dddd:eeee:ffff::1111",
"aaaa:bbbb:cccc:dddd:eeee:ffff::",
"a:b:c:d:e:f::",
"2222:3333:4444:5555:6666:7777:8888:9999"
]
var errcounter = 0
for item in numeric:
try:
var taseq = resolveTAddress(item, Port(443))
except TransportAddressError:
inc(errcounter)
check errcounter == len(numeric)
# test "Faulty resolveTAddress(string, IPv6) for IPv4 address":
# var numeric = ["0.0.0.0:0", "255.0.0.255:54321", "128.128.128.128:12345",
# "255.255.255.255:65535"]
# var errcounter = 0
# for item in numeric:
# try:
# var taseq = resolveTAddress(item, IpAddressFamily.IPv6)
# except TransportAddressError:
# inc(errcounter)
# check errcounter == len(numeric)
# test "Faulty resolveTAddress(string, Port, IPv6) for IPv4 address":
# var numeric = ["0.0.0.0", "255.0.0.255", "128.128.128.128",
# "255.255.255.255"]
# var errcounter = 0
# for item in numeric:
# try:
# var taseq = resolveTAddress(item, Port(443), IpAddressFamily.IPv6)
# except TransportAddressError:
# inc(errcounter)
# check errcounter == len(numeric)

9
tests/testall.nim Normal file
View File

@ -0,0 +1,9 @@
# Chronos Test Suite
# (c) Copyright 2018-Present
# Status Research & Development GmbH
#
# Licensed under either of
# Apache License, version 2.0, (LICENSE-APACHEv2)
# MIT license (LICENSE-MIT)
import testsync, testsoon, testtime, testfut, testsignal, testaddress,
testdatagram, teststream, testserver, testbugs

View File

@ -5,43 +5,41 @@
# Licensed under either of
# Apache License, version 2.0, (LICENSE-APACHEv2)
# MIT license (LICENSE-MIT)
import unittest
import ../chronos
const HELLO_PORT = 45679
const TEST_MSG = "testmsg"
const MSG_LEN = TEST_MSG.len()
suite "Asynchronous issues test suite":
const HELLO_PORT = 45679
const TEST_MSG = "testmsg"
const MSG_LEN = TEST_MSG.len()
type
CustomData = ref object
test: string
type
CustomData = ref object
test: string
proc udp4DataAvailable(transp: DatagramTransport,
remote: TransportAddress): Future[void] {.async, gcsafe.} =
var udata = getUserData[CustomData](transp)
var expect = TEST_MSG
var data: seq[byte]
var datalen: int
transp.peekMessage(data, datalen)
if udata.test == "CHECK" and datalen == MSG_LEN and
equalMem(addr data[0], addr expect[0], datalen):
udata.test = "OK"
transp.close()
proc udp4DataAvailable(transp: DatagramTransport,
remote: TransportAddress): Future[void] {.async, gcsafe.} =
var udata = getUserData[CustomData](transp)
var expect = TEST_MSG
var data: seq[byte]
var datalen: int
transp.peekMessage(data, datalen)
if udata.test == "CHECK" and datalen == MSG_LEN and
equalMem(addr data[0], addr expect[0], datalen):
udata.test = "OK"
transp.close()
proc issue6(): Future[bool] {.async.} =
var myself = initTAddress("127.0.0.1:" & $HELLO_PORT)
var data = CustomData()
data.test = "CHECK"
var dsock4 = newDatagramTransport(udp4DataAvailable, udata = data,
local = myself)
await dsock4.sendTo(myself, TEST_MSG, MSG_LEN)
await dsock4.join()
if data.test == "OK":
result = true
proc issue6(): Future[bool] {.async.} =
var myself = initTAddress("127.0.0.1:" & $HELLO_PORT)
var data = CustomData()
data.test = "CHECK"
var dsock4 = newDatagramTransport(udp4DataAvailable, udata = data,
local = myself)
await dsock4.sendTo(myself, TEST_MSG, MSG_LEN)
await dsock4.join()
if data.test == "OK":
result = true
when isMainModule:
suite "Asynchronous issues test suite":
test "Issue #6":
var res = waitFor(issue6())
check res == true
test "Issue #6":
var res = waitFor(issue6())
check res == true

View File

@ -5,452 +5,15 @@
# Licensed under either of
# Apache License, version 2.0, (LICENSE-APACHEv2)
# MIT license (LICENSE-MIT)
import strutils, net, unittest
import ../chronos
const
TestsCount = 2000
ClientsCount = 20
MessagesCount = 20
proc client1(transp: DatagramTransport,
raddr: TransportAddress): Future[void] {.async.} =
var pbytes: seq[byte]
var nbytes: int
transp.peekMessage(pbytes, nbytes)
if nbytes > 0:
var data = newString(nbytes + 1)
copyMem(addr data[0], addr pbytes[0], nbytes)
data.setLen(nbytes)
if data.startsWith("REQUEST"):
var numstr = data[7..^1]
var num = parseInt(numstr)
var ans = "ANSWER" & $num
await transp.sendTo(raddr, addr ans[0], len(ans))
else:
var err = "ERROR"
await transp.sendTo(raddr, addr err[0], len(err))
else:
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = -1
transp.close()
proc client2(transp: DatagramTransport,
raddr: TransportAddress): Future[void] {.async.} =
var pbytes: seq[byte]
var nbytes: int
transp.peekMessage(pbytes, nbytes)
if nbytes > 0:
var data = newString(nbytes + 1)
copyMem(addr data[0], addr pbytes[0], nbytes)
data.setLen(nbytes)
if data.startsWith("ANSWER"):
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = counterPtr[] + 1
if counterPtr[] == TestsCount:
transp.close()
else:
var ta = initTAddress("127.0.0.1:33336")
var req = "REQUEST" & $counterPtr[]
await transp.sendTo(ta, addr req[0], len(req))
else:
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = -1
transp.close()
else:
## Read operation failed with error
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = -1
transp.close()
proc client3(transp: DatagramTransport,
raddr: TransportAddress): Future[void] {.async.} =
var pbytes: seq[byte]
var nbytes: int
transp.peekMessage(pbytes, nbytes)
if nbytes > 0:
var data = newString(nbytes + 1)
copyMem(addr data[0], addr pbytes[0], nbytes)
data.setLen(nbytes)
if data.startsWith("ANSWER"):
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = counterPtr[] + 1
if counterPtr[] == TestsCount:
transp.close()
else:
var req = "REQUEST" & $counterPtr[]
await transp.send(addr req[0], len(req))
else:
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = -1
transp.close()
else:
## Read operation failed with error
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = -1
transp.close()
proc client4(transp: DatagramTransport,
raddr: TransportAddress): Future[void] {.async.} =
var pbytes: seq[byte]
var nbytes: int
transp.peekMessage(pbytes, nbytes)
if nbytes > 0:
var data = newString(nbytes + 1)
copyMem(addr data[0], addr pbytes[0], nbytes)
data.setLen(nbytes)
if data.startsWith("ANSWER"):
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = counterPtr[] + 1
if counterPtr[] == MessagesCount:
transp.close()
else:
var req = "REQUEST" & $counterPtr[]
await transp.send(addr req[0], len(req))
else:
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = -1
transp.close()
else:
## Read operation failed with error
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = -1
transp.close()
proc client5(transp: DatagramTransport,
raddr: TransportAddress): Future[void] {.async.} =
var pbytes: seq[byte]
var nbytes: int
transp.peekMessage(pbytes, nbytes)
if nbytes > 0:
var data = newString(nbytes + 1)
copyMem(addr data[0], addr pbytes[0], nbytes)
data.setLen(nbytes)
if data.startsWith("ANSWER"):
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = counterPtr[] + 1
if counterPtr[] == MessagesCount:
transp.close()
else:
var req = "REQUEST" & $counterPtr[]
await transp.sendTo(raddr, addr req[0], len(req))
else:
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = -1
transp.close()
else:
## Read operation failed with error
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = -1
transp.close()
proc client6(transp: DatagramTransport,
raddr: TransportAddress): Future[void] {.async.} =
var pbytes: seq[byte]
var nbytes: int
transp.peekMessage(pbytes, nbytes)
if nbytes > 0:
var data = newString(nbytes + 1)
copyMem(addr data[0], addr pbytes[0], nbytes)
data.setLen(nbytes)
if data.startsWith("REQUEST"):
var numstr = data[7..^1]
var num = parseInt(numstr)
var ans = "ANSWER" & $num
await transp.sendTo(raddr, ans)
else:
var err = "ERROR"
await transp.sendTo(raddr, err)
else:
## Read operation failed with error
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = -1
transp.close()
proc client7(transp: DatagramTransport,
raddr: TransportAddress): Future[void] {.async.} =
var pbytes: seq[byte]
var nbytes: int
transp.peekMessage(pbytes, nbytes)
if nbytes > 0:
var data = newString(nbytes + 1)
copyMem(addr data[0], addr pbytes[0], nbytes)
data.setLen(nbytes)
if data.startsWith("ANSWER"):
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = counterPtr[] + 1
if counterPtr[] == TestsCount:
transp.close()
else:
var req = "REQUEST" & $counterPtr[]
await transp.sendTo(raddr, req)
else:
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = -1
transp.close()
else:
## Read operation failed with error
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = -1
transp.close()
proc client8(transp: DatagramTransport,
raddr: TransportAddress): Future[void] {.async.} =
var pbytes: seq[byte]
var nbytes: int
transp.peekMessage(pbytes, nbytes)
if nbytes > 0:
var data = newString(nbytes + 1)
copyMem(addr data[0], addr pbytes[0], nbytes)
data.setLen(nbytes)
if data.startsWith("ANSWER"):
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = counterPtr[] + 1
if counterPtr[] == TestsCount:
transp.close()
else:
var req = "REQUEST" & $counterPtr[]
await transp.send(req)
else:
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = -1
transp.close()
else:
## Read operation failed with error
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = -1
transp.close()
proc client9(transp: DatagramTransport,
raddr: TransportAddress): Future[void] {.async.} =
var pbytes: seq[byte]
var nbytes: int
transp.peekMessage(pbytes, nbytes)
if nbytes > 0:
var data = newString(nbytes + 1)
copyMem(addr data[0], addr pbytes[0], nbytes)
data.setLen(nbytes)
if data.startsWith("REQUEST"):
var numstr = data[7..^1]
var num = parseInt(numstr)
var ans = "ANSWER" & $num
var ansseq = newSeq[byte](len(ans))
copyMem(addr ansseq[0], addr ans[0], len(ans))
await transp.sendTo(raddr, ansseq)
else:
var err = "ERROR"
var errseq = newSeq[byte](len(err))
copyMem(addr errseq[0], addr err[0], len(err))
await transp.sendTo(raddr, errseq)
else:
## Read operation failed with error
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = -1
transp.close()
proc client10(transp: DatagramTransport,
raddr: TransportAddress): Future[void] {.async.} =
var pbytes: seq[byte]
var nbytes: int
transp.peekMessage(pbytes, nbytes)
if nbytes > 0:
var data = newString(nbytes + 1)
copyMem(addr data[0], addr pbytes[0], nbytes)
data.setLen(nbytes)
if data.startsWith("ANSWER"):
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = counterPtr[] + 1
if counterPtr[] == TestsCount:
transp.close()
else:
var req = "REQUEST" & $counterPtr[]
var reqseq = newSeq[byte](len(req))
copyMem(addr reqseq[0], addr req[0], len(req))
await transp.sendTo(raddr, reqseq)
else:
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = -1
transp.close()
else:
## Read operation failed with error
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = -1
transp.close()
proc client11(transp: DatagramTransport,
raddr: TransportAddress): Future[void] {.async.} =
var pbytes: seq[byte]
var nbytes: int
transp.peekMessage(pbytes, nbytes)
if nbytes > 0:
var data = newString(nbytes + 1)
copyMem(addr data[0], addr pbytes[0], nbytes)
data.setLen(nbytes)
if data.startsWith("ANSWER"):
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = counterPtr[] + 1
if counterPtr[] == TestsCount:
transp.close()
else:
var req = "REQUEST" & $counterPtr[]
var reqseq = newSeq[byte](len(req))
copyMem(addr reqseq[0], addr req[0], len(req))
await transp.send(reqseq)
else:
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = -1
transp.close()
else:
## Read operation failed with error
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = -1
transp.close()
proc testPointerSendTo(): Future[int] {.async.} =
## sendTo(pointer) test
var ta = initTAddress("127.0.0.1:33336")
var counter = 0
var dgram1 = newDatagramTransport(client1, udata = addr counter, local = ta)
var dgram2 = newDatagramTransport(client2, udata = addr counter)
var data = "REQUEST0"
await dgram2.sendTo(ta, addr data[0], len(data))
await dgram2.join()
dgram1.close()
await dgram1.join()
result = counter
proc testPointerSend(): Future[int] {.async.} =
## send(pointer) test
var ta = initTAddress("127.0.0.1:33337")
var counter = 0
var dgram1 = newDatagramTransport(client1, udata = addr counter, local = ta)
var dgram2 = newDatagramTransport(client3, udata = addr counter, remote = ta)
var data = "REQUEST0"
await dgram2.send(addr data[0], len(data))
await dgram2.join()
dgram1.close()
await dgram1.join()
result = counter
proc testStringSendTo(): Future[int] {.async.} =
## sendTo(string) test
var ta = initTAddress("127.0.0.1:33338")
var counter = 0
var dgram1 = newDatagramTransport(client6, udata = addr counter, local = ta)
var dgram2 = newDatagramTransport(client7, udata = addr counter)
var data = "REQUEST0"
await dgram2.sendTo(ta, data)
await dgram2.join()
dgram1.close()
await dgram1.join()
result = counter
proc testStringSend(): Future[int] {.async.} =
## send(string) test
var ta = initTAddress("127.0.0.1:33339")
var counter = 0
var dgram1 = newDatagramTransport(client6, udata = addr counter, local = ta)
var dgram2 = newDatagramTransport(client8, udata = addr counter, remote = ta)
var data = "REQUEST0"
await dgram2.send(data)
await dgram2.join()
dgram1.close()
await dgram1.join()
result = counter
proc testSeqSendTo(): Future[int] {.async.} =
## sendTo(string) test
var ta = initTAddress("127.0.0.1:33340")
var counter = 0
var dgram1 = newDatagramTransport(client9, udata = addr counter, local = ta)
var dgram2 = newDatagramTransport(client10, udata = addr counter)
var data = "REQUEST0"
var dataseq = newSeq[byte](len(data))
copyMem(addr dataseq[0], addr data[0], len(data))
await dgram2.sendTo(ta, dataseq)
await dgram2.join()
dgram1.close()
await dgram1.join()
result = counter
proc testSeqSend(): Future[int] {.async.} =
## send(string) test
var ta = initTAddress("127.0.0.1:33341")
var counter = 0
var dgram1 = newDatagramTransport(client9, udata = addr counter, local = ta)
var dgram2 = newDatagramTransport(client11, udata = addr counter, remote = ta)
var data = "REQUEST0"
var dataseq = newSeq[byte](len(data))
copyMem(addr dataseq[0], addr data[0], len(data))
await dgram2.send(data)
await dgram2.join()
dgram1.close()
await dgram1.join()
result = counter
#
proc waitAll(futs: seq[Future[void]]): Future[void] =
var counter = len(futs)
var retFuture = newFuture[void]("waitAll")
proc cb(udata: pointer) =
dec(counter)
if counter == 0:
retFuture.complete()
for fut in futs:
fut.addCallback(cb)
return retFuture
proc test3(bounded: bool): Future[int] {.async.} =
var ta: TransportAddress
if bounded:
ta = initTAddress("127.0.0.1:33240")
else:
ta = initTAddress("127.0.0.1:33241")
var counter = 0
var dgram1 = newDatagramTransport(client1, udata = addr counter, local = ta)
var clients = newSeq[Future[void]](ClientsCount)
var grams = newSeq[DatagramTransport](ClientsCount)
var counters = newSeq[int](ClientsCount)
for i in 0..<ClientsCount:
var data = "REQUEST0"
if bounded:
grams[i] = newDatagramTransport(client4, udata = addr counters[i],
remote = ta)
await grams[i].send(addr data[0], len(data))
else:
grams[i] = newDatagramTransport(client5, udata = addr counters[i])
await grams[i].sendTo(ta, addr data[0], len(data))
clients[i] = grams[i].join()
await waitAll(clients)
dgram1.close()
await dgram1.join()
result = 0
for i in 0..<ClientsCount:
result += counters[i]
proc testConnReset(): Future[bool] {.async.} =
var ta = initTAddress("127.0.0.1:65000")
var counter = 0
proc clientMark(transp: DatagramTransport,
raddr: TransportAddress): Future[void] {.async.} =
counter = 1
transp.close()
var dgram1 = newDatagramTransport(client1, local = ta)
dgram1.close()
await dgram1.join()
var dgram2 = newDatagramTransport(clientMark)
var data = "MESSAGE"
asyncCheck dgram2.sendTo(ta, data)
await sleepAsync(2000.milliseconds)
result = (counter == 0)
dgram2.close()
await dgram2.join()
when isMainModule:
suite "Datagram Transport test suite":
const
TestsCount = 2000
ClientsCount = 20
MessagesCount = 20
m1 = "sendTo(pointer) test (" & $TestsCount & " messages)"
m2 = "send(pointer) test (" & $TestsCount & " messages)"
m3 = "sendTo(string) test (" & $TestsCount & " messages)"
@ -461,22 +24,462 @@ when isMainModule:
" clients x " & $MessagesCount & " messages)"
m8 = "Bounded multiple clients with messages (" & $ClientsCount &
" clients x " & $MessagesCount & " messages)"
suite "Datagram Transport test suite":
test m1:
check waitFor(testPointerSendTo()) == TestsCount
test m2:
check waitFor(testPointerSend()) == TestsCount
test m3:
check waitFor(testStringSendTo()) == TestsCount
test m4:
check waitFor(testStringSend()) == TestsCount
test m5:
check waitFor(testSeqSendTo()) == TestsCount
test m6:
check waitFor(testSeqSend()) == TestsCount
test m7:
check waitFor(test3(false)) == ClientsCount * MessagesCount
test m8:
check waitFor(test3(true)) == ClientsCount * MessagesCount
test "Datagram connection reset test":
check waitFor(testConnReset()) == true
proc client1(transp: DatagramTransport,
raddr: TransportAddress): Future[void] {.async.} =
var pbytes = transp.getMessage()
var nbytes = len(pbytes)
if nbytes > 0:
var data = newString(nbytes + 1)
copyMem(addr data[0], addr pbytes[0], nbytes)
data.setLen(nbytes)
if data.startsWith("REQUEST"):
var numstr = data[7..^1]
var num = parseInt(numstr)
var ans = "ANSWER" & $num
await transp.sendTo(raddr, addr ans[0], len(ans))
else:
var err = "ERROR"
await transp.sendTo(raddr, addr err[0], len(err))
else:
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = -1
transp.close()
proc client2(transp: DatagramTransport,
raddr: TransportAddress): Future[void] {.async.} =
var pbytes = transp.getMessage()
var nbytes = len(pbytes)
if nbytes > 0:
var data = newString(nbytes + 1)
copyMem(addr data[0], addr pbytes[0], nbytes)
data.setLen(nbytes)
if data.startsWith("ANSWER"):
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = counterPtr[] + 1
if counterPtr[] == TestsCount:
transp.close()
else:
var ta = initTAddress("127.0.0.1:33336")
var req = "REQUEST" & $counterPtr[]
await transp.sendTo(ta, addr req[0], len(req))
else:
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = -1
transp.close()
else:
## Read operation failed with error
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = -1
transp.close()
proc client3(transp: DatagramTransport,
raddr: TransportAddress): Future[void] {.async.} =
var pbytes = transp.getMessage()
var nbytes = len(pbytes)
if nbytes > 0:
var data = newString(nbytes + 1)
copyMem(addr data[0], addr pbytes[0], nbytes)
data.setLen(nbytes)
if data.startsWith("ANSWER"):
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = counterPtr[] + 1
if counterPtr[] == TestsCount:
transp.close()
else:
var req = "REQUEST" & $counterPtr[]
await transp.send(addr req[0], len(req))
else:
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = -1
transp.close()
else:
## Read operation failed with error
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = -1
transp.close()
proc client4(transp: DatagramTransport,
raddr: TransportAddress): Future[void] {.async.} =
var pbytes = transp.getMessage()
var nbytes = len(pbytes)
if nbytes > 0:
var data = newString(nbytes + 1)
copyMem(addr data[0], addr pbytes[0], nbytes)
data.setLen(nbytes)
if data.startsWith("ANSWER"):
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = counterPtr[] + 1
if counterPtr[] == MessagesCount:
transp.close()
else:
var req = "REQUEST" & $counterPtr[]
await transp.send(addr req[0], len(req))
else:
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = -1
transp.close()
else:
## Read operation failed with error
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = -1
transp.close()
proc client5(transp: DatagramTransport,
raddr: TransportAddress): Future[void] {.async.} =
var pbytes = transp.getMessage()
var nbytes = len(pbytes)
if nbytes > 0:
var data = newString(nbytes + 1)
copyMem(addr data[0], addr pbytes[0], nbytes)
data.setLen(nbytes)
if data.startsWith("ANSWER"):
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = counterPtr[] + 1
if counterPtr[] == MessagesCount:
transp.close()
else:
var req = "REQUEST" & $counterPtr[]
await transp.sendTo(raddr, addr req[0], len(req))
else:
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = -1
transp.close()
else:
## Read operation failed with error
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = -1
transp.close()
proc client6(transp: DatagramTransport,
raddr: TransportAddress): Future[void] {.async.} =
var pbytes = transp.getMessage()
var nbytes = len(pbytes)
if nbytes > 0:
var data = newString(nbytes + 1)
copyMem(addr data[0], addr pbytes[0], nbytes)
data.setLen(nbytes)
if data.startsWith("REQUEST"):
var numstr = data[7..^1]
var num = parseInt(numstr)
var ans = "ANSWER" & $num
await transp.sendTo(raddr, ans)
else:
var err = "ERROR"
await transp.sendTo(raddr, err)
else:
## Read operation failed with error
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = -1
transp.close()
proc client7(transp: DatagramTransport,
raddr: TransportAddress): Future[void] {.async.} =
var pbytes = transp.getMessage()
var nbytes = len(pbytes)
if nbytes > 0:
var data = newString(nbytes + 1)
copyMem(addr data[0], addr pbytes[0], nbytes)
data.setLen(nbytes)
if data.startsWith("ANSWER"):
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = counterPtr[] + 1
if counterPtr[] == TestsCount:
transp.close()
else:
var req = "REQUEST" & $counterPtr[]
await transp.sendTo(raddr, req)
else:
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = -1
transp.close()
else:
## Read operation failed with error
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = -1
transp.close()
proc client8(transp: DatagramTransport,
raddr: TransportAddress): Future[void] {.async.} =
var pbytes = transp.getMessage()
var nbytes = len(pbytes)
if nbytes > 0:
var data = newString(nbytes + 1)
copyMem(addr data[0], addr pbytes[0], nbytes)
data.setLen(nbytes)
if data.startsWith("ANSWER"):
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = counterPtr[] + 1
if counterPtr[] == TestsCount:
transp.close()
else:
var req = "REQUEST" & $counterPtr[]
await transp.send(req)
else:
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = -1
transp.close()
else:
## Read operation failed with error
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = -1
transp.close()
proc client9(transp: DatagramTransport,
raddr: TransportAddress): Future[void] {.async.} =
var pbytes = transp.getMessage()
var nbytes = len(pbytes)
if nbytes > 0:
var data = newString(nbytes + 1)
copyMem(addr data[0], addr pbytes[0], nbytes)
data.setLen(nbytes)
if data.startsWith("REQUEST"):
var numstr = data[7..^1]
var num = parseInt(numstr)
var ans = "ANSWER" & $num
var ansseq = newSeq[byte](len(ans))
copyMem(addr ansseq[0], addr ans[0], len(ans))
await transp.sendTo(raddr, ansseq)
else:
var err = "ERROR"
var errseq = newSeq[byte](len(err))
copyMem(addr errseq[0], addr err[0], len(err))
await transp.sendTo(raddr, errseq)
else:
## Read operation failed with error
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = -1
transp.close()
proc client10(transp: DatagramTransport,
raddr: TransportAddress): Future[void] {.async.} =
var pbytes = transp.getMessage()
var nbytes = len(pbytes)
if nbytes > 0:
var data = newString(nbytes + 1)
copyMem(addr data[0], addr pbytes[0], nbytes)
data.setLen(nbytes)
if data.startsWith("ANSWER"):
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = counterPtr[] + 1
if counterPtr[] == TestsCount:
transp.close()
else:
var req = "REQUEST" & $counterPtr[]
var reqseq = newSeq[byte](len(req))
copyMem(addr reqseq[0], addr req[0], len(req))
await transp.sendTo(raddr, reqseq)
else:
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = -1
transp.close()
else:
## Read operation failed with error
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = -1
transp.close()
proc client11(transp: DatagramTransport,
raddr: TransportAddress): Future[void] {.async.} =
var pbytes = transp.getMessage()
var nbytes = len(pbytes)
if nbytes > 0:
var data = newString(nbytes + 1)
copyMem(addr data[0], addr pbytes[0], nbytes)
data.setLen(nbytes)
if data.startsWith("ANSWER"):
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = counterPtr[] + 1
if counterPtr[] == TestsCount:
transp.close()
else:
var req = "REQUEST" & $counterPtr[]
var reqseq = newSeq[byte](len(req))
copyMem(addr reqseq[0], addr req[0], len(req))
await transp.send(reqseq)
else:
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = -1
transp.close()
else:
## Read operation failed with error
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = -1
transp.close()
proc testPointerSendTo(): Future[int] {.async.} =
## sendTo(pointer) test
var ta = initTAddress("127.0.0.1:33336")
var counter = 0
var dgram1 = newDatagramTransport(client1, udata = addr counter, local = ta)
var dgram2 = newDatagramTransport(client2, udata = addr counter)
var data = "REQUEST0"
await dgram2.sendTo(ta, addr data[0], len(data))
await dgram2.join()
dgram1.close()
await dgram1.join()
result = counter
proc testPointerSend(): Future[int] {.async.} =
## send(pointer) test
var ta = initTAddress("127.0.0.1:33337")
var counter = 0
var dgram1 = newDatagramTransport(client1, udata = addr counter, local = ta)
var dgram2 = newDatagramTransport(client3, udata = addr counter, remote = ta)
var data = "REQUEST0"
await dgram2.send(addr data[0], len(data))
await dgram2.join()
dgram1.close()
await dgram1.join()
result = counter
proc testStringSendTo(): Future[int] {.async.} =
## sendTo(string) test
var ta = initTAddress("127.0.0.1:33338")
var counter = 0
var dgram1 = newDatagramTransport(client6, udata = addr counter, local = ta)
var dgram2 = newDatagramTransport(client7, udata = addr counter)
var data = "REQUEST0"
await dgram2.sendTo(ta, data)
await dgram2.join()
dgram1.close()
await dgram1.join()
result = counter
proc testStringSend(): Future[int] {.async.} =
## send(string) test
var ta = initTAddress("127.0.0.1:33339")
var counter = 0
var dgram1 = newDatagramTransport(client6, udata = addr counter, local = ta)
var dgram2 = newDatagramTransport(client8, udata = addr counter, remote = ta)
var data = "REQUEST0"
await dgram2.send(data)
await dgram2.join()
dgram1.close()
await dgram1.join()
result = counter
proc testSeqSendTo(): Future[int] {.async.} =
## sendTo(string) test
var ta = initTAddress("127.0.0.1:33340")
var counter = 0
var dgram1 = newDatagramTransport(client9, udata = addr counter, local = ta)
var dgram2 = newDatagramTransport(client10, udata = addr counter)
var data = "REQUEST0"
var dataseq = newSeq[byte](len(data))
copyMem(addr dataseq[0], addr data[0], len(data))
await dgram2.sendTo(ta, dataseq)
await dgram2.join()
dgram1.close()
await dgram1.join()
result = counter
proc testSeqSend(): Future[int] {.async.} =
## send(seq) test
var ta = initTAddress("127.0.0.1:33341")
var counter = 0
var dgram1 = newDatagramTransport(client9, udata = addr counter, local = ta)
var dgram2 = newDatagramTransport(client11, udata = addr counter, remote = ta)
var data = "REQUEST0"
var dataseq = newSeq[byte](len(data))
copyMem(addr dataseq[0], addr data[0], len(data))
await dgram2.send(data)
await dgram2.join()
dgram1.close()
await dgram1.join()
result = counter
#
proc waitAll(futs: seq[Future[void]]): Future[void] =
var counter = len(futs)
var retFuture = newFuture[void]("waitAll")
proc cb(udata: pointer) =
dec(counter)
if counter == 0:
retFuture.complete()
for fut in futs:
fut.addCallback(cb)
return retFuture
proc test3(bounded: bool): Future[int] {.async.} =
var ta: TransportAddress
if bounded:
ta = initTAddress("127.0.0.1:33240")
else:
ta = initTAddress("127.0.0.1:33241")
var counter = 0
var dgram1 = newDatagramTransport(client1, udata = addr counter, local = ta)
var clients = newSeq[Future[void]](ClientsCount)
var grams = newSeq[DatagramTransport](ClientsCount)
var counters = newSeq[int](ClientsCount)
for i in 0..<ClientsCount:
var data = "REQUEST0"
if bounded:
grams[i] = newDatagramTransport(client4, udata = addr counters[i],
remote = ta)
await grams[i].send(addr data[0], len(data))
else:
grams[i] = newDatagramTransport(client5, udata = addr counters[i])
await grams[i].sendTo(ta, addr data[0], len(data))
clients[i] = grams[i].join()
await waitAll(clients)
dgram1.close()
await dgram1.join()
result = 0
for i in 0..<ClientsCount:
result += counters[i]
proc testConnReset(): Future[bool] {.async.} =
var ta = initTAddress("127.0.0.1:65000")
var counter = 0
proc clientMark(transp: DatagramTransport,
raddr: TransportAddress): Future[void] {.async.} =
counter = 1
transp.close()
var dgram1 = newDatagramTransport(client1, local = ta)
dgram1.close()
await dgram1.join()
var dgram2 = newDatagramTransport(clientMark)
var data = "MESSAGE"
asyncCheck dgram2.sendTo(ta, data)
await sleepAsync(2000.milliseconds)
result = (counter == 0)
dgram2.close()
await dgram2.join()
proc testTransportClose(): Future[bool] {.async.} =
var ta = initTAddress("127.0.0.1:45000")
var counter = 0
proc clientMark(transp: DatagramTransport,
raddr: TransportAddress): Future[void] {.async.} =
discard
var dgram = newDatagramTransport(clientMark, local = ta)
dgram.close()
try:
await wait(dgram.join(), 1.seconds)
result = true
except:
discard
test "close(transport) test":
check waitFor(testTransportClose()) == true
test m1:
check waitFor(testPointerSendTo()) == TestsCount
test m2:
check waitFor(testPointerSend()) == TestsCount
test m3:
check waitFor(testStringSendTo()) == TestsCount
test m4:
check waitFor(testStringSend()) == TestsCount
test m5:
check waitFor(testSeqSendTo()) == TestsCount
test m6:
check waitFor(testSeqSend()) == TestsCount
test m7:
check waitFor(test3(false)) == ClientsCount * MessagesCount
test m8:
check waitFor(test3(true)) == ClientsCount * MessagesCount
test "Datagram connection reset test":
check waitFor(testConnReset()) == true

File diff suppressed because it is too large Load Diff

View File

@ -5,118 +5,117 @@
# Licensed under either of
# Apache License, version 2.0, (LICENSE-APACHEv2)
# MIT license (LICENSE-MIT)
import strutils, unittest
import ../chronos
type
CustomServer = ref object of StreamServer
test1: string
test2: string
test3: string
suite "Server's test suite":
type
CustomServer = ref object of StreamServer
test1: string
test2: string
test3: string
CustomTransport = ref object of StreamTransport
test: string
CustomTransport = ref object of StreamTransport
test: string
CustomData = ref object
test: string
CustomData = ref object
test: string
proc serveStreamClient(server: StreamServer,
transp: StreamTransport) {.async.} =
discard
proc serveStreamClient(server: StreamServer,
transp: StreamTransport) {.async.} =
discard
proc serveCustomStreamClient(server: StreamServer,
transp: StreamTransport) {.async.} =
var cserver = cast[CustomServer](server)
var ctransp = cast[CustomTransport](transp)
cserver.test1 = "CONNECTION"
cserver.test2 = ctransp.test
cserver.test3 = await transp.readLine()
var answer = "ANSWER\r\n"
discard await transp.write(answer)
transp.close()
await transp.join()
proc serveCustomStreamClient(server: StreamServer,
transp: StreamTransport) {.async.} =
var cserver = cast[CustomServer](server)
var ctransp = cast[CustomTransport](transp)
cserver.test1 = "CONNECTION"
cserver.test2 = ctransp.test
cserver.test3 = await transp.readLine()
var answer = "ANSWER\r\n"
discard await transp.write(answer)
transp.close()
await transp.join()
proc serveUdataStreamClient(server: StreamServer,
transp: StreamTransport) {.async.} =
var udata = getUserData[CustomData](server)
var line = await transp.readLine()
var msg = line & udata.test & "\r\n"
discard await transp.write(msg)
transp.close()
await transp.join()
proc serveUdataStreamClient(server: StreamServer,
transp: StreamTransport) {.async.} =
var udata = getUserData[CustomData](server)
var line = await transp.readLine()
var msg = line & udata.test & "\r\n"
discard await transp.write(msg)
transp.close()
await transp.join()
proc customServerTransport(server: StreamServer,
fd: AsyncFD): StreamTransport =
var transp = CustomTransport()
transp.test = "CUSTOM"
result = cast[StreamTransport](transp)
proc customServerTransport(server: StreamServer,
fd: AsyncFD): StreamTransport =
var transp = CustomTransport()
transp.test = "CUSTOM"
result = cast[StreamTransport](transp)
proc test1(): bool =
var ta = initTAddress("127.0.0.1:31354")
var server1 = createStreamServer(ta, serveStreamClient, {ReuseAddr})
server1.start()
server1.stop()
server1.close()
waitFor server1.join()
var server2 = createStreamServer(ta, serveStreamClient, {ReuseAddr})
server2.start()
server2.stop()
server2.close()
waitFor server2.join()
result = true
proc test1(): bool =
var ta = initTAddress("127.0.0.1:31354")
var server1 = createStreamServer(ta, serveStreamClient, {ReuseAddr})
server1.start()
server1.stop()
server1.close()
waitFor server1.join()
var server2 = createStreamServer(ta, serveStreamClient, {ReuseAddr})
server2.start()
server2.stop()
server2.close()
waitFor server2.join()
result = true
proc client1(server: CustomServer, ta: TransportAddress) {.async.} =
var transp = CustomTransport()
transp.test = "CLIENT"
server.start()
var ptransp = await connect(ta, child = transp)
var etransp = cast[CustomTransport](ptransp)
doAssert(etransp.test == "CLIENT")
var msg = "TEST\r\n"
discard await transp.write(msg)
var line = await transp.readLine()
transp.close()
server.stop()
server.close()
await server.join()
proc client1(server: CustomServer, ta: TransportAddress) {.async.} =
var transp = CustomTransport()
transp.test = "CLIENT"
server.start()
var ptransp = await connect(ta, child = transp)
var etransp = cast[CustomTransport](ptransp)
doAssert(etransp.test == "CLIENT")
var msg = "TEST\r\n"
discard await transp.write(msg)
var line = await transp.readLine()
transp.close()
server.stop()
server.close()
await server.join()
proc client2(server: StreamServer,
ta: TransportAddress): Future[bool] {.async.} =
server.start()
var transp = await connect(ta)
var msg = "TEST\r\n"
discard await transp.write(msg)
var line = await transp.readLine()
result = (line == "TESTCUSTOMDATA")
transp.close()
server.stop()
server.close()
await server.join()
proc client2(server: StreamServer,
ta: TransportAddress): Future[bool] {.async.} =
server.start()
var transp = await connect(ta)
var msg = "TEST\r\n"
discard await transp.write(msg)
var line = await transp.readLine()
result = (line == "TESTCUSTOMDATA")
transp.close()
server.stop()
server.close()
await server.join()
proc test3(): bool =
var server = CustomServer()
server.test1 = "TEST"
var ta = initTAddress("127.0.0.1:31354")
var pserver = createStreamServer(ta, serveCustomStreamClient, {ReuseAddr},
child = cast[StreamServer](server),
init = customServerTransport)
waitFor client1(server, ta)
result = (server.test1 == "CONNECTION") and (server.test2 == "CUSTOM")
proc test3(): bool =
var server = CustomServer()
server.test1 = "TEST"
var ta = initTAddress("127.0.0.1:31354")
var pserver = createStreamServer(ta, serveCustomStreamClient, {ReuseAddr},
child = cast[StreamServer](server),
init = customServerTransport)
waitFor client1(server, ta)
result = (server.test1 == "CONNECTION") and (server.test2 == "CUSTOM")
proc test4(): bool =
var co = CustomData()
co.test = "CUSTOMDATA"
var ta = initTAddress("127.0.0.1:31354")
var server = createStreamServer(ta, serveUdataStreamClient, {ReuseAddr},
udata = co)
result = waitFor client2(server, ta)
proc test4(): bool =
var co = CustomData()
co.test = "CUSTOMDATA"
var ta = initTAddress("127.0.0.1:31354")
var server = createStreamServer(ta, serveUdataStreamClient, {ReuseAddr},
udata = co)
result = waitFor client2(server, ta)
when isMainModule:
suite "Server's test suite":
test "Stream Server start/stop test":
check test1() == true
test "Stream Server inherited object test":
check test3() == true
test "StreamServer[T] test":
check test4() == true
test "Stream Server start/stop test":
check test1() == true
test "Stream Server inherited object test":
check test3() == true
test "StreamServer[T] test":
check test4() == true

View File

@ -5,39 +5,37 @@
# Licensed under either of
# Apache License, version 2.0, (LICENSE-APACHEv2)
# MIT license (LICENSE-MIT)
import unittest, strutils
import ../chronos
when not defined(windows):
import posix
var signalCounter = 0
suite "Signal handling test suite":
when not defined(windows):
var signalCounter = 0
proc signalProc(udata: pointer) =
var cdata = cast[ptr CompletionData](udata)
signalCounter = cast[int](cdata.udata)
removeSignal(int(cdata.fd))
proc signalProc(udata: pointer) =
var cdata = cast[ptr CompletionData](udata)
signalCounter = cast[int](cdata.udata)
removeSignal(int(cdata.fd))
proc asyncProc() {.async.} =
await sleepAsync(500.milliseconds)
proc asyncProc() {.async.} =
await sleepAsync(500.milliseconds)
proc test(signal, value: int): bool =
discard addSignal(signal, signalProc, cast[pointer](value))
var fut = asyncProc()
discard posix.kill(posix.getpid(), cint(signal))
waitFor(fut)
signalCounter == value
proc test(signal, value: int): bool =
discard addSignal(signal, signalProc, cast[pointer](value))
var fut = asyncProc()
discard posix.kill(posix.getpid(), cint(signal))
waitFor(fut)
signalCounter == value
else:
const
SIGINT = 0
SIGTERM = 0
proc test(signal, value: int): bool = true
else:
const
SIGINT = 0
SIGTERM = 0
proc test(signal, value: int): bool = true
when isMainModule:
suite "Signal handling test suite":
test "SIGINT test":
check test(SIGINT, 31337) == true
test "SIGTERM test":
check test(SIGTERM, 65537) == true
test "SIGINT test":
check test(SIGINT, 31337) == true
test "SIGTERM test":
check test(SIGTERM, 65537) == true

View File

@ -5,79 +5,77 @@
# Licensed under either of
# Apache License, version 2.0, (LICENSE-APACHEv2)
# MIT license (LICENSE-MIT)
import unittest
import ../chronos
const CallSoonTests = 10
var soonTest1 = 0'u
var timeoutsTest1 = 0
var timeoutsTest2 = 0
var soonTest2 = 0
suite "callSoon() tests suite":
const CallSoonTests = 10
var soonTest1 = 0'u
var timeoutsTest1 = 0
var timeoutsTest2 = 0
var soonTest2 = 0
proc callback1(udata: pointer) {.gcsafe.} =
soonTest1 = soonTest1 xor cast[uint](udata)
proc callback1(udata: pointer) {.gcsafe.} =
soonTest1 = soonTest1 xor cast[uint](udata)
proc test1(): uint =
callSoon(callback1, cast[pointer](0x12345678'u))
callSoon(callback1, cast[pointer](0x23456789'u))
callSoon(callback1, cast[pointer](0x3456789A'u))
callSoon(callback1, cast[pointer](0x456789AB'u))
callSoon(callback1, cast[pointer](0x56789ABC'u))
callSoon(callback1, cast[pointer](0x6789ABCD'u))
callSoon(callback1, cast[pointer](0x789ABCDE'u))
callSoon(callback1, cast[pointer](0x89ABCDEF'u))
callSoon(callback1, cast[pointer](0x9ABCDEF1'u))
callSoon(callback1, cast[pointer](0xABCDEF12'u))
callSoon(callback1, cast[pointer](0xBCDEF123'u))
callSoon(callback1, cast[pointer](0xCDEF1234'u))
callSoon(callback1, cast[pointer](0xDEF12345'u))
callSoon(callback1, cast[pointer](0xEF123456'u))
callSoon(callback1, cast[pointer](0xF1234567'u))
callSoon(callback1, cast[pointer](0x12345678'u))
## All callbacks must be processed exactly with 1 poll() call.
poll()
result = soonTest1
proc test1(): uint =
callSoon(callback1, cast[pointer](0x12345678'u))
callSoon(callback1, cast[pointer](0x23456789'u))
callSoon(callback1, cast[pointer](0x3456789A'u))
callSoon(callback1, cast[pointer](0x456789AB'u))
callSoon(callback1, cast[pointer](0x56789ABC'u))
callSoon(callback1, cast[pointer](0x6789ABCD'u))
callSoon(callback1, cast[pointer](0x789ABCDE'u))
callSoon(callback1, cast[pointer](0x89ABCDEF'u))
callSoon(callback1, cast[pointer](0x9ABCDEF1'u))
callSoon(callback1, cast[pointer](0xABCDEF12'u))
callSoon(callback1, cast[pointer](0xBCDEF123'u))
callSoon(callback1, cast[pointer](0xCDEF1234'u))
callSoon(callback1, cast[pointer](0xDEF12345'u))
callSoon(callback1, cast[pointer](0xEF123456'u))
callSoon(callback1, cast[pointer](0xF1234567'u))
callSoon(callback1, cast[pointer](0x12345678'u))
## All callbacks must be processed exactly with 1 poll() call.
poll()
result = soonTest1
proc testProc() {.async.} =
for i in 1..CallSoonTests:
await sleepAsync(100.milliseconds)
timeoutsTest1 += 1
proc testProc() {.async.} =
for i in 1..CallSoonTests:
await sleepAsync(100.milliseconds)
timeoutsTest1 += 1
proc callbackProc(udata: pointer) {.gcsafe.} =
timeoutsTest2 += 1
callSoon(callbackProc)
proc callbackProc(udata: pointer) {.gcsafe.} =
timeoutsTest2 += 1
callSoon(callbackProc)
proc test2(timers, callbacks: var int) =
callSoon(callbackProc)
waitFor(testProc())
timers = timeoutsTest1
callbacks = timeoutsTest2
proc test2(timers, callbacks: var int) =
callSoon(callbackProc)
waitFor(testProc())
timers = timeoutsTest1
callbacks = timeoutsTest2
proc testCallback(udata: pointer) =
soonTest2 = 987654321
proc testCallback(udata: pointer) =
soonTest2 = 987654321
proc test3(): bool =
callSoon(testCallback)
poll()
result = soonTest2 == 987654321
proc test3(): bool =
callSoon(testCallback)
poll()
result = soonTest2 == 987654321
when isMainModule:
suite "callSoon() tests suite":
test "User-defined callback argument test":
var values = [0x12345678'u, 0x23456789'u, 0x3456789A'u, 0x456789AB'u,
0x56789ABC'u, 0x6789ABCD'u, 0x789ABCDE'u, 0x89ABCDEF'u,
0x9ABCDEF1'u, 0xABCDEF12'u, 0xBCDEF123'u, 0xCDEF1234'u,
0xDEF12345'u, 0xEF123456'u, 0xF1234567'u, 0x12345678'u]
var expect = 0'u
for item in values:
expect = expect xor item
check test1() == expect
test "`Asynchronous dead end` #7193 test":
var timers, callbacks: int
test2(timers, callbacks)
check:
timers == CallSoonTests
callbacks > CallSoonTests * 2
test "`callSoon() is not working prior getGlobalDispatcher()` #7192 test":
check test3() == true
test "User-defined callback argument test":
var values = [0x12345678'u, 0x23456789'u, 0x3456789A'u, 0x456789AB'u,
0x56789ABC'u, 0x6789ABCD'u, 0x789ABCDE'u, 0x89ABCDEF'u,
0x9ABCDEF1'u, 0xABCDEF12'u, 0xBCDEF123'u, 0xCDEF1234'u,
0xDEF12345'u, 0xEF123456'u, 0xF1234567'u, 0x12345678'u]
var expect = 0'u
for item in values:
expect = expect xor item
check test1() == expect
test "`Asynchronous dead end` #7193 test":
var timers, callbacks: int
test2(timers, callbacks)
check:
timers == CallSoonTests
callbacks > CallSoonTests * 2
test "`callSoon() is not working prior getGlobalDispatcher()` #7192 test":
check test3() == true

File diff suppressed because it is too large Load Diff

View File

@ -5,214 +5,212 @@
# Licensed under either of
# Apache License, version 2.0, (LICENSE-APACHEv2)
# MIT license (LICENSE-MIT)
import unittest
import ../chronos
var testLockResult = ""
var testEventResult = ""
var testQueue1Result = 0
var testQueue2Result = 0
var testQueue3Result = 0
suite "Asynchronous sync primitives test suite":
var testLockResult = ""
var testEventResult = ""
var testQueue1Result = 0
var testQueue2Result = 0
var testQueue3Result = 0
proc testLock(n: int, lock: AsyncLock) {.async.} =
await lock.acquire()
testLockResult = testLockResult & $n
lock.release()
proc testLock(n: int, lock: AsyncLock) {.async.} =
await lock.acquire()
testLockResult = testLockResult & $n
lock.release()
proc test1(): string =
var lock = newAsyncLock()
lock.own()
discard testLock(0, lock)
discard testLock(1, lock)
discard testLock(2, lock)
discard testLock(3, lock)
discard testLock(4, lock)
discard testLock(5, lock)
discard testLock(6, lock)
discard testLock(7, lock)
discard testLock(8, lock)
discard testLock(9, lock)
lock.release()
## There must be exactly 20 poll() calls
for i in 0..<20:
proc test1(): string =
var lock = newAsyncLock()
lock.own()
discard testLock(0, lock)
discard testLock(1, lock)
discard testLock(2, lock)
discard testLock(3, lock)
discard testLock(4, lock)
discard testLock(5, lock)
discard testLock(6, lock)
discard testLock(7, lock)
discard testLock(8, lock)
discard testLock(9, lock)
lock.release()
## There must be exactly 20 poll() calls
for i in 0..<20:
poll()
result = testLockResult
proc testEvent(n: int, ev: AsyncEvent) {.async.} =
await ev.wait()
testEventResult = testEventResult & $n
proc test2(): string =
var event = newAsyncEvent()
event.clear()
discard testEvent(0, event)
discard testEvent(1, event)
discard testEvent(2, event)
discard testEvent(3, event)
discard testEvent(4, event)
discard testEvent(5, event)
discard testEvent(6, event)
discard testEvent(7, event)
discard testEvent(8, event)
discard testEvent(9, event)
event.fire()
## There must be exactly 2 poll() calls
poll()
result = testLockResult
poll()
result = testEventResult
proc testEvent(n: int, ev: AsyncEvent) {.async.} =
await ev.wait()
testEventResult = testEventResult & $n
proc task1(aq: AsyncQueue[int]) {.async.} =
var item1 = await aq.get()
var item2 = await aq.get()
testQueue1Result = item1 + item2
proc test2(): string =
var event = newAsyncEvent()
event.clear()
discard testEvent(0, event)
discard testEvent(1, event)
discard testEvent(2, event)
discard testEvent(3, event)
discard testEvent(4, event)
discard testEvent(5, event)
discard testEvent(6, event)
discard testEvent(7, event)
discard testEvent(8, event)
discard testEvent(9, event)
event.fire()
## There must be exactly 2 poll() calls
poll()
poll()
result = testEventResult
proc task2(aq: AsyncQueue[int]) {.async.} =
await aq.put(1000)
await aq.put(2000)
proc task1(aq: AsyncQueue[int]) {.async.} =
var item1 = await aq.get()
var item2 = await aq.get()
testQueue1Result = item1 + item2
proc test3(): int =
var queue = newAsyncQueue[int](1)
discard task1(queue)
discard task2(queue)
## There must be exactly 2 poll() calls
poll()
poll()
result = testQueue1Result
proc task2(aq: AsyncQueue[int]) {.async.} =
await aq.put(1000)
await aq.put(2000)
const testsCount = 1000
const queueSize = 10
proc test3(): int =
var queue = newAsyncQueue[int](1)
discard task1(queue)
discard task2(queue)
## There must be exactly 2 poll() calls
poll()
poll()
result = testQueue1Result
proc task3(aq: AsyncQueue[int]) {.async.} =
for i in 1..testsCount:
var item = await aq.get()
testQueue2Result -= item
const testsCount = 1000
const queueSize = 10
proc task4(aq: AsyncQueue[int]) {.async.} =
for i in 1..testsCount:
await aq.put(i)
testQueue2Result += i
proc task3(aq: AsyncQueue[int]) {.async.} =
for i in 1..testsCount:
var item = await aq.get()
testQueue2Result -= item
proc test4(): int =
var queue = newAsyncQueue[int](queueSize)
waitFor(task3(queue) and task4(queue))
result = testQueue2Result
proc task4(aq: AsyncQueue[int]) {.async.} =
for i in 1..testsCount:
await aq.put(i)
testQueue2Result += i
proc task51(aq: AsyncQueue[int]) {.async.} =
var item1 = await aq.popFirst()
var item2 = await aq.popLast()
var item3 = await aq.get()
testQueue3Result = item1 - item2 + item3
proc test4(): int =
var queue = newAsyncQueue[int](queueSize)
waitFor(task3(queue) and task4(queue))
result = testQueue2Result
proc task52(aq: AsyncQueue[int]) {.async.} =
await aq.put(100)
await aq.addLast(1000)
await aq.addFirst(2000)
proc task51(aq: AsyncQueue[int]) {.async.} =
var item1 = await aq.popFirst()
var item2 = await aq.popLast()
var item3 = await aq.get()
testQueue3Result = item1 - item2 + item3
proc test5(): int =
var queue = newAsyncQueue[int](3)
discard task51(queue)
discard task52(queue)
poll()
poll()
result = testQueue3Result
proc task52(aq: AsyncQueue[int]) {.async.} =
await aq.put(100)
await aq.addLast(1000)
await aq.addFirst(2000)
proc test6(): bool =
var queue = newAsyncQueue[int]()
queue.putNoWait(1)
queue.putNoWait(2)
queue.putNoWait(3)
queue.putNoWait(4)
queue.putNoWait(5)
queue.clear()
result = (len(queue) == 0)
proc test5(): int =
var queue = newAsyncQueue[int](3)
discard task51(queue)
discard task52(queue)
poll()
poll()
result = testQueue3Result
proc test7(): bool =
var queue = newAsyncQueue[int]()
var arr1 = @[1, 2, 3, 4, 5]
var arr2 = @[2, 2, 2, 2, 2]
var arr3 = @[1, 2, 3, 4, 5]
queue.putNoWait(1)
queue.putNoWait(2)
queue.putNoWait(3)
queue.putNoWait(4)
queue.putNoWait(5)
var index = 0
for item in queue.items():
result = (item == arr1[index])
inc(index)
proc test6(): bool =
var queue = newAsyncQueue[int]()
queue.putNoWait(1)
queue.putNoWait(2)
queue.putNoWait(3)
queue.putNoWait(4)
queue.putNoWait(5)
queue.clear()
result = (len(queue) == 0)
if not result: return
proc test7(): bool =
var queue = newAsyncQueue[int]()
var arr1 = @[1, 2, 3, 4, 5]
var arr2 = @[2, 2, 2, 2, 2]
var arr3 = @[1, 2, 3, 4, 5]
queue.putNoWait(1)
queue.putNoWait(2)
queue.putNoWait(3)
queue.putNoWait(4)
queue.putNoWait(5)
var index = 0
for item in queue.items():
result = (item == arr1[index])
inc(index)
queue[0] = 2
if not result: return
result = (queue[0] == 2)
queue[0] = 2
if not result: return
result = (queue[0] == 2)
for item in queue.mitems():
item = 2
if not result: return
index = 0
for item in queue.items():
result = (item == arr2[index])
inc(index)
for item in queue.mitems():
item = 2
if not result: return
index = 0
for item in queue.items():
result = (item == arr2[index])
inc(index)
queue[0] = 1
queue[1] = 2
queue[2] = 3
queue[3] = 4
queue[^1] = 5
if not result: return
for i, item in queue.pairs():
result = (item == arr3[i])
queue[0] = 1
queue[1] = 2
queue[2] = 3
queue[3] = 4
queue[^1] = 5
proc test8(): bool =
var q0 = newAsyncQueue[int]()
q0.putNoWait(1)
q0.putNoWait(2)
q0.putNoWait(3)
q0.putNoWait(4)
q0.putNoWait(5)
result = ($q0 == "[1, 2, 3, 4, 5]")
if not result: return
for i, item in queue.pairs():
result = (item == arr3[i])
var q1 = newAsyncQueue[string]()
q1.putNoWait("1")
q1.putNoWait("2")
q1.putNoWait("3")
q1.putNoWait("4")
q1.putNoWait("5")
result = ($q1 == "[\"1\", \"2\", \"3\", \"4\", \"5\"]")
proc test8(): bool =
var q0 = newAsyncQueue[int]()
q0.putNoWait(1)
q0.putNoWait(2)
q0.putNoWait(3)
q0.putNoWait(4)
q0.putNoWait(5)
result = ($q0 == "[1, 2, 3, 4, 5]")
if not result: return
proc test9(): bool =
var q = newAsyncQueue[int]()
q.putNoWait(1)
q.putNoWait(2)
q.putNoWait(3)
q.putNoWait(4)
q.putNoWait(5)
result = (5 in q and not(6 in q))
var q1 = newAsyncQueue[string]()
q1.putNoWait("1")
q1.putNoWait("2")
q1.putNoWait("3")
q1.putNoWait("4")
q1.putNoWait("5")
result = ($q1 == "[\"1\", \"2\", \"3\", \"4\", \"5\"]")
proc test9(): bool =
var q = newAsyncQueue[int]()
q.putNoWait(1)
q.putNoWait(2)
q.putNoWait(3)
q.putNoWait(4)
q.putNoWait(5)
result = (5 in q and not(6 in q))
when isMainModule:
suite "Asynchronous sync primitives test suite":
test "AsyncLock() behavior test":
check test1() == "0123456789"
test "AsyncEvent() behavior test":
check test2() == "0123456789"
test "AsyncQueue() behavior test":
check test3() == 3000
test "AsyncQueue() many iterations test":
check test4() == 0
test "AsyncQueue() addLast/addFirst/popLast/popFirst test":
check test5() == 1100
test "AsyncQueue() clear test":
check test6() == true
test "AsyncQueue() iterators/assignments test":
check test7() == true
test "AsyncQueue() representation test":
check test8() == true
test "AsyncQueue() contains test":
check test9() == true
test "AsyncLock() behavior test":
check test1() == "0123456789"
test "AsyncEvent() behavior test":
check test2() == "0123456789"
test "AsyncQueue() behavior test":
check test3() == 3000
test "AsyncQueue() many iterations test":
check test4() == 0
test "AsyncQueue() addLast/addFirst/popLast/popFirst test":
check test5() == 1100
test "AsyncQueue() clear test":
check test6() == true
test "AsyncQueue() iterators/assignments test":
check test7() == true
test "AsyncQueue() representation test":
check test8() == true
test "AsyncQueue() contains test":
check test9() == true

View File

@ -5,57 +5,55 @@
# Licensed under either of
# Apache License, version 2.0, (LICENSE-APACHEv2)
# MIT license (LICENSE-MIT)
import os, unittest
import ../chronos, ../chronos/timer
const TimersCount = 10
suite "Asynchronous timers test suite":
const TimersCount = 10
proc timeWorker(time: Duration): Future[Duration] {.async.} =
var st = Moment.now()
await sleepAsync(time)
var et = Moment.now()
result = et - st
proc timeWorker(time: Duration): Future[Duration] {.async.} =
var st = Moment.now()
await sleepAsync(time)
var et = Moment.now()
result = et - st
proc waitAll[T](futs: seq[Future[T]]): Future[void] =
var counter = len(futs)
var retFuture = newFuture[void]("waitAll")
proc cb(udata: pointer) =
dec(counter)
if counter == 0:
retFuture.complete()
for fut in futs:
fut.addCallback(cb)
return retFuture
proc waitAll[T](futs: seq[Future[T]]): Future[void] =
var counter = len(futs)
var retFuture = newFuture[void]("waitAll")
proc cb(udata: pointer) =
dec(counter)
if counter == 0:
retFuture.complete()
for fut in futs:
fut.addCallback(cb)
return retFuture
proc test(timeout: Duration): Future[Duration] {.async.} =
var workers = newSeq[Future[Duration]](TimersCount)
for i in 0..<TimersCount:
workers[i] = timeWorker(timeout)
await waitAll(workers)
var sum: Duration
for i in 0..<TimersCount:
var time = workers[i].read()
sum = sum + time
result = sum div 10'i64
proc test(timeout: Duration): Future[Duration] {.async.} =
var workers = newSeq[Future[Duration]](TimersCount)
for i in 0..<TimersCount:
workers[i] = timeWorker(timeout)
await waitAll(workers)
var sum: Duration
for i in 0..<TimersCount:
var time = workers[i].read()
sum = sum + time
result = sum div 10'i64
proc testTimer(): bool =
let a = Moment.now()
waitFor(sleepAsync(1000.milliseconds))
let b = Moment.now()
let d = b - a
result = (d >= 1000.milliseconds) and (d <= 2_000.milliseconds)
proc testTimer(): bool =
let a = Moment.now()
waitFor(sleepAsync(1000.milliseconds))
let b = Moment.now()
let d = b - a
result = (d >= 1000.milliseconds) and (d <= 2_000.milliseconds)
when isMainModule:
suite "Asynchronous timers test suite":
test "Timer reliability test [" & asyncTimer & "]":
check testTimer() == true
test $TimersCount & " timers with 10ms timeout":
var res = waitFor(test(10.milliseconds))
check (res >= 10.milliseconds) and (res <= 100.milliseconds)
test $TimersCount & " timers with 100ms timeout":
var res = waitFor(test(100.milliseconds))
check (res >= 100.milliseconds) and (res <= 1000.milliseconds)
test $TimersCount & " timers with 1000ms timeout":
var res = waitFor(test(1000.milliseconds))
check (res >= 1000.milliseconds) and (res <= 5000.milliseconds)
test "Timer reliability test [" & asyncTimer & "]":
check testTimer() == true
test $TimersCount & " timers with 10ms timeout":
var res = waitFor(test(10.milliseconds))
check (res >= 10.milliseconds) and (res <= 100.milliseconds)
test $TimersCount & " timers with 100ms timeout":
var res = waitFor(test(100.milliseconds))
check (res >= 100.milliseconds) and (res <= 1000.milliseconds)
test $TimersCount & " timers with 1000ms timeout":
var res = waitFor(test(1000.milliseconds))
check (res >= 1000.milliseconds) and (res <= 5000.milliseconds)