diff --git a/codex/erasure/asyncbackend.nim b/codex/erasure/asyncbackend.nim index 4f757a89..f3bca851 100644 --- a/codex/erasure/asyncbackend.nim +++ b/codex/erasure/asyncbackend.nim @@ -18,11 +18,14 @@ import pkg/questionable/results import ./backend import ../errors import ../logutils -import ../utils/asyncthreads logScope: topics = "codex asyncerasure" +const + CompletitionTimeout = 1.seconds # Maximum await time for completition after receiving a signal + CompletitionRetryDelay = 10.millis + type EncoderBackendPtr = ptr EncoderBackend DecoderBackendPtr = ptr DecoderBackend @@ -121,6 +124,20 @@ proc proxySpawnDecodeTask( ): Flowvar[DecodeTaskResult] = tp.spawn decodeTask(args, data[], parity[]) +proc awaitResult[T](signal: ThreadSignalPtr, handle: Flowvar[T]): Future[?!T] {.async.} = + await wait(signal) + + var + res: T + awaitTotal: Duration + while awaitTotal < CompletitionTimeout: + if handle.tryComplete(res): + return success(res) + else: + awaitTotal += CompletitionRetryDelay + await sleepAsync(CompletitionRetryDelay) + + return failure("Task signaled finish but didn't return any result within " & $CompletitionRetryDelay) proc asyncEncode*( tp: Taskpool, @@ -138,7 +155,7 @@ proc asyncEncode*( args = EncodeTaskArgs(signal: signal, backend: unsafeAddr backend, blockSize: blockSize, ecM: ecM) handle = proxySpawnEncodeTask(tp, args, data) - without res =? await awaitThreadResult(signal, handle), err: + without res =? await awaitResult(signal, handle), err: return failure(err) if res.isOk: @@ -173,7 +190,7 @@ proc asyncDecode*( args = DecodeTaskArgs(signal: signal, backend: unsafeAddr backend, blockSize: blockSize, ecK: ecK) handle = proxySpawnDecodeTask(tp, args, data, parity) - without res =? await awaitThreadResult(signal, handle), err: + without res =? await awaitResult(signal, handle), err: return failure(err) if res.isOk: diff --git a/codex/utils/asyncthreads.nim b/codex/utils/asyncthreads.nim index e9cd08b7..c878c47d 100644 --- a/codex/utils/asyncthreads.nim +++ b/codex/utils/asyncthreads.nim @@ -10,25 +10,6 @@ const CompletionTimeout* = 1.seconds # Maximum await time for completition after receiving a signal -proc awaitThreadResult*[T]( - signal: ThreadSignalPtr, handle: Flowvar[T] -): Future[?!T] {.async.} = - await wait(signal) - - var - res: T - awaitTotal: Duration - - while awaitTotal < CompletionTimeout: - if handle.tryComplete(res): ## TODO: pretty sure this leaks currently - return success(res) - else: - awaitTotal += CompletionRetryDelay - await sleepAsync(CompletionRetryDelay) - - return failure( - "Task signaled finish but didn't return any result within " & $CompletionRetryDelay - ) type SignalQueue[T] = object