try queue setup

This commit is contained in:
Jaremy Creechley 2024-05-20 18:13:34 +03:00
parent 9e2820944d
commit f90c4cdb66
No known key found for this signature in database
GPG Key ID: 4E66FB67B21D3300
2 changed files with 15 additions and 12 deletions

View File

@ -16,9 +16,7 @@ type AsyncCircomCompat* = object
tp*: Taskpool tp*: Taskpool
proc proveTask[H]( proc proveTask[H](
circom: CircomCompat, circom: CircomCompat, data: ProofInputs[H], results: SignalQueuePtr[?!CircomProof]
data: ProofInputs[H],
results: SignalQueuePtr[?!CircomProof],
) = ) =
let proof = circom.prove(data) let proof = circom.prove(data)
@ -35,6 +33,7 @@ proc prove*[H](
template spawnTask() = template spawnTask() =
self.tp.spawn proveTask(self.circom, input, queue) self.tp.spawn proveTask(self.circom, input, queue)
spawnTask() spawnTask()
let taskRes = await queue.recvAsync() let taskRes = await queue.recvAsync()

View File

@ -7,9 +7,12 @@ import pkg/questionable/results
const const
CompletionRetryDelay* = 10.millis CompletionRetryDelay* = 10.millis
CompletionTimeout* = 1.seconds # Maximum await time for completition after receiving a signal CompletionTimeout* = 1.seconds
# Maximum await time for completition after receiving a signal
proc awaitThreadResult*[T](signal: ThreadSignalPtr, handle: Flowvar[T]): Future[?!T] {.async.} = proc awaitThreadResult*[T](
signal: ThreadSignalPtr, handle: Flowvar[T]
): Future[?!T] {.async.} =
await wait(signal) await wait(signal)
var var
@ -23,7 +26,9 @@ proc awaitThreadResult*[T](signal: ThreadSignalPtr, handle: Flowvar[T]): Future[
awaitTotal += CompletionRetryDelay awaitTotal += CompletionRetryDelay
await sleepAsync(CompletionRetryDelay) await sleepAsync(CompletionRetryDelay)
return failure("Task signaled finish but didn't return any result within " & $CompletionRetryDelay) return failure(
"Task signaled finish but didn't return any result within " & $CompletionRetryDelay
)
type type
SignalQueue[T] = object SignalQueue[T] = object
@ -40,9 +45,7 @@ proc release*[T](queue: SignalQueuePtr[T]): ?!void =
result = failure(err.msg) result = failure(err.msg)
deallocShared(queue) deallocShared(queue)
proc newSignalQueue*[T]( proc newSignalQueue*[T](maxItems: int = 0): ?!SignalQueuePtr[T] =
maxItems: int = 0
): ?!SignalQueuePtr[T] =
## Create a signal queue compatible with Chronos async. ## Create a signal queue compatible with Chronos async.
result = success cast[ptr SignalQueue[T]](allocShared0(sizeof(SignalQueue[T]))) result = success cast[ptr SignalQueue[T]](allocShared0(sizeof(SignalQueue[T])))
without signal =? ThreadSignalPtr.new().mapFailure, err: without signal =? ThreadSignalPtr.new().mapFailure, err:
@ -59,9 +62,10 @@ proc send*[T](queue: SignalQueuePtr[T], msg: T): ?!void {.raises: [].} =
except Exception as exc: except Exception as exc:
return failure(exc.msg) return failure(exc.msg)
let res = queue[].signal.fireSync() without wasSent =? queue[].signal.fireSync(InfiniteDuration).mapFailure, err:
if res.isErr(): return failure(err)
return failure(res.error()) if not wasSent:
return failure("ThreadSignalPtr not signalled in time")
result = ok() result = ok()
proc recvAsync*[T](queue: SignalQueuePtr[T]): Future[?!T] {.async.} = proc recvAsync*[T](queue: SignalQueuePtr[T]): Future[?!T] {.async.} =