Asynchronous thread notification mechanism. (#406)

* Initial commit.

* Some fixes.

* More fixes.

* Add first test.

* Further fixes for MacOS/BSD.

* Fixes for Linux.

* Add proper tests.

* Lower number of tests.

* Add threadsync tests to test suite.

* There is no need to run tests when threads are off.

* Address review comments.
Fix the issue with multiple signals.
Add tests.

* Switch to use socketpair() instead of pipes.
Fix semaphoring issue on MacOS/BSD.
Add tests.

* Add single threaded fire/wait tests.
This commit is contained in:
Eugene Kabanov 2023-07-21 15:51:36 +03:00 committed by GitHub
parent e04c042e8a
commit 0277b65be2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 834 additions and 24 deletions

View File

@ -879,13 +879,17 @@ elif defined(macos) or defined(macosx):
setrlimit, getpid, pthread_sigmask, sigprocmask, setrlimit, getpid, pthread_sigmask, sigprocmask,
sigemptyset, sigaddset, sigismember, fcntl, accept, sigemptyset, sigaddset, sigismember, fcntl, accept,
pipe, write, signal, read, setsockopt, getsockopt, pipe, write, signal, read, setsockopt, getsockopt,
getcwd, chdir, waitpid, kill, getcwd, chdir, waitpid, kill, select, pselect,
socketpair,
Timeval, Timespec, Pid, Mode, Time, Sigset, SockAddr, Timeval, Timespec, Pid, Mode, Time, Sigset, SockAddr,
SockLen, Sockaddr_storage, Sockaddr_in, Sockaddr_in6, SockLen, Sockaddr_storage, Sockaddr_in, Sockaddr_in6,
Sockaddr_un, SocketHandle, AddrInfo, RLimit, Sockaddr_un, SocketHandle, AddrInfo, RLimit, TFdSet,
Suseconds,
FD_CLR, FD_ISSET, FD_SET, FD_ZERO,
F_GETFL, F_SETFL, F_GETFD, F_SETFD, FD_CLOEXEC, F_GETFL, F_SETFL, F_GETFD, F_SETFD, FD_CLOEXEC,
O_NONBLOCK, SOL_SOCKET, SOCK_RAW, MSG_NOSIGNAL, O_NONBLOCK, SOL_SOCKET, SOCK_RAW, SOCK_DGRAM,
AF_INET, AF_INET6, SO_ERROR, SO_REUSEADDR, SOCK_STREAM, MSG_NOSIGNAL, MSG_PEEK,
AF_INET, AF_INET6, AF_UNIX, SO_ERROR, SO_REUSEADDR,
SO_REUSEPORT, SO_BROADCAST, IPPROTO_IP, SO_REUSEPORT, SO_BROADCAST, IPPROTO_IP,
IPV6_MULTICAST_HOPS, SOCK_DGRAM, RLIMIT_NOFILE, IPV6_MULTICAST_HOPS, SOCK_DGRAM, RLIMIT_NOFILE,
SIG_BLOCK, SIG_UNBLOCK, SHUT_RD, SHUT_WR, SHUT_RDWR, SIG_BLOCK, SIG_UNBLOCK, SHUT_RD, SHUT_WR, SHUT_RDWR,
@ -900,13 +904,17 @@ elif defined(macos) or defined(macosx):
setrlimit, getpid, pthread_sigmask, sigprocmask, setrlimit, getpid, pthread_sigmask, sigprocmask,
sigemptyset, sigaddset, sigismember, fcntl, accept, sigemptyset, sigaddset, sigismember, fcntl, accept,
pipe, write, signal, read, setsockopt, getsockopt, pipe, write, signal, read, setsockopt, getsockopt,
getcwd, chdir, waitpid, kill, getcwd, chdir, waitpid, kill, select, pselect,
socketpair,
Timeval, Timespec, Pid, Mode, Time, Sigset, SockAddr, Timeval, Timespec, Pid, Mode, Time, Sigset, SockAddr,
SockLen, Sockaddr_storage, Sockaddr_in, Sockaddr_in6, SockLen, Sockaddr_storage, Sockaddr_in, Sockaddr_in6,
Sockaddr_un, SocketHandle, AddrInfo, RLimit, Sockaddr_un, SocketHandle, AddrInfo, RLimit, TFdSet,
Suseconds,
FD_CLR, FD_ISSET, FD_SET, FD_ZERO,
F_GETFL, F_SETFL, F_GETFD, F_SETFD, FD_CLOEXEC, F_GETFL, F_SETFL, F_GETFD, F_SETFD, FD_CLOEXEC,
O_NONBLOCK, SOL_SOCKET, SOCK_RAW, MSG_NOSIGNAL, O_NONBLOCK, SOL_SOCKET, SOCK_RAW, SOCK_DGRAM,
AF_INET, AF_INET6, SO_ERROR, SO_REUSEADDR, SOCK_STREAM, MSG_NOSIGNAL, MSG_PEEK,
AF_INET, AF_INET6, AF_UNIX, SO_ERROR, SO_REUSEADDR,
SO_REUSEPORT, SO_BROADCAST, IPPROTO_IP, SO_REUSEPORT, SO_BROADCAST, IPPROTO_IP,
IPV6_MULTICAST_HOPS, SOCK_DGRAM, RLIMIT_NOFILE, IPV6_MULTICAST_HOPS, SOCK_DGRAM, RLIMIT_NOFILE,
SIG_BLOCK, SIG_UNBLOCK, SHUT_RD, SHUT_WR, SHUT_RDWR, SIG_BLOCK, SIG_UNBLOCK, SHUT_RD, SHUT_WR, SHUT_RDWR,
@ -938,17 +946,21 @@ elif defined(linux):
recvfrom, sendto, send, bindSocket, recv, connect, recvfrom, sendto, send, bindSocket, recv, connect,
unlink, listen, sendmsg, recvmsg, getpid, fcntl, unlink, listen, sendmsg, recvmsg, getpid, fcntl,
pthread_sigmask, sigprocmask, clock_gettime, signal, pthread_sigmask, sigprocmask, clock_gettime, signal,
getcwd, chdir, waitpid, kill, getcwd, chdir, waitpid, kill, select, pselect,
socketpair,
ClockId, Itimerspec, Timespec, Sigset, Time, Pid, Mode, ClockId, Itimerspec, Timespec, Sigset, Time, Pid, Mode,
SigInfo, Id, Tmsghdr, IOVec, RLimit, SigInfo, Id, Tmsghdr, IOVec, RLimit, Timeval, TFdSet,
SockAddr, SockLen, Sockaddr_storage, Sockaddr_in, SockAddr, SockLen, Sockaddr_storage, Sockaddr_in,
Sockaddr_in6, Sockaddr_un, AddrInfo, SocketHandle, Sockaddr_in6, Sockaddr_un, AddrInfo, SocketHandle,
Suseconds,
FD_CLR, FD_ISSET, FD_SET, FD_ZERO,
CLOCK_MONOTONIC, F_GETFL, F_SETFL, F_GETFD, F_SETFD, CLOCK_MONOTONIC, F_GETFL, F_SETFL, F_GETFD, F_SETFD,
FD_CLOEXEC, O_NONBLOCK, SIG_BLOCK, SIG_UNBLOCK, FD_CLOEXEC, O_NONBLOCK, SIG_BLOCK, SIG_UNBLOCK,
SOL_SOCKET, SO_ERROR, RLIMIT_NOFILE, MSG_NOSIGNAL, SOL_SOCKET, SO_ERROR, RLIMIT_NOFILE, MSG_NOSIGNAL,
AF_INET, AF_INET6, SO_REUSEADDR, SO_REUSEPORT, MSG_PEEK,
AF_INET, AF_INET6, AF_UNIX, SO_REUSEADDR, SO_REUSEPORT,
SO_BROADCAST, IPPROTO_IP, IPV6_MULTICAST_HOPS, SO_BROADCAST, IPPROTO_IP, IPV6_MULTICAST_HOPS,
SOCK_DGRAM, SHUT_RD, SHUT_WR, SHUT_RDWR, SOCK_DGRAM, SOCK_STREAM, SHUT_RD, SHUT_WR, SHUT_RDWR,
SIGHUP, SIGINT, SIGQUIT, SIGILL, SIGTRAP, SIGABRT, SIGHUP, SIGINT, SIGQUIT, SIGILL, SIGTRAP, SIGABRT,
SIGBUS, SIGFPE, SIGKILL, SIGUSR1, SIGSEGV, SIGUSR2, SIGBUS, SIGFPE, SIGKILL, SIGUSR1, SIGSEGV, SIGUSR2,
SIGPIPE, SIGALRM, SIGTERM, SIGPIPE, SIGCHLD, SIGSTOP, SIGPIPE, SIGALRM, SIGTERM, SIGPIPE, SIGCHLD, SIGSTOP,
@ -961,17 +973,21 @@ elif defined(linux):
recvfrom, sendto, send, bindSocket, recv, connect, recvfrom, sendto, send, bindSocket, recv, connect,
unlink, listen, sendmsg, recvmsg, getpid, fcntl, unlink, listen, sendmsg, recvmsg, getpid, fcntl,
pthread_sigmask, sigprocmask, clock_gettime, signal, pthread_sigmask, sigprocmask, clock_gettime, signal,
getcwd, chdir, waitpid, kill, getcwd, chdir, waitpid, kill, select, pselect,
socketpair,
ClockId, Itimerspec, Timespec, Sigset, Time, Pid, Mode, ClockId, Itimerspec, Timespec, Sigset, Time, Pid, Mode,
SigInfo, Id, Tmsghdr, IOVec, RLimit, SigInfo, Id, Tmsghdr, IOVec, RLimit, TFdSet, Timeval,
SockAddr, SockLen, Sockaddr_storage, Sockaddr_in, SockAddr, SockLen, Sockaddr_storage, Sockaddr_in,
Sockaddr_in6, Sockaddr_un, AddrInfo, SocketHandle, Sockaddr_in6, Sockaddr_un, AddrInfo, SocketHandle,
Suseconds,
FD_CLR, FD_ISSET, FD_SET, FD_ZERO,
CLOCK_MONOTONIC, F_GETFL, F_SETFL, F_GETFD, F_SETFD, CLOCK_MONOTONIC, F_GETFL, F_SETFL, F_GETFD, F_SETFD,
FD_CLOEXEC, O_NONBLOCK, SIG_BLOCK, SIG_UNBLOCK, FD_CLOEXEC, O_NONBLOCK, SIG_BLOCK, SIG_UNBLOCK,
SOL_SOCKET, SO_ERROR, RLIMIT_NOFILE, MSG_NOSIGNAL, SOL_SOCKET, SO_ERROR, RLIMIT_NOFILE, MSG_NOSIGNAL,
AF_INET, AF_INET6, SO_REUSEADDR, SO_REUSEPORT, MSG_PEEK,
AF_INET, AF_INET6, AF_UNIX, SO_REUSEADDR, SO_REUSEPORT,
SO_BROADCAST, IPPROTO_IP, IPV6_MULTICAST_HOPS, SO_BROADCAST, IPPROTO_IP, IPV6_MULTICAST_HOPS,
SOCK_DGRAM, SHUT_RD, SHUT_WR, SHUT_RDWR, SOCK_DGRAM, SOCK_STREAM, SHUT_RD, SHUT_WR, SHUT_RDWR,
SIGHUP, SIGINT, SIGQUIT, SIGILL, SIGTRAP, SIGABRT, SIGHUP, SIGINT, SIGQUIT, SIGILL, SIGTRAP, SIGABRT,
SIGBUS, SIGFPE, SIGKILL, SIGUSR1, SIGSEGV, SIGUSR2, SIGBUS, SIGFPE, SIGKILL, SIGUSR1, SIGSEGV, SIGUSR2,
SIGPIPE, SIGALRM, SIGTERM, SIGPIPE, SIGCHLD, SIGSTOP, SIGPIPE, SIGALRM, SIGTERM, SIGPIPE, SIGCHLD, SIGSTOP,
@ -1080,13 +1096,17 @@ elif defined(freebsd) or defined(openbsd) or defined(netbsd) or
setrlimit, getpid, pthread_sigmask, sigemptyset, setrlimit, getpid, pthread_sigmask, sigemptyset,
sigaddset, sigismember, fcntl, accept, pipe, write, sigaddset, sigismember, fcntl, accept, pipe, write,
signal, read, setsockopt, getsockopt, clock_gettime, signal, read, setsockopt, getsockopt, clock_gettime,
getcwd, chdir, waitpid, kill, getcwd, chdir, waitpid, kill, select, pselect,
socketpair,
Timeval, Timespec, Pid, Mode, Time, Sigset, SockAddr, Timeval, Timespec, Pid, Mode, Time, Sigset, SockAddr,
SockLen, Sockaddr_storage, Sockaddr_in, Sockaddr_in6, SockLen, Sockaddr_storage, Sockaddr_in, Sockaddr_in6,
Sockaddr_un, SocketHandle, AddrInfo, RLimit, Sockaddr_un, SocketHandle, AddrInfo, RLimit, TFdSet,
Suseconds,
FD_CLR, FD_ISSET, FD_SET, FD_ZERO,
F_GETFL, F_SETFL, F_GETFD, F_SETFD, FD_CLOEXEC, F_GETFL, F_SETFL, F_GETFD, F_SETFD, FD_CLOEXEC,
O_NONBLOCK, SOL_SOCKET, SOCK_RAW, MSG_NOSIGNAL, O_NONBLOCK, SOL_SOCKET, SOCK_RAW, SOCK_DGRAM,
AF_INET, AF_INET6, SO_ERROR, SO_REUSEADDR, SOCK_STREAM, MSG_NOSIGNAL, MSG_PEEK,
AF_INET, AF_INET6, AF_UNIX, SO_ERROR, SO_REUSEADDR,
SO_REUSEPORT, SO_BROADCAST, IPPROTO_IP, SO_REUSEPORT, SO_BROADCAST, IPPROTO_IP,
IPV6_MULTICAST_HOPS, SOCK_DGRAM, RLIMIT_NOFILE, IPV6_MULTICAST_HOPS, SOCK_DGRAM, RLIMIT_NOFILE,
SIG_BLOCK, SIG_UNBLOCK, CLOCK_MONOTONIC, SIG_BLOCK, SIG_UNBLOCK, CLOCK_MONOTONIC,
@ -1102,12 +1122,17 @@ elif defined(freebsd) or defined(openbsd) or defined(netbsd) or
setrlimit, getpid, pthread_sigmask, sigemptyset, setrlimit, getpid, pthread_sigmask, sigemptyset,
sigaddset, sigismember, fcntl, accept, pipe, write, sigaddset, sigismember, fcntl, accept, pipe, write,
signal, read, setsockopt, getsockopt, clock_gettime, signal, read, setsockopt, getsockopt, clock_gettime,
getcwd, chdir, waitpid, kill, select, pselect,
socketpair,
Timeval, Timespec, Pid, Mode, Time, Sigset, SockAddr, Timeval, Timespec, Pid, Mode, Time, Sigset, SockAddr,
SockLen, Sockaddr_storage, Sockaddr_in, Sockaddr_in6, SockLen, Sockaddr_storage, Sockaddr_in, Sockaddr_in6,
Sockaddr_un, SocketHandle, AddrInfo, RLimit, Sockaddr_un, SocketHandle, AddrInfo, RLimit, TFdSet,
Suseconds,
FD_CLR, FD_ISSET, FD_SET, FD_ZERO,
F_GETFL, F_SETFL, F_GETFD, F_SETFD, FD_CLOEXEC, F_GETFL, F_SETFL, F_GETFD, F_SETFD, FD_CLOEXEC,
O_NONBLOCK, SOL_SOCKET, SOCK_RAW, MSG_NOSIGNAL, O_NONBLOCK, SOL_SOCKET, SOCK_RAW, SOCK_DGRAM,
AF_INET, AF_INET6, SO_ERROR, SO_REUSEADDR, SOCK_STREAM, MSG_NOSIGNAL, MSG_PEEK,
AF_INET, AF_INET6, AF_UNIX, SO_ERROR, SO_REUSEADDR,
SO_REUSEPORT, SO_BROADCAST, IPPROTO_IP, SO_REUSEPORT, SO_BROADCAST, IPPROTO_IP,
IPV6_MULTICAST_HOPS, SOCK_DGRAM, RLIMIT_NOFILE, IPV6_MULTICAST_HOPS, SOCK_DGRAM, RLIMIT_NOFILE,
SIG_BLOCK, SIG_UNBLOCK, CLOCK_MONOTONIC, SIG_BLOCK, SIG_UNBLOCK, CLOCK_MONOTONIC,

416
chronos/threadsync.nim Normal file
View File

@ -0,0 +1,416 @@
#
# Chronos multithreaded synchronization primitives
#
# (c) Copyright 2023-Present Status Research & Development GmbH
#
# Licensed under either of
# Apache License, version 2.0, (LICENSE-APACHEv2)
# MIT license (LICENSE-MIT)
## This module implements some core async thread synchronization primitives.
import stew/results
import "."/[timer, asyncloop]
export results
{.push raises: [].}
const hasThreadSupport* = compileOption("threads")
when not(hasThreadSupport):
{.fatal: "Compile this program with threads enabled!".}
import "."/[osdefs, osutils, oserrno]
type
ThreadSignal* = object
when defined(windows):
event: HANDLE
elif defined(linux):
efd: AsyncFD
else:
rfd, wfd: AsyncFD
ThreadSignalPtr* = ptr ThreadSignal
proc new*(t: typedesc[ThreadSignalPtr]): Result[ThreadSignalPtr, string] =
## Create new ThreadSignal object.
let res = cast[ptr ThreadSignal](allocShared0(sizeof(ThreadSignal)))
when defined(windows):
var sa = getSecurityAttributes()
let event = osdefs.createEvent(addr sa, DWORD(0), DWORD(0), nil)
if event == HANDLE(0):
deallocShared(res)
return err(osErrorMsg(osLastError()))
res[] = ThreadSignal(event: event)
elif defined(linux):
let efd = eventfd(0, EFD_CLOEXEC or EFD_NONBLOCK)
if efd == -1:
deallocShared(res)
return err(osErrorMsg(osLastError()))
res[] = ThreadSignal(efd: AsyncFD(efd))
else:
var sockets: array[2, cint]
block:
let sres = socketpair(AF_UNIX, SOCK_DGRAM, 0, sockets)
if sres < 0:
deallocShared(res)
return err(osErrorMsg(osLastError()))
# MacOS do not have SOCK_NONBLOCK and SOCK_CLOEXEC, so we forced to use
# setDescriptorFlags() for every socket.
block:
let sres = setDescriptorFlags(sockets[0], true, true)
if sres.isErr():
discard closeFd(sockets[0])
discard closeFd(sockets[1])
deallocShared(res)
return err(osErrorMsg(sres.error))
block:
let sres = setDescriptorFlags(sockets[1], true, true)
if sres.isErr():
discard closeFd(sockets[0])
discard closeFd(sockets[1])
deallocShared(res)
return err(osErrorMsg(sres.error))
res[] = ThreadSignal(rfd: AsyncFD(sockets[0]), wfd: AsyncFD(sockets[1]))
ok(ThreadSignalPtr(res))
when not(defined(windows)):
type
WaitKind {.pure.} = enum
Read, Write
when defined(linux):
proc checkBusy(fd: cint): bool = false
else:
proc checkBusy(fd: cint): bool =
var data = 0'u64
let res = handleEintr(recv(SocketHandle(fd),
addr data, sizeof(uint64), MSG_PEEK))
if res == sizeof(uint64):
true
else:
false
func toTimeval(a: Duration): Timeval =
## Convert Duration ``a`` to ``Timeval`` object.
let nanos = a.nanoseconds
let m = nanos mod Second.nanoseconds()
Timeval(
tv_sec: Time(nanos div Second.nanoseconds()),
tv_usec: Suseconds(m div Microsecond.nanoseconds())
)
proc waitReady(fd: cint, kind: WaitKind,
timeout: Duration): Result[bool, OSErrorCode] =
var
tv: Timeval
fdset =
block:
var res: TFdSet
FD_ZERO(res)
FD_SET(SocketHandle(fd), res)
res
let
ptv =
if not(timeout.isInfinite()):
tv = timeout.toTimeval()
addr tv
else:
nil
nfd = cint(fd) + 1
res =
case kind
of WaitKind.Read:
handleEintr(select(nfd, addr fdset, nil, nil, ptv))
of WaitKind.Write:
handleEintr(select(nfd, nil, addr fdset, nil, ptv))
if res > 0:
ok(true)
elif res == 0:
ok(false)
else:
err(osLastError())
proc safeUnregisterAndCloseFd(fd: AsyncFD): Result[void, OSErrorCode] =
let loop = getThreadDispatcher()
if loop.contains(fd):
? unregister2(fd)
if closeFd(cint(fd)) != 0:
err(osLastError())
else:
ok()
proc close*(signal: ThreadSignalPtr): Result[void, string] =
## Close ThreadSignal object and free all the resources.
defer: deallocShared(signal)
when defined(windows):
# We do not need to perform unregistering on Windows, we can only close it.
if closeHandle(signal[].event) == 0'u32:
return err(osErrorMsg(osLastError()))
elif defined(linux):
let res = safeUnregisterAndCloseFd(signal[].efd)
if res.isErr():
return err(osErrorMsg(res.error))
else:
let res1 = safeUnregisterAndCloseFd(signal[].rfd)
let res2 = safeUnregisterAndCloseFd(signal[].wfd)
if res1.isErr(): return err(osErrorMsg(res1.error))
if res2.isErr(): return err(osErrorMsg(res2.error))
ok()
proc fireSync*(signal: ThreadSignalPtr,
timeout = InfiniteDuration): Result[bool, string] =
## Set state of ``signal`` to signaled state in blocking way.
##
## Returns ``false`` if signal was not signalled in time, and ``true``
## if operation was successful.
when defined(windows):
if setEvent(signal[].event) == 0'u32:
return err(osErrorMsg(osLastError()))
ok(true)
else:
let
eventFd =
when defined(linux):
cint(signal[].efd)
else:
cint(signal[].wfd)
checkFd =
when defined(linux):
cint(signal[].efd)
else:
cint(signal[].rfd)
if checkBusy(checkFd):
# Signal is already in signalled state
return ok(true)
var data = 1'u64
while true:
let res =
when defined(linux):
handleEintr(write(eventFd, addr data, sizeof(uint64)))
else:
handleEintr(send(SocketHandle(eventFd), addr data, sizeof(uint64),
MSG_NOSIGNAL))
if res < 0:
let errorCode = osLastError()
case errorCode
of EAGAIN:
let wres = waitReady(eventFd, WaitKind.Write, timeout)
if wres.isErr():
return err(osErrorMsg(wres.error))
if not(wres.get()):
return ok(false)
else:
return err(osErrorMsg(errorCode))
elif res != sizeof(data):
return err(osErrorMsg(EINVAL))
else:
return ok(true)
proc waitSync*(signal: ThreadSignalPtr,
timeout = InfiniteDuration): Result[bool, string] =
## Wait until the signal become signaled. This procedure is ``NOT`` async,
## so it blocks execution flow, but this procedure do not need asynchronous
## event loop to be present.
when defined(windows):
let
timeoutWin =
if timeout.isInfinite():
INFINITE
else:
DWORD(timeout.milliseconds())
handle = signal[].event
res = waitForSingleObject(handle, timeoutWin)
if res == WAIT_OBJECT_0:
ok(true)
elif res == WAIT_TIMEOUT:
ok(false)
elif res == WAIT_ABANDONED:
err("The wait operation has been abandoned")
else:
err("The wait operation has been failed")
else:
let eventFd =
when defined(linux):
cint(signal[].efd)
else:
cint(signal[].rfd)
var
data = 0'u64
timer = timeout
while true:
let wres =
block:
let
start = Moment.now()
res = waitReady(eventFd, WaitKind.Read, timer)
timer = timer - (Moment.now() - start)
res
if wres.isErr():
return err(osErrorMsg(wres.error))
if not(wres.get()):
return ok(false)
let res =
when defined(linux):
handleEintr(read(eventFd, addr data, sizeof(uint64)))
else:
handleEintr(recv(SocketHandle(eventFd), addr data, sizeof(uint64),
cint(0)))
if res < 0:
let errorCode = osLastError()
# If errorCode == EAGAIN it means that reading operation is already
# pending and so some other consumer reading eventfd or pipe end, in
# this case we going to ignore error and wait for another event.
if errorCode != EAGAIN:
return err(osErrorMsg(errorCode))
elif res != sizeof(data):
return err(osErrorMsg(EINVAL))
else:
return ok(true)
proc fire*(signal: ThreadSignalPtr): Future[void] =
## Set state of ``signal`` to signaled in asynchronous way.
var retFuture = newFuture[void]("asyncthreadsignal.fire")
when defined(windows):
if setEvent(signal[].event) == 0'u32:
retFuture.fail(newException(AsyncError, osErrorMsg(osLastError())))
else:
retFuture.complete()
else:
var data = 1'u64
let
eventFd =
when defined(linux):
cint(signal[].efd)
else:
cint(signal[].wfd)
checkFd =
when defined(linux):
cint(signal[].efd)
else:
cint(signal[].rfd)
proc continuation(udata: pointer) {.gcsafe, raises: [].} =
if not(retFuture.finished()):
let res =
when defined(linux):
handleEintr(write(eventFd, addr data, sizeof(uint64)))
else:
handleEintr(send(SocketHandle(eventFd), addr data, sizeof(uint64),
MSG_NOSIGNAL))
if res < 0:
let errorCode = osLastError()
discard removeWriter2(AsyncFD(eventFd))
retFuture.fail(newException(AsyncError, osErrorMsg(errorCode)))
elif res != sizeof(data):
discard removeWriter2(AsyncFD(eventFd))
retFuture.fail(newException(AsyncError, osErrorMsg(EINVAL)))
else:
let eres = removeWriter2(AsyncFD(eventFd))
if eres.isErr():
retFuture.fail(newException(AsyncError, osErrorMsg(eres.error)))
else:
retFuture.complete()
proc cancellation(udata: pointer) {.gcsafe, raises: [].} =
if not(retFuture.finished()):
discard removeWriter2(AsyncFD(eventFd))
if checkBusy(checkFd):
# Signal is already in signalled state
retFuture.complete()
return retFuture
let res =
when defined(linux):
handleEintr(write(eventFd, addr data, sizeof(uint64)))
else:
handleEintr(send(SocketHandle(eventFd), addr data, sizeof(uint64),
MSG_NOSIGNAL))
if res < 0:
let errorCode = osLastError()
case errorCode
of EAGAIN:
let loop = getThreadDispatcher()
if not(loop.contains(AsyncFD(eventFd))):
let rres = register2(AsyncFD(eventFd))
if rres.isErr():
retFuture.fail(newException(AsyncError, osErrorMsg(rres.error)))
return retFuture
let wres = addWriter2(AsyncFD(eventFd), continuation)
if wres.isErr():
retFuture.fail(newException(AsyncError, osErrorMsg(wres.error)))
else:
retFuture.cancelCallback = cancellation
else:
retFuture.fail(newException(AsyncError, osErrorMsg(errorCode)))
elif res != sizeof(data):
retFuture.fail(newException(AsyncError, osErrorMsg(EINVAL)))
else:
retFuture.complete()
retFuture
when defined(windows):
proc wait*(signal: ThreadSignalPtr) {.async.} =
let handle = signal[].event
let res = await waitForSingleObject(handle, InfiniteDuration)
# There should be no other response, because we use `InfiniteDuration`.
doAssert(res == WaitableResult.Ok)
else:
proc wait*(signal: ThreadSignalPtr): Future[void] =
var retFuture = newFuture[void]("asyncthreadsignal.wait")
var data = 1'u64
let eventFd =
when defined(linux):
cint(signal[].efd)
else:
cint(signal[].rfd)
proc continuation(udata: pointer) {.gcsafe, raises: [].} =
if not(retFuture.finished()):
let res =
when defined(linux):
handleEintr(read(eventFd, addr data, sizeof(uint64)))
else:
handleEintr(recv(SocketHandle(eventFd), addr data, sizeof(uint64),
cint(0)))
if res < 0:
let errorCode = osLastError()
# If errorCode == EAGAIN it means that reading operation is already
# pending and so some other consumer reading eventfd or pipe end, in
# this case we going to ignore error and wait for another event.
if errorCode != EAGAIN:
discard removeReader2(AsyncFD(eventFd))
retFuture.fail(newException(AsyncError, osErrorMsg(errorCode)))
elif res != sizeof(data):
discard removeReader2(AsyncFD(eventFd))
retFuture.fail(newException(AsyncError, osErrorMsg(EINVAL)))
else:
let eres = removeReader2(AsyncFD(eventFd))
if eres.isErr():
retFuture.fail(newException(AsyncError, osErrorMsg(eres.error)))
else:
retFuture.complete()
proc cancellation(udata: pointer) {.gcsafe, raises: [].} =
if not(retFuture.finished()):
# Future is already cancelled so we ignore errors.
discard removeReader2(AsyncFD(eventFd))
let loop = getThreadDispatcher()
if not(loop.contains(AsyncFD(eventFd))):
let res = register2(AsyncFD(eventFd))
if res.isErr():
retFuture.fail(newException(AsyncError, osErrorMsg(res.error)))
return retFuture
let res = addReader2(AsyncFD(eventFd), continuation)
if res.isErr():
retFuture.fail(newException(AsyncError, osErrorMsg(res.error)))
return retFuture
retFuture.cancelCallback = cancellation
retFuture

View File

@ -8,7 +8,7 @@
import testmacro, testsync, testsoon, testtime, testfut, testsignal, import testmacro, testsync, testsoon, testtime, testfut, testsignal,
testaddress, testdatagram, teststream, testserver, testbugs, testnet, testaddress, testdatagram, teststream, testserver, testbugs, testnet,
testasyncstream, testhttpserver, testshttpserver, testhttpclient, testasyncstream, testhttpserver, testshttpserver, testhttpclient,
testproc, testratelimit, testfutures testproc, testratelimit, testfutures, testthreadsync
# Must be imported last to check for Pending futures # Must be imported last to check for Pending futures
import testutils import testutils

369
tests/testthreadsync.nim Normal file
View File

@ -0,0 +1,369 @@
# Chronos Test Suite
# (c) Copyright 2023-Present
# Status Research & Development GmbH
#
# Licensed under either of
# Apache License, version 2.0, (LICENSE-APACHEv2)
# MIT license (LICENSE-MIT)
import std/[cpuinfo, locks, strutils]
import ../chronos/unittest2/asynctests
import ../chronos/threadsync
{.used.}
type
ThreadResult = object
value: int
ThreadResultPtr = ptr ThreadResult
LockPtr = ptr Lock
ThreadArg = object
signal: ThreadSignalPtr
retval: ThreadResultPtr
index: int
ThreadArg2 = object
signal1: ThreadSignalPtr
signal2: ThreadSignalPtr
retval: ThreadResultPtr
ThreadArg3 = object
lock: LockPtr
signal: ThreadSignalPtr
retval: ThreadResultPtr
index: int
WaitSendKind {.pure.} = enum
Sync, Async
const
TestsCount = 1000
suite "Asynchronous multi-threading sync primitives test suite":
proc setResult(thr: ThreadResultPtr, value: int) =
thr[].value = value
proc new(t: typedesc[ThreadResultPtr], value: int = 0): ThreadResultPtr =
var res = cast[ThreadResultPtr](allocShared0(sizeof(ThreadResult)))
res[].value = value
res
proc free(thr: ThreadResultPtr) =
doAssert(not(isNil(thr)))
deallocShared(thr)
let numProcs = countProcessors() * 2
template threadSignalTest(sendFlag, waitFlag: WaitSendKind) =
proc testSyncThread(arg: ThreadArg) {.thread.} =
let res = waitSync(arg.signal, 1500.milliseconds)
if res.isErr():
arg.retval.setResult(1)
else:
if res.get():
arg.retval.setResult(2)
else:
arg.retval.setResult(3)
proc testAsyncThread(arg: ThreadArg) {.thread.} =
proc testAsyncCode(arg: ThreadArg) {.async.} =
try:
await wait(arg.signal).wait(1500.milliseconds)
arg.retval.setResult(2)
except AsyncTimeoutError:
arg.retval.setResult(3)
except CatchableError:
arg.retval.setResult(1)
waitFor testAsyncCode(arg)
let signal = ThreadSignalPtr.new().tryGet()
var args: seq[ThreadArg]
var threads = newSeq[Thread[ThreadArg]](numProcs)
for i in 0 ..< numProcs:
let
res = ThreadResultPtr.new()
arg = ThreadArg(signal: signal, retval: res, index: i)
args.add(arg)
case waitFlag
of WaitSendKind.Sync:
createThread(threads[i], testSyncThread, arg)
of WaitSendKind.Async:
createThread(threads[i], testAsyncThread, arg)
await sleepAsync(500.milliseconds)
case sendFlag
of WaitSendKind.Sync:
check signal.fireSync().isOk()
of WaitSendKind.Async:
await signal.fire()
joinThreads(threads)
var ncheck: array[3, int]
for item in args:
if item.retval[].value == 1:
inc(ncheck[0])
elif item.retval[].value == 2:
inc(ncheck[1])
elif item.retval[].value == 3:
inc(ncheck[2])
free(item.retval)
check:
signal.close().isOk()
ncheck[0] == 0
ncheck[1] == 1
ncheck[2] == numProcs - 1
template threadSignalTest2(testsCount: int,
sendFlag, waitFlag: WaitSendKind) =
proc testSyncThread(arg: ThreadArg2) {.thread.} =
for i in 0 ..< testsCount:
block:
let res = waitSync(arg.signal1, 1500.milliseconds)
if res.isErr():
arg.retval.setResult(-1)
return
if not(res.get()):
arg.retval.setResult(-2)
return
block:
let res = arg.signal2.fireSync()
if res.isErr():
arg.retval.setResult(-3)
return
arg.retval.setResult(i + 1)
proc testAsyncThread(arg: ThreadArg2) {.thread.} =
proc testAsyncCode(arg: ThreadArg2) {.async.} =
for i in 0 ..< testsCount:
try:
await wait(arg.signal1).wait(1500.milliseconds)
except AsyncTimeoutError:
arg.retval.setResult(-2)
return
except AsyncError:
arg.retval.setResult(-1)
return
except CatchableError:
arg.retval.setResult(-3)
return
try:
await arg.signal2.fire()
except AsyncError:
arg.retval.setResult(-4)
return
except CatchableError:
arg.retval.setResult(-5)
return
arg.retval.setResult(i + 1)
waitFor testAsyncCode(arg)
let
signal1 = ThreadSignalPtr.new().tryGet()
signal2 = ThreadSignalPtr.new().tryGet()
retval = ThreadResultPtr.new()
arg = ThreadArg2(signal1: signal1, signal2: signal2, retval: retval)
var thread: Thread[ThreadArg2]
case waitFlag
of WaitSendKind.Sync:
createThread(thread, testSyncThread, arg)
of WaitSendKind.Async:
createThread(thread, testAsyncThread, arg)
let start = Moment.now()
for i in 0 ..< testsCount:
case sendFlag
of WaitSendKind.Sync:
block:
let res = signal1.fireSync()
check res.isOk()
block:
let res = waitSync(arg.signal2, 1500.milliseconds)
check:
res.isOk()
res.get() == true
of WaitSendKind.Async:
await arg.signal1.fire()
await wait(arg.signal2).wait(1500.milliseconds)
joinThreads(thread)
let finish = Moment.now()
let perf = (float64(nanoseconds(1.seconds)) /
float64(nanoseconds(finish - start))) * float64(testsCount)
echo "Switches tested: ", testsCount, ", elapsed time: ", (finish - start),
", performance = ", formatFloat(perf, ffDecimal, 4),
" switches/second"
check:
arg.retval[].value == testsCount
template threadSignalTest3(testsCount: int,
sendFlag, waitFlag: WaitSendKind) =
proc testSyncThread(arg: ThreadArg3) {.thread.} =
withLock(arg.lock[]):
let res = waitSync(arg.signal, 10.milliseconds)
if res.isErr():
arg.retval.setResult(1)
else:
if res.get():
arg.retval.setResult(2)
else:
arg.retval.setResult(3)
proc testAsyncThread(arg: ThreadArg3) {.thread.} =
proc testAsyncCode(arg: ThreadArg3) {.async.} =
withLock(arg.lock[]):
try:
await wait(arg.signal).wait(10.milliseconds)
arg.retval.setResult(2)
except AsyncTimeoutError:
arg.retval.setResult(3)
except CatchableError:
arg.retval.setResult(1)
waitFor testAsyncCode(arg)
let signal = ThreadSignalPtr.new().tryGet()
var args: seq[ThreadArg3]
var threads = newSeq[Thread[ThreadArg3]](numProcs)
var lockPtr = cast[LockPtr](allocShared0(sizeof(Lock)))
initLock(lockPtr[])
acquire(lockPtr[])
for i in 0 ..< numProcs:
let
res = ThreadResultPtr.new()
arg = ThreadArg3(signal: signal, retval: res, index: i, lock: lockPtr)
args.add(arg)
case waitFlag
of WaitSendKind.Sync:
createThread(threads[i], testSyncThread, arg)
of WaitSendKind.Async:
createThread(threads[i], testAsyncThread, arg)
await sleepAsync(500.milliseconds)
case sendFlag
of WaitSendKind.Sync:
for i in 0 ..< testsCount:
check signal.fireSync().isOk()
of WaitSendKind.Async:
for i in 0 ..< testsCount:
await signal.fire()
release(lockPtr[])
joinThreads(threads)
deinitLock(lockPtr[])
deallocShared(lockPtr)
var ncheck: array[3, int]
for item in args:
if item.retval[].value == 1:
inc(ncheck[0])
elif item.retval[].value == 2:
inc(ncheck[1])
elif item.retval[].value == 3:
inc(ncheck[2])
free(item.retval)
check:
signal.close().isOk()
ncheck[0] == 0
ncheck[1] == 1
ncheck[2] == numProcs - 1
template threadSignalTest4(testsCount: int,
sendFlag, waitFlag: WaitSendKind) =
let signal = ThreadSignalPtr.new().tryGet()
let start = Moment.now()
for i in 0 ..< testsCount:
case sendFlag
of WaitSendKind.Sync:
check signal.fireSync().isOk()
of WaitSendKind.Async:
await signal.fire()
case waitFlag
of WaitSendKind.Sync:
check waitSync(signal).isOk()
of WaitSendKind.Async:
await wait(signal)
let finish = Moment.now()
let perf = (float64(nanoseconds(1.seconds)) /
float64(nanoseconds(finish - start))) * float64(testsCount)
echo "Switches tested: ", testsCount, ", elapsed time: ", (finish - start),
", performance = ", formatFloat(perf, ffDecimal, 4),
" switches/second"
check:
signal.close.isOk()
asyncTest "ThreadSignal: Multiple [" & $numProcs &
"] threads waiting test [sync -> sync]":
threadSignalTest(WaitSendKind.Sync, WaitSendKind.Sync)
asyncTest "ThreadSignal: Multiple [" & $numProcs &
"] threads waiting test [async -> async]":
threadSignalTest(WaitSendKind.Async, WaitSendKind.Async)
asyncTest "ThreadSignal: Multiple [" & $numProcs &
"] threads waiting test [async -> sync]":
threadSignalTest(WaitSendKind.Async, WaitSendKind.Sync)
asyncTest "ThreadSignal: Multiple [" & $numProcs &
"] threads waiting test [sync -> async]":
threadSignalTest(WaitSendKind.Sync, WaitSendKind.Async)
asyncTest "ThreadSignal: Multiple thread switches [" & $TestsCount &
"] test [sync -> sync]":
threadSignalTest2(TestsCount, WaitSendKind.Sync, WaitSendKind.Sync)
asyncTest "ThreadSignal: Multiple thread switches [" & $TestsCount &
"] test [async -> async]":
threadSignalTest2(TestsCount, WaitSendKind.Async, WaitSendKind.Async)
asyncTest "ThreadSignal: Multiple thread switches [" & $TestsCount &
"] test [sync -> async]":
threadSignalTest2(TestsCount, WaitSendKind.Sync, WaitSendKind.Async)
asyncTest "ThreadSignal: Multiple thread switches [" & $TestsCount &
"] test [async -> sync]":
threadSignalTest2(TestsCount, WaitSendKind.Async, WaitSendKind.Sync)
asyncTest "ThreadSignal: Multiple signals [" & $TestsCount &
"] to multiple threads [" & $numProcs & "] test [sync -> sync]":
threadSignalTest3(TestsCount, WaitSendKind.Sync, WaitSendKind.Sync)
asyncTest "ThreadSignal: Multiple signals [" & $TestsCount &
"] to multiple threads [" & $numProcs & "] test [async -> async]":
threadSignalTest3(TestsCount, WaitSendKind.Async, WaitSendKind.Async)
asyncTest "ThreadSignal: Multiple signals [" & $TestsCount &
"] to multiple threads [" & $numProcs & "] test [sync -> async]":
threadSignalTest3(TestsCount, WaitSendKind.Sync, WaitSendKind.Async)
asyncTest "ThreadSignal: Multiple signals [" & $TestsCount &
"] to multiple threads [" & $numProcs & "] test [async -> sync]":
threadSignalTest3(TestsCount, WaitSendKind.Async, WaitSendKind.Sync)
asyncTest "ThreadSignal: Single threaded switches [" & $TestsCount &
"] test [sync -> sync]":
threadSignalTest4(TestsCount, WaitSendKind.Sync, WaitSendKind.Sync)
asyncTest "ThreadSignal: Single threaded switches [" & $TestsCount &
"] test [sync -> sync]":
threadSignalTest4(TestsCount, WaitSendKind.Async, WaitSendKind.Async)
asyncTest "ThreadSignal: Single threaded switches [" & $TestsCount &
"] test [sync -> async]":
threadSignalTest4(TestsCount, WaitSendKind.Sync, WaitSendKind.Async)
asyncTest "ThreadSignal: Single threaded switches [" & $TestsCount &
"] test [async -> sync]":
threadSignalTest4(TestsCount, WaitSendKind.Async, WaitSendKind.Sync)