From 148ddf49c2ff51a223ca0df237893afe5dbaec07 Mon Sep 17 00:00:00 2001 From: Eugene Kabanov Date: Tue, 23 May 2023 13:39:35 +0300 Subject: [PATCH] Asyncproc (Part 3/3) (#374) * Initial commit. * Some Linux fixes. * Address review comments on Windows. * Fix issues on Linux. * Fix 1.2 issue and Windows warnings. * Fix posix compilation issues. --- chronos.nim | 5 +- chronos/asyncloop.nim | 282 ++++++++- chronos/asyncproc.nim | 1311 +++++++++++++++++++++++++++++++++++++++++ chronos/config.nim | 32 +- chronos/osdefs.nim | 54 +- tests/testall.nim | 2 +- tests/testproc.bat | 36 ++ tests/testproc.nim | 425 +++++++++++++ tests/testproc.sh | 20 + tests/testsignal.nim | 2 +- 10 files changed, 2119 insertions(+), 50 deletions(-) create mode 100644 chronos/asyncproc.nim create mode 100644 tests/testproc.bat create mode 100644 tests/testproc.nim create mode 100755 tests/testproc.sh diff --git a/chronos.nim b/chronos.nim index 8295924..6801b28 100644 --- a/chronos.nim +++ b/chronos.nim @@ -5,5 +5,6 @@ # Licensed under either of # Apache License, version 2.0, (LICENSE-APACHEv2) # MIT license (LICENSE-MIT) -import chronos/[asyncloop, asyncsync, handles, transport, timer, debugutils] -export asyncloop, asyncsync, handles, transport, timer, debugutils +import chronos/[asyncloop, asyncsync, handles, transport, timer, + asyncproc, debugutils] +export asyncloop, asyncsync, handles, transport, timer, asyncproc, debugutils diff --git a/chronos/asyncloop.nim b/chronos/asyncloop.nim index 5314933..ff2f079 100644 --- a/chronos/asyncloop.nim +++ b/chronos/asyncloop.nim @@ -14,7 +14,7 @@ else: {.push raises: [].} from nativesockets import Port -import std/[tables, strutils, heapqueue, options, deques] +import std/[tables, strutils, heapqueue, deques] import stew/results import "."/[config, osdefs, oserrno, osutils, timer] @@ -320,6 +320,20 @@ when defined(windows): RefCustomOverlapped* = ref CustomOverlapped + PostCallbackData = object + ioPort: HANDLE + handleFd: AsyncFD + waitFd: HANDLE + udata: pointer + ovlref: RefCustomOverlapped + ovl: pointer + + WaitableHandle* = ref PostCallbackData + ProcessHandle* = distinct WaitableHandle + + WaitableResult* {.pure.} = enum + Ok, Timeout + AsyncFD* = distinct int proc hash(x: AsyncFD): Hash {.borrow.} @@ -328,9 +342,9 @@ when defined(windows): proc getFunc(s: SocketHandle, fun: var pointer, guid: GUID): bool = var bytesRet: DWORD fun = nil - result = wsaIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, unsafeAddr(guid), - sizeof(GUID).DWORD, addr fun, sizeof(pointer).DWORD, - addr(bytesRet), nil, nil) == 0 + wsaIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, unsafeAddr(guid), + DWORD(sizeof(GUID)), addr fun, DWORD(sizeof(pointer)), + addr(bytesRet), nil, nil) == 0 proc globalInit() = var wsa = WSAData() @@ -428,6 +442,141 @@ when defined(windows): ## Unregisters ``fd``. getThreadDispatcher().handles.excl(fd) + {.push stackTrace: off.} + proc waitableCallback(param: pointer, timerOrWaitFired: WINBOOL) {. + stdcall, gcsafe.} = + # This procedure will be executed in `wait thread`, so it must not use + # GC related objects. + # We going to ignore callbacks which was spawned when `isNil(param) == true` + # because we unable to indicate this error. + if isNil(param): return + var wh = cast[ptr PostCallbackData](param) + # We ignore result of postQueueCompletionStatus() call because we unable to + # indicate error. + discard postQueuedCompletionStatus(wh[].ioPort, DWORD(timerOrWaitFired), + ULONG_PTR(wh[].handleFd), + wh[].ovl) + {.pop.} + + proc registerWaitable( + handle: HANDLE, + flags: ULONG, + timeout: Duration, + cb: CallbackFunc, + udata: pointer + ): Result[WaitableHandle, OSErrorCode] = + ## Register handle of (Change notification, Console input, Event, + ## Memory resource notification, Mutex, Process, Semaphore, Thread, + ## Waitable timer) for waiting, using specific Windows' ``flags`` and + ## ``timeout`` value. + ## + ## Callback ``cb`` will be scheduled with ``udata`` parameter when + ## ``handle`` become signaled. + ## + ## Result of this procedure call ``WaitableHandle`` should be closed using + ## closeWaitable() call. + ## + ## NOTE: This is private procedure, not supposed to be publicly available, + ## please use ``waitForSingleObject()``. + let loop = getThreadDispatcher() + var ovl = RefCustomOverlapped(data: CompletionData(cb: cb)) + + var whandle = (ref PostCallbackData)( + ioPort: loop.getIoHandler(), + handleFd: AsyncFD(handle), + udata: udata, + ovlref: ovl, + ovl: cast[pointer](ovl) + ) + + ovl.data.udata = cast[pointer](whandle) + + let dwordTimeout = + if timeout == InfiniteDuration: + DWORD(INFINITE) + else: + DWORD(timeout.milliseconds) + + if registerWaitForSingleObject(addr(whandle[].waitFd), handle, + cast[WAITORTIMERCALLBACK](waitableCallback), + cast[pointer](whandle), + dwordTimeout, + flags) == WINBOOL(0): + ovl.data.udata = nil + whandle.ovlref = nil + whandle.ovl = nil + return err(osLastError()) + + ok(WaitableHandle(whandle)) + + proc closeWaitable(wh: WaitableHandle): Result[void, OSErrorCode] = + ## Close waitable handle ``wh`` and clear all the resources. It is safe + ## to close this handle, even if wait operation is pending. + ## + ## NOTE: This is private procedure, not supposed to be publicly available, + ## please use ``waitForSingleObject()``. + doAssert(not(isNil(wh))) + + let pdata = (ref PostCallbackData)(wh) + # We are not going to clear `ref` fields in PostCallbackData object because + # it possible that callback is already scheduled. + if unregisterWait(pdata.waitFd) == 0: + let res = osLastError() + if res != ERROR_IO_PENDING: + return err(res) + ok() + + proc addProcess2*(pid: int, cb: CallbackFunc, + udata: pointer = nil): Result[ProcessHandle, OSErrorCode] = + ## Registers callback ``cb`` to be called when process with process + ## identifier ``pid`` exited. Returns process identifier, which can be + ## used to clear process callback via ``removeProcess``. + doAssert(pid > 0, "Process identifier must be positive integer") + let + hProcess = openProcess(SYNCHRONIZE, WINBOOL(0), DWORD(pid)) + flags = WT_EXECUTEINWAITTHREAD or WT_EXECUTEONLYONCE + + var wh: WaitableHandle = nil + + if hProcess == HANDLE(0): + return err(osLastError()) + + proc continuation(udata: pointer) {.gcsafe.} = + doAssert(not(isNil(udata))) + doAssert(not(isNil(wh))) + discard closeFd(hProcess) + cb(wh[].udata) + + wh = + block: + let res = registerWaitable(hProcess, flags, InfiniteDuration, + continuation, udata) + if res.isErr(): + discard closeFd(hProcess) + return err(res.error()) + res.get() + ok(ProcessHandle(wh)) + + proc removeProcess2*(procHandle: ProcessHandle): Result[void, OSErrorCode] = + ## Remove process' watching using process' descriptor ``procHandle``. + let waitableHandle = WaitableHandle(procHandle) + doAssert(not(isNil(waitableHandle))) + ? closeWaitable(waitableHandle) + ok() + + proc addProcess*(pid: int, cb: CallbackFunc, + udata: pointer = nil): ProcessHandle {. + raises: [Defect, OSError].} = + ## Registers callback ``cb`` to be called when process with process + ## identifier ``pid`` exited. Returns process identifier, which can be + ## used to clear process callback via ``removeProcess``. + addProcess2(pid, cb, udata).tryGet() + + proc removeProcess*(procHandle: ProcessHandle) {. + raises: [Defect, OSError].} = + ## Remove process' watching using process' descriptor ``procHandle``. + removeProcess2(procHandle).tryGet() + proc poll*() = ## Perform single asynchronous step, processing timers and completing ## tasks. Blocks until at least one event has completed. @@ -772,8 +921,15 @@ elif defined(macosx) or defined(freebsd) or defined(netbsd) or closeSocket(fd, aftercb) when asyncEventEngine in ["epoll", "kqueue"]: - proc addSignal2*(signal: int, cb: CallbackFunc, - udata: pointer = nil): Result[int, OSErrorCode] = + type + ProcessHandle* = distinct int + SignalHandle* = distinct int + + proc addSignal2*( + signal: int, + cb: CallbackFunc, + udata: pointer = nil + ): Result[SignalHandle, OSErrorCode] = ## Start watching signal ``signal``, and when signal appears, call the ## callback ``cb`` with specified argument ``udata``. Returns signal ## identifier code, which can be used to remove signal callback @@ -785,10 +941,13 @@ elif defined(macosx) or defined(freebsd) or defined(netbsd) or adata.reader = AsyncCallback(function: cb, udata: udata) do: return err(osdefs.EBADF) - ok(sigfd) + ok(SignalHandle(sigfd)) - proc addProcess2*(pid: int, cb: CallbackFunc, - udata: pointer = nil): Result[int, OSErrorCode] = + proc addProcess2*( + pid: int, + cb: CallbackFunc, + udata: pointer = nil + ): Result[ProcessHandle, OSErrorCode] = ## Registers callback ``cb`` to be called when process with process ## identifier ``pid`` exited. Returns process' descriptor, which can be ## used to clear process callback via ``removeProcess``. @@ -799,31 +958,42 @@ elif defined(macosx) or defined(freebsd) or defined(netbsd) or adata.reader = AsyncCallback(function: cb, udata: udata) do: return err(osdefs.EBADF) - ok(procfd) + ok(ProcessHandle(procfd)) - proc removeSignal2*(sigfd: int): Result[void, OSErrorCode] = + proc removeSignal2*(signalHandle: SignalHandle): Result[void, OSErrorCode] = ## Remove watching signal ``signal``. - getThreadDispatcher().selector.unregister2(cint(sigfd)) + getThreadDispatcher().selector.unregister2(cint(signalHandle)) - proc removeProcess2*(procfd: int): Result[void, OSErrorCode] = + proc removeProcess2*(procHandle: ProcessHandle): Result[void, OSErrorCode] = ## Remove process' watching using process' descriptor ``procfd``. - getThreadDispatcher().selector.unregister2(cint(procfd)) + getThreadDispatcher().selector.unregister2(cint(procHandle)) proc addSignal*(signal: int, cb: CallbackFunc, - udata: pointer = nil): int {.raises: [Defect, OSError].} = + udata: pointer = nil): SignalHandle {. + raises: [Defect, OSError].} = ## Start watching signal ``signal``, and when signal appears, call the ## callback ``cb`` with specified argument ``udata``. Returns signal ## identifier code, which can be used to remove signal callback ## via ``removeSignal``. addSignal2(signal, cb, udata).tryGet() - proc removeSignal*(sigfd: int) {.raises: [Defect, OSError].} = + proc removeSignal*(signalHandle: SignalHandle) {. + raises: [Defect, OSError].} = ## Remove watching signal ``signal``. - removeSignal2(sigfd).tryGet() + removeSignal2(signalHandle).tryGet() - proc removeProcess*(procfd: int) {.raises: [Defect, OSError].} = - ## Remove process' watching using process' descriptor ``procfd``. - removeProcess2(procfd).tryGet() + proc addProcess*(pid: int, cb: CallbackFunc, + udata: pointer = nil): ProcessHandle {. + raises: [Defect, OSError].} = + ## Registers callback ``cb`` to be called when process with process + ## identifier ``pid`` exited. Returns process identifier, which can be + ## used to clear process callback via ``removeProcess``. + addProcess2(pid, cb, udata).tryGet() + + proc removeProcess*(procHandle: ProcessHandle) {. + raises: [Defect, OSError].} = + ## Remove process' watching using process' descriptor ``procHandle``. + removeProcess2(procHandle).tryGet() proc poll*() {.gcsafe.} = ## Perform single asynchronous step. @@ -1002,7 +1172,7 @@ when not(defined(windows)): when asyncEventEngine in ["epoll", "kqueue"]: proc waitSignal*(signal: int): Future[void] {.raises: [Defect].} = var retFuture = newFuture[void]("chronos.waitSignal()") - var sigfd: int = -1 + var signalHandle: Opt[SignalHandle] template getSignalException(e: OSErrorCode): untyped = newException(AsyncError, "Could not manipulate signal handler, " & @@ -1010,8 +1180,8 @@ when not(defined(windows)): proc continuation(udata: pointer) {.gcsafe.} = if not(retFuture.finished()): - if sigfd != -1: - let res = removeSignal2(sigfd) + if signalHandle.isSome(): + let res = removeSignal2(signalHandle.get()) if res.isErr(): retFuture.fail(getSignalException(res.error())) else: @@ -1019,17 +1189,17 @@ when not(defined(windows)): proc cancellation(udata: pointer) {.gcsafe.} = if not(retFuture.finished()): - if sigfd != -1: - let res = removeSignal2(sigfd) + if signalHandle.isSome(): + let res = removeSignal2(signalHandle.get()) if res.isErr(): retFuture.fail(getSignalException(res.error())) - sigfd = + signalHandle = block: let res = addSignal2(signal, continuation) if res.isErr(): retFuture.fail(getSignalException(res.error())) - res.get() + Opt.some(res.get()) retFuture.cancelCallback = cancellation retFuture @@ -1283,5 +1453,63 @@ when chronosFutureTracking: ## completed, cancelled or failed). futureList.count +when defined(windows): + proc waitForSingleObject*(handle: HANDLE, + timeout: Duration): Future[WaitableResult] {. + raises: [Defect].} = + ## Waits until the specified object is in the signaled state or the + ## time-out interval elapses. WaitForSingleObject() for asynchronous world. + let flags = WT_EXECUTEONLYONCE + + var + retFuture = newFuture[WaitableResult]("chronos.waitForSingleObject()") + waitHandle: WaitableHandle = nil + + proc continuation(udata: pointer) {.gcsafe.} = + doAssert(not(isNil(waitHandle))) + if not(retFuture.finished()): + let + ovl = cast[PtrCustomOverlapped](udata) + returnFlag = WINBOOL(ovl.data.bytesCount) + res = closeWaitable(waitHandle) + if res.isErr(): + retFuture.fail(newException(AsyncError, osErrorMsg(res.error()))) + else: + if returnFlag == TRUE: + retFuture.complete(WaitableResult.Timeout) + else: + retFuture.complete(WaitableResult.Ok) + + proc cancellation(udata: pointer) {.gcsafe.} = + doAssert(not(isNil(waitHandle))) + if not(retFuture.finished()): + discard closeWaitable(waitHandle) + + let wres = uint32(waitForSingleObject(handle, DWORD(0))) + if wres == WAIT_OBJECT_0: + retFuture.complete(WaitableResult.Ok) + return retFuture + elif wres == WAIT_ABANDONED: + retFuture.fail(newException(AsyncError, "Handle was abandoned")) + return retFuture + elif wres == WAIT_FAILED: + retFuture.fail(newException(AsyncError, osErrorMsg(osLastError()))) + return retFuture + + if timeout == ZeroDuration: + retFuture.complete(WaitableResult.Timeout) + return retFuture + + waitHandle = + block: + let res = registerWaitable(handle, flags, timeout, continuation, nil) + if res.isErr(): + retFuture.fail(newException(AsyncError, osErrorMsg(res.error()))) + return retFuture + res.get() + + retFuture.cancelCallback = cancellation + return retFuture + # Perform global per-module initialization. globalInit() diff --git a/chronos/asyncproc.nim b/chronos/asyncproc.nim new file mode 100644 index 0000000..6e9858f --- /dev/null +++ b/chronos/asyncproc.nim @@ -0,0 +1,1311 @@ +# +# Chronos' asynchronous process management +# +# (c) Copyright 2022-Present Status Research & Development GmbH +# +# Licensed under either of +# Apache License, version 2.0, (LICENSE-APACHEv2) +# MIT license (LICENSE-MIT) + +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} + {.pragma: apforward, gcsafe, raises: [Defect].} +else: + {.push raises: [].} + {.pragma: apforward, gcsafe, raises: [].} + +import std/strtabs +import "."/[config, asyncloop, handles, osdefs, osutils, oserrno], + streams/asyncstream +import stew/[results, byteutils] +from std/os import quoteShell, quoteShellWindows, quoteShellPosix, envPairs + +export strtabs, results +export quoteShell, quoteShellWindows, quoteShellPosix, envPairs + +const + AsyncProcessTrackerName* = "async.process" + ## AsyncProcess leaks tracker name + + + +type + AsyncProcessError* = object of CatchableError + + AsyncProcessResult*[T] = Result[T, OSErrorCode] + + AsyncProcessOption* {.pure.} = enum + UsePath, + EvalCommand, + StdErrToStdOut, + ProcessGroup + + StandardKind {.pure.} = enum + Stdin, Stdout, Stderr + + ProcessFlag {.pure.} = enum + UserStdin, UserStdout, UserStderr, + AutoStdin, AutoStdout, AutoStderr, + NoStdin, NoStdout, NoStderr, + CopyStdout + + ProcessStreamHandleKind {.pure.} = enum + None, Auto, ProcHandle, Transport, StreamReader, StreamWriter + + ProcessStreamHandle* = object + case kind: ProcessStreamHandleKind + of ProcessStreamHandleKind.None: + discard + of ProcessStreamHandleKind.Auto: + discard + of ProcessStreamHandleKind.ProcHandle: + handle: AsyncFD + of ProcessStreamHandleKind.Transport: + transp: StreamTransport + of ProcessStreamHandleKind.StreamReader: + reader: AsyncStreamReader + of ProcessStreamHandleKind.StreamWriter: + writer: AsyncStreamWriter + + StreamHolderFlag {.pure.} = enum + Transport, Stream + + StreamKind {.pure.} = enum + None, Reader, Writer + + AsyncStreamHolder = object + case kind: StreamKind + of StreamKind.Reader: + reader: AsyncStreamReader + of StreamKind.Writer: + writer: AsyncStreamWriter + of StreamKind.None: + discard + flags: set[StreamHolderFlag] + + AsyncProcessPipes = object + flags: set[ProcessFlag] + stdinHolder: AsyncStreamHolder + stdoutHolder: AsyncStreamHolder + stderrHolder: AsyncStreamHolder + stdinHandle: AsyncFD + stdoutHandle: AsyncFD + stderrHandle: AsyncFD + + AsyncProcess* = object + + AsyncProcessImpl = object of RootObj + when defined(windows): + processHandle: HANDLE + threadHandle: HANDLE + processId: DWORD + else: + processId: Pid + pipes: AsyncProcessPipes + exitStatus: Opt[int] + flags: set[ProcessFlag] + options: set[AsyncProcessOption] + + AsyncProcessRef* = ref AsyncProcessImpl + + CommandExResponse* = object + stdOutput*: string + stdError*: string + status*: int + + AsyncProcessTracker* = ref object of TrackerBase + opened*: int64 + closed*: int64 + +template Pipe*(t: typedesc[AsyncProcess]): ProcessStreamHandle = + ProcessStreamHandle(kind: ProcessStreamHandleKind.Auto) + +proc setupAsyncProcessTracker(): AsyncProcessTracker {.gcsafe.} + +proc getAsyncProcessTracker(): AsyncProcessTracker {.inline.} = + var res = cast[AsyncProcessTracker](getTracker(AsyncProcessTrackerName)) + if isNil(res): + res = setupAsyncProcessTracker() + res + +proc dumpAsyncProcessTracking(): string {.gcsafe.} = + var tracker = getAsyncProcessTracker() + let res = "Started async processes: " & $tracker.opened & "\n" & + "Closed async processes: " & $tracker.closed + res + +proc leakAsyncProccessTracker(): bool {.gcsafe.} = + var tracker = getAsyncProcessTracker() + tracker.opened != tracker.closed + +proc trackAsyncProccess(t: AsyncProcessRef) {.inline.} = + var tracker = getAsyncProcessTracker() + inc(tracker.opened) + +proc untrackAsyncProcess(t: AsyncProcessRef) {.inline.} = + var tracker = getAsyncProcessTracker() + inc(tracker.closed) + +proc setupAsyncProcessTracker(): AsyncProcessTracker {.gcsafe.} = + var res = AsyncProcessTracker( + opened: 0, + closed: 0, + dump: dumpAsyncProcessTracking, + isLeaked: leakAsyncProccessTracker + ) + addTracker(AsyncProcessTrackerName, res) + res + +proc init*(t: typedesc[AsyncFD], handle: ProcessStreamHandle): AsyncFD = + case handle.kind + of ProcessStreamHandleKind.ProcHandle: + handle.handle + of ProcessStreamHandleKind.Transport: + handle.transp.fd + of ProcessStreamHandleKind.StreamReader: + doAssert(not(isNil(handle.reader.tsource))) + handle.reader.tsource.fd + of ProcessStreamHandleKind.StreamWriter: + doAssert(not(isNil(handle.writer.tsource))) + handle.writer.tsource.fd + of ProcessStreamHandleKind.Auto: + raiseAssert "ProcessStreamHandle could not be auto at this moment" + of ProcessStreamHandleKind.None: + raiseAssert "ProcessStreamHandle could not be empty at this moment" + +proc init*(t: typedesc[AsyncStreamHolder], handle: AsyncStreamReader, + baseFlags: set[StreamHolderFlag] = {}): AsyncStreamHolder = + AsyncStreamHolder(kind: StreamKind.Reader, reader: handle, flags: baseFlags) + +proc init*(t: typedesc[AsyncStreamHolder], handle: AsyncStreamWriter, + baseFlags: set[StreamHolderFlag] = {}): AsyncStreamHolder = + AsyncStreamHolder(kind: StreamKind.Writer, writer: handle, flags: baseFlags) + +proc init*(t: typedesc[AsyncStreamHolder]): AsyncStreamHolder = + AsyncStreamHolder(kind: StreamKind.None) + +proc init*(t: typedesc[AsyncStreamHolder], handle: ProcessStreamHandle, + kind: StreamKind, baseFlags: set[StreamHolderFlag] = {} + ): AsyncProcessResult[AsyncStreamHolder] = + case handle.kind + of ProcessStreamHandleKind.ProcHandle: + case kind + of StreamKind.Reader: + let + transp = ? fromPipe2(handle.handle) + reader = newAsyncStreamReader(transp) + flags = baseFlags + {StreamHolderFlag.Stream, + StreamHolderFlag.Transport} + ok(AsyncStreamHolder(kind: StreamKind.Reader, reader: reader, + flags: flags)) + of StreamKind.Writer: + let + transp = ? fromPipe2(handle.handle) + writer = newAsyncStreamWriter(transp) + flags = baseFlags + {StreamHolderFlag.Stream, + StreamHolderFlag.Transport} + ok(AsyncStreamHolder(kind: StreamKind.Writer, writer: writer, + flags: flags)) + of StreamKind.None: + ok(AsyncStreamHolder(kind: StreamKind.None)) + of ProcessStreamHandleKind.Transport: + case kind + of StreamKind.Reader: + let + reader = newAsyncStreamReader(handle.transp) + flags = baseFlags + {StreamHolderFlag.Stream} + ok(AsyncStreamHolder(kind: StreamKind.Reader, reader: reader, + flags: flags)) + of StreamKind.Writer: + let + writer = newAsyncStreamWriter(handle.transp) + flags = baseFlags + {StreamHolderFlag.Stream} + ok(AsyncStreamHolder(kind: StreamKind.Writer, writer: writer, + flags: flags)) + of StreamKind.None: + ok(AsyncStreamHolder(kind: StreamKind.None)) + of ProcessStreamHandleKind.StreamReader: + ok(AsyncStreamHolder(kind: StreamKind.Reader, reader: handle.reader, + flags: baseFlags)) + of ProcessStreamHandleKind.StreamWriter: + ok(AsyncStreamHolder(kind: StreamKind.Writer, writer: handle.writer, + flags: baseFlags)) + of ProcessStreamHandleKind.None, ProcessStreamHandleKind.Auto: + ok(AsyncStreamHolder(kind: StreamKind.None)) + +proc init*(t: typedesc[ProcessStreamHandle]): ProcessStreamHandle = + ProcessStreamHandle(kind: ProcessStreamHandleKind.None) + +proc init*(t: typedesc[ProcessStreamHandle], + handle: AsyncFD): ProcessStreamHandle = + ProcessStreamHandle(kind: ProcessStreamHandleKind.ProcHandle, handle: handle) + +proc init*(t: typedesc[ProcessStreamHandle], + transp: StreamTransport): ProcessStreamHandle = + doAssert(transp.kind == TransportKind.Pipe, + "Only pipe transports can be used as process streams") + ProcessStreamHandle(kind: ProcessStreamHandleKind.ProcHandle, transp: transp) + +proc init*(t: typedesc[ProcessStreamHandle], + reader: AsyncStreamReader): ProcessStreamHandle = + ProcessStreamHandle(kind: ProcessStreamHandleKind.StreamReader, + reader: reader) + +proc init*(t: typedesc[ProcessStreamHandle], + writer: AsyncStreamWriter): ProcessStreamHandle = + ProcessStreamHandle(kind: ProcessStreamHandleKind.StreamWriter, + writer: writer) + +proc isEmpty*(handle: ProcessStreamHandle): bool = + handle.kind == ProcessStreamHandleKind.None + +proc suspend*(p: AsyncProcessRef): AsyncProcessResult[void] {.apforward.} +proc resume*(p: AsyncProcessRef): AsyncProcessResult[void] {.apforward.} +proc terminate*(p: AsyncProcessRef): AsyncProcessResult[void] {.apforward.} +proc kill*(p: AsyncProcessRef): AsyncProcessResult[void] {.apforward.} +proc running*(p: AsyncProcessRef): AsyncProcessResult[bool] {.apforward.} +proc peekExitCode*(p: AsyncProcessRef): AsyncProcessResult[int] {.apforward.} +proc preparePipes(options: set[AsyncProcessOption], + stdinHandle, stdoutHandle, stderrHandle: ProcessStreamHandle + ): AsyncProcessResult[AsyncProcessPipes] {.apforward.} +proc closeProcessHandles(pipes: var AsyncProcessPipes, + options: set[AsyncProcessOption], + lastError: OSErrorCode): OSErrorCode {.apforward.} +proc closeProcessStreams(pipes: AsyncProcessPipes, + options: set[AsyncProcessOption]): Future[void] {. + apforward.} +proc closeWait(holder: AsyncStreamHolder): Future[void] {.apforward.} + +template isOk(code: OSErrorCode): bool = + when defined(windows): + code == ERROR_SUCCESS + else: + code == OSErrorCode(0) + +template closePipe(handle: AsyncFD): bool = + let fd = + when defined(windows): + osdefs.HANDLE(handle) + else: + cint(handle) + closeFd(fd) != -1 + +proc closeProcessHandles(pipes: var AsyncProcessPipes, + options: set[AsyncProcessOption], + lastError: OSErrorCode): OSErrorCode = + # We trying to preserve error code of last failed operation. + var currentError = lastError + + if ProcessFlag.AutoStdin in pipes.flags: + if pipes.stdinHandle != asyncInvalidPipe: + if currentError.isOk(): + if not(closePipe(pipes.stdinHandle)): + currentError = osLastError() + else: + discard closePipe(pipes.stdinHandle) + pipes.stdinHandle = asyncInvalidPipe + + if ProcessFlag.AutoStdout in pipes.flags: + if pipes.stdoutHandle != asyncInvalidPipe: + if currentError.isOk(): + if not(closePipe(pipes.stdoutHandle)): + currentError = osLastError() + else: + discard closePipe(pipes.stdoutHandle) + pipes.stdoutHandle = asyncInvalidPipe + + if ProcessFlag.AutoStderr in pipes.flags: + if pipes.stderrHandle != asyncInvalidPipe: + if currentError.isOk(): + if not(closePipe(pipes.stderrHandle)): + currentError = osLastError() + else: + discard closePipe(pipes.stderrHandle) + pipes.stderrHandle = asyncInvalidPipe + + currentError + +template pipesPresent*(pipes: AsyncProcessPipes): bool = + let mask = {ProcessFlag.AutoStdin, ProcessFlag.AutoStdout, + ProcessFlag.AutoStderr,ProcessFlag.UserStdin, + ProcessFlag.UserStdout, ProcessFlag.UserStderr} + pipes.flags * mask != {} + +proc raiseAsyncProcessError(msg: string, exc: ref CatchableError = nil) {. + noreturn, noinit, noinline, raises: [AsyncProcessError].} = + let message = + if isNil(exc): + msg + else: + msg & " ([" & $exc.name & "]: " & $exc.msg & ")" + raise newException(AsyncProcessError, message) + +proc raiseAsyncProcessError(msg: string, error: OSErrorCode|cint) {. + noreturn, noinit, noinline, raises: [AsyncProcessError].} = + when error is OSErrorCode: + let message = msg & " ([OSError]: " & osErrorMsg(error) & ")" + else: + let message = msg & " ([OSError]: " & osErrorMsg(OSErrorCode(error)) & ")" + raise newException(AsyncProcessError, message) + +when defined(windows): + + proc getStdinHandle(pipes: AsyncProcessPipes): HANDLE = + if pipes.flags * {ProcessFlag.AutoStdin, ProcessFlag.UserStdin} != {}: + HANDLE(pipes.stdinHandle) + else: + getStdHandle(STD_INPUT_HANDLE) + + proc getStdoutHandle(pipes: AsyncProcessPipes): HANDLE = + if pipes.flags * {ProcessFlag.AutoStdout, ProcessFlag.UserStdout} != {}: + HANDLE(pipes.stdoutHandle) + else: + getStdHandle(STD_OUTPUT_HANDLE) + + proc getStderrHandle(pipes: AsyncProcessPipes): HANDLE = + if pipes.flags * {ProcessFlag.AutoStderr, ProcessFlag.UserStderr, + ProcessFlag.CopyStdout} != {}: + HANDLE(pipes.stderrHandle) + else: + getStdHandle(STD_ERROR_HANDLE) + + proc getProcessEnvironment*(): StringTableRef = + var res = newStringTable(modeCaseInsensitive) + var env = getEnvironmentStringsW() + if isNil(env): + return res + var slider = env + while int(slider[]) != 0: + let pos = wcschr(slider, WCHAR(0x0000)) + let line = slider.toString().valueOr("") + slider = cast[LPWSTR](cast[uint](pos) + uint(sizeof(WCHAR))) + if len(line) > 0: + let delim = line.find('=') + if delim > 0: + res[substr(line, 0, delim - 1)] = substr(line, delim + 1) + discard freeEnvironmentStringsW(env) + res + + proc buildCommandLine(a: string, args: openArray[string]): string = + # TODO: Procedures quoteShell/(Windows, Posix)() needs security and bug review + # or reimplementation, for example quoteShellWindows() do not handle `\` + # properly. + # https://docs.microsoft.com/en-us/cpp/cpp/main-function-command-line-args?redirectedfrom=MSDN&view=msvc-170#parsing-c-command-line-arguments + var res = quoteShell(a) + for i in 0 ..< len(args): + res.add(' ') + res.add(quoteShell(args[i])) + res + + proc buildEnvironment(env: StringTableRef): Result[LPWSTR, OSErrorCode] = + var str: string + for key, value in pairs(env): + doAssert('=' notin key, "`=` must not be present in key name") + str.add(key) + str.add('=') + str.add(value) + str.add('\x00') + str.add("\x00\x00") + toWideString(str) + + proc closeThreadAndProcessHandle(p: AsyncProcessRef + ): AsyncProcessResult[void] = + if p.threadHandle != HANDLE(0): + if closeHandle(p.threadHandle) == FALSE: + discard closeHandle(p.processHandle) + return err(osLastError()) + p.threadHandle = HANDLE(0) + + if p.processHandle != HANDLE(0): + if closeHandle(p.processHandle) == FALSE: + return err(osLastError()) + p.processHandle = HANDLE(0) + + proc startProcess*(command: string, workingDir: string = "", + arguments: seq[string] = @[], + environment: StringTableRef = nil, + options: set[AsyncProcessOption] = {}, + stdinHandle = ProcessStreamHandle(), + stdoutHandle = ProcessStreamHandle(), + stderrHandle = ProcessStreamHandle(), + ): Future[AsyncProcessRef] {.async.} = + var + pipes = preparePipes(options, stdinHandle, stdoutHandle, + stderrHandle).valueOr: + raiseAsyncProcessError("Unable to initialze process pipes", error) + + let + commandLine = + if AsyncProcessOption.EvalCommand in options: + chronosProcShell & " /C " & command + else: + buildCommandLine(command, arguments) + workingDirectory = + if len(workingDir) > 0: + workingDir.toWideString().valueOr: + raiseAsyncProcessError("Unable to proceed working directory path", + error) + else: + nil + environment = + if not(isNil(environment)): + buildEnvironment(environment).valueOr: + raiseAsyncProcessError("Unable to build child process environment", + error) + else: + nil + flags = CREATE_UNICODE_ENVIRONMENT + var + psa = getSecurityAttributes(false) + tsa = getSecurityAttributes(false) + startupInfo = + block: + var res = STARTUPINFO(cb: DWORD(sizeof(STARTUPINFO))) + if pipes.pipesPresent(): + res.dwFlags = STARTF_USESTDHANDLES + res.hStdInput = pipes.getStdinHandle() + res.hStdOutput = pipes.getStdoutHandle() + res.hStdError = pipes.getStderrHandle() + res + procInfo = PROCESS_INFORMATION() + + let wideCommandLine = commandLine.toWideString().valueOr: + raiseAsyncProcessError("Unable to proceed command line", error) + + let res = createProcess( + nil, + wideCommandLine, + addr psa, addr tsa, + TRUE, # NOTE: This is very important flag and MUST not be modified. + # All overloaded pipe handles will not work if this flag will be + # set to FALSE. + flags, + environment, + workingDirectory, + startupInfo, procInfo + ) + + if(not(isNil(environment))): + free(environment) + free(wideCommandLine) + + var currentError = osLastError() + if res == FALSE: + await pipes.closeProcessStreams(options) + currentError = closeProcessHandles(pipes, options, currentError) + + if res == FALSE: + raiseAsyncProcessError("Unable to spawn process", currentError) + + let process = AsyncProcessRef( + processHandle: procInfo.hProcess, + threadHandle: procInfo.hThread, + processId: procInfo.dwProcessId, + pipes: pipes, + options: options, + flags: pipes.flags + ) + + trackAsyncProccess(process) + return process + + proc peekProcessExitCode(p: AsyncProcessRef): AsyncProcessResult[int] = + var wstatus: DWORD = 0 + if p.exitStatus.isSome(): + return ok(p.exitStatus.get()) + + let res = getExitCodeProcess(p.processHandle, wstatus) + if res == TRUE: + if wstatus != STILL_ACTIVE: + let status = int(wstatus) + p.exitStatus = Opt.some(status) + ok(status) + else: + ok(-1) + else: + err(osLastError()) + + proc suspend(p: AsyncProcessRef): AsyncProcessResult[void] = + if suspendThread(p.threadHandle) != 0xFFFF_FFFF'u32: + ok() + else: + err(osLastError()) + + proc resume(p: AsyncProcessRef): AsyncProcessResult[void] = + if resumeThread(p.threadHandle) != 0xFFFF_FFFF'u32: + ok() + else: + err(osLastError()) + + proc terminate(p: AsyncProcessRef): AsyncProcessResult[void] = + if terminateProcess(p.processHandle, 0) != 0'u32: + ok() + else: + err(osLastError()) + + proc kill(p: AsyncProcessRef): AsyncProcessResult[void] = + p.terminate() + + proc running(p: AsyncProcessRef): AsyncProcessResult[bool] = + let res = ? p.peekExitCode() + if res == -1: + ok(true) + else: + ok(false) + + proc waitForExit*(p: AsyncProcessRef, + timeout = InfiniteDuration): Future[int] {.async.} = + if p.exitStatus.isSome(): + return p.exitStatus.get() + + let wres = + try: + await waitForSingleObject(p.processHandle, timeout) + except ValueError as exc: + raiseAsyncProcessError("Unable to wait for process handle", exc) + + if wres == WaitableResult.Timeout: + let res = p.kill() + if res.isErr(): + raiseAsyncProcessError("Unable to terminate process", res.error()) + + let exitCode = p.peekProcessExitCode().valueOr: + raiseAsyncProcessError("Unable to peek process exit code", error) + + if exitCode >= 0: + p.exitStatus = Opt.some(exitCode) + return exitCode + + proc peekExitCode(p: AsyncProcessRef): AsyncProcessResult[int] = + if p.exitStatus.isSome(): + return ok(p.exitStatus.get()) + let res = waitForSingleObject(p.processHandle, DWORD(0)) + if res != WAIT_TIMEOUT: + let exitCode = ? p.peekProcessExitCode() + ok(exitCode) + else: + ok(-1) +else: + import std/strutils + + type + SpawnAttr = object + attrs: PosixSpawnAttr + actions: PosixSpawnFileActions + + proc fd(h: AsyncStreamHolder): cint = + case h.kind + of StreamKind.Reader: + cint(h.reader.tsource.fd) + of StreamKind.Writer: + cint(h.writer.tsource.fd) + of StreamKind.None: + raiseAssert "Incorrect stream holder" + + proc isEmpty(h: AsyncStreamHolder): bool = + h.kind == StreamKind.None + + proc initSpawn(pipes: AsyncProcessPipes, options: set[AsyncProcessOption] + ): Result[SpawnAttr, OSErrorCode] = + template doCheck(body: untyped): untyped = + let res = body + if res != 0: + return err(OSErrorCode(res)) + + var + attrs = + block: + var value: PosixSpawnAttr + let res = posixSpawnAttrInit(value) + if res != 0: + return err(OSErrorCode(res)) + value + actions = + block: + var value: PosixSpawnFileActions + let res = posixSpawnFileActionsInit(value) + if res != 0: + discard posixSpawnAttrDestroy(attrs) + return err(OSErrorCode(res)) + value + mask = + block: + var res: Sigset + discard sigemptyset(res) + res + + doCheck(posixSpawnAttrSetSigMask(attrs, mask)) + if AsyncProcessOption.ProcessGroup in options: + doCheck(posixSpawnAttrSetPgroup(attrs, 0)) + doCheck(posixSpawnAttrSetFlags(attrs, osdefs.POSIX_SPAWN_USEVFORK or + osdefs.POSIX_SPAWN_SETSIGMASK or + osdefs.POSIX_SPAWN_SETPGROUP)) + else: + doCheck(posixSpawnAttrSetFlags(attrs, osdefs.POSIX_SPAWN_USEVFORK or + osdefs.POSIX_SPAWN_SETSIGMASK)) + + if pipes.flags * {ProcessFlag.AutoStdin, ProcessFlag.UserStdin} != {}: + # Close child process STDIN. + doCheck(posixSpawnFileActionsAddClose(actions, cint(0))) + # Make a duplicate of `stdinHandle` as child process STDIN. + doCheck(posixSpawnFileActionsAddDup2(actions, cint(pipes.stdinHandle), + cint(0))) + # Close child process side of `stdinHandle`. + doCheck(posixSpawnFileActionsAddClose(actions, + cint(pipes.stdinHandle))) + # Close parent process side of `stdinHandle`. + if not(pipes.stdinHolder.isEmpty()): + let fd = cint(pipes.stdinHolder.fd()) + doCheck(posixSpawnFileActionsAddClose(actions, fd)) + + if pipes.flags * {ProcessFlag.AutoStdout, ProcessFlag.UserStdout} != {}: + # Close child process STDOUT. + doCheck(posixSpawnFileActionsAddClose(actions, cint(1))) + # Make a duplicate of `stdoutHandle` as child process STDOUT. + doCheck(posixSpawnFileActionsAddDup2(actions, cint(pipes.stdoutHandle), + cint(1))) + if AsyncProcessOption.StdErrToStdOut notin options: + # Close child process side of `stdoutHandle`. + doCheck(posixSpawnFileActionsAddClose(actions, + cint(pipes.stdoutHandle))) + # Close parent process side of `stdoutHandle`. + if not(pipes.stdoutHolder.isEmpty()): + let fd = cint(pipes.stdoutHolder.fd()) + doCheck(posixSpawnFileActionsAddClose(actions, fd)) + + if pipes.flags * {ProcessFlag.AutoStderr, ProcessFlag.UserStderr} != {}: + # Close child process STDERR. + doCheck(posixSpawnFileActionsAddClose(actions, cint(2))) + # Make a duplicate of `stderrHandle` as child process STDERR. + doCheck(posixSpawnFileActionsAddDup2(actions, cint(pipes.stderrHandle), + cint(2))) + # Close child process side of `stderrHandle`. + doCheck(posixSpawnFileActionsAddClose(actions, + cint(pipes.stderrHandle))) + # Close parent process side of `stderrHandle`. + if not(pipes.stderrHolder.isEmpty()): + let fd = cint(pipes.stderrHolder.fd()) + doCheck(posixSpawnFileActionsAddClose(actions, fd)) + else: + if AsyncProcessOption.StdErrToStdOut in options: + # Close child process STDERR. + doCheck(posixSpawnFileActionsAddClose(actions, cint(2))) + # Make a duplicate of `stdoutHandle` as child process STDERR. + doCheck(posixSpawnFileActionsAddDup2(actions, cint(pipes.stdoutHandle), + cint(2))) + # Close child process side of `stdoutHandle`. + doCheck(posixSpawnFileActionsAddClose(actions, + cint(pipes.stdoutHandle))) + # Close parent process side of `stdoutHandle`. + if not(pipes.stdoutHolder.isEmpty()): + let fd = cint(pipes.stdoutHolder.fd()) + doCheck(posixSpawnFileActionsAddClose(actions, fd)) + ok(SpawnAttr(attrs: attrs, actions: actions)) + + proc free(v: var SpawnAttr): Result[void, OSErrorCode] = + block: + let res = posixSpawnAttrDestroy(v.attrs) + if res != 0: + discard posixSpawnFileActionsDestroy(v.actions) + return err(OSErrorCode(res)) + block: + let res = posixSpawnFileActionsDestroy(v.actions) + if res != 0: + return err(OSErrorCode(res)) + ok() + + proc getKeyValueItem(key: string, value: string): cstring = + var p = cast[cstring](alloc(len(key) + len(value) + 1 + 1)) + var offset = 0 + if len(key) > 0: + copyMem(addr p[offset], unsafeAddr(key[0]), len(key)) + inc(offset, len(key)) + p[offset] = '=' + inc(offset) + if len(value) > 0: + copyMem(addr p[offset], unsafeAddr(value[0]), len(value)) + inc(offset, len(value)) + p[offset] = '\x00' + p + + proc envToCStringArray(t: StringTableRef): cstringArray = + let itemsCount = len(t) + var + res = cast[cstringArray](alloc((itemsCount + 1) * sizeof(cstring))) + i = 0 + for key, value in pairs(t): + res[i] = getKeyValueItem(key, value) + inc(i) + res[i] = nil # Last item in CStringArray should be `nil`. + res + + proc envToCStringArray(): cstringArray = + let itemsCount = + block: + var res = 0 + for key, value in envPairs(): inc(res) + res + var + res = cast[cstringArray](alloc((itemsCount + 1) * sizeof(cstring))) + i = 0 + for key, value in envPairs(): + res[i] = getKeyValueItem(key, value) + inc(i) + res[i] = nil # Last item in CStringArray should be `nil`. + res + + when defined(macosx) or defined(macos) or defined(ios): + proc getEnvironment(): ptr cstringArray {. + importc: "_NSGetEnviron", header: "".} + else: + var globalEnv {.importc: "environ", header: "".}: cstringArray + + proc getProcessEnvironment*(): StringTableRef = + var res = newStringTable(modeCaseInsensitive) + let env = + when defined(macosx) or defined(macos) or defined(ios): + getEnvironment()[] + else: + globalEnv + var i = 0 + while not(isNil(env[i])): + let line = $env[i] + if len(line) > 0: + let delim = line.find('=') + if delim > 0: + res[substr(line, 0, delim - 1)] = substr(line, delim + 1) + inc(i) + res + + func exitStatusLikeShell(status: int): int = + if WAITIFSIGNALED(cint(status)): + # like the shell! + 128 + WAITTERMSIG(cint(status)) + else: + WAITEXITSTATUS(cint(status)) + + proc getCurrentDirectory(): AsyncProcessResult[string] = + var bufsize = 1024 + var res = newString(bufsize) + + proc strLength(a: string): int {.nimcall.} = + for i in 0 ..< len(a): + if a[i] == '\x00': + return i + len(a) + + while true: + if osdefs.getcwd(cstring(res), bufsize) != nil: + setLen(res, strLength(res)) + return ok(res) + else: + let errorCode = osLastError() + if errorCode == oserrno.ERANGE: + bufsize = bufsize shl 1 + doAssert(bufsize >= 0) + res = newString(bufsize) + else: + return err(errorCode) + + proc setCurrentDirectory(dir: string): AsyncProcessResult[void] = + let res = osdefs.chdir(cstring(dir)) + if res == -1: + return err(osLastError()) + ok() + + proc closeThreadAndProcessHandle(p: AsyncProcessRef + ): AsyncProcessResult[void] = + discard + + proc startProcess*(command: string, workingDir: string = "", + arguments: seq[string] = @[], + environment: StringTableRef = nil, + options: set[AsyncProcessOption] = {}, + stdinHandle = ProcessStreamHandle(), + stdoutHandle = ProcessStreamHandle(), + stderrHandle = ProcessStreamHandle(), + ): Future[AsyncProcessRef] {.async.} = + var + pid: Pid + pipes = preparePipes(options, stdinHandle, stdoutHandle, + stderrHandle).valueOr: + raiseAsyncProcessError("Unable to initialze process pipes", + error) + sa = pipes.initSpawn(options).valueOr: + discard closeProcessHandles(pipes, options, OSErrorCode(0)) + await pipes.closeProcessStreams(options) + raiseAsyncProcessError("Unable to initalize spawn attributes", 0) + + let + (commandLine, commandArguments) = + if AsyncProcessOption.EvalCommand in options: + let args = @[chronosProcShell, "-c", command] + (chronosProcShell, allocCStringArray(args)) + else: + var res = @[command] + for arg in arguments.items(): + res.add(arg) + (command, allocCStringArray(res)) + commandEnv = + if isNil(environment): + envToCStringArray() + else: + envToCStringArray(environment) + + var currentError: OSErrorCode + var currentDir: string + + try: + currentDir = + if len(workingDir) > 0: + # Save current working directory and change it to `workingDir`. + let cres = getCurrentDirectory() + if cres.isErr(): + raiseAsyncProcessError("Unable to obtain current directory", + cres.error()) + let sres = setCurrentDirectory(workingDir) + if sres.isErr(): + raiseAsyncProcessError("Unable to change current directory", + sres.error()) + cres.get() + else: + "" + + let res = + if AsyncProcessOption.UsePath in options: + posixSpawnp(pid, cstring(commandLine), sa.actions, sa.attrs, + commandArguments, commandEnv) + else: + posixSpawn(pid, cstring(commandLine), sa.actions, sa.attrs, + commandArguments, commandEnv) + + if res != 0: + await pipes.closeProcessStreams(options) + currentError = closeProcessHandles(pipes, options, OSErrorCode(res)) + + finally: + # Restore working directory + if (len(workingDir) > 0) and (len(currentDir) > 0): + # Restore working directory. + let cres = getCurrentDirectory() + if cres.isErr(): + # On error we still try to restore original working directory. + if currentError.isOk(): + currentError = cres.error() + discard setCurrentDirectory(currentDir) + else: + if cres.get() != currentDir: + let sres = setCurrentDirectory(currentDir) + if sres.isErr(): + if currentError.isOk(): + currentError = sres.error() + + # Cleanup allocated memory + deallocCStringArray(commandArguments) + deallocCStringArray(commandEnv) + + # Cleanup posix_spawn attributes and file operations + if not(currentError.isOk()): + discard sa.free() + else: + let res = sa.free() + if res.isErr(): + currentError = res.error() + + # If currentError has been set, raising an exception. + if not(currentError.isOk()): + raiseAsyncProcessError("Unable to spawn process", currentError) + + let process = AsyncProcessRef( + processId: pid, + pipes: pipes, + options: options, + flags: pipes.flags + ) + + trackAsyncProccess(process) + return process + + proc peekProcessExitCode(p: AsyncProcessRef, + reap = false): AsyncProcessResult[int] = + var wstatus: cint = 0 + if p.exitStatus.isSome(): + return ok(p.exitStatus.get()) + let + flags = if reap: cint(0) else: osdefs.WNOHANG + waitRes = + block: + var res: cint = 0 + while true: + res = osdefs.waitpid(p.processId, wstatus, flags) + if not((res == -1) and (osLastError() == oserrno.EINTR)): + break + res + if waitRes == p.processId: + if WAITIFEXITED(wstatus) or WAITIFSIGNALED(wstatus): + let status = int(wstatus) + p.exitStatus = Opt.some(status) + ok(status) + else: + ok(-1) + elif waitRes == 0: + ok(-1) + else: + err(osLastError()) + + proc suspend(p: AsyncProcessRef): AsyncProcessResult[void] = + if osdefs.kill(p.processId, osdefs.SIGSTOP) == 0: + ok() + else: + err(osLastError()) + + proc resume(p: AsyncProcessRef): AsyncProcessResult[void] = + if osdefs.kill(p.processId, osdefs.SIGCONT) == 0: + ok() + else: + err(osLastError()) + + proc terminate(p: AsyncProcessRef): AsyncProcessResult[void] = + if osdefs.kill(p.processId, osdefs.SIGTERM) == 0: + ok() + else: + err(osLastError()) + + proc kill(p: AsyncProcessRef): AsyncProcessResult[void] = + if osdefs.kill(p.processId, osdefs.SIGKILL) == 0: + ok() + else: + err(osLastError()) + + proc running(p: AsyncProcessRef): AsyncProcessResult[bool] = + let res = ? p.peekProcessExitCode() + if res == -1: + ok(true) + else: + ok(false) + + proc waitForExit*(p: AsyncProcessRef, + timeout = InfiniteDuration): Future[int] = + var + retFuture = newFuture[int]("chronos.waitForExit()") + processHandle: ProcessHandle + timer: TimerCallback = nil + + if p.exitStatus.isSome(): + retFuture.complete(p.exitStatus.get()) + return retFuture + + if timeout == ZeroDuration: + let res = p.kill() + if res.isErr(): + retFuture.fail(newException(AsyncProcessError, osErrorMsg(res.error()))) + return retFuture + + block: + let exitCode = p.peekProcessExitCode().valueOr: + retFuture.fail(newException(AsyncProcessError, osErrorMsg(error))) + return retFuture + if exitCode != -1: + retFuture.complete(exitStatusLikeShell(exitCode)) + return retFuture + + if timeout == ZeroDuration: + retFuture.complete(-1) + return retFuture + + proc continuation(udata: pointer) {.gcsafe.} = + let source = cast[int](udata) + if not(retFuture.finished()): + if source == 1: + # Process exited. + let res = removeProcess2(processHandle) + if res.isErr(): + retFuture.fail(newException(AsyncProcessError, + osErrorMsg(res.error()))) + return + if not(isNil(timer)): + clearTimer(timer) + let exitCode = p.peekProcessExitCode().valueOr: + retFuture.fail(newException(AsyncProcessError, osErrorMsg(error))) + return + if exitCode == -1: + retFuture.complete(-1) + else: + retFuture.complete(exitStatusLikeShell(exitCode)) + else: + # Timeout exceeded. + let res = p.kill() + if res.isErr(): + retFuture.fail(newException(AsyncProcessError, + osErrorMsg(res.error()))) + + proc cancellation(udata: pointer) {.gcsafe.} = + if not(retFuture.finished()): + if not(isNil(timer)): + clearTimer(timer) + # Ignore any errors because of cancellation. + discard removeProcess2(processHandle) + + if timeout != InfiniteDuration: + timer = setTimer(Moment.fromNow(timeout), continuation, cast[pointer](2)) + + processHandle = addProcess2(int(p.processId), continuation, + cast[pointer](1)).valueOr: + if error == oserrno.ESRCH: + # "zombie death race" problem. + # If process exited right after `waitpid()` - `kqueue` call + # could return ESRCH error. So we need to handle it properly and + # try to reap process code from exiting process. + let exitCode = p.peekProcessExitCode(true).valueOr: + retFuture.fail(newException(AsyncProcessError, osErrorMsg(error))) + return retFuture + if exitCode == -1: + # This should not be happens one more time, so we just report + # original error. + retFuture.fail(newException(AsyncProcessError, + osErrorMsg(oserrno.ESRCH))) + else: + retFuture.complete(exitStatusLikeShell(exitCode)) + else: + retFuture.fail(newException(AsyncProcessError, osErrorMsg(error))) + return retFuture + + # addProcess2() has race condition problem inside. Its possible that child + # process (we going to wait) sends SIGCHLD right after addProcess2() blocks + # signals and before it starts monitoring for signal (`signalfd` or + # `kqueue`). To avoid this problem we going to check process for completion + # one more time. + block: + let exitCode = p.peekProcessExitCode().valueOr: + discard removeProcess2(processHandle) + retFuture.fail(newException(AsyncProcessError, osErrorMsg(error))) + return retFuture + if exitCode != -1: + discard removeProcess2(processHandle) + retFuture.complete(exitStatusLikeShell(exitCode)) + return retFuture + + # Process is still running, so we going to wait for SIGCHLD. + retFuture.cancelCallback = cancellation + return retFuture + + proc peekExitCode(p: AsyncProcessRef): AsyncProcessResult[int] = + let res = ? p.peekProcessExitCode() + ok(exitStatusLikeShell(res)) + +proc createPipe(kind: StandardKind + ): Result[tuple[read: AsyncFD, write: AsyncFD], OSErrorCode] = + case kind + of StandardKind.Stdin: + let pipes = + when defined(windows): + let + readFlags: set[DescriptorFlag] = {DescriptorFlag.NonBlock} + writeFlags: set[DescriptorFlag] = {DescriptorFlag.NonBlock} + ? createOsPipe(readFlags, writeFlags) + else: + let + readFlags: set[DescriptorFlag] = {} + writeFlags: set[DescriptorFlag] = {DescriptorFlag.NonBlock} + ? createOsPipe(readFlags, writeFlags) + ok((read: AsyncFD(pipes.read), write: AsyncFD(pipes.write))) + of StandardKind.Stdout, StandardKind.Stderr: + let pipes = + when defined(windows): + let + readFlags: set[DescriptorFlag] = {DescriptorFlag.NonBlock} + writeFlags: set[DescriptorFlag] = {DescriptorFlag.NonBlock} + ? createOsPipe(readFlags, writeFlags) + else: + let + readFlags: set[DescriptorFlag] = {DescriptorFlag.NonBlock} + writeFlags: set[DescriptorFlag] = {} + ? createOsPipe(readFlags, writeFlags) + ok((read: AsyncFD(pipes.read), write: AsyncFD(pipes.write))) + +proc preparePipes(options: set[AsyncProcessOption], + stdinHandle, stdoutHandle, + stderrHandle: ProcessStreamHandle + ): AsyncProcessResult[AsyncProcessPipes] = + + let + (stdinFlags, localStdin, remoteStdin) = + case stdinHandle.kind + of ProcessStreamHandleKind.None: + ({ProcessFlag.NoStdin}, AsyncStreamHolder.init(), + asyncInvalidPipe) + of ProcessStreamHandleKind.Auto: + let (pipeIn, pipeOut) = ? createPipe(StandardKind.Stdin) + let holder = ? AsyncStreamHolder.init( + ProcessStreamHandle.init(pipeOut), StreamKind.Writer, {}) + ({ProcessFlag.AutoStdin}, holder, pipeIn) + else: + ({ProcessFlag.UserStdin}, + AsyncStreamHolder.init(), AsyncFD.init(stdinHandle)) + (stdoutFlags, localStdout, remoteStdout) = + case stdoutHandle.kind + of ProcessStreamHandleKind.None: + ({ProcessFlag.NoStdout}, AsyncStreamHolder.init(), + asyncInvalidPipe) + of ProcessStreamHandleKind.Auto: + let (pipeIn, pipeOut) = ? createPipe(StandardKind.Stdout) + let holder = ? AsyncStreamHolder.init( + ProcessStreamHandle.init(pipeIn), StreamKind.Reader, {}) + ({ProcessFlag.AutoStdout}, holder, pipeOut) + else: + ({ProcessFlag.UserStdout}, + AsyncStreamHolder.init(), AsyncFD.init(stdoutHandle)) + (stderrFlags, localStderr, remoteStderr) = + if AsyncProcessOption.StdErrToStdOut in options: + doAssert(stderrHandle.isEmpty(), + "`stderrHandle` argument must not be set, when" & + "`AsyncProcessOption.StdErrToStdOut` flag is used") + case stdoutHandle.kind + of ProcessStreamHandleKind.None: + raiseAssert "`stdoutHandle` argument must be present, when " & + "`AsyncProcessOption.StdErrToStdOut` flag is used" + of ProcessStreamHandleKind.Auto: + ({ProcessFlag.CopyStdout}, localStdout, remoteStdout) + else: + ({ProcessFlag.CopyStdout}, localStdout, remoteStdout) + else: + case stderrHandle.kind + of ProcessStreamHandleKind.None: + ({ProcessFlag.NoStderr}, AsyncStreamHolder.init(), + asyncInvalidPipe) + of ProcessStreamHandleKind.Auto: + let (pipeIn, pipeOut) = ? createPipe(StandardKind.Stderr) + let holder = ? AsyncStreamHolder.init( + ProcessStreamHandle.init(pipeIn), StreamKind.Reader, {}) + ({ProcessFlag.AutoStderr}, holder, pipeOut) + else: + ({ProcessFlag.UserStderr}, + AsyncStreamHolder.init(), AsyncFD.init(stderrHandle)) + + ok(AsyncProcessPipes( + flags: stdinFlags + stdoutFlags + stderrFlags, + stdinHolder: localStdin, + stdoutHolder: localStdout, + stderrHolder: localStderr, + stdinHandle: remoteStdin, + stdoutHandle: remoteStdout, + stderrHandle: remoteStderr + )) + +proc closeWait(holder: AsyncStreamHolder) {.async.} = + let (future, transp) = + case holder.kind + of StreamKind.None: + (nil, nil) + of StreamKind.Reader: + if StreamHolderFlag.Stream in holder.flags: + (holder.reader.closeWait(), holder.reader.tsource) + else: + (nil, holder.reader.tsource) + of StreamKind.Writer: + if StreamHolderFlag.Stream in holder.flags: + (holder.writer.closeWait(), holder.writer.tsource) + else: + (nil, holder.writer.tsource) + + let pending = + block: + var res: seq[Future[void]] + if not(isNil(future)): + res.add(future) + if not(isNil(transp)): + if StreamHolderFlag.Transport in holder.flags: + res.add(transp.closeWait()) + res + + if len(pending) > 0: + await allFutures(pending) + +proc closeProcessStreams(pipes: AsyncProcessPipes, + options: set[AsyncProcessOption]): Future[void] = + let pending = + block: + var res: seq[Future[void]] + if ProcessFlag.AutoStdin in pipes.flags: + res.add(pipes.stdinHolder.closeWait()) + if ProcessFlag.AutoStdout in pipes.flags: + res.add(pipes.stdoutHolder.closeWait()) + if ProcessFlag.AutoStderr in pipes.flags: + res.add(pipes.stderrHolder.closeWait()) + res + allFutures(pending) + +proc closeWait*(p: AsyncProcessRef) {.async.} = + # Here we ignore all possible errrors, because we do not want to raise + # exceptions. + discard closeProcessHandles(p.pipes, p.options, OSErrorCode(0)) + await p.pipes.closeProcessStreams(p.options) + discard p.closeThreadAndProcessHandle() + untrackAsyncProcess(p) + +proc stdinStream*(p: AsyncProcessRef): AsyncStreamWriter = + doAssert(p.pipes.stdinHolder.kind == StreamKind.Writer, + "StdinStreamWriter is not available") + p.pipes.stdinHolder.writer + +proc stdoutStream*(p: AsyncProcessRef): AsyncStreamReader = + doAssert(p.pipes.stdoutHolder.kind == StreamKind.Reader, + "StdoutStreamReader is not available") + p.pipes.stdoutHolder.reader + +proc stderrStream*(p: AsyncProcessRef): AsyncStreamReader = + doAssert(p.pipes.stderrHolder.kind == StreamKind.Reader, + "StderrStreamReader is not available") + p.pipes.stderrHolder.reader + +proc execCommand*(command: string, + options = {AsyncProcessOption.EvalCommand}, + timeout = InfiniteDuration + ): Future[int] {.async.} = + let poptions = options + {AsyncProcessOption.EvalCommand} + let process = await startProcess(command, options = poptions) + let res = + try: + await process.waitForExit(timeout) + finally: + await process.closeWait() + return res + +proc execCommandEx*(command: string, + options = {AsyncProcessOption.EvalCommand}, + timeout = InfiniteDuration + ): Future[CommandExResponse] {.async.} = + let + process = await startProcess(command, options = options, + stdoutHandle = AsyncProcess.Pipe, + stderrHandle = AsyncProcess.Pipe) + outputReader = process.stdoutStream.read() + errorReader = process.stderrStream.read() + res = + try: + await allFutures(outputReader, errorReader) + let + status = await process.waitForExit(timeout) + output = + try: + string.fromBytes(outputReader.read()) + except AsyncStreamError as exc: + raiseAsyncProcessError("Unable to read process' stdout channel", + exc) + error = + try: + string.fromBytes(errorReader.read()) + except AsyncStreamError as exc: + raiseAsyncProcessError("Unable to read process' stderr channel", + exc) + CommandExResponse(status: status, stdOutput: output, stdError: error) + finally: + await process.closeWait() + + return res + +proc pid*(p: AsyncProcessRef): int = + ## Returns process ``p`` identifier. + int(p.processId) + +template processId*(p: AsyncProcessRef): int = pid(p) diff --git a/chronos/config.nim b/chronos/config.nim index abc9c37..cef8a63 100644 --- a/chronos/config.nim +++ b/chronos/config.nim @@ -14,10 +14,10 @@ when (NimMajor, NimMinor) >= (1, 4): const chronosStrictException* {.booldefine.}: bool = defined(chronosPreviewV4) - ## Require that `async` code raises only derivatives of `CatchableError` and - ## not `Exception` - forward declarations, methods and `proc` types used - ## from within `async` code may need to be be explicitly annotated with - ## `raises: [CatchableError]` when this mode is enabled. + ## Require that `async` code raises only derivatives of `CatchableError` + ## and not `Exception` - forward declarations, methods and `proc` types + ## used from within `async` code may need to be be explicitly annotated + ## with `raises: [CatchableError]` when this mode is enabled. chronosStackTrace* {.booldefine.}: bool = defined(chronosDebug) ## Include stack traces in futures for creation and completion points @@ -32,6 +32,21 @@ when (NimMajor, NimMinor) >= (1, 4): chronosDumpAsync* {.booldefine.}: bool = defined(nimDumpAsync) ## Print code generated by {.async.} transformation + + chronosProcShell* {.strdefine.}: string = + when defined(windows): + "cmd.exe" + else: + when defined(android): + "/system/bin/sh" + else: + "/bin/sh" + ## Default shell binary path. + ## + ## The shell is used as command for command line when process started + ## using `AsyncProcessOption.EvalCommand` and API calls such as + ## ``execCommand(command)`` and ``execCommandEx(command)``. + else: # 1.2 doesn't support `booldefine` in `when` properly const @@ -42,6 +57,14 @@ else: chronosFutureTracking*: bool = defined(chronosDebug) or defined(chronosFutureTracking) chronosDumpAsync*: bool = defined(nimDumpAsync) + chronosProcShell* {.strdefine.}: string = + when defined(windows): + "cmd.exe" + else: + when defined(android): + "/system/bin/sh" + else: + "/bin/sh" when defined(debug) or defined(chronosConfig): import std/macros @@ -55,3 +78,4 @@ when defined(debug) or defined(chronosConfig): printOption("chronosFutureId", chronosFutureId) printOption("chronosFutureTracking", chronosFutureTracking) printOption("chronosDumpAsync", chronosDumpAsync) + printOption("chronosProcShell", chronosProcShell) diff --git a/chronos/osdefs.nim b/chronos/osdefs.nim index d7bb868..971a9a9 100644 --- a/chronos/osdefs.nim +++ b/chronos/osdefs.nim @@ -827,7 +827,8 @@ elif defined(macos) or defined(macosx): unlink, listen, getaddrinfo, gai_strerror, getrlimit, setrlimit, getpid, pthread_sigmask, sigprocmask, sigemptyset, sigaddset, sigismember, fcntl, accept, pipe, write, - signal, read, setsockopt, getsockopt, + signal, read, setsockopt, getsockopt, getcwd, chdir, + waitpid, kill, Timeval, Timespec, Pid, Mode, Time, Sigset, SockAddr, SockLen, Sockaddr_storage, Sockaddr_in, Sockaddr_in6, Sockaddr_un, SocketHandle, AddrInfo, RLimit, @@ -839,14 +840,16 @@ elif defined(macos) or defined(macosx): SIG_BLOCK, SIG_UNBLOCK, SIGHUP, SIGINT, SIGQUIT, SIGILL, SIGTRAP, SIGABRT, SIGBUS, SIGFPE, SIGKILL, SIGUSR1, SIGSEGV, SIGUSR2, - SIGPIPE, SIGALRM, SIGTERM, SIGPIPE, SIGCHLD + SIGPIPE, SIGALRM, SIGTERM, SIGPIPE, SIGCHLD, SIGSTOP, + SIGCONT export close, shutdown, socket, getpeername, getsockname, recvfrom, sendto, send, bindSocket, recv, connect, unlink, listen, getaddrinfo, gai_strerror, getrlimit, setrlimit, getpid, pthread_sigmask, sigprocmask, sigemptyset, sigaddset, sigismember, fcntl, accept, pipe, write, - signal, read, setsockopt, getsockopt, + signal, read, setsockopt, getsockopt, getcwd, chdir, + waitpid, kill, Timeval, Timespec, Pid, Mode, Time, Sigset, SockAddr, SockLen, Sockaddr_storage, Sockaddr_in, Sockaddr_in6, Sockaddr_un, SocketHandle, AddrInfo, RLimit, @@ -858,7 +861,8 @@ elif defined(macos) or defined(macosx): SIG_BLOCK, SIG_UNBLOCK, SIGHUP, SIGINT, SIGQUIT, SIGILL, SIGTRAP, SIGABRT, SIGBUS, SIGFPE, SIGKILL, SIGUSR1, SIGSEGV, SIGUSR2, - SIGPIPE, SIGALRM, SIGTERM, SIGPIPE, SIGCHLD + SIGPIPE, SIGALRM, SIGTERM, SIGPIPE, SIGCHLD, SIGSTOP, + SIGCONT type MachTimebaseInfo* {.importc: "struct mach_timebase_info", @@ -882,7 +886,8 @@ elif defined(linux): getrlimit, setrlimit, getpeername, getsockname, recvfrom, sendto, send, bindSocket, recv, connect, unlink, listen, sendmsg, recvmsg, getpid, fcntl, - pthread_sigmask, clock_gettime, signal, + pthread_sigmask, clock_gettime, signal, getcwd, chdir, + waitpid, kill, ClockId, Itimerspec, Timespec, Sigset, Time, Pid, Mode, SigInfo, Id, Tmsghdr, IOVec, RLimit, SockAddr, SockLen, Sockaddr_storage, Sockaddr_in, @@ -895,7 +900,8 @@ elif defined(linux): SOCK_DGRAM, SIGHUP, SIGINT, SIGQUIT, SIGILL, SIGTRAP, SIGABRT, SIGBUS, SIGFPE, SIGKILL, SIGUSR1, SIGSEGV, SIGUSR2, - SIGPIPE, SIGALRM, SIGTERM, SIGPIPE, SIGCHLD + SIGPIPE, SIGALRM, SIGTERM, SIGPIPE, SIGCHLD, SIGSTOP, + SIGCONT export close, shutdown, sigemptyset, sigaddset, sigismember, sigdelset, write, read, waitid, getaddrinfo, @@ -903,7 +909,8 @@ elif defined(linux): getrlimit, setrlimit, getpeername, getsockname, recvfrom, sendto, send, bindSocket, recv, connect, unlink, listen, sendmsg, recvmsg, getpid, fcntl, - pthread_sigmask, clock_gettime, signal, + pthread_sigmask, clock_gettime, signal, getcwd, chdir, + waitpid, kill, ClockId, Itimerspec, Timespec, Sigset, Time, Pid, Mode, SigInfo, Id, Tmsghdr, IOVec, RLimit, SockAddr, SockLen, Sockaddr_storage, Sockaddr_in, @@ -916,7 +923,8 @@ elif defined(linux): SOCK_DGRAM, SIGHUP, SIGINT, SIGQUIT, SIGILL, SIGTRAP, SIGABRT, SIGBUS, SIGFPE, SIGKILL, SIGUSR1, SIGSEGV, SIGUSR2, - SIGPIPE, SIGALRM, SIGTERM, SIGPIPE, SIGCHLD + SIGPIPE, SIGALRM, SIGTERM, SIGPIPE, SIGCHLD, SIGSTOP, + SIGCONT when not defined(android) and defined(amd64): const IP_MULTICAST_TTL*: cint = 33 @@ -1012,6 +1020,7 @@ elif defined(freebsd) or defined(openbsd) or defined(netbsd) or setrlimit, getpid, pthread_sigmask, sigemptyset, sigaddset, sigismember, fcntl, accept, pipe, write, signal, read, setsockopt, getsockopt, clock_gettime, + getcwd, chdir, waitpid, kill, Timeval, Timespec, Pid, Mode, Time, Sigset, SockAddr, SockLen, Sockaddr_storage, Sockaddr_in, Sockaddr_in6, Sockaddr_un, SocketHandle, AddrInfo, RLimit, @@ -1023,7 +1032,8 @@ elif defined(freebsd) or defined(openbsd) or defined(netbsd) or SIG_BLOCK, SIG_UNBLOCK, CLOCK_MONOTONIC, SIGHUP, SIGINT, SIGQUIT, SIGILL, SIGTRAP, SIGABRT, SIGBUS, SIGFPE, SIGKILL, SIGUSR1, SIGSEGV, SIGUSR2, - SIGPIPE, SIGALRM, SIGTERM, SIGPIPE, SIGCHLD + SIGPIPE, SIGALRM, SIGTERM, SIGPIPE, SIGCHLD, SIGSTOP, + SIGCONT export close, shutdown, socket, getpeername, getsockname, recvfrom, sendto, send, bindSocket, recv, connect, @@ -1042,7 +1052,8 @@ elif defined(freebsd) or defined(openbsd) or defined(netbsd) or SIG_BLOCK, SIG_UNBLOCK, CLOCK_MONOTONIC, SIGHUP, SIGINT, SIGQUIT, SIGILL, SIGTRAP, SIGABRT, SIGBUS, SIGFPE, SIGKILL, SIGUSR1, SIGSEGV, SIGUSR2, - SIGPIPE, SIGALRM, SIGTERM, SIGPIPE, SIGCHLD + SIGPIPE, SIGALRM, SIGTERM, SIGPIPE, SIGCHLD, SIGSTOP, + SIGCONT var IP_MULTICAST_TTL* {.importc: "IP_MULTICAST_TTL", header: "".}: cint @@ -1081,15 +1092,28 @@ elif defined(macos) or defined(macosx): IPPROTO_TCP* = 6 when defined(linux): - const O_CLOEXEC* = 0x80000 + const + O_CLOEXEC* = 0x80000 + POSIX_SPAWN_USEVFORK* = 0x40 elif defined(freebsd): - const O_CLOEXEC* = 0x00100000 + const + O_CLOEXEC* = 0x00100000 + POSIX_SPAWN_USEVFORK* = 0x00 elif defined(openbsd): - const O_CLOEXEC* = 0x10000 + const + O_CLOEXEC* = 0x10000 + POSIX_SPAWN_USEVFORK* = 0x00 elif defined(netbsd): - const O_CLOEXEC* = 0x00400000 + const + O_CLOEXEC* = 0x00400000 + POSIX_SPAWN_USEVFORK* = 0x00 elif defined(dragonfly): - const O_CLOEXEC* = 0x00020000 + const + O_CLOEXEC* = 0x00020000 + POSIX_SPAWN_USEVFORK* = 0x00 +elif defined(macos) or defined(macosx): + const + POSIX_SPAWN_USEVFORK* = 0x00 when defined(linux) or defined(macos) or defined(macosx) or defined(freebsd) or defined(openbsd) or defined(netbsd) or defined(dragonfly): diff --git a/tests/testall.nim b/tests/testall.nim index c0713bf..eabe0a5 100644 --- a/tests/testall.nim +++ b/tests/testall.nim @@ -8,7 +8,7 @@ import testmacro, testsync, testsoon, testtime, testfut, testsignal, testaddress, testdatagram, teststream, testserver, testbugs, testnet, testasyncstream, testhttpserver, testshttpserver, testhttpclient, - testratelimit + testproc, testratelimit # Must be imported last to check for Pending futures import testutils diff --git a/tests/testproc.bat b/tests/testproc.bat new file mode 100644 index 0000000..314bea7 --- /dev/null +++ b/tests/testproc.bat @@ -0,0 +1,36 @@ +@ECHO OFF + +IF /I "%1" == "STDIN" ( + GOTO :STDINTEST +) ELSE IF /I "%1" == "TIMEOUT2" ( + GOTO :TIMEOUTTEST2 +) ELSE IF /I "%1" == "TIMEOUT10" ( + GOTO :TIMEOUTTEST10 +) ELSE IF /I "%1" == "BIGDATA" ( + GOTO :BIGDATA +) ELSE IF /I "%1" == "ENVTEST" ( + GOTO :ENVTEST +) + +EXIT 0 + +:STDINTEST +SET /P "INPUTDATA=" +ECHO STDIN DATA: %INPUTDATA% +EXIT 0 + +:TIMEOUTTEST2 +ping -n 2 127.0.0.1 > NUL +EXIT 2 + +:TIMEOUTTEST10 +ping -n 10 127.0.0.1 > NUL +EXIT 0 + +:BIGDATA +FOR /L %%G IN (1, 1, 400000) DO ECHO ALICEWASBEGINNINGTOGETVERYTIREDOFSITTINGBYHERSISTERONTHEBANKANDO +EXIT 0 + +:ENVTEST +ECHO %CHRONOSASYNC% +EXIT 0 diff --git a/tests/testproc.nim b/tests/testproc.nim new file mode 100644 index 0000000..05f793d --- /dev/null +++ b/tests/testproc.nim @@ -0,0 +1,425 @@ +# Chronos Test Suite +# (c) Copyright 2022-Present +# Status Research & Development GmbH +# +# Licensed under either of +# Apache License, version 2.0, (LICENSE-APACHEv2) +# MIT license (LICENSE-MIT) +import std/os +import unittest2, stew/[base10, byteutils] +import ".."/chronos/unittest2/asynctests + +when defined(posix): + from ".."/chronos/osdefs import SIGKILL + +when defined(nimHasUsed): {.used.} + +suite "Asynchronous process management test suite": + const OutputTests = + when defined(windows): + [ + ("ECHO TESTOUT", "TESTOUT\r\n", ""), + ("ECHO TESTERR 1>&2", "", "TESTERR \r\n"), + ("ECHO TESTBOTH && ECHO TESTBOTH 1>&2", "TESTBOTH \r\n", + "TESTBOTH \r\n") + ] + else: + [ + ("echo TESTOUT", "TESTOUT\n", ""), + ("echo TESTERR 1>&2", "", "TESTERR\n"), + ("echo TESTBOTH && echo TESTBOTH 1>&2", "TESTBOTH\n", "TESTBOTH\n") + ] + + const ExitCodes = [5, 13, 64, 100, 126, 127, 128, 130, 255] + + proc createBigMessage(size: int): seq[byte] = + var message = "MESSAGE" + result = newSeq[byte](size) + for i in 0 ..< len(result): + result[i] = byte(message[i mod len(message)]) + + when not(defined(windows)): + proc getCurrentFD(): int = + let local = initTAddress("127.0.0.1:34334") + let sock = createAsyncSocket(local.getDomain(), SockType.SOCK_DGRAM, + Protocol.IPPROTO_UDP) + closeSocket(sock) + return int(sock) + var markFD = getCurrentFD() + + asyncTest "execCommand() exit codes test": + for item in ExitCodes: + let command = "exit " & Base10.toString(uint64(item)) + let res = await execCommand(command) + check res == item + + asyncTest "execCommandEx() exit codes and outputs test": + for test in OutputTests: + let response = await execCommandEx(test[0]) + check: + response.stdOutput == test[1] + response.stdError == test[2] + response.status == 0 + + asyncTest "waitForExit() & peekExitCode() exit codes test": + let options = {AsyncProcessOption.EvalCommand} + for item in ExitCodes: + let command = "exit " & Base10.toString(uint64(item)) + let process = await startProcess(command, options = options) + try: + let res = await process.waitForExit(InfiniteDuration) + check: + res == item + process.peekExitCode().tryGet() == item + process.running().tryGet() == false + finally: + await process.closeWait() + + asyncTest "addProcess() test": + var + handlerFut = newFuture[void]("process.handler.future") + pidFd: ProcessHandle + processCounter = 0 + processExitCode = 0 + process: AsyncProcessRef + + proc processHandler(udata: pointer) {.gcsafe.} = + processCounter = cast[int](udata) + processExitCode = process.peekExitCode().valueOr: + handlerFut.fail(newException(ValueError, osErrorMsg(error))) + return + let res = removeProcess2(pidFd) + if res.isErr(): + handlerFut.fail(newException(ValueError, osErrorMsg(res.error()))) + else: + handlerFut.complete() + + let + options = {AsyncProcessOption.EvalCommand} + command = "exit 1" + + process = await startProcess(command, options = options) + + try: + pidFd = + block: + let res = addProcess2(process.pid(), processHandler, + cast[pointer](31337)) + if res.isErr(): + raiseAssert osErrorMsg(res.error()) + res.get() + await handlerFut.wait(5.seconds) + check: + processExitCode == 1 + processCounter == 31337 + finally: + await process.closeWait() + + asyncTest "STDIN stream test": + let + command = + when defined(windows): + "tests\\testproc.bat stdin" + else: + "tests/testproc.sh stdin" + options = {AsyncProcessOption.EvalCommand} + shellHeader = "STDIN DATA: ".toBytes() + smallTest = + when defined(windows): + "SMALL AMOUNT\r\n".toBytes() + else: + "SMALL AMOUNT\n".toBytes() + + let bigTest = + when defined(windows): + var res = createBigMessage(256) + res.add(byte(0x0D)) + res.add(byte(0x0A)) + res + else: + var res = createBigMessage(256) + res.add(byte(0x0A)) + res + + for item in [smallTest, bigTest]: + let process = await startProcess(command, options = options, + stdinHandle = AsyncProcess.Pipe, + stdoutHandle = AsyncProcess.Pipe) + try: + await process.stdinStream.write(item) + let stdoutDataFut = process.stdoutStream.read() + let res = await process.waitForExit(InfiniteDuration) + await allFutures(stdoutDataFut) + check: + res == 0 + stdoutDataFut.read() == shellHeader & item + finally: + await process.closeWait() + + asyncTest "STDOUT and STDERR streams test": + let options = {AsyncProcessOption.EvalCommand} + + for test in OutputTests: + let process = await startProcess(test[0], options = options, + stdoutHandle = AsyncProcess.Pipe, + stderrHandle = AsyncProcess.Pipe) + try: + let outBytesFut = process.stdoutStream.read() + let errBytesFut = process.stderrStream.read() + let res = await process.waitForExit(InfiniteDuration) + await allFutures(outBytesFut, errBytesFut) + check: + string.fromBytes(outBytesFut.read()) == test[1] + string.fromBytes(errBytesFut.read()) == test[2] + res == 0 + finally: + await process.closeWait() + + asyncTest "STDERR to STDOUT streams test": + let options = {AsyncProcessOption.EvalCommand, + AsyncProcessOption.StdErrToStdOut} + let command = + when defined(windows): + "ECHO TESTSTDOUT && ECHO TESTSTDERR 1>&2" + else: + "echo TESTSTDOUT && echo TESTSTDERR 1>&2" + let expect = + when defined(windows): + "TESTSTDOUT \r\nTESTSTDERR \r\n" + else: + "TESTSTDOUT\nTESTSTDERR\n" + let process = await startProcess(command, options = options, + stdoutHandle = AsyncProcess.Pipe) + try: + let outBytesFut = process.stdoutStream.read() + let res = await process.waitForExit(InfiniteDuration) + await allFutures(outBytesFut) + check: + string.fromBytes(outBytesFut.read()) == expect + res == 0 + finally: + await process.closeWait() + + asyncTest "Capture big amount of bytes from STDOUT stream test": + let options = {AsyncProcessOption.EvalCommand} + let command = + when defined(windows): + "tests\\testproc.bat bigdata" + else: + "tests/testproc.sh bigdata" + let expect = + when defined(windows): + 400_000 * (64 + 2) + else: + 400_000 * (64 + 1) + let process = await startProcess(command, options = options, + stdoutHandle = AsyncProcess.Pipe, + stderrHandle = AsyncProcess.Pipe) + try: + let outBytesFut = process.stdoutStream.read() + let errBytesFut = process.stderrStream.read() + let res = await process.waitForExit(InfiniteDuration) + await allFutures(outBytesFut, errBytesFut) + check: + res == 0 + len(outBytesFut.read()) == expect + len(errBytesFut.read()) == 0 + finally: + await process.closeWait() + + asyncTest "Long-waiting waitForExit() test": + let command = + when defined(windows): + ("tests\\testproc.bat", "timeout2") + else: + ("tests/testproc.sh", "timeout2") + let process = await startProcess(command[0], arguments = @[command[1]]) + try: + let res = await process.waitForExit(InfiniteDuration) + check res == 2 + finally: + await process.closeWait() + + asyncTest "waitForExit(duration) test": + let command = + when defined(windows): + ("tests\\testproc.bat", "timeout10") + else: + ("tests/testproc.sh", "timeout10") + let expect = + when defined(windows): + 0 + else: + 128 + int(SIGKILL) + let process = await startProcess(command[0], arguments = @[command[1]]) + try: + let res = await process.waitForExit(1.seconds) + check res == expect + finally: + await process.closeWait() + + asyncTest "Child process environment test": + let command = + when defined(windows): + ("tests\\testproc.bat", "envtest", 0, "CHILDPROCESSTEST\r\n") + else: + ("tests/testproc.sh", "envtest", 0, "CHILDPROCESSTEST\n") + + let env = getProcessEnvironment() + env["CHRONOSASYNC"] = "CHILDPROCESSTEST" + let process = await startProcess(command[0], arguments = @[command[1]], + environment = env, + stdoutHandle = AsyncProcess.Pipe) + try: + let outBytesFut = process.stdoutStream.read() + let res = await process.waitForExit(InfiniteDuration) + let outBytes = await outBytesFut + check: + res == command[2] + string.fromBytes(outBytes) == command[3] + finally: + await process.closeWait() + + test "getProcessEnvironment() test": + let env = getProcessEnvironment() + when defined(windows): + check len(env["SYSTEMROOT"]) > 0 + else: + check len(env["USER"]) > 0 + + asyncTest "Multiple processes waiting test": + const ProcessesCount = 50 + + let command = + when defined(windows): + ("tests\\testproc.bat", "timeout2", 2) + else: + ("tests/testproc.sh", "timeout2", 2) + + var processes: seq[AsyncProcessRef] + for n in 0 ..< ProcessesCount: + let process = await startProcess(command[0], arguments = @[command[1]]) + processes.add(process) + try: + var pending: seq[Future[int]] + for process in processes: + pending.add(process.waitForExit(10.seconds)) + await allFutures(pending) + for index in 0 ..< ProcessesCount: + check pending[index].read() == command[2] + finally: + var pending: seq[Future[void]] + for process in processes: + pending.add(process.closeWait()) + await allFutures(pending) + + asyncTest "Multiple processes exit codes test": + const ProcessesCount = 50 + + let options = {AsyncProcessOption.EvalCommand} + + var processes: seq[AsyncProcessRef] + for n in 0 ..< ProcessesCount: + let + command = "exit " & Base10.toString(uint64(n)) + process = await startProcess(command, options = options) + processes.add(process) + try: + var pending: seq[Future[int]] + for process in processes: + pending.add(process.waitForExit(10.seconds)) + await allFutures(pending) + for index in 0 ..< ProcessesCount: + check pending[index].read() == index + finally: + var pending: seq[Future[void]] + for process in processes: + pending.add(process.closeWait()) + await allFutures(pending) + + asyncTest "Multiple processes data capture test": + const ProcessesCount = 50 + + let options = {AsyncProcessOption.EvalCommand} + + var processes: seq[AsyncProcessRef] + for n in 0 ..< ProcessesCount: + let command = + when defined(windows): + "ECHO TEST" & $n + else: + "echo TEST" & $n + + let process = await startProcess(command, options = options, + stdoutHandle = AsyncProcess.Pipe) + processes.add(process) + + try: + var pendingReaders: seq[Future[seq[byte]]] + var pendingWaiters: seq[Future[int]] + for process in processes: + pendingReaders.add(process.stdoutStream.read()) + pendingWaiters.add(process.waitForExit(10.seconds)) + await allFutures(pendingReaders) + await allFutures(pendingWaiters) + + for index in 0 ..< ProcessesCount: + let expect = + when defined(windows): + "TEST" & $index & "\r\n" + else: + "TEST" & $index & "\n" + check string.fromBytes(pendingReaders[index].read()) == expect + check pendingWaiters[index].read() == 0 + finally: + var pending: seq[Future[void]] + for process in processes: + pending.add(process.closeWait()) + await allFutures(pending) + + asyncTest "terminate() test": + let command = + when defined(windows): + ("tests\\testproc.bat", "timeout10", 0) + else: + ("tests/testproc.sh", "timeout10", 143) # 128 + SIGTERM + let process = await startProcess(command[0], arguments = @[command[1]]) + try: + let resFut = process.waitForExit(InfiniteDuration) + check process.terminate().isOk() + let res = await resFut + check res == command[2] + finally: + await process.closeWait() + + asyncTest "kill() test": + let command = + when defined(windows): + ("tests\\testproc.bat", "timeout10", 0) + else: + ("tests/testproc.sh", "timeout10", 137) # 128 + SIGKILL + let process = await startProcess(command[0], arguments = @[command[1]]) + try: + let resFut = process.waitForExit(InfiniteDuration) + check process.kill().isOk() + let res = await resFut + check res == command[2] + finally: + await process.closeWait() + + test "File descriptors leaks test": + when defined(windows): + skip() + else: + check getCurrentFD() == markFD + + test "Leaks test": + proc getTrackerLeaks(tracker: string): bool = + let tracker = getTracker(tracker) + if isNil(tracker): false else: tracker.isLeaked() + + check: + getTrackerLeaks("async.process") == false + getTrackerLeaks("async.stream.reader") == false + getTrackerLeaks("async.stream.writer") == false + getTrackerLeaks("stream.transport") == false diff --git a/tests/testproc.sh b/tests/testproc.sh new file mode 100755 index 0000000..1725d49 --- /dev/null +++ b/tests/testproc.sh @@ -0,0 +1,20 @@ +#!/bin/bash + +if [ "$1" == "stdin" ]; then + read -r inputdata + echo "STDIN DATA: $inputdata" +elif [ "$1" == "timeout2" ]; then + sleep 2 + exit 2 +elif [ "$1" == "timeout10" ]; then + sleep 10 +elif [ "$1" == "bigdata" ]; then + for i in {1..400000} + do + echo "ALICEWASBEGINNINGTOGETVERYTIREDOFSITTINGBYHERSISTERONTHEBANKANDO" + done +elif [ "$1" == "envtest" ]; then + echo "$CHRONOSASYNC" +else + echo "arguments missing" +fi diff --git a/tests/testsignal.nim b/tests/testsignal.nim index 5eca5a9..0bcf793 100644 --- a/tests/testsignal.nim +++ b/tests/testsignal.nim @@ -17,7 +17,7 @@ suite "Signal handling test suite": when not defined(windows): var signalCounter = 0 - sigfd = -1 + sigfd: SignalHandle proc signalProc(udata: pointer) = signalCounter = cast[int](udata)