From 38ac70809ec9bcd61712ac816d5ce584575ccc8d Mon Sep 17 00:00:00 2001 From: Jaremy Creechley Date: Fri, 17 May 2024 00:39:07 +0300 Subject: [PATCH] add asynthreads utils --- codex/erasure/asyncbackend.nim | 23 +++--------------- codex/slots/proofs/backends/asynccircoms.nim | 4 +++- codex/utils/asyncthreads.nim | 25 ++++++++++++++++++++ 3 files changed, 31 insertions(+), 21 deletions(-) create mode 100644 codex/utils/asyncthreads.nim diff --git a/codex/erasure/asyncbackend.nim b/codex/erasure/asyncbackend.nim index f3bca851..4f757a89 100644 --- a/codex/erasure/asyncbackend.nim +++ b/codex/erasure/asyncbackend.nim @@ -18,14 +18,11 @@ import pkg/questionable/results import ./backend import ../errors import ../logutils +import ../utils/asyncthreads logScope: topics = "codex asyncerasure" -const - CompletitionTimeout = 1.seconds # Maximum await time for completition after receiving a signal - CompletitionRetryDelay = 10.millis - type EncoderBackendPtr = ptr EncoderBackend DecoderBackendPtr = ptr DecoderBackend @@ -124,20 +121,6 @@ proc proxySpawnDecodeTask( ): Flowvar[DecodeTaskResult] = tp.spawn decodeTask(args, data[], parity[]) -proc awaitResult[T](signal: ThreadSignalPtr, handle: Flowvar[T]): Future[?!T] {.async.} = - await wait(signal) - - var - res: T - awaitTotal: Duration - while awaitTotal < CompletitionTimeout: - if handle.tryComplete(res): - return success(res) - else: - awaitTotal += CompletitionRetryDelay - await sleepAsync(CompletitionRetryDelay) - - return failure("Task signaled finish but didn't return any result within " & $CompletitionRetryDelay) proc asyncEncode*( tp: Taskpool, @@ -155,7 +138,7 @@ proc asyncEncode*( args = EncodeTaskArgs(signal: signal, backend: unsafeAddr backend, blockSize: blockSize, ecM: ecM) handle = proxySpawnEncodeTask(tp, args, data) - without res =? await awaitResult(signal, handle), err: + without res =? await awaitThreadResult(signal, handle), err: return failure(err) if res.isOk: @@ -190,7 +173,7 @@ proc asyncDecode*( args = DecodeTaskArgs(signal: signal, backend: unsafeAddr backend, blockSize: blockSize, ecK: ecK) handle = proxySpawnDecodeTask(tp, args, data, parity) - without res =? await awaitResult(signal, handle), err: + without res =? await awaitThreadResult(signal, handle), err: return failure(err) if res.isOk: diff --git a/codex/slots/proofs/backends/asynccircoms.nim b/codex/slots/proofs/backends/asynccircoms.nim index 2801ea18..e9589274 100644 --- a/codex/slots/proofs/backends/asynccircoms.nim +++ b/codex/slots/proofs/backends/asynccircoms.nim @@ -52,9 +52,11 @@ proc prove*[H]( without signal =? ThreadSignalPtr.new().mapFailure, err: return failure(err) - let args = ProveTaskArgs(signal, self.params) + let args = ProveTaskArgs(signal: signal, params: self.params) self.tp.spawn proveTask(args, input) + await wait(signal) + proc verify*[H]( self: AsyncCircomCompat, proof: CircomProof, inputs: ProofInputs[H] ): Future[?!bool] {.async.} = diff --git a/codex/utils/asyncthreads.nim b/codex/utils/asyncthreads.nim new file mode 100644 index 00000000..7dc04882 --- /dev/null +++ b/codex/utils/asyncthreads.nim @@ -0,0 +1,25 @@ + +import pkg/taskpools +import pkg/taskpools/flowvars +import pkg/chronos +import pkg/chronos/threadsync +import pkg/questionable/results + +const + CompletionRetryDelay* = 10.millis + CompletionTimeout* = 1.seconds # Maximum await time for completition after receiving a signal + +proc awaitThreadResult*[T](signal: ThreadSignalPtr, handle: Flowvar[T]): Future[?!T] {.async.} = + await wait(signal) + + var + res: T + awaitTotal: Duration + while awaitTotal < CompletionTimeout: + if handle.tryComplete(res): + return success(res) + else: + awaitTotal += CompletionRetryDelay + await sleepAsync(CompletionRetryDelay) + + return failure("Task signaled finish but didn't return any result within " & $CompletionRetryDelay) \ No newline at end of file