From f90c4cdb66aa730132ab0b70225cd91b02e9e49c Mon Sep 17 00:00:00 2001 From: Jaremy Creechley Date: Mon, 20 May 2024 18:13:34 +0300 Subject: [PATCH] try queue setup --- codex/slots/proofs/backends/asynccircoms.nim | 5 ++--- codex/utils/asyncthreads.nim | 22 ++++++++++++-------- 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/codex/slots/proofs/backends/asynccircoms.nim b/codex/slots/proofs/backends/asynccircoms.nim index 431f7b16..2a632873 100644 --- a/codex/slots/proofs/backends/asynccircoms.nim +++ b/codex/slots/proofs/backends/asynccircoms.nim @@ -16,9 +16,7 @@ type AsyncCircomCompat* = object tp*: Taskpool proc proveTask[H]( - circom: CircomCompat, - data: ProofInputs[H], - results: SignalQueuePtr[?!CircomProof], + circom: CircomCompat, data: ProofInputs[H], results: SignalQueuePtr[?!CircomProof] ) = let proof = circom.prove(data) @@ -35,6 +33,7 @@ proc prove*[H]( template spawnTask() = self.tp.spawn proveTask(self.circom, input, queue) + spawnTask() let taskRes = await queue.recvAsync() diff --git a/codex/utils/asyncthreads.nim b/codex/utils/asyncthreads.nim index ba9b76b4..b4bf9cde 100644 --- a/codex/utils/asyncthreads.nim +++ b/codex/utils/asyncthreads.nim @@ -7,9 +7,12 @@ import pkg/questionable/results const CompletionRetryDelay* = 10.millis - CompletionTimeout* = 1.seconds # Maximum await time for completition after receiving a signal + CompletionTimeout* = 1.seconds + # Maximum await time for completition after receiving a signal -proc awaitThreadResult*[T](signal: ThreadSignalPtr, handle: Flowvar[T]): Future[?!T] {.async.} = +proc awaitThreadResult*[T]( + signal: ThreadSignalPtr, handle: Flowvar[T] +): Future[?!T] {.async.} = await wait(signal) var @@ -23,7 +26,9 @@ proc awaitThreadResult*[T](signal: ThreadSignalPtr, handle: Flowvar[T]): Future[ awaitTotal += CompletionRetryDelay await sleepAsync(CompletionRetryDelay) - return failure("Task signaled finish but didn't return any result within " & $CompletionRetryDelay) + return failure( + "Task signaled finish but didn't return any result within " & $CompletionRetryDelay + ) type SignalQueue[T] = object @@ -40,9 +45,7 @@ proc release*[T](queue: SignalQueuePtr[T]): ?!void = result = failure(err.msg) deallocShared(queue) -proc newSignalQueue*[T]( - maxItems: int = 0 -): ?!SignalQueuePtr[T] = +proc newSignalQueue*[T](maxItems: int = 0): ?!SignalQueuePtr[T] = ## Create a signal queue compatible with Chronos async. result = success cast[ptr SignalQueue[T]](allocShared0(sizeof(SignalQueue[T]))) without signal =? ThreadSignalPtr.new().mapFailure, err: @@ -59,9 +62,10 @@ proc send*[T](queue: SignalQueuePtr[T], msg: T): ?!void {.raises: [].} = except Exception as exc: return failure(exc.msg) - let res = queue[].signal.fireSync() - if res.isErr(): - return failure(res.error()) + without wasSent =? queue[].signal.fireSync(InfiniteDuration).mapFailure, err: + return failure(err) + if not wasSent: + return failure("ThreadSignalPtr not signalled in time") result = ok() proc recvAsync*[T](queue: SignalQueuePtr[T]): Future[?!T] {.async.} =