revert back erasure code
This commit is contained in:
parent
eb22397ca7
commit
1e2c40d86d
|
@ -18,11 +18,14 @@ import pkg/questionable/results
|
||||||
import ./backend
|
import ./backend
|
||||||
import ../errors
|
import ../errors
|
||||||
import ../logutils
|
import ../logutils
|
||||||
import ../utils/asyncthreads
|
|
||||||
|
|
||||||
logScope:
|
logScope:
|
||||||
topics = "codex asyncerasure"
|
topics = "codex asyncerasure"
|
||||||
|
|
||||||
|
const
|
||||||
|
CompletitionTimeout = 1.seconds # Maximum await time for completition after receiving a signal
|
||||||
|
CompletitionRetryDelay = 10.millis
|
||||||
|
|
||||||
type
|
type
|
||||||
EncoderBackendPtr = ptr EncoderBackend
|
EncoderBackendPtr = ptr EncoderBackend
|
||||||
DecoderBackendPtr = ptr DecoderBackend
|
DecoderBackendPtr = ptr DecoderBackend
|
||||||
|
@ -121,6 +124,20 @@ proc proxySpawnDecodeTask(
|
||||||
): Flowvar[DecodeTaskResult] =
|
): Flowvar[DecodeTaskResult] =
|
||||||
tp.spawn decodeTask(args, data[], parity[])
|
tp.spawn decodeTask(args, data[], parity[])
|
||||||
|
|
||||||
|
proc awaitResult[T](signal: ThreadSignalPtr, handle: Flowvar[T]): Future[?!T] {.async.} =
|
||||||
|
await wait(signal)
|
||||||
|
|
||||||
|
var
|
||||||
|
res: T
|
||||||
|
awaitTotal: Duration
|
||||||
|
while awaitTotal < CompletitionTimeout:
|
||||||
|
if handle.tryComplete(res):
|
||||||
|
return success(res)
|
||||||
|
else:
|
||||||
|
awaitTotal += CompletitionRetryDelay
|
||||||
|
await sleepAsync(CompletitionRetryDelay)
|
||||||
|
|
||||||
|
return failure("Task signaled finish but didn't return any result within " & $CompletitionRetryDelay)
|
||||||
|
|
||||||
proc asyncEncode*(
|
proc asyncEncode*(
|
||||||
tp: Taskpool,
|
tp: Taskpool,
|
||||||
|
@ -138,7 +155,7 @@ proc asyncEncode*(
|
||||||
args = EncodeTaskArgs(signal: signal, backend: unsafeAddr backend, blockSize: blockSize, ecM: ecM)
|
args = EncodeTaskArgs(signal: signal, backend: unsafeAddr backend, blockSize: blockSize, ecM: ecM)
|
||||||
handle = proxySpawnEncodeTask(tp, args, data)
|
handle = proxySpawnEncodeTask(tp, args, data)
|
||||||
|
|
||||||
without res =? await awaitThreadResult(signal, handle), err:
|
without res =? await awaitResult(signal, handle), err:
|
||||||
return failure(err)
|
return failure(err)
|
||||||
|
|
||||||
if res.isOk:
|
if res.isOk:
|
||||||
|
@ -173,7 +190,7 @@ proc asyncDecode*(
|
||||||
args = DecodeTaskArgs(signal: signal, backend: unsafeAddr backend, blockSize: blockSize, ecK: ecK)
|
args = DecodeTaskArgs(signal: signal, backend: unsafeAddr backend, blockSize: blockSize, ecK: ecK)
|
||||||
handle = proxySpawnDecodeTask(tp, args, data, parity)
|
handle = proxySpawnDecodeTask(tp, args, data, parity)
|
||||||
|
|
||||||
without res =? await awaitThreadResult(signal, handle), err:
|
without res =? await awaitResult(signal, handle), err:
|
||||||
return failure(err)
|
return failure(err)
|
||||||
|
|
||||||
if res.isOk:
|
if res.isOk:
|
||||||
|
|
|
@ -10,25 +10,6 @@ const
|
||||||
CompletionTimeout* = 1.seconds
|
CompletionTimeout* = 1.seconds
|
||||||
# Maximum await time for completition after receiving a signal
|
# Maximum await time for completition after receiving a signal
|
||||||
|
|
||||||
proc awaitThreadResult*[T](
|
|
||||||
signal: ThreadSignalPtr, handle: Flowvar[T]
|
|
||||||
): Future[?!T] {.async.} =
|
|
||||||
await wait(signal)
|
|
||||||
|
|
||||||
var
|
|
||||||
res: T
|
|
||||||
awaitTotal: Duration
|
|
||||||
|
|
||||||
while awaitTotal < CompletionTimeout:
|
|
||||||
if handle.tryComplete(res): ## TODO: pretty sure this leaks currently
|
|
||||||
return success(res)
|
|
||||||
else:
|
|
||||||
awaitTotal += CompletionRetryDelay
|
|
||||||
await sleepAsync(CompletionRetryDelay)
|
|
||||||
|
|
||||||
return failure(
|
|
||||||
"Task signaled finish but didn't return any result within " & $CompletionRetryDelay
|
|
||||||
)
|
|
||||||
|
|
||||||
type
|
type
|
||||||
SignalQueue[T] = object
|
SignalQueue[T] = object
|
||||||
|
|
Loading…
Reference in New Issue