mirror of
https://github.com/status-im/nim-chronos.git
synced 2025-01-09 11:02:40 +00:00
e296ae30c8
* asyncraises for threadsync * missing bracket * missing exception
421 lines
13 KiB
Nim
421 lines
13 KiB
Nim
#
|
|
# Chronos multithreaded synchronization primitives
|
|
#
|
|
# (c) Copyright 2023-Present Status Research & Development GmbH
|
|
#
|
|
# Licensed under either of
|
|
# Apache License, version 2.0, (LICENSE-APACHEv2)
|
|
# MIT license (LICENSE-MIT)
|
|
|
|
## This module implements some core async thread synchronization primitives.
|
|
import results
|
|
import "."/[timer, asyncloop]
|
|
|
|
export results
|
|
|
|
{.push raises: [].}
|
|
|
|
const hasThreadSupport* = compileOption("threads")
|
|
when not(hasThreadSupport):
|
|
{.fatal: "Compile this program with threads enabled!".}
|
|
|
|
import "."/[osdefs, osutils, oserrno]
|
|
|
|
type
|
|
ThreadSignal* = object
|
|
when defined(windows):
|
|
event: HANDLE
|
|
elif defined(linux):
|
|
efd: AsyncFD
|
|
else:
|
|
rfd, wfd: AsyncFD
|
|
|
|
ThreadSignalPtr* = ptr ThreadSignal
|
|
|
|
proc new*(t: typedesc[ThreadSignalPtr]): Result[ThreadSignalPtr, string] =
|
|
## Create new ThreadSignal object.
|
|
let res = cast[ptr ThreadSignal](allocShared0(sizeof(ThreadSignal)))
|
|
when defined(windows):
|
|
var sa = getSecurityAttributes()
|
|
let event = osdefs.createEvent(addr sa, DWORD(0), DWORD(0), nil)
|
|
if event == HANDLE(0):
|
|
deallocShared(res)
|
|
return err(osErrorMsg(osLastError()))
|
|
res[] = ThreadSignal(event: event)
|
|
elif defined(linux):
|
|
let efd = eventfd(0, EFD_CLOEXEC or EFD_NONBLOCK)
|
|
if efd == -1:
|
|
deallocShared(res)
|
|
return err(osErrorMsg(osLastError()))
|
|
res[] = ThreadSignal(efd: AsyncFD(efd))
|
|
else:
|
|
var sockets: array[2, cint]
|
|
block:
|
|
let sres = socketpair(AF_UNIX, SOCK_DGRAM, 0, sockets)
|
|
if sres < 0:
|
|
deallocShared(res)
|
|
return err(osErrorMsg(osLastError()))
|
|
# MacOS do not have SOCK_NONBLOCK and SOCK_CLOEXEC, so we forced to use
|
|
# setDescriptorFlags() for every socket.
|
|
block:
|
|
let sres = setDescriptorFlags(sockets[0], true, true)
|
|
if sres.isErr():
|
|
discard closeFd(sockets[0])
|
|
discard closeFd(sockets[1])
|
|
deallocShared(res)
|
|
return err(osErrorMsg(sres.error))
|
|
block:
|
|
let sres = setDescriptorFlags(sockets[1], true, true)
|
|
if sres.isErr():
|
|
discard closeFd(sockets[0])
|
|
discard closeFd(sockets[1])
|
|
deallocShared(res)
|
|
return err(osErrorMsg(sres.error))
|
|
res[] = ThreadSignal(rfd: AsyncFD(sockets[0]), wfd: AsyncFD(sockets[1]))
|
|
ok(ThreadSignalPtr(res))
|
|
|
|
when not(defined(windows)):
|
|
type
|
|
WaitKind {.pure.} = enum
|
|
Read, Write
|
|
|
|
when defined(linux):
|
|
proc checkBusy(fd: cint): bool = false
|
|
else:
|
|
proc checkBusy(fd: cint): bool =
|
|
var data = 0'u64
|
|
let res = handleEintr(recv(SocketHandle(fd),
|
|
addr data, sizeof(uint64), MSG_PEEK))
|
|
if res == sizeof(uint64):
|
|
true
|
|
else:
|
|
false
|
|
|
|
func toTimeval(a: Duration): Timeval =
|
|
## Convert Duration ``a`` to ``Timeval`` object.
|
|
let nanos = a.nanoseconds
|
|
let m = nanos mod Second.nanoseconds()
|
|
Timeval(
|
|
tv_sec: Time(nanos div Second.nanoseconds()),
|
|
tv_usec: Suseconds(m div Microsecond.nanoseconds())
|
|
)
|
|
|
|
proc waitReady(fd: cint, kind: WaitKind,
|
|
timeout: Duration): Result[bool, OSErrorCode] =
|
|
var
|
|
tv: Timeval
|
|
fdset =
|
|
block:
|
|
var res: TFdSet
|
|
FD_ZERO(res)
|
|
FD_SET(SocketHandle(fd), res)
|
|
res
|
|
|
|
let
|
|
ptv =
|
|
if not(timeout.isInfinite()):
|
|
tv = timeout.toTimeval()
|
|
addr tv
|
|
else:
|
|
nil
|
|
nfd = cint(fd) + 1
|
|
res =
|
|
case kind
|
|
of WaitKind.Read:
|
|
handleEintr(select(nfd, addr fdset, nil, nil, ptv))
|
|
of WaitKind.Write:
|
|
handleEintr(select(nfd, nil, addr fdset, nil, ptv))
|
|
|
|
if res > 0:
|
|
ok(true)
|
|
elif res == 0:
|
|
ok(false)
|
|
else:
|
|
err(osLastError())
|
|
|
|
proc safeUnregisterAndCloseFd(fd: AsyncFD): Result[void, OSErrorCode] =
|
|
let loop = getThreadDispatcher()
|
|
if loop.contains(fd):
|
|
? unregister2(fd)
|
|
if closeFd(cint(fd)) != 0:
|
|
err(osLastError())
|
|
else:
|
|
ok()
|
|
|
|
proc close*(signal: ThreadSignalPtr): Result[void, string] =
|
|
## Close ThreadSignal object and free all the resources.
|
|
defer: deallocShared(signal)
|
|
when defined(windows):
|
|
# We do not need to perform unregistering on Windows, we can only close it.
|
|
if closeHandle(signal[].event) == 0'u32:
|
|
return err(osErrorMsg(osLastError()))
|
|
elif defined(linux):
|
|
let res = safeUnregisterAndCloseFd(signal[].efd)
|
|
if res.isErr():
|
|
return err(osErrorMsg(res.error))
|
|
else:
|
|
let res1 = safeUnregisterAndCloseFd(signal[].rfd)
|
|
let res2 = safeUnregisterAndCloseFd(signal[].wfd)
|
|
if res1.isErr(): return err(osErrorMsg(res1.error))
|
|
if res2.isErr(): return err(osErrorMsg(res2.error))
|
|
ok()
|
|
|
|
proc fireSync*(signal: ThreadSignalPtr,
|
|
timeout = InfiniteDuration): Result[bool, string] =
|
|
## Set state of ``signal`` to signaled state in blocking way.
|
|
##
|
|
## Returns ``false`` if signal was not signalled in time, and ``true``
|
|
## if operation was successful.
|
|
when defined(windows):
|
|
if setEvent(signal[].event) == 0'u32:
|
|
return err(osErrorMsg(osLastError()))
|
|
ok(true)
|
|
else:
|
|
let
|
|
eventFd =
|
|
when defined(linux):
|
|
cint(signal[].efd)
|
|
else:
|
|
cint(signal[].wfd)
|
|
checkFd =
|
|
when defined(linux):
|
|
cint(signal[].efd)
|
|
else:
|
|
cint(signal[].rfd)
|
|
|
|
if checkBusy(checkFd):
|
|
# Signal is already in signalled state
|
|
return ok(true)
|
|
|
|
var data = 1'u64
|
|
while true:
|
|
let res =
|
|
when defined(linux):
|
|
handleEintr(write(eventFd, addr data, sizeof(uint64)))
|
|
else:
|
|
handleEintr(send(SocketHandle(eventFd), addr data, sizeof(uint64),
|
|
MSG_NOSIGNAL))
|
|
if res < 0:
|
|
let errorCode = osLastError()
|
|
case errorCode
|
|
of EAGAIN:
|
|
let wres = waitReady(eventFd, WaitKind.Write, timeout)
|
|
if wres.isErr():
|
|
return err(osErrorMsg(wres.error))
|
|
if not(wres.get()):
|
|
return ok(false)
|
|
else:
|
|
return err(osErrorMsg(errorCode))
|
|
elif res != sizeof(data):
|
|
return err(osErrorMsg(EINVAL))
|
|
else:
|
|
return ok(true)
|
|
|
|
proc waitSync*(signal: ThreadSignalPtr,
|
|
timeout = InfiniteDuration): Result[bool, string] =
|
|
## Wait until the signal become signaled. This procedure is ``NOT`` async,
|
|
## so it blocks execution flow, but this procedure do not need asynchronous
|
|
## event loop to be present.
|
|
when defined(windows):
|
|
let
|
|
timeoutWin =
|
|
if timeout.isInfinite():
|
|
INFINITE
|
|
else:
|
|
DWORD(timeout.milliseconds())
|
|
handle = signal[].event
|
|
res = waitForSingleObject(handle, timeoutWin)
|
|
if res == WAIT_OBJECT_0:
|
|
ok(true)
|
|
elif res == WAIT_TIMEOUT:
|
|
ok(false)
|
|
elif res == WAIT_ABANDONED:
|
|
err("The wait operation has been abandoned")
|
|
else:
|
|
err("The wait operation has been failed")
|
|
else:
|
|
let eventFd =
|
|
when defined(linux):
|
|
cint(signal[].efd)
|
|
else:
|
|
cint(signal[].rfd)
|
|
var
|
|
data = 0'u64
|
|
timer = timeout
|
|
while true:
|
|
let wres =
|
|
block:
|
|
let
|
|
start = Moment.now()
|
|
res = waitReady(eventFd, WaitKind.Read, timer)
|
|
timer = timer - (Moment.now() - start)
|
|
res
|
|
if wres.isErr():
|
|
return err(osErrorMsg(wres.error))
|
|
if not(wres.get()):
|
|
return ok(false)
|
|
let res =
|
|
when defined(linux):
|
|
handleEintr(read(eventFd, addr data, sizeof(uint64)))
|
|
else:
|
|
handleEintr(recv(SocketHandle(eventFd), addr data, sizeof(uint64),
|
|
cint(0)))
|
|
if res < 0:
|
|
let errorCode = osLastError()
|
|
# If errorCode == EAGAIN it means that reading operation is already
|
|
# pending and so some other consumer reading eventfd or pipe end, in
|
|
# this case we going to ignore error and wait for another event.
|
|
if errorCode != EAGAIN:
|
|
return err(osErrorMsg(errorCode))
|
|
elif res != sizeof(data):
|
|
return err(osErrorMsg(EINVAL))
|
|
else:
|
|
return ok(true)
|
|
|
|
proc fire*(signal: ThreadSignalPtr): Future[void] {.
|
|
async: (raises: [AsyncError, CancelledError], raw: true).} =
|
|
## Set state of ``signal`` to signaled in asynchronous way.
|
|
var retFuture = newFuture[void]("asyncthreadsignal.fire")
|
|
when defined(windows):
|
|
if setEvent(signal[].event) == 0'u32:
|
|
retFuture.fail(newException(AsyncError, osErrorMsg(osLastError())))
|
|
else:
|
|
retFuture.complete()
|
|
else:
|
|
var data = 1'u64
|
|
let
|
|
eventFd =
|
|
when defined(linux):
|
|
cint(signal[].efd)
|
|
else:
|
|
cint(signal[].wfd)
|
|
checkFd =
|
|
when defined(linux):
|
|
cint(signal[].efd)
|
|
else:
|
|
cint(signal[].rfd)
|
|
|
|
proc continuation(udata: pointer) {.gcsafe, raises: [].} =
|
|
if not(retFuture.finished()):
|
|
let res =
|
|
when defined(linux):
|
|
handleEintr(write(eventFd, addr data, sizeof(uint64)))
|
|
else:
|
|
handleEintr(send(SocketHandle(eventFd), addr data, sizeof(uint64),
|
|
MSG_NOSIGNAL))
|
|
if res < 0:
|
|
let errorCode = osLastError()
|
|
discard removeWriter2(AsyncFD(eventFd))
|
|
retFuture.fail(newException(AsyncError, osErrorMsg(errorCode)))
|
|
elif res != sizeof(data):
|
|
discard removeWriter2(AsyncFD(eventFd))
|
|
retFuture.fail(newException(AsyncError, osErrorMsg(EINVAL)))
|
|
else:
|
|
let eres = removeWriter2(AsyncFD(eventFd))
|
|
if eres.isErr():
|
|
retFuture.fail(newException(AsyncError, osErrorMsg(eres.error)))
|
|
else:
|
|
retFuture.complete()
|
|
|
|
proc cancellation(udata: pointer) {.gcsafe, raises: [].} =
|
|
if not(retFuture.finished()):
|
|
discard removeWriter2(AsyncFD(eventFd))
|
|
|
|
if checkBusy(checkFd):
|
|
# Signal is already in signalled state
|
|
retFuture.complete()
|
|
return retFuture
|
|
|
|
let res =
|
|
when defined(linux):
|
|
handleEintr(write(eventFd, addr data, sizeof(uint64)))
|
|
else:
|
|
handleEintr(send(SocketHandle(eventFd), addr data, sizeof(uint64),
|
|
MSG_NOSIGNAL))
|
|
if res < 0:
|
|
let errorCode = osLastError()
|
|
case errorCode
|
|
of EAGAIN:
|
|
let loop = getThreadDispatcher()
|
|
if not(loop.contains(AsyncFD(eventFd))):
|
|
let rres = register2(AsyncFD(eventFd))
|
|
if rres.isErr():
|
|
retFuture.fail(newException(AsyncError, osErrorMsg(rres.error)))
|
|
return retFuture
|
|
let wres = addWriter2(AsyncFD(eventFd), continuation)
|
|
if wres.isErr():
|
|
retFuture.fail(newException(AsyncError, osErrorMsg(wres.error)))
|
|
else:
|
|
retFuture.cancelCallback = cancellation
|
|
else:
|
|
retFuture.fail(newException(AsyncError, osErrorMsg(errorCode)))
|
|
elif res != sizeof(data):
|
|
retFuture.fail(newException(AsyncError, osErrorMsg(EINVAL)))
|
|
else:
|
|
retFuture.complete()
|
|
|
|
retFuture
|
|
|
|
when defined(windows):
|
|
proc wait*(signal: ThreadSignalPtr) {.
|
|
async: (raises: [AsyncError, CancelledError]).} =
|
|
let handle = signal[].event
|
|
let res = await waitForSingleObject(handle, InfiniteDuration)
|
|
# There should be no other response, because we use `InfiniteDuration`.
|
|
doAssert(res == WaitableResult.Ok)
|
|
else:
|
|
proc wait*(signal: ThreadSignalPtr): Future[void] {.
|
|
async: (raises: [AsyncError, CancelledError], raw: true).} =
|
|
let retFuture = Future[void].Raising([AsyncError, CancelledError]).init(
|
|
"asyncthreadsignal.wait")
|
|
var data = 1'u64
|
|
let eventFd =
|
|
when defined(linux):
|
|
cint(signal[].efd)
|
|
else:
|
|
cint(signal[].rfd)
|
|
|
|
proc continuation(udata: pointer) {.gcsafe, raises: [].} =
|
|
if not(retFuture.finished()):
|
|
let res =
|
|
when defined(linux):
|
|
handleEintr(read(eventFd, addr data, sizeof(uint64)))
|
|
else:
|
|
handleEintr(recv(SocketHandle(eventFd), addr data, sizeof(uint64),
|
|
cint(0)))
|
|
if res < 0:
|
|
let errorCode = osLastError()
|
|
# If errorCode == EAGAIN it means that reading operation is already
|
|
# pending and so some other consumer reading eventfd or pipe end, in
|
|
# this case we going to ignore error and wait for another event.
|
|
if errorCode != EAGAIN:
|
|
discard removeReader2(AsyncFD(eventFd))
|
|
retFuture.fail(newException(AsyncError, osErrorMsg(errorCode)))
|
|
elif res != sizeof(data):
|
|
discard removeReader2(AsyncFD(eventFd))
|
|
retFuture.fail(newException(AsyncError, osErrorMsg(EINVAL)))
|
|
else:
|
|
let eres = removeReader2(AsyncFD(eventFd))
|
|
if eres.isErr():
|
|
retFuture.fail(newException(AsyncError, osErrorMsg(eres.error)))
|
|
else:
|
|
retFuture.complete()
|
|
|
|
proc cancellation(udata: pointer) {.gcsafe, raises: [].} =
|
|
if not(retFuture.finished()):
|
|
# Future is already cancelled so we ignore errors.
|
|
discard removeReader2(AsyncFD(eventFd))
|
|
|
|
let loop = getThreadDispatcher()
|
|
if not(loop.contains(AsyncFD(eventFd))):
|
|
let res = register2(AsyncFD(eventFd))
|
|
if res.isErr():
|
|
retFuture.fail(newException(AsyncError, osErrorMsg(res.error)))
|
|
return retFuture
|
|
let res = addReader2(AsyncFD(eventFd), continuation)
|
|
if res.isErr():
|
|
retFuture.fail(newException(AsyncError, osErrorMsg(res.error)))
|
|
return retFuture
|
|
retFuture.cancelCallback = cancellation
|
|
retFuture
|