nim-codex/codex/utils/asyncthreads.nim

62 lines
1.9 KiB
Nim
Raw Normal View History

2024-05-20 17:15:51 +03:00
import std/options
2024-05-17 00:39:07 +03:00
import pkg/taskpools
import pkg/taskpools/flowvars
import pkg/chronos
import pkg/chronos/threadsync
import pkg/questionable/results
const
CompletionRetryDelay* = 10.millis
2024-05-20 18:13:34 +03:00
CompletionTimeout* = 1.seconds
# Maximum await time for completition after receiving a signal
2024-05-17 00:39:07 +03:00
2024-05-20 17:15:51 +03:00
type
SignalQueue[T] = object
signal: ThreadSignalPtr
chan*: Channel[T]
SignalQueuePtr*[T] = ptr SignalQueue[T]
proc release*[T](queue: SignalQueuePtr[T]): ?!void =
## Call to properly dispose of a SignalQueue.
queue[].chan.close()
if err =? queue[].signal.close().mapFailure.errorOption():
2024-05-20 18:04:05 +03:00
queue[].signal = nil
deallocShared(queue)
return failure(err.msg)
else:
deallocShared(queue)
return success()
2024-05-20 17:15:51 +03:00
2024-05-20 22:39:26 +03:00
proc newSignalQueue*[T](maxItems: int = 0): ?!SignalQueuePtr[T] =
2024-05-20 17:15:51 +03:00
## Create a signal queue compatible with Chronos async.
2024-05-20 22:39:26 +03:00
result = success cast[ptr SignalQueue[T]](allocShared0(sizeof(SignalQueue[T])))
without signal =? ThreadSignalPtr.new().mapFailure, sigErr:
return failure(sigErr)
result[].signal = signal
result[].chan.open(maxItems)
2024-05-20 17:15:51 +03:00
proc send*[T](queue: SignalQueuePtr[T], msg: T): ?!void {.raises: [].} =
2024-05-20 18:32:14 +03:00
## Sends a message from a regular thread. `msg` is deep copied. May block
2024-05-20 17:15:51 +03:00
try:
queue[].chan.send(msg)
except Exception as exc:
return failure(exc.msg)
2024-05-20 22:39:26 +03:00
without wasSent =? queue[].signal.fireSync(InfiniteDuration).mapFailure, sigErr:
return failure(sigErr)
if wasSent:
2024-05-20 18:32:14 +03:00
return ok()
else:
2024-05-20 18:13:34 +03:00
return failure("ThreadSignalPtr not signalled in time")
2024-05-20 17:15:51 +03:00
proc recvAsync*[T](queue: SignalQueuePtr[T]): Future[?!T] {.async.} =
## Async compatible receive from queue. Pauses async execution until
## an item is received from the queue
await wait(queue.signal)
let res = queue.chan.tryRecv()
if not res.dataAvailable:
return failure("unable to retrieve expected queue value")
else:
return success(res.msg)