Asyncproc (Part 3/3) (#374)

* Initial commit.

* Some Linux fixes.

* Address review comments on Windows.

* Fix issues on Linux.

* Fix 1.2 issue and Windows warnings.

* Fix posix compilation issues.
This commit is contained in:
Eugene Kabanov 2023-05-23 13:39:35 +03:00 committed by GitHub
parent f748387462
commit 148ddf49c2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 2119 additions and 50 deletions

View File

@ -5,5 +5,6 @@
# Licensed under either of
# Apache License, version 2.0, (LICENSE-APACHEv2)
# MIT license (LICENSE-MIT)
import chronos/[asyncloop, asyncsync, handles, transport, timer, debugutils]
export asyncloop, asyncsync, handles, transport, timer, debugutils
import chronos/[asyncloop, asyncsync, handles, transport, timer,
asyncproc, debugutils]
export asyncloop, asyncsync, handles, transport, timer, asyncproc, debugutils

View File

@ -14,7 +14,7 @@ else:
{.push raises: [].}
from nativesockets import Port
import std/[tables, strutils, heapqueue, options, deques]
import std/[tables, strutils, heapqueue, deques]
import stew/results
import "."/[config, osdefs, oserrno, osutils, timer]
@ -320,6 +320,20 @@ when defined(windows):
RefCustomOverlapped* = ref CustomOverlapped
PostCallbackData = object
ioPort: HANDLE
handleFd: AsyncFD
waitFd: HANDLE
udata: pointer
ovlref: RefCustomOverlapped
ovl: pointer
WaitableHandle* = ref PostCallbackData
ProcessHandle* = distinct WaitableHandle
WaitableResult* {.pure.} = enum
Ok, Timeout
AsyncFD* = distinct int
proc hash(x: AsyncFD): Hash {.borrow.}
@ -328,9 +342,9 @@ when defined(windows):
proc getFunc(s: SocketHandle, fun: var pointer, guid: GUID): bool =
var bytesRet: DWORD
fun = nil
result = wsaIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, unsafeAddr(guid),
sizeof(GUID).DWORD, addr fun, sizeof(pointer).DWORD,
addr(bytesRet), nil, nil) == 0
wsaIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, unsafeAddr(guid),
DWORD(sizeof(GUID)), addr fun, DWORD(sizeof(pointer)),
addr(bytesRet), nil, nil) == 0
proc globalInit() =
var wsa = WSAData()
@ -428,6 +442,141 @@ when defined(windows):
## Unregisters ``fd``.
getThreadDispatcher().handles.excl(fd)
{.push stackTrace: off.}
proc waitableCallback(param: pointer, timerOrWaitFired: WINBOOL) {.
stdcall, gcsafe.} =
# This procedure will be executed in `wait thread`, so it must not use
# GC related objects.
# We going to ignore callbacks which was spawned when `isNil(param) == true`
# because we unable to indicate this error.
if isNil(param): return
var wh = cast[ptr PostCallbackData](param)
# We ignore result of postQueueCompletionStatus() call because we unable to
# indicate error.
discard postQueuedCompletionStatus(wh[].ioPort, DWORD(timerOrWaitFired),
ULONG_PTR(wh[].handleFd),
wh[].ovl)
{.pop.}
proc registerWaitable(
handle: HANDLE,
flags: ULONG,
timeout: Duration,
cb: CallbackFunc,
udata: pointer
): Result[WaitableHandle, OSErrorCode] =
## Register handle of (Change notification, Console input, Event,
## Memory resource notification, Mutex, Process, Semaphore, Thread,
## Waitable timer) for waiting, using specific Windows' ``flags`` and
## ``timeout`` value.
##
## Callback ``cb`` will be scheduled with ``udata`` parameter when
## ``handle`` become signaled.
##
## Result of this procedure call ``WaitableHandle`` should be closed using
## closeWaitable() call.
##
## NOTE: This is private procedure, not supposed to be publicly available,
## please use ``waitForSingleObject()``.
let loop = getThreadDispatcher()
var ovl = RefCustomOverlapped(data: CompletionData(cb: cb))
var whandle = (ref PostCallbackData)(
ioPort: loop.getIoHandler(),
handleFd: AsyncFD(handle),
udata: udata,
ovlref: ovl,
ovl: cast[pointer](ovl)
)
ovl.data.udata = cast[pointer](whandle)
let dwordTimeout =
if timeout == InfiniteDuration:
DWORD(INFINITE)
else:
DWORD(timeout.milliseconds)
if registerWaitForSingleObject(addr(whandle[].waitFd), handle,
cast[WAITORTIMERCALLBACK](waitableCallback),
cast[pointer](whandle),
dwordTimeout,
flags) == WINBOOL(0):
ovl.data.udata = nil
whandle.ovlref = nil
whandle.ovl = nil
return err(osLastError())
ok(WaitableHandle(whandle))
proc closeWaitable(wh: WaitableHandle): Result[void, OSErrorCode] =
## Close waitable handle ``wh`` and clear all the resources. It is safe
## to close this handle, even if wait operation is pending.
##
## NOTE: This is private procedure, not supposed to be publicly available,
## please use ``waitForSingleObject()``.
doAssert(not(isNil(wh)))
let pdata = (ref PostCallbackData)(wh)
# We are not going to clear `ref` fields in PostCallbackData object because
# it possible that callback is already scheduled.
if unregisterWait(pdata.waitFd) == 0:
let res = osLastError()
if res != ERROR_IO_PENDING:
return err(res)
ok()
proc addProcess2*(pid: int, cb: CallbackFunc,
udata: pointer = nil): Result[ProcessHandle, OSErrorCode] =
## Registers callback ``cb`` to be called when process with process
## identifier ``pid`` exited. Returns process identifier, which can be
## used to clear process callback via ``removeProcess``.
doAssert(pid > 0, "Process identifier must be positive integer")
let
hProcess = openProcess(SYNCHRONIZE, WINBOOL(0), DWORD(pid))
flags = WT_EXECUTEINWAITTHREAD or WT_EXECUTEONLYONCE
var wh: WaitableHandle = nil
if hProcess == HANDLE(0):
return err(osLastError())
proc continuation(udata: pointer) {.gcsafe.} =
doAssert(not(isNil(udata)))
doAssert(not(isNil(wh)))
discard closeFd(hProcess)
cb(wh[].udata)
wh =
block:
let res = registerWaitable(hProcess, flags, InfiniteDuration,
continuation, udata)
if res.isErr():
discard closeFd(hProcess)
return err(res.error())
res.get()
ok(ProcessHandle(wh))
proc removeProcess2*(procHandle: ProcessHandle): Result[void, OSErrorCode] =
## Remove process' watching using process' descriptor ``procHandle``.
let waitableHandle = WaitableHandle(procHandle)
doAssert(not(isNil(waitableHandle)))
? closeWaitable(waitableHandle)
ok()
proc addProcess*(pid: int, cb: CallbackFunc,
udata: pointer = nil): ProcessHandle {.
raises: [Defect, OSError].} =
## Registers callback ``cb`` to be called when process with process
## identifier ``pid`` exited. Returns process identifier, which can be
## used to clear process callback via ``removeProcess``.
addProcess2(pid, cb, udata).tryGet()
proc removeProcess*(procHandle: ProcessHandle) {.
raises: [Defect, OSError].} =
## Remove process' watching using process' descriptor ``procHandle``.
removeProcess2(procHandle).tryGet()
proc poll*() =
## Perform single asynchronous step, processing timers and completing
## tasks. Blocks until at least one event has completed.
@ -772,8 +921,15 @@ elif defined(macosx) or defined(freebsd) or defined(netbsd) or
closeSocket(fd, aftercb)
when asyncEventEngine in ["epoll", "kqueue"]:
proc addSignal2*(signal: int, cb: CallbackFunc,
udata: pointer = nil): Result[int, OSErrorCode] =
type
ProcessHandle* = distinct int
SignalHandle* = distinct int
proc addSignal2*(
signal: int,
cb: CallbackFunc,
udata: pointer = nil
): Result[SignalHandle, OSErrorCode] =
## Start watching signal ``signal``, and when signal appears, call the
## callback ``cb`` with specified argument ``udata``. Returns signal
## identifier code, which can be used to remove signal callback
@ -785,10 +941,13 @@ elif defined(macosx) or defined(freebsd) or defined(netbsd) or
adata.reader = AsyncCallback(function: cb, udata: udata)
do:
return err(osdefs.EBADF)
ok(sigfd)
ok(SignalHandle(sigfd))
proc addProcess2*(pid: int, cb: CallbackFunc,
udata: pointer = nil): Result[int, OSErrorCode] =
proc addProcess2*(
pid: int,
cb: CallbackFunc,
udata: pointer = nil
): Result[ProcessHandle, OSErrorCode] =
## Registers callback ``cb`` to be called when process with process
## identifier ``pid`` exited. Returns process' descriptor, which can be
## used to clear process callback via ``removeProcess``.
@ -799,31 +958,42 @@ elif defined(macosx) or defined(freebsd) or defined(netbsd) or
adata.reader = AsyncCallback(function: cb, udata: udata)
do:
return err(osdefs.EBADF)
ok(procfd)
ok(ProcessHandle(procfd))
proc removeSignal2*(sigfd: int): Result[void, OSErrorCode] =
proc removeSignal2*(signalHandle: SignalHandle): Result[void, OSErrorCode] =
## Remove watching signal ``signal``.
getThreadDispatcher().selector.unregister2(cint(sigfd))
getThreadDispatcher().selector.unregister2(cint(signalHandle))
proc removeProcess2*(procfd: int): Result[void, OSErrorCode] =
proc removeProcess2*(procHandle: ProcessHandle): Result[void, OSErrorCode] =
## Remove process' watching using process' descriptor ``procfd``.
getThreadDispatcher().selector.unregister2(cint(procfd))
getThreadDispatcher().selector.unregister2(cint(procHandle))
proc addSignal*(signal: int, cb: CallbackFunc,
udata: pointer = nil): int {.raises: [Defect, OSError].} =
udata: pointer = nil): SignalHandle {.
raises: [Defect, OSError].} =
## Start watching signal ``signal``, and when signal appears, call the
## callback ``cb`` with specified argument ``udata``. Returns signal
## identifier code, which can be used to remove signal callback
## via ``removeSignal``.
addSignal2(signal, cb, udata).tryGet()
proc removeSignal*(sigfd: int) {.raises: [Defect, OSError].} =
proc removeSignal*(signalHandle: SignalHandle) {.
raises: [Defect, OSError].} =
## Remove watching signal ``signal``.
removeSignal2(sigfd).tryGet()
removeSignal2(signalHandle).tryGet()
proc removeProcess*(procfd: int) {.raises: [Defect, OSError].} =
## Remove process' watching using process' descriptor ``procfd``.
removeProcess2(procfd).tryGet()
proc addProcess*(pid: int, cb: CallbackFunc,
udata: pointer = nil): ProcessHandle {.
raises: [Defect, OSError].} =
## Registers callback ``cb`` to be called when process with process
## identifier ``pid`` exited. Returns process identifier, which can be
## used to clear process callback via ``removeProcess``.
addProcess2(pid, cb, udata).tryGet()
proc removeProcess*(procHandle: ProcessHandle) {.
raises: [Defect, OSError].} =
## Remove process' watching using process' descriptor ``procHandle``.
removeProcess2(procHandle).tryGet()
proc poll*() {.gcsafe.} =
## Perform single asynchronous step.
@ -1002,7 +1172,7 @@ when not(defined(windows)):
when asyncEventEngine in ["epoll", "kqueue"]:
proc waitSignal*(signal: int): Future[void] {.raises: [Defect].} =
var retFuture = newFuture[void]("chronos.waitSignal()")
var sigfd: int = -1
var signalHandle: Opt[SignalHandle]
template getSignalException(e: OSErrorCode): untyped =
newException(AsyncError, "Could not manipulate signal handler, " &
@ -1010,8 +1180,8 @@ when not(defined(windows)):
proc continuation(udata: pointer) {.gcsafe.} =
if not(retFuture.finished()):
if sigfd != -1:
let res = removeSignal2(sigfd)
if signalHandle.isSome():
let res = removeSignal2(signalHandle.get())
if res.isErr():
retFuture.fail(getSignalException(res.error()))
else:
@ -1019,17 +1189,17 @@ when not(defined(windows)):
proc cancellation(udata: pointer) {.gcsafe.} =
if not(retFuture.finished()):
if sigfd != -1:
let res = removeSignal2(sigfd)
if signalHandle.isSome():
let res = removeSignal2(signalHandle.get())
if res.isErr():
retFuture.fail(getSignalException(res.error()))
sigfd =
signalHandle =
block:
let res = addSignal2(signal, continuation)
if res.isErr():
retFuture.fail(getSignalException(res.error()))
res.get()
Opt.some(res.get())
retFuture.cancelCallback = cancellation
retFuture
@ -1283,5 +1453,63 @@ when chronosFutureTracking:
## completed, cancelled or failed).
futureList.count
when defined(windows):
proc waitForSingleObject*(handle: HANDLE,
timeout: Duration): Future[WaitableResult] {.
raises: [Defect].} =
## Waits until the specified object is in the signaled state or the
## time-out interval elapses. WaitForSingleObject() for asynchronous world.
let flags = WT_EXECUTEONLYONCE
var
retFuture = newFuture[WaitableResult]("chronos.waitForSingleObject()")
waitHandle: WaitableHandle = nil
proc continuation(udata: pointer) {.gcsafe.} =
doAssert(not(isNil(waitHandle)))
if not(retFuture.finished()):
let
ovl = cast[PtrCustomOverlapped](udata)
returnFlag = WINBOOL(ovl.data.bytesCount)
res = closeWaitable(waitHandle)
if res.isErr():
retFuture.fail(newException(AsyncError, osErrorMsg(res.error())))
else:
if returnFlag == TRUE:
retFuture.complete(WaitableResult.Timeout)
else:
retFuture.complete(WaitableResult.Ok)
proc cancellation(udata: pointer) {.gcsafe.} =
doAssert(not(isNil(waitHandle)))
if not(retFuture.finished()):
discard closeWaitable(waitHandle)
let wres = uint32(waitForSingleObject(handle, DWORD(0)))
if wres == WAIT_OBJECT_0:
retFuture.complete(WaitableResult.Ok)
return retFuture
elif wres == WAIT_ABANDONED:
retFuture.fail(newException(AsyncError, "Handle was abandoned"))
return retFuture
elif wres == WAIT_FAILED:
retFuture.fail(newException(AsyncError, osErrorMsg(osLastError())))
return retFuture
if timeout == ZeroDuration:
retFuture.complete(WaitableResult.Timeout)
return retFuture
waitHandle =
block:
let res = registerWaitable(handle, flags, timeout, continuation, nil)
if res.isErr():
retFuture.fail(newException(AsyncError, osErrorMsg(res.error())))
return retFuture
res.get()
retFuture.cancelCallback = cancellation
return retFuture
# Perform global per-module initialization.
globalInit()

1311
chronos/asyncproc.nim Normal file

File diff suppressed because it is too large Load Diff

View File

@ -14,10 +14,10 @@
when (NimMajor, NimMinor) >= (1, 4):
const
chronosStrictException* {.booldefine.}: bool = defined(chronosPreviewV4)
## Require that `async` code raises only derivatives of `CatchableError` and
## not `Exception` - forward declarations, methods and `proc` types used
## from within `async` code may need to be be explicitly annotated with
## `raises: [CatchableError]` when this mode is enabled.
## Require that `async` code raises only derivatives of `CatchableError`
## and not `Exception` - forward declarations, methods and `proc` types
## used from within `async` code may need to be be explicitly annotated
## with `raises: [CatchableError]` when this mode is enabled.
chronosStackTrace* {.booldefine.}: bool = defined(chronosDebug)
## Include stack traces in futures for creation and completion points
@ -32,6 +32,21 @@ when (NimMajor, NimMinor) >= (1, 4):
chronosDumpAsync* {.booldefine.}: bool = defined(nimDumpAsync)
## Print code generated by {.async.} transformation
chronosProcShell* {.strdefine.}: string =
when defined(windows):
"cmd.exe"
else:
when defined(android):
"/system/bin/sh"
else:
"/bin/sh"
## Default shell binary path.
##
## The shell is used as command for command line when process started
## using `AsyncProcessOption.EvalCommand` and API calls such as
## ``execCommand(command)`` and ``execCommandEx(command)``.
else:
# 1.2 doesn't support `booldefine` in `when` properly
const
@ -42,6 +57,14 @@ else:
chronosFutureTracking*: bool =
defined(chronosDebug) or defined(chronosFutureTracking)
chronosDumpAsync*: bool = defined(nimDumpAsync)
chronosProcShell* {.strdefine.}: string =
when defined(windows):
"cmd.exe"
else:
when defined(android):
"/system/bin/sh"
else:
"/bin/sh"
when defined(debug) or defined(chronosConfig):
import std/macros
@ -55,3 +78,4 @@ when defined(debug) or defined(chronosConfig):
printOption("chronosFutureId", chronosFutureId)
printOption("chronosFutureTracking", chronosFutureTracking)
printOption("chronosDumpAsync", chronosDumpAsync)
printOption("chronosProcShell", chronosProcShell)

View File

@ -827,7 +827,8 @@ elif defined(macos) or defined(macosx):
unlink, listen, getaddrinfo, gai_strerror, getrlimit,
setrlimit, getpid, pthread_sigmask, sigprocmask, sigemptyset,
sigaddset, sigismember, fcntl, accept, pipe, write,
signal, read, setsockopt, getsockopt,
signal, read, setsockopt, getsockopt, getcwd, chdir,
waitpid, kill,
Timeval, Timespec, Pid, Mode, Time, Sigset, SockAddr,
SockLen, Sockaddr_storage, Sockaddr_in, Sockaddr_in6,
Sockaddr_un, SocketHandle, AddrInfo, RLimit,
@ -839,14 +840,16 @@ elif defined(macos) or defined(macosx):
SIG_BLOCK, SIG_UNBLOCK,
SIGHUP, SIGINT, SIGQUIT, SIGILL, SIGTRAP, SIGABRT,
SIGBUS, SIGFPE, SIGKILL, SIGUSR1, SIGSEGV, SIGUSR2,
SIGPIPE, SIGALRM, SIGTERM, SIGPIPE, SIGCHLD
SIGPIPE, SIGALRM, SIGTERM, SIGPIPE, SIGCHLD, SIGSTOP,
SIGCONT
export close, shutdown, socket, getpeername, getsockname,
recvfrom, sendto, send, bindSocket, recv, connect,
unlink, listen, getaddrinfo, gai_strerror, getrlimit,
setrlimit, getpid, pthread_sigmask, sigprocmask, sigemptyset,
sigaddset, sigismember, fcntl, accept, pipe, write,
signal, read, setsockopt, getsockopt,
signal, read, setsockopt, getsockopt, getcwd, chdir,
waitpid, kill,
Timeval, Timespec, Pid, Mode, Time, Sigset, SockAddr,
SockLen, Sockaddr_storage, Sockaddr_in, Sockaddr_in6,
Sockaddr_un, SocketHandle, AddrInfo, RLimit,
@ -858,7 +861,8 @@ elif defined(macos) or defined(macosx):
SIG_BLOCK, SIG_UNBLOCK,
SIGHUP, SIGINT, SIGQUIT, SIGILL, SIGTRAP, SIGABRT,
SIGBUS, SIGFPE, SIGKILL, SIGUSR1, SIGSEGV, SIGUSR2,
SIGPIPE, SIGALRM, SIGTERM, SIGPIPE, SIGCHLD
SIGPIPE, SIGALRM, SIGTERM, SIGPIPE, SIGCHLD, SIGSTOP,
SIGCONT
type
MachTimebaseInfo* {.importc: "struct mach_timebase_info",
@ -882,7 +886,8 @@ elif defined(linux):
getrlimit, setrlimit, getpeername, getsockname,
recvfrom, sendto, send, bindSocket, recv, connect,
unlink, listen, sendmsg, recvmsg, getpid, fcntl,
pthread_sigmask, clock_gettime, signal,
pthread_sigmask, clock_gettime, signal, getcwd, chdir,
waitpid, kill,
ClockId, Itimerspec, Timespec, Sigset, Time, Pid, Mode,
SigInfo, Id, Tmsghdr, IOVec, RLimit,
SockAddr, SockLen, Sockaddr_storage, Sockaddr_in,
@ -895,7 +900,8 @@ elif defined(linux):
SOCK_DGRAM,
SIGHUP, SIGINT, SIGQUIT, SIGILL, SIGTRAP, SIGABRT,
SIGBUS, SIGFPE, SIGKILL, SIGUSR1, SIGSEGV, SIGUSR2,
SIGPIPE, SIGALRM, SIGTERM, SIGPIPE, SIGCHLD
SIGPIPE, SIGALRM, SIGTERM, SIGPIPE, SIGCHLD, SIGSTOP,
SIGCONT
export close, shutdown, sigemptyset, sigaddset, sigismember,
sigdelset, write, read, waitid, getaddrinfo,
@ -903,7 +909,8 @@ elif defined(linux):
getrlimit, setrlimit, getpeername, getsockname,
recvfrom, sendto, send, bindSocket, recv, connect,
unlink, listen, sendmsg, recvmsg, getpid, fcntl,
pthread_sigmask, clock_gettime, signal,
pthread_sigmask, clock_gettime, signal, getcwd, chdir,
waitpid, kill,
ClockId, Itimerspec, Timespec, Sigset, Time, Pid, Mode,
SigInfo, Id, Tmsghdr, IOVec, RLimit,
SockAddr, SockLen, Sockaddr_storage, Sockaddr_in,
@ -916,7 +923,8 @@ elif defined(linux):
SOCK_DGRAM,
SIGHUP, SIGINT, SIGQUIT, SIGILL, SIGTRAP, SIGABRT,
SIGBUS, SIGFPE, SIGKILL, SIGUSR1, SIGSEGV, SIGUSR2,
SIGPIPE, SIGALRM, SIGTERM, SIGPIPE, SIGCHLD
SIGPIPE, SIGALRM, SIGTERM, SIGPIPE, SIGCHLD, SIGSTOP,
SIGCONT
when not defined(android) and defined(amd64):
const IP_MULTICAST_TTL*: cint = 33
@ -1012,6 +1020,7 @@ elif defined(freebsd) or defined(openbsd) or defined(netbsd) or
setrlimit, getpid, pthread_sigmask, sigemptyset,
sigaddset, sigismember, fcntl, accept, pipe, write,
signal, read, setsockopt, getsockopt, clock_gettime,
getcwd, chdir, waitpid, kill,
Timeval, Timespec, Pid, Mode, Time, Sigset, SockAddr,
SockLen, Sockaddr_storage, Sockaddr_in, Sockaddr_in6,
Sockaddr_un, SocketHandle, AddrInfo, RLimit,
@ -1023,7 +1032,8 @@ elif defined(freebsd) or defined(openbsd) or defined(netbsd) or
SIG_BLOCK, SIG_UNBLOCK, CLOCK_MONOTONIC,
SIGHUP, SIGINT, SIGQUIT, SIGILL, SIGTRAP, SIGABRT,
SIGBUS, SIGFPE, SIGKILL, SIGUSR1, SIGSEGV, SIGUSR2,
SIGPIPE, SIGALRM, SIGTERM, SIGPIPE, SIGCHLD
SIGPIPE, SIGALRM, SIGTERM, SIGPIPE, SIGCHLD, SIGSTOP,
SIGCONT
export close, shutdown, socket, getpeername, getsockname,
recvfrom, sendto, send, bindSocket, recv, connect,
@ -1042,7 +1052,8 @@ elif defined(freebsd) or defined(openbsd) or defined(netbsd) or
SIG_BLOCK, SIG_UNBLOCK, CLOCK_MONOTONIC,
SIGHUP, SIGINT, SIGQUIT, SIGILL, SIGTRAP, SIGABRT,
SIGBUS, SIGFPE, SIGKILL, SIGUSR1, SIGSEGV, SIGUSR2,
SIGPIPE, SIGALRM, SIGTERM, SIGPIPE, SIGCHLD
SIGPIPE, SIGALRM, SIGTERM, SIGPIPE, SIGCHLD, SIGSTOP,
SIGCONT
var IP_MULTICAST_TTL* {.importc: "IP_MULTICAST_TTL",
header: "<netinet/in.h>".}: cint
@ -1081,15 +1092,28 @@ elif defined(macos) or defined(macosx):
IPPROTO_TCP* = 6
when defined(linux):
const O_CLOEXEC* = 0x80000
const
O_CLOEXEC* = 0x80000
POSIX_SPAWN_USEVFORK* = 0x40
elif defined(freebsd):
const O_CLOEXEC* = 0x00100000
const
O_CLOEXEC* = 0x00100000
POSIX_SPAWN_USEVFORK* = 0x00
elif defined(openbsd):
const O_CLOEXEC* = 0x10000
const
O_CLOEXEC* = 0x10000
POSIX_SPAWN_USEVFORK* = 0x00
elif defined(netbsd):
const O_CLOEXEC* = 0x00400000
const
O_CLOEXEC* = 0x00400000
POSIX_SPAWN_USEVFORK* = 0x00
elif defined(dragonfly):
const O_CLOEXEC* = 0x00020000
const
O_CLOEXEC* = 0x00020000
POSIX_SPAWN_USEVFORK* = 0x00
elif defined(macos) or defined(macosx):
const
POSIX_SPAWN_USEVFORK* = 0x00
when defined(linux) or defined(macos) or defined(macosx) or defined(freebsd) or
defined(openbsd) or defined(netbsd) or defined(dragonfly):

View File

@ -8,7 +8,7 @@
import testmacro, testsync, testsoon, testtime, testfut, testsignal,
testaddress, testdatagram, teststream, testserver, testbugs, testnet,
testasyncstream, testhttpserver, testshttpserver, testhttpclient,
testratelimit
testproc, testratelimit
# Must be imported last to check for Pending futures
import testutils

36
tests/testproc.bat Normal file
View File

@ -0,0 +1,36 @@
@ECHO OFF
IF /I "%1" == "STDIN" (
GOTO :STDINTEST
) ELSE IF /I "%1" == "TIMEOUT2" (
GOTO :TIMEOUTTEST2
) ELSE IF /I "%1" == "TIMEOUT10" (
GOTO :TIMEOUTTEST10
) ELSE IF /I "%1" == "BIGDATA" (
GOTO :BIGDATA
) ELSE IF /I "%1" == "ENVTEST" (
GOTO :ENVTEST
)
EXIT 0
:STDINTEST
SET /P "INPUTDATA="
ECHO STDIN DATA: %INPUTDATA%
EXIT 0
:TIMEOUTTEST2
ping -n 2 127.0.0.1 > NUL
EXIT 2
:TIMEOUTTEST10
ping -n 10 127.0.0.1 > NUL
EXIT 0
:BIGDATA
FOR /L %%G IN (1, 1, 400000) DO ECHO ALICEWASBEGINNINGTOGETVERYTIREDOFSITTINGBYHERSISTERONTHEBANKANDO
EXIT 0
:ENVTEST
ECHO %CHRONOSASYNC%
EXIT 0

425
tests/testproc.nim Normal file
View File

@ -0,0 +1,425 @@
# Chronos Test Suite
# (c) Copyright 2022-Present
# Status Research & Development GmbH
#
# Licensed under either of
# Apache License, version 2.0, (LICENSE-APACHEv2)
# MIT license (LICENSE-MIT)
import std/os
import unittest2, stew/[base10, byteutils]
import ".."/chronos/unittest2/asynctests
when defined(posix):
from ".."/chronos/osdefs import SIGKILL
when defined(nimHasUsed): {.used.}
suite "Asynchronous process management test suite":
const OutputTests =
when defined(windows):
[
("ECHO TESTOUT", "TESTOUT\r\n", ""),
("ECHO TESTERR 1>&2", "", "TESTERR \r\n"),
("ECHO TESTBOTH && ECHO TESTBOTH 1>&2", "TESTBOTH \r\n",
"TESTBOTH \r\n")
]
else:
[
("echo TESTOUT", "TESTOUT\n", ""),
("echo TESTERR 1>&2", "", "TESTERR\n"),
("echo TESTBOTH && echo TESTBOTH 1>&2", "TESTBOTH\n", "TESTBOTH\n")
]
const ExitCodes = [5, 13, 64, 100, 126, 127, 128, 130, 255]
proc createBigMessage(size: int): seq[byte] =
var message = "MESSAGE"
result = newSeq[byte](size)
for i in 0 ..< len(result):
result[i] = byte(message[i mod len(message)])
when not(defined(windows)):
proc getCurrentFD(): int =
let local = initTAddress("127.0.0.1:34334")
let sock = createAsyncSocket(local.getDomain(), SockType.SOCK_DGRAM,
Protocol.IPPROTO_UDP)
closeSocket(sock)
return int(sock)
var markFD = getCurrentFD()
asyncTest "execCommand() exit codes test":
for item in ExitCodes:
let command = "exit " & Base10.toString(uint64(item))
let res = await execCommand(command)
check res == item
asyncTest "execCommandEx() exit codes and outputs test":
for test in OutputTests:
let response = await execCommandEx(test[0])
check:
response.stdOutput == test[1]
response.stdError == test[2]
response.status == 0
asyncTest "waitForExit() & peekExitCode() exit codes test":
let options = {AsyncProcessOption.EvalCommand}
for item in ExitCodes:
let command = "exit " & Base10.toString(uint64(item))
let process = await startProcess(command, options = options)
try:
let res = await process.waitForExit(InfiniteDuration)
check:
res == item
process.peekExitCode().tryGet() == item
process.running().tryGet() == false
finally:
await process.closeWait()
asyncTest "addProcess() test":
var
handlerFut = newFuture[void]("process.handler.future")
pidFd: ProcessHandle
processCounter = 0
processExitCode = 0
process: AsyncProcessRef
proc processHandler(udata: pointer) {.gcsafe.} =
processCounter = cast[int](udata)
processExitCode = process.peekExitCode().valueOr:
handlerFut.fail(newException(ValueError, osErrorMsg(error)))
return
let res = removeProcess2(pidFd)
if res.isErr():
handlerFut.fail(newException(ValueError, osErrorMsg(res.error())))
else:
handlerFut.complete()
let
options = {AsyncProcessOption.EvalCommand}
command = "exit 1"
process = await startProcess(command, options = options)
try:
pidFd =
block:
let res = addProcess2(process.pid(), processHandler,
cast[pointer](31337))
if res.isErr():
raiseAssert osErrorMsg(res.error())
res.get()
await handlerFut.wait(5.seconds)
check:
processExitCode == 1
processCounter == 31337
finally:
await process.closeWait()
asyncTest "STDIN stream test":
let
command =
when defined(windows):
"tests\\testproc.bat stdin"
else:
"tests/testproc.sh stdin"
options = {AsyncProcessOption.EvalCommand}
shellHeader = "STDIN DATA: ".toBytes()
smallTest =
when defined(windows):
"SMALL AMOUNT\r\n".toBytes()
else:
"SMALL AMOUNT\n".toBytes()
let bigTest =
when defined(windows):
var res = createBigMessage(256)
res.add(byte(0x0D))
res.add(byte(0x0A))
res
else:
var res = createBigMessage(256)
res.add(byte(0x0A))
res
for item in [smallTest, bigTest]:
let process = await startProcess(command, options = options,
stdinHandle = AsyncProcess.Pipe,
stdoutHandle = AsyncProcess.Pipe)
try:
await process.stdinStream.write(item)
let stdoutDataFut = process.stdoutStream.read()
let res = await process.waitForExit(InfiniteDuration)
await allFutures(stdoutDataFut)
check:
res == 0
stdoutDataFut.read() == shellHeader & item
finally:
await process.closeWait()
asyncTest "STDOUT and STDERR streams test":
let options = {AsyncProcessOption.EvalCommand}
for test in OutputTests:
let process = await startProcess(test[0], options = options,
stdoutHandle = AsyncProcess.Pipe,
stderrHandle = AsyncProcess.Pipe)
try:
let outBytesFut = process.stdoutStream.read()
let errBytesFut = process.stderrStream.read()
let res = await process.waitForExit(InfiniteDuration)
await allFutures(outBytesFut, errBytesFut)
check:
string.fromBytes(outBytesFut.read()) == test[1]
string.fromBytes(errBytesFut.read()) == test[2]
res == 0
finally:
await process.closeWait()
asyncTest "STDERR to STDOUT streams test":
let options = {AsyncProcessOption.EvalCommand,
AsyncProcessOption.StdErrToStdOut}
let command =
when defined(windows):
"ECHO TESTSTDOUT && ECHO TESTSTDERR 1>&2"
else:
"echo TESTSTDOUT && echo TESTSTDERR 1>&2"
let expect =
when defined(windows):
"TESTSTDOUT \r\nTESTSTDERR \r\n"
else:
"TESTSTDOUT\nTESTSTDERR\n"
let process = await startProcess(command, options = options,
stdoutHandle = AsyncProcess.Pipe)
try:
let outBytesFut = process.stdoutStream.read()
let res = await process.waitForExit(InfiniteDuration)
await allFutures(outBytesFut)
check:
string.fromBytes(outBytesFut.read()) == expect
res == 0
finally:
await process.closeWait()
asyncTest "Capture big amount of bytes from STDOUT stream test":
let options = {AsyncProcessOption.EvalCommand}
let command =
when defined(windows):
"tests\\testproc.bat bigdata"
else:
"tests/testproc.sh bigdata"
let expect =
when defined(windows):
400_000 * (64 + 2)
else:
400_000 * (64 + 1)
let process = await startProcess(command, options = options,
stdoutHandle = AsyncProcess.Pipe,
stderrHandle = AsyncProcess.Pipe)
try:
let outBytesFut = process.stdoutStream.read()
let errBytesFut = process.stderrStream.read()
let res = await process.waitForExit(InfiniteDuration)
await allFutures(outBytesFut, errBytesFut)
check:
res == 0
len(outBytesFut.read()) == expect
len(errBytesFut.read()) == 0
finally:
await process.closeWait()
asyncTest "Long-waiting waitForExit() test":
let command =
when defined(windows):
("tests\\testproc.bat", "timeout2")
else:
("tests/testproc.sh", "timeout2")
let process = await startProcess(command[0], arguments = @[command[1]])
try:
let res = await process.waitForExit(InfiniteDuration)
check res == 2
finally:
await process.closeWait()
asyncTest "waitForExit(duration) test":
let command =
when defined(windows):
("tests\\testproc.bat", "timeout10")
else:
("tests/testproc.sh", "timeout10")
let expect =
when defined(windows):
0
else:
128 + int(SIGKILL)
let process = await startProcess(command[0], arguments = @[command[1]])
try:
let res = await process.waitForExit(1.seconds)
check res == expect
finally:
await process.closeWait()
asyncTest "Child process environment test":
let command =
when defined(windows):
("tests\\testproc.bat", "envtest", 0, "CHILDPROCESSTEST\r\n")
else:
("tests/testproc.sh", "envtest", 0, "CHILDPROCESSTEST\n")
let env = getProcessEnvironment()
env["CHRONOSASYNC"] = "CHILDPROCESSTEST"
let process = await startProcess(command[0], arguments = @[command[1]],
environment = env,
stdoutHandle = AsyncProcess.Pipe)
try:
let outBytesFut = process.stdoutStream.read()
let res = await process.waitForExit(InfiniteDuration)
let outBytes = await outBytesFut
check:
res == command[2]
string.fromBytes(outBytes) == command[3]
finally:
await process.closeWait()
test "getProcessEnvironment() test":
let env = getProcessEnvironment()
when defined(windows):
check len(env["SYSTEMROOT"]) > 0
else:
check len(env["USER"]) > 0
asyncTest "Multiple processes waiting test":
const ProcessesCount = 50
let command =
when defined(windows):
("tests\\testproc.bat", "timeout2", 2)
else:
("tests/testproc.sh", "timeout2", 2)
var processes: seq[AsyncProcessRef]
for n in 0 ..< ProcessesCount:
let process = await startProcess(command[0], arguments = @[command[1]])
processes.add(process)
try:
var pending: seq[Future[int]]
for process in processes:
pending.add(process.waitForExit(10.seconds))
await allFutures(pending)
for index in 0 ..< ProcessesCount:
check pending[index].read() == command[2]
finally:
var pending: seq[Future[void]]
for process in processes:
pending.add(process.closeWait())
await allFutures(pending)
asyncTest "Multiple processes exit codes test":
const ProcessesCount = 50
let options = {AsyncProcessOption.EvalCommand}
var processes: seq[AsyncProcessRef]
for n in 0 ..< ProcessesCount:
let
command = "exit " & Base10.toString(uint64(n))
process = await startProcess(command, options = options)
processes.add(process)
try:
var pending: seq[Future[int]]
for process in processes:
pending.add(process.waitForExit(10.seconds))
await allFutures(pending)
for index in 0 ..< ProcessesCount:
check pending[index].read() == index
finally:
var pending: seq[Future[void]]
for process in processes:
pending.add(process.closeWait())
await allFutures(pending)
asyncTest "Multiple processes data capture test":
const ProcessesCount = 50
let options = {AsyncProcessOption.EvalCommand}
var processes: seq[AsyncProcessRef]
for n in 0 ..< ProcessesCount:
let command =
when defined(windows):
"ECHO TEST" & $n
else:
"echo TEST" & $n
let process = await startProcess(command, options = options,
stdoutHandle = AsyncProcess.Pipe)
processes.add(process)
try:
var pendingReaders: seq[Future[seq[byte]]]
var pendingWaiters: seq[Future[int]]
for process in processes:
pendingReaders.add(process.stdoutStream.read())
pendingWaiters.add(process.waitForExit(10.seconds))
await allFutures(pendingReaders)
await allFutures(pendingWaiters)
for index in 0 ..< ProcessesCount:
let expect =
when defined(windows):
"TEST" & $index & "\r\n"
else:
"TEST" & $index & "\n"
check string.fromBytes(pendingReaders[index].read()) == expect
check pendingWaiters[index].read() == 0
finally:
var pending: seq[Future[void]]
for process in processes:
pending.add(process.closeWait())
await allFutures(pending)
asyncTest "terminate() test":
let command =
when defined(windows):
("tests\\testproc.bat", "timeout10", 0)
else:
("tests/testproc.sh", "timeout10", 143) # 128 + SIGTERM
let process = await startProcess(command[0], arguments = @[command[1]])
try:
let resFut = process.waitForExit(InfiniteDuration)
check process.terminate().isOk()
let res = await resFut
check res == command[2]
finally:
await process.closeWait()
asyncTest "kill() test":
let command =
when defined(windows):
("tests\\testproc.bat", "timeout10", 0)
else:
("tests/testproc.sh", "timeout10", 137) # 128 + SIGKILL
let process = await startProcess(command[0], arguments = @[command[1]])
try:
let resFut = process.waitForExit(InfiniteDuration)
check process.kill().isOk()
let res = await resFut
check res == command[2]
finally:
await process.closeWait()
test "File descriptors leaks test":
when defined(windows):
skip()
else:
check getCurrentFD() == markFD
test "Leaks test":
proc getTrackerLeaks(tracker: string): bool =
let tracker = getTracker(tracker)
if isNil(tracker): false else: tracker.isLeaked()
check:
getTrackerLeaks("async.process") == false
getTrackerLeaks("async.stream.reader") == false
getTrackerLeaks("async.stream.writer") == false
getTrackerLeaks("stream.transport") == false

20
tests/testproc.sh Executable file
View File

@ -0,0 +1,20 @@
#!/bin/bash
if [ "$1" == "stdin" ]; then
read -r inputdata
echo "STDIN DATA: $inputdata"
elif [ "$1" == "timeout2" ]; then
sleep 2
exit 2
elif [ "$1" == "timeout10" ]; then
sleep 10
elif [ "$1" == "bigdata" ]; then
for i in {1..400000}
do
echo "ALICEWASBEGINNINGTOGETVERYTIREDOFSITTINGBYHERSISTERONTHEBANKANDO"
done
elif [ "$1" == "envtest" ]; then
echo "$CHRONOSASYNC"
else
echo "arguments missing"
fi

View File

@ -17,7 +17,7 @@ suite "Signal handling test suite":
when not defined(windows):
var
signalCounter = 0
sigfd = -1
sigfd: SignalHandle
proc signalProc(udata: pointer) =
signalCounter = cast[int](udata)