From 0277b65be2c7a365ac13df002fba6e172be55537 Mon Sep 17 00:00:00 2001 From: Eugene Kabanov Date: Fri, 21 Jul 2023 15:51:36 +0300 Subject: [PATCH] Asynchronous thread notification mechanism. (#406) * Initial commit. * Some fixes. * More fixes. * Add first test. * Further fixes for MacOS/BSD. * Fixes for Linux. * Add proper tests. * Lower number of tests. * Add threadsync tests to test suite. * There is no need to run tests when threads are off. * Address review comments. Fix the issue with multiple signals. Add tests. * Switch to use socketpair() instead of pipes. Fix semaphoring issue on MacOS/BSD. Add tests. * Add single threaded fire/wait tests. --- chronos/osdefs.nim | 71 ++++--- chronos/threadsync.nim | 416 +++++++++++++++++++++++++++++++++++++++ tests/testall.nim | 2 +- tests/testthreadsync.nim | 369 ++++++++++++++++++++++++++++++++++ 4 files changed, 834 insertions(+), 24 deletions(-) create mode 100644 chronos/threadsync.nim create mode 100644 tests/testthreadsync.nim diff --git a/chronos/osdefs.nim b/chronos/osdefs.nim index a638056d..ecf770b8 100644 --- a/chronos/osdefs.nim +++ b/chronos/osdefs.nim @@ -879,13 +879,17 @@ elif defined(macos) or defined(macosx): setrlimit, getpid, pthread_sigmask, sigprocmask, sigemptyset, sigaddset, sigismember, fcntl, accept, pipe, write, signal, read, setsockopt, getsockopt, - getcwd, chdir, waitpid, kill, + getcwd, chdir, waitpid, kill, select, pselect, + socketpair, Timeval, Timespec, Pid, Mode, Time, Sigset, SockAddr, SockLen, Sockaddr_storage, Sockaddr_in, Sockaddr_in6, - Sockaddr_un, SocketHandle, AddrInfo, RLimit, + Sockaddr_un, SocketHandle, AddrInfo, RLimit, TFdSet, + Suseconds, + FD_CLR, FD_ISSET, FD_SET, FD_ZERO, F_GETFL, F_SETFL, F_GETFD, F_SETFD, FD_CLOEXEC, - O_NONBLOCK, SOL_SOCKET, SOCK_RAW, MSG_NOSIGNAL, - AF_INET, AF_INET6, SO_ERROR, SO_REUSEADDR, + O_NONBLOCK, SOL_SOCKET, SOCK_RAW, SOCK_DGRAM, + SOCK_STREAM, MSG_NOSIGNAL, MSG_PEEK, + AF_INET, AF_INET6, AF_UNIX, SO_ERROR, SO_REUSEADDR, SO_REUSEPORT, SO_BROADCAST, IPPROTO_IP, IPV6_MULTICAST_HOPS, SOCK_DGRAM, RLIMIT_NOFILE, SIG_BLOCK, SIG_UNBLOCK, SHUT_RD, SHUT_WR, SHUT_RDWR, @@ -900,13 +904,17 @@ elif defined(macos) or defined(macosx): setrlimit, getpid, pthread_sigmask, sigprocmask, sigemptyset, sigaddset, sigismember, fcntl, accept, pipe, write, signal, read, setsockopt, getsockopt, - getcwd, chdir, waitpid, kill, + getcwd, chdir, waitpid, kill, select, pselect, + socketpair, Timeval, Timespec, Pid, Mode, Time, Sigset, SockAddr, SockLen, Sockaddr_storage, Sockaddr_in, Sockaddr_in6, - Sockaddr_un, SocketHandle, AddrInfo, RLimit, + Sockaddr_un, SocketHandle, AddrInfo, RLimit, TFdSet, + Suseconds, + FD_CLR, FD_ISSET, FD_SET, FD_ZERO, F_GETFL, F_SETFL, F_GETFD, F_SETFD, FD_CLOEXEC, - O_NONBLOCK, SOL_SOCKET, SOCK_RAW, MSG_NOSIGNAL, - AF_INET, AF_INET6, SO_ERROR, SO_REUSEADDR, + O_NONBLOCK, SOL_SOCKET, SOCK_RAW, SOCK_DGRAM, + SOCK_STREAM, MSG_NOSIGNAL, MSG_PEEK, + AF_INET, AF_INET6, AF_UNIX, SO_ERROR, SO_REUSEADDR, SO_REUSEPORT, SO_BROADCAST, IPPROTO_IP, IPV6_MULTICAST_HOPS, SOCK_DGRAM, RLIMIT_NOFILE, SIG_BLOCK, SIG_UNBLOCK, SHUT_RD, SHUT_WR, SHUT_RDWR, @@ -938,17 +946,21 @@ elif defined(linux): recvfrom, sendto, send, bindSocket, recv, connect, unlink, listen, sendmsg, recvmsg, getpid, fcntl, pthread_sigmask, sigprocmask, clock_gettime, signal, - getcwd, chdir, waitpid, kill, + getcwd, chdir, waitpid, kill, select, pselect, + socketpair, ClockId, Itimerspec, Timespec, Sigset, Time, Pid, Mode, - SigInfo, Id, Tmsghdr, IOVec, RLimit, + SigInfo, Id, Tmsghdr, IOVec, RLimit, Timeval, TFdSet, SockAddr, SockLen, Sockaddr_storage, Sockaddr_in, Sockaddr_in6, Sockaddr_un, AddrInfo, SocketHandle, + Suseconds, + FD_CLR, FD_ISSET, FD_SET, FD_ZERO, CLOCK_MONOTONIC, F_GETFL, F_SETFL, F_GETFD, F_SETFD, FD_CLOEXEC, O_NONBLOCK, SIG_BLOCK, SIG_UNBLOCK, SOL_SOCKET, SO_ERROR, RLIMIT_NOFILE, MSG_NOSIGNAL, - AF_INET, AF_INET6, SO_REUSEADDR, SO_REUSEPORT, + MSG_PEEK, + AF_INET, AF_INET6, AF_UNIX, SO_REUSEADDR, SO_REUSEPORT, SO_BROADCAST, IPPROTO_IP, IPV6_MULTICAST_HOPS, - SOCK_DGRAM, SHUT_RD, SHUT_WR, SHUT_RDWR, + SOCK_DGRAM, SOCK_STREAM, SHUT_RD, SHUT_WR, SHUT_RDWR, SIGHUP, SIGINT, SIGQUIT, SIGILL, SIGTRAP, SIGABRT, SIGBUS, SIGFPE, SIGKILL, SIGUSR1, SIGSEGV, SIGUSR2, SIGPIPE, SIGALRM, SIGTERM, SIGPIPE, SIGCHLD, SIGSTOP, @@ -961,17 +973,21 @@ elif defined(linux): recvfrom, sendto, send, bindSocket, recv, connect, unlink, listen, sendmsg, recvmsg, getpid, fcntl, pthread_sigmask, sigprocmask, clock_gettime, signal, - getcwd, chdir, waitpid, kill, + getcwd, chdir, waitpid, kill, select, pselect, + socketpair, ClockId, Itimerspec, Timespec, Sigset, Time, Pid, Mode, - SigInfo, Id, Tmsghdr, IOVec, RLimit, + SigInfo, Id, Tmsghdr, IOVec, RLimit, TFdSet, Timeval, SockAddr, SockLen, Sockaddr_storage, Sockaddr_in, Sockaddr_in6, Sockaddr_un, AddrInfo, SocketHandle, + Suseconds, + FD_CLR, FD_ISSET, FD_SET, FD_ZERO, CLOCK_MONOTONIC, F_GETFL, F_SETFL, F_GETFD, F_SETFD, FD_CLOEXEC, O_NONBLOCK, SIG_BLOCK, SIG_UNBLOCK, SOL_SOCKET, SO_ERROR, RLIMIT_NOFILE, MSG_NOSIGNAL, - AF_INET, AF_INET6, SO_REUSEADDR, SO_REUSEPORT, + MSG_PEEK, + AF_INET, AF_INET6, AF_UNIX, SO_REUSEADDR, SO_REUSEPORT, SO_BROADCAST, IPPROTO_IP, IPV6_MULTICAST_HOPS, - SOCK_DGRAM, SHUT_RD, SHUT_WR, SHUT_RDWR, + SOCK_DGRAM, SOCK_STREAM, SHUT_RD, SHUT_WR, SHUT_RDWR, SIGHUP, SIGINT, SIGQUIT, SIGILL, SIGTRAP, SIGABRT, SIGBUS, SIGFPE, SIGKILL, SIGUSR1, SIGSEGV, SIGUSR2, SIGPIPE, SIGALRM, SIGTERM, SIGPIPE, SIGCHLD, SIGSTOP, @@ -1080,13 +1096,17 @@ elif defined(freebsd) or defined(openbsd) or defined(netbsd) or setrlimit, getpid, pthread_sigmask, sigemptyset, sigaddset, sigismember, fcntl, accept, pipe, write, signal, read, setsockopt, getsockopt, clock_gettime, - getcwd, chdir, waitpid, kill, + getcwd, chdir, waitpid, kill, select, pselect, + socketpair, Timeval, Timespec, Pid, Mode, Time, Sigset, SockAddr, SockLen, Sockaddr_storage, Sockaddr_in, Sockaddr_in6, - Sockaddr_un, SocketHandle, AddrInfo, RLimit, + Sockaddr_un, SocketHandle, AddrInfo, RLimit, TFdSet, + Suseconds, + FD_CLR, FD_ISSET, FD_SET, FD_ZERO, F_GETFL, F_SETFL, F_GETFD, F_SETFD, FD_CLOEXEC, - O_NONBLOCK, SOL_SOCKET, SOCK_RAW, MSG_NOSIGNAL, - AF_INET, AF_INET6, SO_ERROR, SO_REUSEADDR, + O_NONBLOCK, SOL_SOCKET, SOCK_RAW, SOCK_DGRAM, + SOCK_STREAM, MSG_NOSIGNAL, MSG_PEEK, + AF_INET, AF_INET6, AF_UNIX, SO_ERROR, SO_REUSEADDR, SO_REUSEPORT, SO_BROADCAST, IPPROTO_IP, IPV6_MULTICAST_HOPS, SOCK_DGRAM, RLIMIT_NOFILE, SIG_BLOCK, SIG_UNBLOCK, CLOCK_MONOTONIC, @@ -1102,12 +1122,17 @@ elif defined(freebsd) or defined(openbsd) or defined(netbsd) or setrlimit, getpid, pthread_sigmask, sigemptyset, sigaddset, sigismember, fcntl, accept, pipe, write, signal, read, setsockopt, getsockopt, clock_gettime, + getcwd, chdir, waitpid, kill, select, pselect, + socketpair, Timeval, Timespec, Pid, Mode, Time, Sigset, SockAddr, SockLen, Sockaddr_storage, Sockaddr_in, Sockaddr_in6, - Sockaddr_un, SocketHandle, AddrInfo, RLimit, + Sockaddr_un, SocketHandle, AddrInfo, RLimit, TFdSet, + Suseconds, + FD_CLR, FD_ISSET, FD_SET, FD_ZERO, F_GETFL, F_SETFL, F_GETFD, F_SETFD, FD_CLOEXEC, - O_NONBLOCK, SOL_SOCKET, SOCK_RAW, MSG_NOSIGNAL, - AF_INET, AF_INET6, SO_ERROR, SO_REUSEADDR, + O_NONBLOCK, SOL_SOCKET, SOCK_RAW, SOCK_DGRAM, + SOCK_STREAM, MSG_NOSIGNAL, MSG_PEEK, + AF_INET, AF_INET6, AF_UNIX, SO_ERROR, SO_REUSEADDR, SO_REUSEPORT, SO_BROADCAST, IPPROTO_IP, IPV6_MULTICAST_HOPS, SOCK_DGRAM, RLIMIT_NOFILE, SIG_BLOCK, SIG_UNBLOCK, CLOCK_MONOTONIC, diff --git a/chronos/threadsync.nim b/chronos/threadsync.nim new file mode 100644 index 00000000..d4141812 --- /dev/null +++ b/chronos/threadsync.nim @@ -0,0 +1,416 @@ +# +# Chronos multithreaded synchronization primitives +# +# (c) Copyright 2023-Present Status Research & Development GmbH +# +# Licensed under either of +# Apache License, version 2.0, (LICENSE-APACHEv2) +# MIT license (LICENSE-MIT) + +## This module implements some core async thread synchronization primitives. +import stew/results +import "."/[timer, asyncloop] + +export results + +{.push raises: [].} + +const hasThreadSupport* = compileOption("threads") +when not(hasThreadSupport): + {.fatal: "Compile this program with threads enabled!".} + +import "."/[osdefs, osutils, oserrno] + +type + ThreadSignal* = object + when defined(windows): + event: HANDLE + elif defined(linux): + efd: AsyncFD + else: + rfd, wfd: AsyncFD + + ThreadSignalPtr* = ptr ThreadSignal + +proc new*(t: typedesc[ThreadSignalPtr]): Result[ThreadSignalPtr, string] = + ## Create new ThreadSignal object. + let res = cast[ptr ThreadSignal](allocShared0(sizeof(ThreadSignal))) + when defined(windows): + var sa = getSecurityAttributes() + let event = osdefs.createEvent(addr sa, DWORD(0), DWORD(0), nil) + if event == HANDLE(0): + deallocShared(res) + return err(osErrorMsg(osLastError())) + res[] = ThreadSignal(event: event) + elif defined(linux): + let efd = eventfd(0, EFD_CLOEXEC or EFD_NONBLOCK) + if efd == -1: + deallocShared(res) + return err(osErrorMsg(osLastError())) + res[] = ThreadSignal(efd: AsyncFD(efd)) + else: + var sockets: array[2, cint] + block: + let sres = socketpair(AF_UNIX, SOCK_DGRAM, 0, sockets) + if sres < 0: + deallocShared(res) + return err(osErrorMsg(osLastError())) + # MacOS do not have SOCK_NONBLOCK and SOCK_CLOEXEC, so we forced to use + # setDescriptorFlags() for every socket. + block: + let sres = setDescriptorFlags(sockets[0], true, true) + if sres.isErr(): + discard closeFd(sockets[0]) + discard closeFd(sockets[1]) + deallocShared(res) + return err(osErrorMsg(sres.error)) + block: + let sres = setDescriptorFlags(sockets[1], true, true) + if sres.isErr(): + discard closeFd(sockets[0]) + discard closeFd(sockets[1]) + deallocShared(res) + return err(osErrorMsg(sres.error)) + res[] = ThreadSignal(rfd: AsyncFD(sockets[0]), wfd: AsyncFD(sockets[1])) + ok(ThreadSignalPtr(res)) + +when not(defined(windows)): + type + WaitKind {.pure.} = enum + Read, Write + + when defined(linux): + proc checkBusy(fd: cint): bool = false + else: + proc checkBusy(fd: cint): bool = + var data = 0'u64 + let res = handleEintr(recv(SocketHandle(fd), + addr data, sizeof(uint64), MSG_PEEK)) + if res == sizeof(uint64): + true + else: + false + + func toTimeval(a: Duration): Timeval = + ## Convert Duration ``a`` to ``Timeval`` object. + let nanos = a.nanoseconds + let m = nanos mod Second.nanoseconds() + Timeval( + tv_sec: Time(nanos div Second.nanoseconds()), + tv_usec: Suseconds(m div Microsecond.nanoseconds()) + ) + + proc waitReady(fd: cint, kind: WaitKind, + timeout: Duration): Result[bool, OSErrorCode] = + var + tv: Timeval + fdset = + block: + var res: TFdSet + FD_ZERO(res) + FD_SET(SocketHandle(fd), res) + res + + let + ptv = + if not(timeout.isInfinite()): + tv = timeout.toTimeval() + addr tv + else: + nil + nfd = cint(fd) + 1 + res = + case kind + of WaitKind.Read: + handleEintr(select(nfd, addr fdset, nil, nil, ptv)) + of WaitKind.Write: + handleEintr(select(nfd, nil, addr fdset, nil, ptv)) + + if res > 0: + ok(true) + elif res == 0: + ok(false) + else: + err(osLastError()) + + proc safeUnregisterAndCloseFd(fd: AsyncFD): Result[void, OSErrorCode] = + let loop = getThreadDispatcher() + if loop.contains(fd): + ? unregister2(fd) + if closeFd(cint(fd)) != 0: + err(osLastError()) + else: + ok() + +proc close*(signal: ThreadSignalPtr): Result[void, string] = + ## Close ThreadSignal object and free all the resources. + defer: deallocShared(signal) + when defined(windows): + # We do not need to perform unregistering on Windows, we can only close it. + if closeHandle(signal[].event) == 0'u32: + return err(osErrorMsg(osLastError())) + elif defined(linux): + let res = safeUnregisterAndCloseFd(signal[].efd) + if res.isErr(): + return err(osErrorMsg(res.error)) + else: + let res1 = safeUnregisterAndCloseFd(signal[].rfd) + let res2 = safeUnregisterAndCloseFd(signal[].wfd) + if res1.isErr(): return err(osErrorMsg(res1.error)) + if res2.isErr(): return err(osErrorMsg(res2.error)) + ok() + +proc fireSync*(signal: ThreadSignalPtr, + timeout = InfiniteDuration): Result[bool, string] = + ## Set state of ``signal`` to signaled state in blocking way. + ## + ## Returns ``false`` if signal was not signalled in time, and ``true`` + ## if operation was successful. + when defined(windows): + if setEvent(signal[].event) == 0'u32: + return err(osErrorMsg(osLastError())) + ok(true) + else: + let + eventFd = + when defined(linux): + cint(signal[].efd) + else: + cint(signal[].wfd) + checkFd = + when defined(linux): + cint(signal[].efd) + else: + cint(signal[].rfd) + + if checkBusy(checkFd): + # Signal is already in signalled state + return ok(true) + + var data = 1'u64 + while true: + let res = + when defined(linux): + handleEintr(write(eventFd, addr data, sizeof(uint64))) + else: + handleEintr(send(SocketHandle(eventFd), addr data, sizeof(uint64), + MSG_NOSIGNAL)) + if res < 0: + let errorCode = osLastError() + case errorCode + of EAGAIN: + let wres = waitReady(eventFd, WaitKind.Write, timeout) + if wres.isErr(): + return err(osErrorMsg(wres.error)) + if not(wres.get()): + return ok(false) + else: + return err(osErrorMsg(errorCode)) + elif res != sizeof(data): + return err(osErrorMsg(EINVAL)) + else: + return ok(true) + +proc waitSync*(signal: ThreadSignalPtr, + timeout = InfiniteDuration): Result[bool, string] = + ## Wait until the signal become signaled. This procedure is ``NOT`` async, + ## so it blocks execution flow, but this procedure do not need asynchronous + ## event loop to be present. + when defined(windows): + let + timeoutWin = + if timeout.isInfinite(): + INFINITE + else: + DWORD(timeout.milliseconds()) + handle = signal[].event + res = waitForSingleObject(handle, timeoutWin) + if res == WAIT_OBJECT_0: + ok(true) + elif res == WAIT_TIMEOUT: + ok(false) + elif res == WAIT_ABANDONED: + err("The wait operation has been abandoned") + else: + err("The wait operation has been failed") + else: + let eventFd = + when defined(linux): + cint(signal[].efd) + else: + cint(signal[].rfd) + var + data = 0'u64 + timer = timeout + while true: + let wres = + block: + let + start = Moment.now() + res = waitReady(eventFd, WaitKind.Read, timer) + timer = timer - (Moment.now() - start) + res + if wres.isErr(): + return err(osErrorMsg(wres.error)) + if not(wres.get()): + return ok(false) + let res = + when defined(linux): + handleEintr(read(eventFd, addr data, sizeof(uint64))) + else: + handleEintr(recv(SocketHandle(eventFd), addr data, sizeof(uint64), + cint(0))) + if res < 0: + let errorCode = osLastError() + # If errorCode == EAGAIN it means that reading operation is already + # pending and so some other consumer reading eventfd or pipe end, in + # this case we going to ignore error and wait for another event. + if errorCode != EAGAIN: + return err(osErrorMsg(errorCode)) + elif res != sizeof(data): + return err(osErrorMsg(EINVAL)) + else: + return ok(true) + +proc fire*(signal: ThreadSignalPtr): Future[void] = + ## Set state of ``signal`` to signaled in asynchronous way. + var retFuture = newFuture[void]("asyncthreadsignal.fire") + when defined(windows): + if setEvent(signal[].event) == 0'u32: + retFuture.fail(newException(AsyncError, osErrorMsg(osLastError()))) + else: + retFuture.complete() + else: + var data = 1'u64 + let + eventFd = + when defined(linux): + cint(signal[].efd) + else: + cint(signal[].wfd) + checkFd = + when defined(linux): + cint(signal[].efd) + else: + cint(signal[].rfd) + + proc continuation(udata: pointer) {.gcsafe, raises: [].} = + if not(retFuture.finished()): + let res = + when defined(linux): + handleEintr(write(eventFd, addr data, sizeof(uint64))) + else: + handleEintr(send(SocketHandle(eventFd), addr data, sizeof(uint64), + MSG_NOSIGNAL)) + if res < 0: + let errorCode = osLastError() + discard removeWriter2(AsyncFD(eventFd)) + retFuture.fail(newException(AsyncError, osErrorMsg(errorCode))) + elif res != sizeof(data): + discard removeWriter2(AsyncFD(eventFd)) + retFuture.fail(newException(AsyncError, osErrorMsg(EINVAL))) + else: + let eres = removeWriter2(AsyncFD(eventFd)) + if eres.isErr(): + retFuture.fail(newException(AsyncError, osErrorMsg(eres.error))) + else: + retFuture.complete() + + proc cancellation(udata: pointer) {.gcsafe, raises: [].} = + if not(retFuture.finished()): + discard removeWriter2(AsyncFD(eventFd)) + + if checkBusy(checkFd): + # Signal is already in signalled state + retFuture.complete() + return retFuture + + let res = + when defined(linux): + handleEintr(write(eventFd, addr data, sizeof(uint64))) + else: + handleEintr(send(SocketHandle(eventFd), addr data, sizeof(uint64), + MSG_NOSIGNAL)) + if res < 0: + let errorCode = osLastError() + case errorCode + of EAGAIN: + let loop = getThreadDispatcher() + if not(loop.contains(AsyncFD(eventFd))): + let rres = register2(AsyncFD(eventFd)) + if rres.isErr(): + retFuture.fail(newException(AsyncError, osErrorMsg(rres.error))) + return retFuture + let wres = addWriter2(AsyncFD(eventFd), continuation) + if wres.isErr(): + retFuture.fail(newException(AsyncError, osErrorMsg(wres.error))) + else: + retFuture.cancelCallback = cancellation + else: + retFuture.fail(newException(AsyncError, osErrorMsg(errorCode))) + elif res != sizeof(data): + retFuture.fail(newException(AsyncError, osErrorMsg(EINVAL))) + else: + retFuture.complete() + + retFuture + +when defined(windows): + proc wait*(signal: ThreadSignalPtr) {.async.} = + let handle = signal[].event + let res = await waitForSingleObject(handle, InfiniteDuration) + # There should be no other response, because we use `InfiniteDuration`. + doAssert(res == WaitableResult.Ok) +else: + proc wait*(signal: ThreadSignalPtr): Future[void] = + var retFuture = newFuture[void]("asyncthreadsignal.wait") + var data = 1'u64 + let eventFd = + when defined(linux): + cint(signal[].efd) + else: + cint(signal[].rfd) + + proc continuation(udata: pointer) {.gcsafe, raises: [].} = + if not(retFuture.finished()): + let res = + when defined(linux): + handleEintr(read(eventFd, addr data, sizeof(uint64))) + else: + handleEintr(recv(SocketHandle(eventFd), addr data, sizeof(uint64), + cint(0))) + if res < 0: + let errorCode = osLastError() + # If errorCode == EAGAIN it means that reading operation is already + # pending and so some other consumer reading eventfd or pipe end, in + # this case we going to ignore error and wait for another event. + if errorCode != EAGAIN: + discard removeReader2(AsyncFD(eventFd)) + retFuture.fail(newException(AsyncError, osErrorMsg(errorCode))) + elif res != sizeof(data): + discard removeReader2(AsyncFD(eventFd)) + retFuture.fail(newException(AsyncError, osErrorMsg(EINVAL))) + else: + let eres = removeReader2(AsyncFD(eventFd)) + if eres.isErr(): + retFuture.fail(newException(AsyncError, osErrorMsg(eres.error))) + else: + retFuture.complete() + + proc cancellation(udata: pointer) {.gcsafe, raises: [].} = + if not(retFuture.finished()): + # Future is already cancelled so we ignore errors. + discard removeReader2(AsyncFD(eventFd)) + + let loop = getThreadDispatcher() + if not(loop.contains(AsyncFD(eventFd))): + let res = register2(AsyncFD(eventFd)) + if res.isErr(): + retFuture.fail(newException(AsyncError, osErrorMsg(res.error))) + return retFuture + let res = addReader2(AsyncFD(eventFd), continuation) + if res.isErr(): + retFuture.fail(newException(AsyncError, osErrorMsg(res.error))) + return retFuture + retFuture.cancelCallback = cancellation + retFuture diff --git a/tests/testall.nim b/tests/testall.nim index bf0e98a9..4861a85e 100644 --- a/tests/testall.nim +++ b/tests/testall.nim @@ -8,7 +8,7 @@ import testmacro, testsync, testsoon, testtime, testfut, testsignal, testaddress, testdatagram, teststream, testserver, testbugs, testnet, testasyncstream, testhttpserver, testshttpserver, testhttpclient, - testproc, testratelimit, testfutures + testproc, testratelimit, testfutures, testthreadsync # Must be imported last to check for Pending futures import testutils diff --git a/tests/testthreadsync.nim b/tests/testthreadsync.nim new file mode 100644 index 00000000..fc85dc8c --- /dev/null +++ b/tests/testthreadsync.nim @@ -0,0 +1,369 @@ +# Chronos Test Suite +# (c) Copyright 2023-Present +# Status Research & Development GmbH +# +# Licensed under either of +# Apache License, version 2.0, (LICENSE-APACHEv2) +# MIT license (LICENSE-MIT) +import std/[cpuinfo, locks, strutils] +import ../chronos/unittest2/asynctests +import ../chronos/threadsync + +{.used.} + +type + ThreadResult = object + value: int + + ThreadResultPtr = ptr ThreadResult + + LockPtr = ptr Lock + + ThreadArg = object + signal: ThreadSignalPtr + retval: ThreadResultPtr + index: int + + ThreadArg2 = object + signal1: ThreadSignalPtr + signal2: ThreadSignalPtr + retval: ThreadResultPtr + + ThreadArg3 = object + lock: LockPtr + signal: ThreadSignalPtr + retval: ThreadResultPtr + index: int + + WaitSendKind {.pure.} = enum + Sync, Async + +const + TestsCount = 1000 + +suite "Asynchronous multi-threading sync primitives test suite": + proc setResult(thr: ThreadResultPtr, value: int) = + thr[].value = value + + proc new(t: typedesc[ThreadResultPtr], value: int = 0): ThreadResultPtr = + var res = cast[ThreadResultPtr](allocShared0(sizeof(ThreadResult))) + res[].value = value + res + + proc free(thr: ThreadResultPtr) = + doAssert(not(isNil(thr))) + deallocShared(thr) + + let numProcs = countProcessors() * 2 + + template threadSignalTest(sendFlag, waitFlag: WaitSendKind) = + proc testSyncThread(arg: ThreadArg) {.thread.} = + let res = waitSync(arg.signal, 1500.milliseconds) + if res.isErr(): + arg.retval.setResult(1) + else: + if res.get(): + arg.retval.setResult(2) + else: + arg.retval.setResult(3) + + proc testAsyncThread(arg: ThreadArg) {.thread.} = + proc testAsyncCode(arg: ThreadArg) {.async.} = + try: + await wait(arg.signal).wait(1500.milliseconds) + arg.retval.setResult(2) + except AsyncTimeoutError: + arg.retval.setResult(3) + except CatchableError: + arg.retval.setResult(1) + + waitFor testAsyncCode(arg) + + let signal = ThreadSignalPtr.new().tryGet() + var args: seq[ThreadArg] + var threads = newSeq[Thread[ThreadArg]](numProcs) + for i in 0 ..< numProcs: + let + res = ThreadResultPtr.new() + arg = ThreadArg(signal: signal, retval: res, index: i) + args.add(arg) + case waitFlag + of WaitSendKind.Sync: + createThread(threads[i], testSyncThread, arg) + of WaitSendKind.Async: + createThread(threads[i], testAsyncThread, arg) + + await sleepAsync(500.milliseconds) + case sendFlag + of WaitSendKind.Sync: + check signal.fireSync().isOk() + of WaitSendKind.Async: + await signal.fire() + + joinThreads(threads) + + var ncheck: array[3, int] + for item in args: + if item.retval[].value == 1: + inc(ncheck[0]) + elif item.retval[].value == 2: + inc(ncheck[1]) + elif item.retval[].value == 3: + inc(ncheck[2]) + free(item.retval) + check: + signal.close().isOk() + ncheck[0] == 0 + ncheck[1] == 1 + ncheck[2] == numProcs - 1 + + template threadSignalTest2(testsCount: int, + sendFlag, waitFlag: WaitSendKind) = + proc testSyncThread(arg: ThreadArg2) {.thread.} = + for i in 0 ..< testsCount: + block: + let res = waitSync(arg.signal1, 1500.milliseconds) + if res.isErr(): + arg.retval.setResult(-1) + return + if not(res.get()): + arg.retval.setResult(-2) + return + + block: + let res = arg.signal2.fireSync() + if res.isErr(): + arg.retval.setResult(-3) + return + + arg.retval.setResult(i + 1) + + proc testAsyncThread(arg: ThreadArg2) {.thread.} = + proc testAsyncCode(arg: ThreadArg2) {.async.} = + for i in 0 ..< testsCount: + try: + await wait(arg.signal1).wait(1500.milliseconds) + except AsyncTimeoutError: + arg.retval.setResult(-2) + return + except AsyncError: + arg.retval.setResult(-1) + return + except CatchableError: + arg.retval.setResult(-3) + return + + try: + await arg.signal2.fire() + except AsyncError: + arg.retval.setResult(-4) + return + except CatchableError: + arg.retval.setResult(-5) + return + + arg.retval.setResult(i + 1) + + waitFor testAsyncCode(arg) + + let + signal1 = ThreadSignalPtr.new().tryGet() + signal2 = ThreadSignalPtr.new().tryGet() + retval = ThreadResultPtr.new() + arg = ThreadArg2(signal1: signal1, signal2: signal2, retval: retval) + var thread: Thread[ThreadArg2] + + case waitFlag + of WaitSendKind.Sync: + createThread(thread, testSyncThread, arg) + of WaitSendKind.Async: + createThread(thread, testAsyncThread, arg) + + let start = Moment.now() + for i in 0 ..< testsCount: + case sendFlag + of WaitSendKind.Sync: + block: + let res = signal1.fireSync() + check res.isOk() + block: + let res = waitSync(arg.signal2, 1500.milliseconds) + check: + res.isOk() + res.get() == true + of WaitSendKind.Async: + await arg.signal1.fire() + await wait(arg.signal2).wait(1500.milliseconds) + joinThreads(thread) + let finish = Moment.now() + let perf = (float64(nanoseconds(1.seconds)) / + float64(nanoseconds(finish - start))) * float64(testsCount) + echo "Switches tested: ", testsCount, ", elapsed time: ", (finish - start), + ", performance = ", formatFloat(perf, ffDecimal, 4), + " switches/second" + + check: + arg.retval[].value == testsCount + + template threadSignalTest3(testsCount: int, + sendFlag, waitFlag: WaitSendKind) = + proc testSyncThread(arg: ThreadArg3) {.thread.} = + withLock(arg.lock[]): + let res = waitSync(arg.signal, 10.milliseconds) + if res.isErr(): + arg.retval.setResult(1) + else: + if res.get(): + arg.retval.setResult(2) + else: + arg.retval.setResult(3) + + proc testAsyncThread(arg: ThreadArg3) {.thread.} = + proc testAsyncCode(arg: ThreadArg3) {.async.} = + withLock(arg.lock[]): + try: + await wait(arg.signal).wait(10.milliseconds) + arg.retval.setResult(2) + except AsyncTimeoutError: + arg.retval.setResult(3) + except CatchableError: + arg.retval.setResult(1) + + waitFor testAsyncCode(arg) + + let signal = ThreadSignalPtr.new().tryGet() + var args: seq[ThreadArg3] + var threads = newSeq[Thread[ThreadArg3]](numProcs) + var lockPtr = cast[LockPtr](allocShared0(sizeof(Lock))) + initLock(lockPtr[]) + acquire(lockPtr[]) + + for i in 0 ..< numProcs: + let + res = ThreadResultPtr.new() + arg = ThreadArg3(signal: signal, retval: res, index: i, lock: lockPtr) + args.add(arg) + case waitFlag + of WaitSendKind.Sync: + createThread(threads[i], testSyncThread, arg) + of WaitSendKind.Async: + createThread(threads[i], testAsyncThread, arg) + + await sleepAsync(500.milliseconds) + case sendFlag + of WaitSendKind.Sync: + for i in 0 ..< testsCount: + check signal.fireSync().isOk() + of WaitSendKind.Async: + for i in 0 ..< testsCount: + await signal.fire() + + release(lockPtr[]) + joinThreads(threads) + deinitLock(lockPtr[]) + deallocShared(lockPtr) + + var ncheck: array[3, int] + for item in args: + if item.retval[].value == 1: + inc(ncheck[0]) + elif item.retval[].value == 2: + inc(ncheck[1]) + elif item.retval[].value == 3: + inc(ncheck[2]) + free(item.retval) + check: + signal.close().isOk() + ncheck[0] == 0 + ncheck[1] == 1 + ncheck[2] == numProcs - 1 + + template threadSignalTest4(testsCount: int, + sendFlag, waitFlag: WaitSendKind) = + let signal = ThreadSignalPtr.new().tryGet() + let start = Moment.now() + for i in 0 ..< testsCount: + case sendFlag + of WaitSendKind.Sync: + check signal.fireSync().isOk() + of WaitSendKind.Async: + await signal.fire() + + case waitFlag + of WaitSendKind.Sync: + check waitSync(signal).isOk() + of WaitSendKind.Async: + await wait(signal) + let finish = Moment.now() + let perf = (float64(nanoseconds(1.seconds)) / + float64(nanoseconds(finish - start))) * float64(testsCount) + echo "Switches tested: ", testsCount, ", elapsed time: ", (finish - start), + ", performance = ", formatFloat(perf, ffDecimal, 4), + " switches/second" + + check: + signal.close.isOk() + + asyncTest "ThreadSignal: Multiple [" & $numProcs & + "] threads waiting test [sync -> sync]": + threadSignalTest(WaitSendKind.Sync, WaitSendKind.Sync) + + asyncTest "ThreadSignal: Multiple [" & $numProcs & + "] threads waiting test [async -> async]": + threadSignalTest(WaitSendKind.Async, WaitSendKind.Async) + + asyncTest "ThreadSignal: Multiple [" & $numProcs & + "] threads waiting test [async -> sync]": + threadSignalTest(WaitSendKind.Async, WaitSendKind.Sync) + + asyncTest "ThreadSignal: Multiple [" & $numProcs & + "] threads waiting test [sync -> async]": + threadSignalTest(WaitSendKind.Sync, WaitSendKind.Async) + + asyncTest "ThreadSignal: Multiple thread switches [" & $TestsCount & + "] test [sync -> sync]": + threadSignalTest2(TestsCount, WaitSendKind.Sync, WaitSendKind.Sync) + + asyncTest "ThreadSignal: Multiple thread switches [" & $TestsCount & + "] test [async -> async]": + threadSignalTest2(TestsCount, WaitSendKind.Async, WaitSendKind.Async) + + asyncTest "ThreadSignal: Multiple thread switches [" & $TestsCount & + "] test [sync -> async]": + threadSignalTest2(TestsCount, WaitSendKind.Sync, WaitSendKind.Async) + + asyncTest "ThreadSignal: Multiple thread switches [" & $TestsCount & + "] test [async -> sync]": + threadSignalTest2(TestsCount, WaitSendKind.Async, WaitSendKind.Sync) + + asyncTest "ThreadSignal: Multiple signals [" & $TestsCount & + "] to multiple threads [" & $numProcs & "] test [sync -> sync]": + threadSignalTest3(TestsCount, WaitSendKind.Sync, WaitSendKind.Sync) + + asyncTest "ThreadSignal: Multiple signals [" & $TestsCount & + "] to multiple threads [" & $numProcs & "] test [async -> async]": + threadSignalTest3(TestsCount, WaitSendKind.Async, WaitSendKind.Async) + + asyncTest "ThreadSignal: Multiple signals [" & $TestsCount & + "] to multiple threads [" & $numProcs & "] test [sync -> async]": + threadSignalTest3(TestsCount, WaitSendKind.Sync, WaitSendKind.Async) + + asyncTest "ThreadSignal: Multiple signals [" & $TestsCount & + "] to multiple threads [" & $numProcs & "] test [async -> sync]": + threadSignalTest3(TestsCount, WaitSendKind.Async, WaitSendKind.Sync) + + asyncTest "ThreadSignal: Single threaded switches [" & $TestsCount & + "] test [sync -> sync]": + threadSignalTest4(TestsCount, WaitSendKind.Sync, WaitSendKind.Sync) + + asyncTest "ThreadSignal: Single threaded switches [" & $TestsCount & + "] test [sync -> sync]": + threadSignalTest4(TestsCount, WaitSendKind.Async, WaitSendKind.Async) + + asyncTest "ThreadSignal: Single threaded switches [" & $TestsCount & + "] test [sync -> async]": + threadSignalTest4(TestsCount, WaitSendKind.Sync, WaitSendKind.Async) + + asyncTest "ThreadSignal: Single threaded switches [" & $TestsCount & + "] test [async -> sync]": + threadSignalTest4(TestsCount, WaitSendKind.Async, WaitSendKind.Sync)