add asynthreads utils

This commit is contained in:
Jaremy Creechley 2024-05-17 00:39:07 +03:00
parent 542ba13222
commit 38ac70809e
No known key found for this signature in database
GPG Key ID: 4E66FB67B21D3300
3 changed files with 31 additions and 21 deletions

View File

@ -18,14 +18,11 @@ import pkg/questionable/results
import ./backend import ./backend
import ../errors import ../errors
import ../logutils import ../logutils
import ../utils/asyncthreads
logScope: logScope:
topics = "codex asyncerasure" topics = "codex asyncerasure"
const
CompletitionTimeout = 1.seconds # Maximum await time for completition after receiving a signal
CompletitionRetryDelay = 10.millis
type type
EncoderBackendPtr = ptr EncoderBackend EncoderBackendPtr = ptr EncoderBackend
DecoderBackendPtr = ptr DecoderBackend DecoderBackendPtr = ptr DecoderBackend
@ -124,20 +121,6 @@ proc proxySpawnDecodeTask(
): Flowvar[DecodeTaskResult] = ): Flowvar[DecodeTaskResult] =
tp.spawn decodeTask(args, data[], parity[]) tp.spawn decodeTask(args, data[], parity[])
proc awaitResult[T](signal: ThreadSignalPtr, handle: Flowvar[T]): Future[?!T] {.async.} =
await wait(signal)
var
res: T
awaitTotal: Duration
while awaitTotal < CompletitionTimeout:
if handle.tryComplete(res):
return success(res)
else:
awaitTotal += CompletitionRetryDelay
await sleepAsync(CompletitionRetryDelay)
return failure("Task signaled finish but didn't return any result within " & $CompletitionRetryDelay)
proc asyncEncode*( proc asyncEncode*(
tp: Taskpool, tp: Taskpool,
@ -155,7 +138,7 @@ proc asyncEncode*(
args = EncodeTaskArgs(signal: signal, backend: unsafeAddr backend, blockSize: blockSize, ecM: ecM) args = EncodeTaskArgs(signal: signal, backend: unsafeAddr backend, blockSize: blockSize, ecM: ecM)
handle = proxySpawnEncodeTask(tp, args, data) handle = proxySpawnEncodeTask(tp, args, data)
without res =? await awaitResult(signal, handle), err: without res =? await awaitThreadResult(signal, handle), err:
return failure(err) return failure(err)
if res.isOk: if res.isOk:
@ -190,7 +173,7 @@ proc asyncDecode*(
args = DecodeTaskArgs(signal: signal, backend: unsafeAddr backend, blockSize: blockSize, ecK: ecK) args = DecodeTaskArgs(signal: signal, backend: unsafeAddr backend, blockSize: blockSize, ecK: ecK)
handle = proxySpawnDecodeTask(tp, args, data, parity) handle = proxySpawnDecodeTask(tp, args, data, parity)
without res =? await awaitResult(signal, handle), err: without res =? await awaitThreadResult(signal, handle), err:
return failure(err) return failure(err)
if res.isOk: if res.isOk:

View File

@ -52,9 +52,11 @@ proc prove*[H](
without signal =? ThreadSignalPtr.new().mapFailure, err: without signal =? ThreadSignalPtr.new().mapFailure, err:
return failure(err) return failure(err)
let args = ProveTaskArgs(signal, self.params) let args = ProveTaskArgs(signal: signal, params: self.params)
self.tp.spawn proveTask(args, input) self.tp.spawn proveTask(args, input)
await wait(signal)
proc verify*[H]( proc verify*[H](
self: AsyncCircomCompat, proof: CircomProof, inputs: ProofInputs[H] self: AsyncCircomCompat, proof: CircomProof, inputs: ProofInputs[H]
): Future[?!bool] {.async.} = ): Future[?!bool] {.async.} =

View File

@ -0,0 +1,25 @@
import pkg/taskpools
import pkg/taskpools/flowvars
import pkg/chronos
import pkg/chronos/threadsync
import pkg/questionable/results
const
CompletionRetryDelay* = 10.millis
CompletionTimeout* = 1.seconds # Maximum await time for completition after receiving a signal
proc awaitThreadResult*[T](signal: ThreadSignalPtr, handle: Flowvar[T]): Future[?!T] {.async.} =
await wait(signal)
var
res: T
awaitTotal: Duration
while awaitTotal < CompletionTimeout:
if handle.tryComplete(res):
return success(res)
else:
awaitTotal += CompletionRetryDelay
await sleepAsync(CompletionRetryDelay)
return failure("Task signaled finish but didn't return any result within " & $CompletionRetryDelay)