try queue setup

This commit is contained in:
Jaremy Creechley 2024-05-20 17:54:06 +03:00
parent b6180881ea
commit 899a0f6f95
No known key found for this signature in database
GPG Key ID: 4E66FB67B21D3300
2 changed files with 5 additions and 17 deletions

View File

@ -18,16 +18,11 @@ type AsyncCircomCompat* = object
proc proveTask[H]( proc proveTask[H](
circom: CircomCompat, circom: CircomCompat,
data: ProofInputs[H], data: ProofInputs[H],
results: SignalQueuePtr[Result[CircomProof, string]], results: SignalQueuePtr[?!CircomProof],
) = ) =
var val: Result[CircomProof, string]
let proof = circom.prove(data) let proof = circom.prove(data)
if proof.isOk():
val.ok(proof.get())
else:
val.err(proof.error().msg)
if (let sent = results.send(val); sent.isErr()): if (let sent = results.send(proof); sent.isErr()):
error "Error sending proof results", msg = sent.error().msg error "Error sending proof results", msg = sent.error().msg
proc prove*[H]( proc prove*[H](
@ -36,13 +31,13 @@ proc prove*[H](
## Generates proof using circom-compat asynchronously ## Generates proof using circom-compat asynchronously
## ##
without queue =? newSignalQueue[Result[CircomProof, string]](), err: without queue =? newSignalQueue[?!CircomProof](), err:
return failure(err) return failure(err)
defer: defer:
if (let res = queue.release(); res.isErr): if (let res = queue.release(); res.isErr):
error "Error releasing proof queue ", msg = res.error().msg error "Error releasing proof queue ", msg = res.error().msg
proc spawnTask() = template spawnTask() =
self.tp.spawn proveTask(self.circom, input, queue) self.tp.spawn proveTask(self.circom, input, queue)
spawnTask() spawnTask()
@ -50,7 +45,7 @@ proc prove*[H](
without taskRes =? await queue.recvAsync(), err: without taskRes =? await queue.recvAsync(), err:
return failure(err) return failure(err)
without proof =? taskRes.mapFailure, err: without proof =? taskRes, err:
return failure(err) return failure(err)
success(proof) success(proof)

View File

@ -63,13 +63,6 @@ proc send*[T](queue: SignalQueuePtr[T], msg: T): ?!void {.raises: [].} =
return failure(res.error()) return failure(res.error())
result = ok() result = ok()
proc recv*[T](queue: SignalQueue[T]): ?!T =
## Receive item from queue, blocking.
try:
ok(queue.chan[].recv())
except Exception as exc:
failure(exc.msg)
proc recvAsync*[T](queue: SignalQueuePtr[T]): Future[?!T] {.async.} = proc recvAsync*[T](queue: SignalQueuePtr[T]): Future[?!T] {.async.} =
## Async compatible receive from queue. Pauses async execution until ## Async compatible receive from queue. Pauses async execution until
## an item is received from the queue ## an item is received from the queue