nim-chronos/chronos/asyncproc.nim
Eugene Kabanov 2e8551b0d9
Cancellation fixes and tests. (#445)
* Add callTick and stream cancellation tests.

* Fix stepsAsync() test.

* Cancellation changes.

* Update and add more cancellation tests.

* Fix Posix shutdown call to handle ENOTCONN error.

* With new changes to to cancellation its now possible.

* Refactor testsoon.nim to not produce artifacts after tests are finished.

* Debugging MacOS issue.

* Adjust flaky test times.

* Fix issue.

* Add test for issue #334 which was also addressed in this PR.
Avoid `break` in problematic test.

* Add noCancelWait() call which prohibits cancellation.
Fix closeWait() calls to use noCancelWait() predicate.
Adding sleep to flaky MacOS test.

* Remove all debugging echoes.

* Fix cancelAndWait() which now could perform multiple attempts to cancel target Future (mustCancel behavior).

* Fix issues revealed by switch to different cancelAndWait().

* Address review comments.

* Fix testutils compilation warning.

* Rename callTick() to internalCallTick().

* Add some documentation comments.

* Disable flaky ratelimit test.

* Rename noCancelWait() to noCancel().
Address review comments.
2023-09-15 19:38:39 +03:00

1348 lines
47 KiB
Nim

#
# Chronos' asynchronous process management
#
# (c) Copyright 2022-Present Status Research & Development GmbH
#
# Licensed under either of
# Apache License, version 2.0, (LICENSE-APACHEv2)
# MIT license (LICENSE-MIT)
{.push raises: [].}
{.pragma: apforward, gcsafe, raises: [].}
import std/strtabs
import "."/[config, asyncloop, handles, osdefs, osutils, oserrno],
streams/asyncstream
import stew/[results, byteutils]
from std/os import quoteShell, quoteShellWindows, quoteShellPosix, envPairs
export strtabs, results
export quoteShell, quoteShellWindows, quoteShellPosix, envPairs
const
AsyncProcessTrackerName* = "async.process"
## AsyncProcess leaks tracker name
type
AsyncProcessError* = object of AsyncError
AsyncProcessTimeoutError* = object of AsyncProcessError
AsyncProcessResult*[T] = Result[T, OSErrorCode]
AsyncProcessOption* {.pure.} = enum
UsePath,
EvalCommand,
StdErrToStdOut,
ProcessGroup
StandardKind {.pure.} = enum
Stdin, Stdout, Stderr
ProcessFlag {.pure.} = enum
UserStdin, UserStdout, UserStderr,
AutoStdin, AutoStdout, AutoStderr,
NoStdin, NoStdout, NoStderr,
CopyStdout
ProcessStreamHandleKind {.pure.} = enum
None, Auto, ProcHandle, Transport, StreamReader, StreamWriter
ProcessStreamHandle* = object
case kind: ProcessStreamHandleKind
of ProcessStreamHandleKind.None:
discard
of ProcessStreamHandleKind.Auto:
discard
of ProcessStreamHandleKind.ProcHandle:
handle: AsyncFD
of ProcessStreamHandleKind.Transport:
transp: StreamTransport
of ProcessStreamHandleKind.StreamReader:
reader: AsyncStreamReader
of ProcessStreamHandleKind.StreamWriter:
writer: AsyncStreamWriter
StreamHolderFlag {.pure.} = enum
Transport, Stream
StreamKind {.pure.} = enum
None, Reader, Writer
AsyncStreamHolder = object
case kind: StreamKind
of StreamKind.Reader:
reader: AsyncStreamReader
of StreamKind.Writer:
writer: AsyncStreamWriter
of StreamKind.None:
discard
flags: set[StreamHolderFlag]
AsyncProcessPipes = object
flags: set[ProcessFlag]
stdinHolder: AsyncStreamHolder
stdoutHolder: AsyncStreamHolder
stderrHolder: AsyncStreamHolder
stdinHandle: AsyncFD
stdoutHandle: AsyncFD
stderrHandle: AsyncFD
AsyncProcess* = object
AsyncProcessImpl = object of RootObj
when defined(windows):
processHandle: HANDLE
threadHandle: HANDLE
processId: DWORD
else:
processId: Pid
pipes: AsyncProcessPipes
exitStatus: Opt[int]
flags: set[ProcessFlag]
options: set[AsyncProcessOption]
AsyncProcessRef* = ref AsyncProcessImpl
CommandExResponse* = object
stdOutput*: string
stdError*: string
status*: int
WaitOperation {.pure.} = enum
Kill, Terminate
template Pipe*(t: typedesc[AsyncProcess]): ProcessStreamHandle =
ProcessStreamHandle(kind: ProcessStreamHandleKind.Auto)
proc init*(t: typedesc[AsyncFD], handle: ProcessStreamHandle): AsyncFD =
case handle.kind
of ProcessStreamHandleKind.ProcHandle:
handle.handle
of ProcessStreamHandleKind.Transport:
handle.transp.fd
of ProcessStreamHandleKind.StreamReader:
doAssert(not(isNil(handle.reader.tsource)))
handle.reader.tsource.fd
of ProcessStreamHandleKind.StreamWriter:
doAssert(not(isNil(handle.writer.tsource)))
handle.writer.tsource.fd
of ProcessStreamHandleKind.Auto:
raiseAssert "ProcessStreamHandle could not be auto at this moment"
of ProcessStreamHandleKind.None:
raiseAssert "ProcessStreamHandle could not be empty at this moment"
proc init*(t: typedesc[AsyncStreamHolder], handle: AsyncStreamReader,
baseFlags: set[StreamHolderFlag] = {}): AsyncStreamHolder =
AsyncStreamHolder(kind: StreamKind.Reader, reader: handle, flags: baseFlags)
proc init*(t: typedesc[AsyncStreamHolder], handle: AsyncStreamWriter,
baseFlags: set[StreamHolderFlag] = {}): AsyncStreamHolder =
AsyncStreamHolder(kind: StreamKind.Writer, writer: handle, flags: baseFlags)
proc init*(t: typedesc[AsyncStreamHolder]): AsyncStreamHolder =
AsyncStreamHolder(kind: StreamKind.None)
proc init*(t: typedesc[AsyncStreamHolder], handle: ProcessStreamHandle,
kind: StreamKind, baseFlags: set[StreamHolderFlag] = {}
): AsyncProcessResult[AsyncStreamHolder] =
case handle.kind
of ProcessStreamHandleKind.ProcHandle:
case kind
of StreamKind.Reader:
let
transp = ? fromPipe2(handle.handle)
reader = newAsyncStreamReader(transp)
flags = baseFlags + {StreamHolderFlag.Stream,
StreamHolderFlag.Transport}
ok(AsyncStreamHolder(kind: StreamKind.Reader, reader: reader,
flags: flags))
of StreamKind.Writer:
let
transp = ? fromPipe2(handle.handle)
writer = newAsyncStreamWriter(transp)
flags = baseFlags + {StreamHolderFlag.Stream,
StreamHolderFlag.Transport}
ok(AsyncStreamHolder(kind: StreamKind.Writer, writer: writer,
flags: flags))
of StreamKind.None:
ok(AsyncStreamHolder(kind: StreamKind.None))
of ProcessStreamHandleKind.Transport:
case kind
of StreamKind.Reader:
let
reader = newAsyncStreamReader(handle.transp)
flags = baseFlags + {StreamHolderFlag.Stream}
ok(AsyncStreamHolder(kind: StreamKind.Reader, reader: reader,
flags: flags))
of StreamKind.Writer:
let
writer = newAsyncStreamWriter(handle.transp)
flags = baseFlags + {StreamHolderFlag.Stream}
ok(AsyncStreamHolder(kind: StreamKind.Writer, writer: writer,
flags: flags))
of StreamKind.None:
ok(AsyncStreamHolder(kind: StreamKind.None))
of ProcessStreamHandleKind.StreamReader:
ok(AsyncStreamHolder(kind: StreamKind.Reader, reader: handle.reader,
flags: baseFlags))
of ProcessStreamHandleKind.StreamWriter:
ok(AsyncStreamHolder(kind: StreamKind.Writer, writer: handle.writer,
flags: baseFlags))
of ProcessStreamHandleKind.None, ProcessStreamHandleKind.Auto:
ok(AsyncStreamHolder(kind: StreamKind.None))
proc init*(t: typedesc[ProcessStreamHandle]): ProcessStreamHandle =
ProcessStreamHandle(kind: ProcessStreamHandleKind.None)
proc init*(t: typedesc[ProcessStreamHandle],
handle: AsyncFD): ProcessStreamHandle =
ProcessStreamHandle(kind: ProcessStreamHandleKind.ProcHandle, handle: handle)
proc init*(t: typedesc[ProcessStreamHandle],
transp: StreamTransport): ProcessStreamHandle =
doAssert(transp.kind == TransportKind.Pipe,
"Only pipe transports can be used as process streams")
ProcessStreamHandle(kind: ProcessStreamHandleKind.ProcHandle, transp: transp)
proc init*(t: typedesc[ProcessStreamHandle],
reader: AsyncStreamReader): ProcessStreamHandle =
ProcessStreamHandle(kind: ProcessStreamHandleKind.StreamReader,
reader: reader)
proc init*(t: typedesc[ProcessStreamHandle],
writer: AsyncStreamWriter): ProcessStreamHandle =
ProcessStreamHandle(kind: ProcessStreamHandleKind.StreamWriter,
writer: writer)
proc isEmpty*(handle: ProcessStreamHandle): bool =
handle.kind == ProcessStreamHandleKind.None
proc suspend*(p: AsyncProcessRef): AsyncProcessResult[void] {.apforward.}
proc resume*(p: AsyncProcessRef): AsyncProcessResult[void] {.apforward.}
proc terminate*(p: AsyncProcessRef): AsyncProcessResult[void] {.apforward.}
proc kill*(p: AsyncProcessRef): AsyncProcessResult[void] {.apforward.}
proc running*(p: AsyncProcessRef): AsyncProcessResult[bool] {.apforward.}
proc peekExitCode*(p: AsyncProcessRef): AsyncProcessResult[int] {.apforward.}
proc preparePipes(options: set[AsyncProcessOption],
stdinHandle, stdoutHandle, stderrHandle: ProcessStreamHandle
): AsyncProcessResult[AsyncProcessPipes] {.apforward.}
proc closeProcessHandles(pipes: var AsyncProcessPipes,
options: set[AsyncProcessOption],
lastError: OSErrorCode): OSErrorCode {.apforward.}
proc closeProcessStreams(pipes: AsyncProcessPipes,
options: set[AsyncProcessOption]): Future[void] {.
apforward.}
proc closeWait(holder: AsyncStreamHolder): Future[void] {.apforward.}
template isOk(code: OSErrorCode): bool =
when defined(windows):
code == ERROR_SUCCESS
else:
code == OSErrorCode(0)
template closePipe(handle: AsyncFD): bool =
let fd =
when defined(windows):
osdefs.HANDLE(handle)
else:
cint(handle)
closeFd(fd) != -1
proc closeProcessHandles(pipes: var AsyncProcessPipes,
options: set[AsyncProcessOption],
lastError: OSErrorCode): OSErrorCode =
# We trying to preserve error code of last failed operation.
var currentError = lastError
if ProcessFlag.AutoStdin in pipes.flags:
if pipes.stdinHandle != asyncInvalidPipe:
if currentError.isOk():
if not(closePipe(pipes.stdinHandle)):
currentError = osLastError()
else:
discard closePipe(pipes.stdinHandle)
pipes.stdinHandle = asyncInvalidPipe
if ProcessFlag.AutoStdout in pipes.flags:
if pipes.stdoutHandle != asyncInvalidPipe:
if currentError.isOk():
if not(closePipe(pipes.stdoutHandle)):
currentError = osLastError()
else:
discard closePipe(pipes.stdoutHandle)
pipes.stdoutHandle = asyncInvalidPipe
if ProcessFlag.AutoStderr in pipes.flags:
if pipes.stderrHandle != asyncInvalidPipe:
if currentError.isOk():
if not(closePipe(pipes.stderrHandle)):
currentError = osLastError()
else:
discard closePipe(pipes.stderrHandle)
pipes.stderrHandle = asyncInvalidPipe
currentError
template pipesPresent*(pipes: AsyncProcessPipes): bool =
let mask = {ProcessFlag.AutoStdin, ProcessFlag.AutoStdout,
ProcessFlag.AutoStderr,ProcessFlag.UserStdin,
ProcessFlag.UserStdout, ProcessFlag.UserStderr}
pipes.flags * mask != {}
proc raiseAsyncProcessError(msg: string, exc: ref CatchableError = nil) {.
noreturn, noinit, noinline, raises: [AsyncProcessError].} =
let message =
if isNil(exc):
msg
else:
msg & " ([" & $exc.name & "]: " & $exc.msg & ")"
raise newException(AsyncProcessError, message)
proc raiseAsyncProcessTimeoutError() {.
noreturn, noinit, noinline, raises: [AsyncProcessTimeoutError].} =
let message = "Operation timed out"
raise newException(AsyncProcessTimeoutError, message)
proc raiseAsyncProcessError(msg: string, error: OSErrorCode|cint) {.
noreturn, noinit, noinline, raises: [AsyncProcessError].} =
when error is OSErrorCode:
let message = msg & " ([OSError]: " & osErrorMsg(error) & ")"
else:
let message = msg & " ([OSError]: " & osErrorMsg(OSErrorCode(error)) & ")"
raise newException(AsyncProcessError, message)
when defined(windows):
proc getStdinHandle(pipes: AsyncProcessPipes): HANDLE =
if pipes.flags * {ProcessFlag.AutoStdin, ProcessFlag.UserStdin} != {}:
HANDLE(pipes.stdinHandle)
else:
getStdHandle(STD_INPUT_HANDLE)
proc getStdoutHandle(pipes: AsyncProcessPipes): HANDLE =
if pipes.flags * {ProcessFlag.AutoStdout, ProcessFlag.UserStdout} != {}:
HANDLE(pipes.stdoutHandle)
else:
getStdHandle(STD_OUTPUT_HANDLE)
proc getStderrHandle(pipes: AsyncProcessPipes): HANDLE =
if pipes.flags * {ProcessFlag.AutoStderr, ProcessFlag.UserStderr,
ProcessFlag.CopyStdout} != {}:
HANDLE(pipes.stderrHandle)
else:
getStdHandle(STD_ERROR_HANDLE)
proc getProcessEnvironment*(): StringTableRef =
var res = newStringTable(modeCaseInsensitive)
var env = getEnvironmentStringsW()
if isNil(env):
return res
var slider = env
while int(slider[]) != 0:
let pos = wcschr(slider, WCHAR(0x0000))
let line = slider.toString().valueOr("")
slider = cast[LPWSTR](cast[uint](pos) + uint(sizeof(WCHAR)))
if len(line) > 0:
let delim = line.find('=')
if delim > 0:
res[substr(line, 0, delim - 1)] = substr(line, delim + 1)
discard freeEnvironmentStringsW(env)
res
proc buildCommandLine(a: string, args: openArray[string]): string =
# TODO: Procedures quoteShell/(Windows, Posix)() needs security and bug review
# or reimplementation, for example quoteShellWindows() do not handle `\`
# properly.
# https://docs.microsoft.com/en-us/cpp/cpp/main-function-command-line-args?redirectedfrom=MSDN&view=msvc-170#parsing-c-command-line-arguments
var res = quoteShell(a)
for i in 0 ..< len(args):
res.add(' ')
res.add(quoteShell(args[i]))
res
proc buildEnvironment(env: StringTableRef): Result[LPWSTR, OSErrorCode] =
var str: string
for key, value in pairs(env):
doAssert('=' notin key, "`=` must not be present in key name")
str.add(key)
str.add('=')
str.add(value)
str.add('\x00')
str.add("\x00\x00")
toWideString(str)
proc closeThreadAndProcessHandle(p: AsyncProcessRef
): AsyncProcessResult[void] =
if p.threadHandle != HANDLE(0):
if closeHandle(p.threadHandle) == FALSE:
discard closeHandle(p.processHandle)
return err(osLastError())
p.threadHandle = HANDLE(0)
if p.processHandle != HANDLE(0):
if closeHandle(p.processHandle) == FALSE:
return err(osLastError())
p.processHandle = HANDLE(0)
proc startProcess*(command: string, workingDir: string = "",
arguments: seq[string] = @[],
environment: StringTableRef = nil,
options: set[AsyncProcessOption] = {},
stdinHandle = ProcessStreamHandle(),
stdoutHandle = ProcessStreamHandle(),
stderrHandle = ProcessStreamHandle(),
): Future[AsyncProcessRef] {.async.} =
var
pipes = preparePipes(options, stdinHandle, stdoutHandle,
stderrHandle).valueOr:
raiseAsyncProcessError("Unable to initialze process pipes", error)
let
commandLine =
if AsyncProcessOption.EvalCommand in options:
chronosProcShell & " /C " & command
else:
buildCommandLine(command, arguments)
workingDirectory =
if len(workingDir) > 0:
workingDir.toWideString().valueOr:
raiseAsyncProcessError("Unable to proceed working directory path",
error)
else:
nil
environment =
if not(isNil(environment)):
buildEnvironment(environment).valueOr:
raiseAsyncProcessError("Unable to build child process environment",
error)
else:
nil
flags = CREATE_UNICODE_ENVIRONMENT
var
psa = getSecurityAttributes(false)
tsa = getSecurityAttributes(false)
startupInfo =
block:
var res = STARTUPINFO(cb: DWORD(sizeof(STARTUPINFO)))
if pipes.pipesPresent():
res.dwFlags = STARTF_USESTDHANDLES
res.hStdInput = pipes.getStdinHandle()
res.hStdOutput = pipes.getStdoutHandle()
res.hStdError = pipes.getStderrHandle()
res
procInfo = PROCESS_INFORMATION()
let wideCommandLine = commandLine.toWideString().valueOr:
raiseAsyncProcessError("Unable to proceed command line", error)
let res = createProcess(
nil,
wideCommandLine,
addr psa, addr tsa,
TRUE, # NOTE: This is very important flag and MUST not be modified.
# All overloaded pipe handles will not work if this flag will be
# set to FALSE.
flags,
environment,
workingDirectory,
startupInfo, procInfo
)
if(not(isNil(environment))):
free(environment)
free(wideCommandLine)
var currentError = osLastError()
if res == FALSE:
await pipes.closeProcessStreams(options)
currentError = closeProcessHandles(pipes, options, currentError)
if res == FALSE:
raiseAsyncProcessError("Unable to spawn process", currentError)
let process = AsyncProcessRef(
processHandle: procInfo.hProcess,
threadHandle: procInfo.hThread,
processId: procInfo.dwProcessId,
pipes: pipes,
options: options,
flags: pipes.flags
)
trackCounter(AsyncProcessTrackerName)
return process
proc peekProcessExitCode(p: AsyncProcessRef): AsyncProcessResult[int] =
var wstatus: DWORD = 0
if p.exitStatus.isSome():
return ok(p.exitStatus.get())
let res = getExitCodeProcess(p.processHandle, wstatus)
if res == TRUE:
if wstatus != STILL_ACTIVE:
let status = int(wstatus)
p.exitStatus = Opt.some(status)
ok(status)
else:
ok(-1)
else:
err(osLastError())
proc suspend(p: AsyncProcessRef): AsyncProcessResult[void] =
if suspendThread(p.threadHandle) != 0xFFFF_FFFF'u32:
ok()
else:
err(osLastError())
proc resume(p: AsyncProcessRef): AsyncProcessResult[void] =
if resumeThread(p.threadHandle) != 0xFFFF_FFFF'u32:
ok()
else:
err(osLastError())
proc terminate(p: AsyncProcessRef): AsyncProcessResult[void] =
if terminateProcess(p.processHandle, 0) != 0'u32:
ok()
else:
err(osLastError())
proc kill(p: AsyncProcessRef): AsyncProcessResult[void] =
p.terminate()
proc running(p: AsyncProcessRef): AsyncProcessResult[bool] =
let res = ? p.peekExitCode()
if res == -1:
ok(true)
else:
ok(false)
proc waitForExit*(p: AsyncProcessRef,
timeout = InfiniteDuration): Future[int] {.async.} =
if p.exitStatus.isSome():
return p.exitStatus.get()
let wres =
try:
await waitForSingleObject(p.processHandle, timeout)
except ValueError as exc:
raiseAsyncProcessError("Unable to wait for process handle", exc)
if wres == WaitableResult.Timeout:
let res = p.kill()
if res.isErr():
raiseAsyncProcessError("Unable to terminate process", res.error())
let exitCode = p.peekProcessExitCode().valueOr:
raiseAsyncProcessError("Unable to peek process exit code", error)
if exitCode >= 0:
p.exitStatus = Opt.some(exitCode)
return exitCode
proc peekExitCode(p: AsyncProcessRef): AsyncProcessResult[int] =
if p.exitStatus.isSome():
return ok(p.exitStatus.get())
let res = waitForSingleObject(p.processHandle, DWORD(0))
if res != WAIT_TIMEOUT:
let exitCode = ? p.peekProcessExitCode()
ok(exitCode)
else:
ok(-1)
else:
import std/strutils
type
SpawnAttr = object
attrs: PosixSpawnAttr
actions: PosixSpawnFileActions
proc fd(h: AsyncStreamHolder): cint =
case h.kind
of StreamKind.Reader:
cint(h.reader.tsource.fd)
of StreamKind.Writer:
cint(h.writer.tsource.fd)
of StreamKind.None:
raiseAssert "Incorrect stream holder"
proc isEmpty(h: AsyncStreamHolder): bool =
h.kind == StreamKind.None
proc initSpawn(pipes: AsyncProcessPipes, options: set[AsyncProcessOption]
): Result[SpawnAttr, OSErrorCode] =
template doCheck(body: untyped): untyped =
let res = body
if res != 0:
return err(OSErrorCode(res))
var
attrs =
block:
var value: PosixSpawnAttr
let res = posixSpawnAttrInit(value)
if res != 0:
return err(OSErrorCode(res))
value
actions =
block:
var value: PosixSpawnFileActions
let res = posixSpawnFileActionsInit(value)
if res != 0:
discard posixSpawnAttrDestroy(attrs)
return err(OSErrorCode(res))
value
mask =
block:
var res: Sigset
discard sigemptyset(res)
res
doCheck(posixSpawnAttrSetSigMask(attrs, mask))
if AsyncProcessOption.ProcessGroup in options:
doCheck(posixSpawnAttrSetPgroup(attrs, 0))
doCheck(posixSpawnAttrSetFlags(attrs, osdefs.POSIX_SPAWN_USEVFORK or
osdefs.POSIX_SPAWN_SETSIGMASK or
osdefs.POSIX_SPAWN_SETPGROUP))
else:
doCheck(posixSpawnAttrSetFlags(attrs, osdefs.POSIX_SPAWN_USEVFORK or
osdefs.POSIX_SPAWN_SETSIGMASK))
if pipes.flags * {ProcessFlag.AutoStdin, ProcessFlag.UserStdin} != {}:
# Close child process STDIN.
doCheck(posixSpawnFileActionsAddClose(actions, cint(0)))
# Make a duplicate of `stdinHandle` as child process STDIN.
doCheck(posixSpawnFileActionsAddDup2(actions, cint(pipes.stdinHandle),
cint(0)))
# Close child process side of `stdinHandle`.
doCheck(posixSpawnFileActionsAddClose(actions,
cint(pipes.stdinHandle)))
# Close parent process side of `stdinHandle`.
if not(pipes.stdinHolder.isEmpty()):
let fd = cint(pipes.stdinHolder.fd())
doCheck(posixSpawnFileActionsAddClose(actions, fd))
if pipes.flags * {ProcessFlag.AutoStdout, ProcessFlag.UserStdout} != {}:
# Close child process STDOUT.
doCheck(posixSpawnFileActionsAddClose(actions, cint(1)))
# Make a duplicate of `stdoutHandle` as child process STDOUT.
doCheck(posixSpawnFileActionsAddDup2(actions, cint(pipes.stdoutHandle),
cint(1)))
if AsyncProcessOption.StdErrToStdOut notin options:
# Close child process side of `stdoutHandle`.
doCheck(posixSpawnFileActionsAddClose(actions,
cint(pipes.stdoutHandle)))
# Close parent process side of `stdoutHandle`.
if not(pipes.stdoutHolder.isEmpty()):
let fd = cint(pipes.stdoutHolder.fd())
doCheck(posixSpawnFileActionsAddClose(actions, fd))
if pipes.flags * {ProcessFlag.AutoStderr, ProcessFlag.UserStderr} != {}:
# Close child process STDERR.
doCheck(posixSpawnFileActionsAddClose(actions, cint(2)))
# Make a duplicate of `stderrHandle` as child process STDERR.
doCheck(posixSpawnFileActionsAddDup2(actions, cint(pipes.stderrHandle),
cint(2)))
# Close child process side of `stderrHandle`.
doCheck(posixSpawnFileActionsAddClose(actions,
cint(pipes.stderrHandle)))
# Close parent process side of `stderrHandle`.
if not(pipes.stderrHolder.isEmpty()):
let fd = cint(pipes.stderrHolder.fd())
doCheck(posixSpawnFileActionsAddClose(actions, fd))
else:
if AsyncProcessOption.StdErrToStdOut in options:
# Close child process STDERR.
doCheck(posixSpawnFileActionsAddClose(actions, cint(2)))
# Make a duplicate of `stdoutHandle` as child process STDERR.
doCheck(posixSpawnFileActionsAddDup2(actions, cint(pipes.stdoutHandle),
cint(2)))
# Close child process side of `stdoutHandle`.
doCheck(posixSpawnFileActionsAddClose(actions,
cint(pipes.stdoutHandle)))
# Close parent process side of `stdoutHandle`.
if not(pipes.stdoutHolder.isEmpty()):
let fd = cint(pipes.stdoutHolder.fd())
doCheck(posixSpawnFileActionsAddClose(actions, fd))
ok(SpawnAttr(attrs: attrs, actions: actions))
proc free(v: var SpawnAttr): Result[void, OSErrorCode] =
block:
let res = posixSpawnAttrDestroy(v.attrs)
if res != 0:
discard posixSpawnFileActionsDestroy(v.actions)
return err(OSErrorCode(res))
block:
let res = posixSpawnFileActionsDestroy(v.actions)
if res != 0:
return err(OSErrorCode(res))
ok()
proc getKeyValueItem(key: string, value: string): cstring =
var p = cast[cstring](alloc(len(key) + len(value) + 1 + 1))
var offset = 0
if len(key) > 0:
copyMem(addr p[offset], unsafeAddr(key[0]), len(key))
inc(offset, len(key))
p[offset] = '='
inc(offset)
if len(value) > 0:
copyMem(addr p[offset], unsafeAddr(value[0]), len(value))
inc(offset, len(value))
p[offset] = '\x00'
p
proc envToCStringArray(t: StringTableRef): cstringArray =
let itemsCount = len(t)
var
res = cast[cstringArray](alloc((itemsCount + 1) * sizeof(cstring)))
i = 0
for key, value in pairs(t):
res[i] = getKeyValueItem(key, value)
inc(i)
res[i] = nil # Last item in CStringArray should be `nil`.
res
proc envToCStringArray(): cstringArray =
let itemsCount =
block:
var res = 0
for key, value in envPairs(): inc(res)
res
var
res = cast[cstringArray](alloc((itemsCount + 1) * sizeof(cstring)))
i = 0
for key, value in envPairs():
res[i] = getKeyValueItem(key, value)
inc(i)
res[i] = nil # Last item in CStringArray should be `nil`.
res
when defined(macosx) or defined(macos) or defined(ios):
proc getEnvironment(): ptr cstringArray {.
importc: "_NSGetEnviron", header: "<crt_externs.h>".}
else:
var globalEnv {.importc: "environ", header: "<unistd.h>".}: cstringArray
proc getProcessEnvironment*(): StringTableRef =
var res = newStringTable(modeCaseInsensitive)
let env =
when defined(macosx) or defined(macos) or defined(ios):
getEnvironment()[]
else:
globalEnv
var i = 0
while not(isNil(env[i])):
let line = $env[i]
if len(line) > 0:
let delim = line.find('=')
if delim > 0:
res[substr(line, 0, delim - 1)] = substr(line, delim + 1)
inc(i)
res
func exitStatusLikeShell(status: int): int =
if WAITIFSIGNALED(cint(status)):
# like the shell!
128 + WAITTERMSIG(cint(status))
else:
WAITEXITSTATUS(cint(status))
proc getCurrentDirectory(): AsyncProcessResult[string] =
var bufsize = 1024
var res = newString(bufsize)
proc strLength(a: string): int {.nimcall.} =
for i in 0 ..< len(a):
if a[i] == '\x00':
return i
len(a)
while true:
if osdefs.getcwd(cstring(res), bufsize) != nil:
setLen(res, strLength(res))
return ok(res)
else:
let errorCode = osLastError()
if errorCode == oserrno.ERANGE:
bufsize = bufsize shl 1
doAssert(bufsize >= 0)
res = newString(bufsize)
else:
return err(errorCode)
proc setCurrentDirectory(dir: string): AsyncProcessResult[void] =
let res = osdefs.chdir(cstring(dir))
if res == -1:
return err(osLastError())
ok()
proc closeThreadAndProcessHandle(p: AsyncProcessRef
): AsyncProcessResult[void] =
discard
proc startProcess*(command: string, workingDir: string = "",
arguments: seq[string] = @[],
environment: StringTableRef = nil,
options: set[AsyncProcessOption] = {},
stdinHandle = ProcessStreamHandle(),
stdoutHandle = ProcessStreamHandle(),
stderrHandle = ProcessStreamHandle(),
): Future[AsyncProcessRef] {.async.} =
var
pid: Pid
pipes = preparePipes(options, stdinHandle, stdoutHandle,
stderrHandle).valueOr:
raiseAsyncProcessError("Unable to initialze process pipes",
error)
sa = pipes.initSpawn(options).valueOr:
discard closeProcessHandles(pipes, options, OSErrorCode(0))
await pipes.closeProcessStreams(options)
raiseAsyncProcessError("Unable to initalize spawn attributes", 0)
let
(commandLine, commandArguments) =
if AsyncProcessOption.EvalCommand in options:
let args = @[chronosProcShell, "-c", command]
(chronosProcShell, allocCStringArray(args))
else:
var res = @[command]
for arg in arguments.items():
res.add(arg)
(command, allocCStringArray(res))
commandEnv =
if isNil(environment):
envToCStringArray()
else:
envToCStringArray(environment)
var currentError: OSErrorCode
var currentDir: string
try:
currentDir =
if len(workingDir) > 0:
# Save current working directory and change it to `workingDir`.
let cres = getCurrentDirectory()
if cres.isErr():
raiseAsyncProcessError("Unable to obtain current directory",
cres.error())
let sres = setCurrentDirectory(workingDir)
if sres.isErr():
raiseAsyncProcessError("Unable to change current directory",
sres.error())
cres.get()
else:
""
let res =
if AsyncProcessOption.UsePath in options:
posixSpawnp(pid, cstring(commandLine), sa.actions, sa.attrs,
commandArguments, commandEnv)
else:
posixSpawn(pid, cstring(commandLine), sa.actions, sa.attrs,
commandArguments, commandEnv)
if res != 0:
await pipes.closeProcessStreams(options)
currentError = closeProcessHandles(pipes, options, OSErrorCode(res))
finally:
# Restore working directory
if (len(workingDir) > 0) and (len(currentDir) > 0):
# Restore working directory.
let cres = getCurrentDirectory()
if cres.isErr():
# On error we still try to restore original working directory.
if currentError.isOk():
currentError = cres.error()
discard setCurrentDirectory(currentDir)
else:
if cres.get() != currentDir:
let sres = setCurrentDirectory(currentDir)
if sres.isErr():
if currentError.isOk():
currentError = sres.error()
# Cleanup allocated memory
deallocCStringArray(commandArguments)
deallocCStringArray(commandEnv)
# Cleanup posix_spawn attributes and file operations
if not(currentError.isOk()):
discard sa.free()
else:
let res = sa.free()
if res.isErr():
currentError = res.error()
# If currentError has been set, raising an exception.
if not(currentError.isOk()):
raiseAsyncProcessError("Unable to spawn process", currentError)
let process = AsyncProcessRef(
processId: pid,
pipes: pipes,
options: options,
flags: pipes.flags
)
trackCounter(AsyncProcessTrackerName)
return process
proc peekProcessExitCode(p: AsyncProcessRef,
reap = false): AsyncProcessResult[int] =
var wstatus: cint = 0
if p.exitStatus.isSome():
return ok(p.exitStatus.get())
let
flags = if reap: cint(0) else: osdefs.WNOHANG
waitRes =
block:
var res: cint = 0
while true:
res = osdefs.waitpid(p.processId, wstatus, flags)
if not((res == -1) and (osLastError() == oserrno.EINTR)):
break
res
if waitRes == p.processId:
if WAITIFEXITED(wstatus) or WAITIFSIGNALED(wstatus):
let status = int(wstatus)
p.exitStatus = Opt.some(status)
ok(status)
else:
ok(-1)
elif waitRes == 0:
ok(-1)
else:
err(osLastError())
proc suspend(p: AsyncProcessRef): AsyncProcessResult[void] =
if osdefs.kill(p.processId, osdefs.SIGSTOP) == 0:
ok()
else:
err(osLastError())
proc resume(p: AsyncProcessRef): AsyncProcessResult[void] =
if osdefs.kill(p.processId, osdefs.SIGCONT) == 0:
ok()
else:
err(osLastError())
proc terminate(p: AsyncProcessRef): AsyncProcessResult[void] =
if osdefs.kill(p.processId, osdefs.SIGTERM) == 0:
ok()
else:
err(osLastError())
proc kill(p: AsyncProcessRef): AsyncProcessResult[void] =
if osdefs.kill(p.processId, osdefs.SIGKILL) == 0:
ok()
else:
err(osLastError())
proc running(p: AsyncProcessRef): AsyncProcessResult[bool] =
let res = ? p.peekProcessExitCode()
if res == -1:
ok(true)
else:
ok(false)
proc waitForExit*(p: AsyncProcessRef,
timeout = InfiniteDuration): Future[int] =
var
retFuture = newFuture[int]("chronos.waitForExit()")
processHandle: ProcessHandle
timer: TimerCallback = nil
if p.exitStatus.isSome():
retFuture.complete(p.exitStatus.get())
return retFuture
if timeout == ZeroDuration:
let res = p.kill()
if res.isErr():
retFuture.fail(newException(AsyncProcessError, osErrorMsg(res.error())))
return retFuture
block:
let exitCode = p.peekProcessExitCode().valueOr:
retFuture.fail(newException(AsyncProcessError, osErrorMsg(error)))
return retFuture
if exitCode != -1:
retFuture.complete(exitStatusLikeShell(exitCode))
return retFuture
if timeout == ZeroDuration:
retFuture.complete(-1)
return retFuture
proc continuation(udata: pointer) {.gcsafe.} =
let source = cast[int](udata)
if not(retFuture.finished()):
if source == 1:
# Process exited.
let res = removeProcess2(processHandle)
if res.isErr():
retFuture.fail(newException(AsyncProcessError,
osErrorMsg(res.error())))
return
if not(isNil(timer)):
clearTimer(timer)
let exitCode = p.peekProcessExitCode().valueOr:
retFuture.fail(newException(AsyncProcessError, osErrorMsg(error)))
return
if exitCode == -1:
retFuture.complete(-1)
else:
retFuture.complete(exitStatusLikeShell(exitCode))
else:
# Timeout exceeded.
let res = p.kill()
if res.isErr():
retFuture.fail(newException(AsyncProcessError,
osErrorMsg(res.error())))
proc cancellation(udata: pointer) {.gcsafe.} =
if not(retFuture.finished()):
if not(isNil(timer)):
clearTimer(timer)
# Ignore any errors because of cancellation.
discard removeProcess2(processHandle)
if timeout != InfiniteDuration:
timer = setTimer(Moment.fromNow(timeout), continuation, cast[pointer](2))
processHandle = addProcess2(int(p.processId), continuation,
cast[pointer](1)).valueOr:
if error == oserrno.ESRCH:
# "zombie death race" problem.
# If process exited right after `waitpid()` - `kqueue` call
# could return ESRCH error. So we need to handle it properly and
# try to reap process code from exiting process.
let exitCode = p.peekProcessExitCode(true).valueOr:
retFuture.fail(newException(AsyncProcessError, osErrorMsg(error)))
return retFuture
if exitCode == -1:
# This should not be happens one more time, so we just report
# original error.
retFuture.fail(newException(AsyncProcessError,
osErrorMsg(oserrno.ESRCH)))
else:
retFuture.complete(exitStatusLikeShell(exitCode))
else:
retFuture.fail(newException(AsyncProcessError, osErrorMsg(error)))
return retFuture
# addProcess2() has race condition problem inside. Its possible that child
# process (we going to wait) sends SIGCHLD right after addProcess2() blocks
# signals and before it starts monitoring for signal (`signalfd` or
# `kqueue`). To avoid this problem we going to check process for completion
# one more time.
block:
let exitCode = p.peekProcessExitCode().valueOr:
discard removeProcess2(processHandle)
retFuture.fail(newException(AsyncProcessError, osErrorMsg(error)))
return retFuture
if exitCode != -1:
discard removeProcess2(processHandle)
retFuture.complete(exitStatusLikeShell(exitCode))
return retFuture
# Process is still running, so we going to wait for SIGCHLD.
retFuture.cancelCallback = cancellation
return retFuture
proc peekExitCode(p: AsyncProcessRef): AsyncProcessResult[int] =
let res = ? p.peekProcessExitCode()
ok(exitStatusLikeShell(res))
proc createPipe(kind: StandardKind
): Result[tuple[read: AsyncFD, write: AsyncFD], OSErrorCode] =
case kind
of StandardKind.Stdin:
let pipes =
when defined(windows):
let
readFlags: set[DescriptorFlag] = {DescriptorFlag.NonBlock}
writeFlags: set[DescriptorFlag] = {DescriptorFlag.NonBlock}
? createOsPipe(readFlags, writeFlags)
else:
let
readFlags: set[DescriptorFlag] = {}
writeFlags: set[DescriptorFlag] = {DescriptorFlag.NonBlock}
? createOsPipe(readFlags, writeFlags)
ok((read: AsyncFD(pipes.read), write: AsyncFD(pipes.write)))
of StandardKind.Stdout, StandardKind.Stderr:
let pipes =
when defined(windows):
let
readFlags: set[DescriptorFlag] = {DescriptorFlag.NonBlock}
writeFlags: set[DescriptorFlag] = {DescriptorFlag.NonBlock}
? createOsPipe(readFlags, writeFlags)
else:
let
readFlags: set[DescriptorFlag] = {DescriptorFlag.NonBlock}
writeFlags: set[DescriptorFlag] = {}
? createOsPipe(readFlags, writeFlags)
ok((read: AsyncFD(pipes.read), write: AsyncFD(pipes.write)))
proc preparePipes(options: set[AsyncProcessOption],
stdinHandle, stdoutHandle,
stderrHandle: ProcessStreamHandle
): AsyncProcessResult[AsyncProcessPipes] =
let
(stdinFlags, localStdin, remoteStdin) =
case stdinHandle.kind
of ProcessStreamHandleKind.None:
({ProcessFlag.NoStdin}, AsyncStreamHolder.init(),
asyncInvalidPipe)
of ProcessStreamHandleKind.Auto:
let (pipeIn, pipeOut) = ? createPipe(StandardKind.Stdin)
let holder = ? AsyncStreamHolder.init(
ProcessStreamHandle.init(pipeOut), StreamKind.Writer, {})
({ProcessFlag.AutoStdin}, holder, pipeIn)
else:
({ProcessFlag.UserStdin},
AsyncStreamHolder.init(), AsyncFD.init(stdinHandle))
(stdoutFlags, localStdout, remoteStdout) =
case stdoutHandle.kind
of ProcessStreamHandleKind.None:
({ProcessFlag.NoStdout}, AsyncStreamHolder.init(),
asyncInvalidPipe)
of ProcessStreamHandleKind.Auto:
let (pipeIn, pipeOut) = ? createPipe(StandardKind.Stdout)
let holder = ? AsyncStreamHolder.init(
ProcessStreamHandle.init(pipeIn), StreamKind.Reader, {})
({ProcessFlag.AutoStdout}, holder, pipeOut)
else:
({ProcessFlag.UserStdout},
AsyncStreamHolder.init(), AsyncFD.init(stdoutHandle))
(stderrFlags, localStderr, remoteStderr) =
if AsyncProcessOption.StdErrToStdOut in options:
doAssert(stderrHandle.isEmpty(),
"`stderrHandle` argument must not be set, when" &
"`AsyncProcessOption.StdErrToStdOut` flag is used")
case stdoutHandle.kind
of ProcessStreamHandleKind.None:
raiseAssert "`stdoutHandle` argument must be present, when " &
"`AsyncProcessOption.StdErrToStdOut` flag is used"
of ProcessStreamHandleKind.Auto:
({ProcessFlag.CopyStdout}, localStdout, remoteStdout)
else:
({ProcessFlag.CopyStdout}, localStdout, remoteStdout)
else:
case stderrHandle.kind
of ProcessStreamHandleKind.None:
({ProcessFlag.NoStderr}, AsyncStreamHolder.init(),
asyncInvalidPipe)
of ProcessStreamHandleKind.Auto:
let (pipeIn, pipeOut) = ? createPipe(StandardKind.Stderr)
let holder = ? AsyncStreamHolder.init(
ProcessStreamHandle.init(pipeIn), StreamKind.Reader, {})
({ProcessFlag.AutoStderr}, holder, pipeOut)
else:
({ProcessFlag.UserStderr},
AsyncStreamHolder.init(), AsyncFD.init(stderrHandle))
ok(AsyncProcessPipes(
flags: stdinFlags + stdoutFlags + stderrFlags,
stdinHolder: localStdin,
stdoutHolder: localStdout,
stderrHolder: localStderr,
stdinHandle: remoteStdin,
stdoutHandle: remoteStdout,
stderrHandle: remoteStderr
))
proc closeWait(holder: AsyncStreamHolder) {.async.} =
let (future, transp) =
case holder.kind
of StreamKind.None:
(nil, nil)
of StreamKind.Reader:
if StreamHolderFlag.Stream in holder.flags:
(holder.reader.closeWait(), holder.reader.tsource)
else:
(nil, holder.reader.tsource)
of StreamKind.Writer:
if StreamHolderFlag.Stream in holder.flags:
(holder.writer.closeWait(), holder.writer.tsource)
else:
(nil, holder.writer.tsource)
let pending =
block:
var res: seq[Future[void]]
if not(isNil(future)):
res.add(future)
if not(isNil(transp)):
if StreamHolderFlag.Transport in holder.flags:
res.add(transp.closeWait())
res
if len(pending) > 0:
await allFutures(pending)
proc closeProcessStreams(pipes: AsyncProcessPipes,
options: set[AsyncProcessOption]): Future[void] =
let pending =
block:
var res: seq[Future[void]]
if ProcessFlag.AutoStdin in pipes.flags:
res.add(pipes.stdinHolder.closeWait())
if ProcessFlag.AutoStdout in pipes.flags:
res.add(pipes.stdoutHolder.closeWait())
if ProcessFlag.AutoStderr in pipes.flags:
res.add(pipes.stderrHolder.closeWait())
res
allFutures(pending)
proc opAndWaitForExit(p: AsyncProcessRef, op: WaitOperation,
timeout = InfiniteDuration): Future[int] {.async.} =
let timerFut =
if timeout == InfiniteDuration:
newFuture[void]("chronos.killAndwaitForExit")
else:
sleepAsync(timeout)
while true:
if p.running().get(true):
# We ignore operation errors because we going to repeat calling
# operation until process will not exit.
case op
of WaitOperation.Kill:
discard p.kill()
of WaitOperation.Terminate:
discard p.terminate()
else:
let exitCode = p.peekExitCode().valueOr:
raiseAsyncProcessError("Unable to peek process exit code", error)
if not(timerFut.finished()):
await cancelAndWait(timerFut)
return exitCode
let waitFut = p.waitForExit().wait(100.milliseconds)
discard await race(FutureBase(waitFut), FutureBase(timerFut))
if waitFut.finished() and not(waitFut.failed()):
let res = p.peekExitCode()
if res.isOk():
if not(timerFut.finished()):
await cancelAndWait(timerFut)
return res.get()
if timerFut.finished():
if not(waitFut.finished()):
await waitFut.cancelAndWait()
raiseAsyncProcessTimeoutError()
proc closeWait*(p: AsyncProcessRef) {.async.} =
# Here we ignore all possible errrors, because we do not want to raise
# exceptions.
discard closeProcessHandles(p.pipes, p.options, OSErrorCode(0))
await noCancel(p.pipes.closeProcessStreams(p.options))
discard p.closeThreadAndProcessHandle()
untrackCounter(AsyncProcessTrackerName)
proc stdinStream*(p: AsyncProcessRef): AsyncStreamWriter =
doAssert(p.pipes.stdinHolder.kind == StreamKind.Writer,
"StdinStreamWriter is not available")
p.pipes.stdinHolder.writer
proc stdoutStream*(p: AsyncProcessRef): AsyncStreamReader =
doAssert(p.pipes.stdoutHolder.kind == StreamKind.Reader,
"StdoutStreamReader is not available")
p.pipes.stdoutHolder.reader
proc stderrStream*(p: AsyncProcessRef): AsyncStreamReader =
doAssert(p.pipes.stderrHolder.kind == StreamKind.Reader,
"StderrStreamReader is not available")
p.pipes.stderrHolder.reader
proc execCommand*(command: string,
options = {AsyncProcessOption.EvalCommand},
timeout = InfiniteDuration
): Future[int] {.async.} =
let
poptions = options + {AsyncProcessOption.EvalCommand}
process = await startProcess(command, options = poptions)
res =
try:
await process.waitForExit(timeout)
finally:
await process.closeWait()
res
proc execCommandEx*(command: string,
options = {AsyncProcessOption.EvalCommand},
timeout = InfiniteDuration
): Future[CommandExResponse] {.async.} =
let
process = await startProcess(command, options = options,
stdoutHandle = AsyncProcess.Pipe,
stderrHandle = AsyncProcess.Pipe)
outputReader = process.stdoutStream.read()
errorReader = process.stderrStream.read()
res =
try:
await allFutures(outputReader, errorReader)
let
status = await process.waitForExit(timeout)
output =
try:
string.fromBytes(outputReader.read())
except AsyncStreamError as exc:
raiseAsyncProcessError("Unable to read process' stdout channel",
exc)
error =
try:
string.fromBytes(errorReader.read())
except AsyncStreamError as exc:
raiseAsyncProcessError("Unable to read process' stderr channel",
exc)
CommandExResponse(status: status, stdOutput: output, stdError: error)
finally:
await process.closeWait()
res
proc pid*(p: AsyncProcessRef): int =
## Returns process ``p`` identifier.
int(p.processId)
template processId*(p: AsyncProcessRef): int = pid(p)
proc killAndWaitForExit*(p: AsyncProcessRef,
timeout = InfiniteDuration): Future[int] =
## Perform continuous attempts to kill the ``p`` process for specified period
## of time ``timeout``.
##
## On Posix systems, killing means sending ``SIGKILL`` to the process ``p``,
## On Windows, it uses ``TerminateProcess`` to kill the process ``p``.
##
## If the process ``p`` fails to be killed within the ``timeout`` time, it
## will raise ``AsyncProcessTimeoutError``.
##
## In case of error this it will raise ``AsyncProcessError``.
##
## Returns process ``p`` exit code.
opAndWaitForExit(p, WaitOperation.Kill, timeout)
proc terminateAndWaitForExit*(p: AsyncProcessRef,
timeout = InfiniteDuration): Future[int] =
## Perform continuous attempts to terminate the ``p`` process for specified
## period of time ``timeout``.
##
## On Posix systems, terminating means sending ``SIGTERM`` to the process
## ``p``, on Windows, it uses ``TerminateProcess`` to terminate the process
## ``p``.
##
## If the process ``p`` fails to be terminated within the ``timeout`` time, it
## will raise ``AsyncProcessTimeoutError``.
##
## In case of error this it will raise ``AsyncProcessError``.
##
## Returns process ``p`` exit code.
opAndWaitForExit(p, WaitOperation.Terminate, timeout)