try queue setup

This commit is contained in:
Jaremy Creechley 2024-05-20 17:15:51 +03:00
parent 30e087efb2
commit 98abf47a23
No known key found for this signature in database
GPG Key ID: 4E66FB67B21D3300
2 changed files with 87 additions and 37 deletions

View File

@ -11,38 +11,36 @@ import ../../../utils/asyncthreads
import ./circomcompat import ./circomcompat
type type AsyncCircomCompat* = object
AsyncCircomCompat* = object
params*: CircomCompatParams params*: CircomCompatParams
tp*: Taskpool tp*: Taskpool
# Args objects are missing seq[seq[byte]] field, to avoid unnecessary data copy
ProveTaskArgs* = object
signal: ThreadSignalPtr
params: CircomCompatParams
var circomBackend {.threadvar.}: Option[CircomCompat] var circomBackend {.threadvar.}: Option[CircomCompat]
proc proveTask[H]( proc proveTask[H](
args: ProveTaskArgs, data: ProofInputs[H] params: CircomCompatParams,
): Result[CircomProof, string] = data: ProofInputs[H],
results: SignalQueuePtr[Result[CircomProof, string]],
) =
try: try:
if circomBackend.isNone: if circomBackend.isNone:
circomBackend = some CircomCompat.init(args.params) circomBackend = some CircomCompat.init(params)
else: else:
assert circomBackend.get().params == args.params assert circomBackend.get().params == params
let res = circomBackend.get().prove(data) let proof = circomBackend.get().prove(data)
if res.isOk: var val: Result[CircomProof, string]
return ok(res.get()) if proof.isOk():
val.ok(proof.get())
else: else:
return err(res.error().msg) val.err(proof.error().msg)
if (let sent = results.send(val); sent.isErr()):
error "Error sending proof results", msg = sent.error().msg
except CatchableError as exception: except CatchableError as exception:
return err(exception.msg) var err = Result[CircomProof, string].err(exception.msg)
finally: if (let res = results.send(err); res.isErr()):
if err =? args.signal.fireSync().mapFailure.errorOption(): error "Error sending proof results", msg = res.error().msg
error "Error firing signal in proveTask ", msg = err.msg
proc prove*[H]( proc prove*[H](
self: AsyncCircomCompat, input: ProofInputs[H] self: AsyncCircomCompat, input: ProofInputs[H]
@ -50,27 +48,24 @@ proc prove*[H](
## Generates proof using circom-compat asynchronously ## Generates proof using circom-compat asynchronously
## ##
without signal =? ThreadSignalPtr.new().mapFailure, err: without queue =? newSignalQueue[Result[CircomProof, string]](), err:
return failure(err) return failure(err)
defer:
if err =? signal.close().mapFailure.errorOption():
error "Error closing signal", msg = $err.msg
let args = ProveTaskArgs(signal: signal, params: self.params) proc spawnTask() =
proc spawnTask(): Flowvar[Result[CircomProof, string]] = self.tp.spawn proveTask(self.params, input, queue)
self.tp.spawn proveTask(args, input)
let flowvar = spawnTask()
without taskRes =? await awaitThreadResult(signal, flowvar), err: spawnTask()
without taskRes =? await queue.recvAsync(), err:
return failure(err) return failure(err)
if (let res = queue.release(); res.isErr):
return failure "Error releasing proof queue " & res.error().msg
without proof =? taskRes.mapFailure, err: without proof =? taskRes.mapFailure, err:
let res: ?!CircomProof = failure(err) return failure(err)
return res
let pf: CircomProof = proof
success(pf)
success(proof)
proc verify*[H]( proc verify*[H](
self: AsyncCircomCompat, proof: CircomProof, inputs: ProofInputs[H] self: AsyncCircomCompat, proof: CircomProof, inputs: ProofInputs[H]

View File

@ -1,4 +1,4 @@
import std/options
import pkg/taskpools import pkg/taskpools
import pkg/taskpools/flowvars import pkg/taskpools/flowvars
import pkg/chronos import pkg/chronos
@ -24,3 +24,58 @@ proc awaitThreadResult*[T](signal: ThreadSignalPtr, handle: Flowvar[T]): Future[
await sleepAsync(CompletionRetryDelay) await sleepAsync(CompletionRetryDelay)
return failure("Task signaled finish but didn't return any result within " & $CompletionRetryDelay) return failure("Task signaled finish but didn't return any result within " & $CompletionRetryDelay)
type
SignalQueue[T] = object
signal: ThreadSignalPtr
chan*: Channel[T]
SignalQueuePtr*[T] = ptr SignalQueue[T]
proc release*[T](queue: SignalQueuePtr[T]): ?!void =
## Call to properly dispose of a SignalQueue.
queue[].chan.close()
if err =? queue[].signal.close().mapFailure.errorOption():
return failure(err.msg)
deallocShared(queue)
proc newSignalQueue*[T](
maxItems: int = 0
): ?!SignalQueuePtr[T] =
## Create a signal queue compatible with Chronos async.
result = success cast[ptr SignalQueue[T]](allocShared0(sizeof(SignalQueue[T])))
without signal =? ThreadSignalPtr.new().mapFailure, err:
return failure(err)
result[].signal = signal
result[].chan.open(maxItems)
proc send*[T](queue: SignalQueuePtr[T], msg: T): ?!void {.raises: [].} =
## Sends a message to a thread. `msg` is copied.
## Note: may be blocking.
##
try:
queue[].chan.send(msg)
except Exception as exc:
return failure(exc.msg)
let res = queue[].signal.fireSync()
if res.isErr():
return failure(res.error())
result = ok()
proc recv*[T](queue: SignalQueue[T]): ?!T =
## Receive item from queue, blocking.
try:
ok(queue.chan[].recv())
except Exception as exc:
failure(exc.msg)
proc recvAsync*[T](queue: SignalQueuePtr[T]): Future[?!T] {.async.} =
## Async compatible receive from queue. Pauses async execution until
## an item is received from the queue
await wait(queue.signal)
let res = queue.chan.tryRecv()
if not res.dataAvailable:
return failure("unable to retrieve expected queue value")
else:
return success(res.msg)