mirror of
https://github.com/logos-storage/logos-storage-nim.git
synced 2026-01-02 13:33:10 +00:00
Merge e1ab1127eb3f5b40af089f01370b71a996189121 into 60861d6af841d96085db366d44c1d543b7659fa5
This commit is contained in:
commit
c5f69e72b1
@ -26,18 +26,14 @@ method release*(self: ErasureBackend) {.base, gcsafe.} =
|
||||
raiseAssert("not implemented!")
|
||||
|
||||
method encode*(
|
||||
self: EncoderBackend,
|
||||
buffers, parity: ptr UncheckedArray[ptr UncheckedArray[byte]],
|
||||
dataLen, parityLen: int,
|
||||
self: EncoderBackend, data, parity: var openArray[seq[byte]]
|
||||
): Result[void, cstring] {.base, gcsafe.} =
|
||||
## encode buffers using a backend
|
||||
##
|
||||
raiseAssert("not implemented!")
|
||||
|
||||
method decode*(
|
||||
self: DecoderBackend,
|
||||
buffers, parity, recovered: ptr UncheckedArray[ptr UncheckedArray[byte]],
|
||||
dataLen, parityLen, recoveredLen: int,
|
||||
self: DecoderBackend, data, parity, recovered: var openArray[seq[byte]]
|
||||
): Result[void, cstring] {.base, gcsafe.} =
|
||||
## decode buffers using a backend
|
||||
##
|
||||
|
||||
@ -22,13 +22,11 @@ type
|
||||
decoder*: Option[LeoDecoder]
|
||||
|
||||
method encode*(
|
||||
self: LeoEncoderBackend,
|
||||
data, parity: ptr UncheckedArray[ptr UncheckedArray[byte]],
|
||||
dataLen, parityLen: int,
|
||||
self: LeoEncoderBackend, data, parity: var openArray[seq[byte]]
|
||||
): Result[void, cstring] =
|
||||
## Encode data using Leopard backend
|
||||
|
||||
if parityLen == 0:
|
||||
if parity.len == 0:
|
||||
return ok()
|
||||
|
||||
var encoder =
|
||||
@ -38,12 +36,10 @@ method encode*(
|
||||
else:
|
||||
self.encoder.get()
|
||||
|
||||
encoder.encode(data, parity, dataLen, parityLen)
|
||||
encoder.encode(data, parity)
|
||||
|
||||
method decode*(
|
||||
self: LeoDecoderBackend,
|
||||
data, parity, recovered: ptr UncheckedArray[ptr UncheckedArray[byte]],
|
||||
dataLen, parityLen, recoveredLen: int,
|
||||
self: LeoDecoderBackend, data, parity, recovered: var openArray[seq[byte]]
|
||||
): Result[void, cstring] =
|
||||
## Decode data using given Leopard backend
|
||||
|
||||
@ -54,7 +50,7 @@ method decode*(
|
||||
else:
|
||||
self.decoder.get()
|
||||
|
||||
decoder.decode(data, parity, recovered, dataLen, parityLen, recoveredLen)
|
||||
decoder.decode(data, parity, recovered)
|
||||
|
||||
method release*(self: LeoEncoderBackend) =
|
||||
if self.encoder.isSome:
|
||||
|
||||
@ -29,6 +29,7 @@ import ../utils/asynciter
|
||||
import ../indexingstrategy
|
||||
import ../errors
|
||||
import ../utils/arrayutils
|
||||
import ../utils/uniqueptr
|
||||
|
||||
import pkg/stew/byteutils
|
||||
|
||||
@ -68,12 +69,14 @@ type
|
||||
DecoderProvider* =
|
||||
proc(size, blocks, parity: int): DecoderBackend {.raises: [Defect], noSideEffect.}
|
||||
|
||||
Erasure* = ref object
|
||||
Erasure* = object
|
||||
taskPool: Taskpool
|
||||
encoderProvider*: EncoderProvider
|
||||
decoderProvider*: DecoderProvider
|
||||
store*: BlockStore
|
||||
|
||||
ErasureRef* = ref Erasure
|
||||
|
||||
EncodingParams = object
|
||||
ecK: Natural
|
||||
ecM: Natural
|
||||
@ -92,19 +95,18 @@ type
|
||||
EncodeTask = object
|
||||
success: Atomic[bool]
|
||||
erasure: ptr Erasure
|
||||
blocks: ptr UncheckedArray[ptr UncheckedArray[byte]]
|
||||
parity: ptr UncheckedArray[ptr UncheckedArray[byte]]
|
||||
blockSize, blocksLen, parityLen: int
|
||||
blocks: seq[seq[byte]]
|
||||
parity: UniquePtr[seq[seq[byte]]]
|
||||
blockSize, parityLen: int
|
||||
signal: ThreadSignalPtr
|
||||
|
||||
DecodeTask = object
|
||||
success: Atomic[bool]
|
||||
erasure: ptr Erasure
|
||||
blocks: ptr UncheckedArray[ptr UncheckedArray[byte]]
|
||||
parity: ptr UncheckedArray[ptr UncheckedArray[byte]]
|
||||
recovered: ptr UncheckedArray[ptr UncheckedArray[byte]]
|
||||
blockSize, blocksLen: int
|
||||
parityLen, recoveredLen: int
|
||||
blocks: seq[seq[byte]]
|
||||
parity: seq[seq[byte]]
|
||||
recovered: UniquePtr[seq[seq[byte]]]
|
||||
blockSize, recoveredLen: int
|
||||
signal: ThreadSignalPtr
|
||||
|
||||
func indexToPos(steps, idx, step: int): int {.inline.} =
|
||||
@ -118,7 +120,7 @@ func indexToPos(steps, idx, step: int): int {.inline.} =
|
||||
(idx - step) div steps
|
||||
|
||||
proc getPendingBlocks(
|
||||
self: Erasure, manifest: Manifest, indices: seq[int]
|
||||
self: ErasureRef, manifest: Manifest, indices: seq[int]
|
||||
): AsyncIter[(?!bt.Block, int)] =
|
||||
## Get pending blocks iterator
|
||||
##
|
||||
@ -154,7 +156,7 @@ proc getPendingBlocks(
|
||||
AsyncIter[(?!bt.Block, int)].new(genNext, isFinished)
|
||||
|
||||
proc prepareEncodingData(
|
||||
self: Erasure,
|
||||
self: ErasureRef,
|
||||
manifest: Manifest,
|
||||
params: EncodingParams,
|
||||
step: Natural,
|
||||
@ -198,7 +200,7 @@ proc prepareEncodingData(
|
||||
success(resolved.Natural)
|
||||
|
||||
proc prepareDecodingData(
|
||||
self: Erasure,
|
||||
self: ErasureRef,
|
||||
encoded: Manifest,
|
||||
step: Natural,
|
||||
data: ref seq[seq[byte]],
|
||||
@ -294,48 +296,43 @@ proc init*(
|
||||
|
||||
proc leopardEncodeTask(tp: Taskpool, task: ptr EncodeTask) {.gcsafe.} =
|
||||
# Task suitable for running in taskpools - look, no GC!
|
||||
let encoder =
|
||||
task[].erasure.encoderProvider(task[].blockSize, task[].blocksLen, task[].parityLen)
|
||||
let encoder = task[].erasure.encoderProvider(
|
||||
task[].blockSize, task[].blocks.len, task[].parityLen
|
||||
)
|
||||
defer:
|
||||
encoder.release()
|
||||
discard task[].signal.fireSync()
|
||||
|
||||
if (
|
||||
let res =
|
||||
encoder.encode(task[].blocks, task[].parity, task[].blocksLen, task[].parityLen)
|
||||
res.isErr
|
||||
):
|
||||
var parity = newSeqWith(task[].parityLen, newSeq[byte](task[].blockSize))
|
||||
if (let res = encoder.encode(task[].blocks, parity); res.isErr):
|
||||
warn "Error from leopard encoder backend!", error = $res.error
|
||||
|
||||
task[].success.store(false)
|
||||
else:
|
||||
var paritySeq = newSeq[seq[byte]](task[].parityLen)
|
||||
for i in 0 ..< task[].parityLen:
|
||||
var innerSeq = isolate(parity[i])
|
||||
paritySeq[i] = extract(innerSeq)
|
||||
|
||||
task[].parity = newUniquePtr(paritySeq)
|
||||
task[].success.store(true)
|
||||
|
||||
proc asyncEncode*(
|
||||
self: Erasure,
|
||||
blockSize, blocksLen, parityLen: int,
|
||||
blocks: ref seq[seq[byte]],
|
||||
parity: ptr UncheckedArray[ptr UncheckedArray[byte]],
|
||||
): Future[?!void] {.async: (raises: [CancelledError]).} =
|
||||
without threadPtr =? ThreadSignalPtr.new():
|
||||
return failure("Unable to create thread signal")
|
||||
self: ErasureRef, blockSize, parityLen: int, blocks: seq[seq[byte]]
|
||||
): Future[?!seq[seq[byte]]] {.async: (raises: [CancelledError]).} =
|
||||
var threadPtr = ?ThreadSignalPtr.new().mapFailure()
|
||||
|
||||
defer:
|
||||
threadPtr.close().expect("closing once works")
|
||||
|
||||
var data = makeUncheckedArray(blocks)
|
||||
|
||||
defer:
|
||||
dealloc(data)
|
||||
if threadPtr != nil:
|
||||
?threadPtr.close().mapFailure()
|
||||
threadPtr = nil
|
||||
|
||||
## Create an ecode task with block data
|
||||
var task = EncodeTask(
|
||||
erasure: addr self,
|
||||
erasure: cast[ptr Erasure](self),
|
||||
blockSize: blockSize,
|
||||
blocksLen: blocksLen,
|
||||
parityLen: parityLen,
|
||||
blocks: data,
|
||||
parity: parity,
|
||||
blocks: blocks,
|
||||
signal: threadPtr,
|
||||
)
|
||||
|
||||
@ -344,21 +341,20 @@ proc asyncEncode*(
|
||||
self.taskPool.spawn leopardEncodeTask(self.taskPool, addr task)
|
||||
let threadFut = threadPtr.wait()
|
||||
|
||||
if joinErr =? catch(await threadFut.join()).errorOption:
|
||||
if err =? catch(await noCancel threadFut).errorOption:
|
||||
return failure(err)
|
||||
if joinErr of CancelledError:
|
||||
raise (ref CancelledError) joinErr
|
||||
else:
|
||||
return failure(joinErr)
|
||||
if err =? catch(await threadFut.join()).errorOption:
|
||||
?catch(await noCancel threadFut)
|
||||
if err of CancelledError:
|
||||
raise (ref CancelledError) err
|
||||
|
||||
return failure(err)
|
||||
|
||||
if not task.success.load():
|
||||
return failure("Leopard encoding task failed")
|
||||
|
||||
success()
|
||||
success extractValue(task.parity)
|
||||
|
||||
proc encodeData(
|
||||
self: Erasure, manifest: Manifest, params: EncodingParams
|
||||
self: ErasureRef, manifest: Manifest, params: EncodingParams
|
||||
): Future[?!Manifest] {.async.} =
|
||||
## Encode blocks pointed to by the protected manifest
|
||||
##
|
||||
@ -380,17 +376,9 @@ proc encodeData(
|
||||
try:
|
||||
for step in 0 ..< params.steps:
|
||||
# TODO: Don't allocate a new seq every time, allocate once and zero out
|
||||
var
|
||||
data = seq[seq[byte]].new() # number of blocks to encode
|
||||
parity = createDoubleArray(params.ecM, manifest.blockSize.int)
|
||||
defer:
|
||||
freeDoubleArray(parity, params.ecM)
|
||||
var data = seq[seq[byte]].new() # number of blocks to encode
|
||||
|
||||
data[].setLen(params.ecK)
|
||||
# TODO: this is a tight blocking loop so we sleep here to allow
|
||||
# other events to be processed, this should be addressed
|
||||
# by threading
|
||||
await sleepAsync(10.millis)
|
||||
|
||||
without resolved =?
|
||||
(await self.prepareEncodingData(manifest, params, step, data, cids, emptyBlock)),
|
||||
@ -400,21 +388,15 @@ proc encodeData(
|
||||
|
||||
trace "Erasure coding data", data = data[].len
|
||||
|
||||
var parity: seq[seq[byte]]
|
||||
try:
|
||||
if err =? (
|
||||
await self.asyncEncode(
|
||||
manifest.blockSize.int, params.ecK, params.ecM, data, parity
|
||||
)
|
||||
).errorOption:
|
||||
return failure(err)
|
||||
parity = ?(await self.asyncEncode(manifest.blockSize.int, params.ecM, data[]))
|
||||
except CancelledError as exc:
|
||||
raise exc
|
||||
|
||||
var idx = params.rounded + step
|
||||
for j in 0 ..< params.ecM:
|
||||
var innerPtr: ptr UncheckedArray[byte] = parity[][j]
|
||||
without blk =? bt.Block.new(innerPtr.toOpenArray(0, manifest.blockSize.int - 1)),
|
||||
error:
|
||||
without blk =? bt.Block.new(parity[j]), error:
|
||||
trace "Unable to create parity block", err = error.msg
|
||||
return failure(error)
|
||||
|
||||
@ -453,7 +435,7 @@ proc encodeData(
|
||||
return failure(exc)
|
||||
|
||||
proc encode*(
|
||||
self: Erasure,
|
||||
self: ErasureRef,
|
||||
manifest: Manifest,
|
||||
blocks: Natural,
|
||||
parity: Natural,
|
||||
@ -476,58 +458,43 @@ proc encode*(
|
||||
|
||||
proc leopardDecodeTask(tp: Taskpool, task: ptr DecodeTask) {.gcsafe.} =
|
||||
# Task suitable for running in taskpools - look, no GC!
|
||||
let decoder =
|
||||
task[].erasure.decoderProvider(task[].blockSize, task[].blocksLen, task[].parityLen)
|
||||
let decoder = task[].erasure.decoderProvider(
|
||||
task[].blockSize, task[].blocks.len, task[].parity.len
|
||||
)
|
||||
defer:
|
||||
decoder.release()
|
||||
discard task[].signal.fireSync()
|
||||
|
||||
if (
|
||||
let res = decoder.decode(
|
||||
task[].blocks,
|
||||
task[].parity,
|
||||
task[].recovered,
|
||||
task[].blocksLen,
|
||||
task[].parityLen,
|
||||
task[].recoveredLen,
|
||||
)
|
||||
res.isErr
|
||||
):
|
||||
var recovered = newSeqWith(task[].blocks.len, newSeq[byte](task[].blockSize))
|
||||
|
||||
if (let res = decoder.decode(task[].blocks, task[].parity, recovered); res.isErr):
|
||||
warn "Error from leopard decoder backend!", error = $res.error
|
||||
task[].success.store(false)
|
||||
else:
|
||||
var recoveredSeq = newSeq[seq[byte]](task[].blocks.len)
|
||||
for i in 0 ..< task[].blocks.len:
|
||||
var innerSeq = isolate(recovered[i])
|
||||
recoveredSeq[i] = extract(innerSeq)
|
||||
|
||||
task[].recovered = newUniquePtr(recoveredSeq)
|
||||
task[].success.store(true)
|
||||
|
||||
proc asyncDecode*(
|
||||
self: Erasure,
|
||||
blockSize, blocksLen, parityLen: int,
|
||||
blocks, parity: ref seq[seq[byte]],
|
||||
recovered: ptr UncheckedArray[ptr UncheckedArray[byte]],
|
||||
): Future[?!void] {.async: (raises: [CancelledError]).} =
|
||||
without threadPtr =? ThreadSignalPtr.new():
|
||||
return failure("Unable to create thread signal")
|
||||
self: ErasureRef, blockSize: int, blocks, parity: seq[seq[byte]]
|
||||
): Future[?!seq[seq[byte]]] {.async: (raises: [CancelledError]).} =
|
||||
var threadPtr = ?ThreadSignalPtr.new().mapFailure()
|
||||
|
||||
defer:
|
||||
threadPtr.close().expect("closing once works")
|
||||
|
||||
var
|
||||
blockData = makeUncheckedArray(blocks)
|
||||
parityData = makeUncheckedArray(parity)
|
||||
|
||||
defer:
|
||||
dealloc(blockData)
|
||||
dealloc(parityData)
|
||||
if threadPtr != nil:
|
||||
?threadPtr.close().mapFailure()
|
||||
threadPtr = nil
|
||||
|
||||
## Create an decode task with block data
|
||||
var task = DecodeTask(
|
||||
erasure: addr self,
|
||||
erasure: cast[ptr Erasure](self),
|
||||
blockSize: blockSize,
|
||||
blocksLen: blocksLen,
|
||||
parityLen: parityLen,
|
||||
recoveredLen: blocksLen,
|
||||
blocks: blockData,
|
||||
parity: parityData,
|
||||
recovered: recovered,
|
||||
blocks: blocks,
|
||||
parity: parity,
|
||||
signal: threadPtr,
|
||||
)
|
||||
|
||||
@ -536,21 +503,20 @@ proc asyncDecode*(
|
||||
self.taskPool.spawn leopardDecodeTask(self.taskPool, addr task)
|
||||
let threadFut = threadPtr.wait()
|
||||
|
||||
if joinErr =? catch(await threadFut.join()).errorOption:
|
||||
if err =? catch(await noCancel threadFut).errorOption:
|
||||
return failure(err)
|
||||
if joinErr of CancelledError:
|
||||
raise (ref CancelledError) joinErr
|
||||
else:
|
||||
return failure(joinErr)
|
||||
if err =? catch(await threadFut.join()).errorOption:
|
||||
?catch(await noCancel threadFut)
|
||||
if err of CancelledError:
|
||||
raise (ref CancelledError) err
|
||||
|
||||
return failure(err)
|
||||
|
||||
if not task.success.load():
|
||||
return failure("Leopard decoding task failed")
|
||||
|
||||
success()
|
||||
success extractValue(task.recovered)
|
||||
|
||||
proc decodeInternal(
|
||||
self: Erasure, encoded: Manifest
|
||||
self: ErasureRef, encoded: Manifest
|
||||
): Future[?!(ref seq[Cid], seq[Natural])] {.async.} =
|
||||
logScope:
|
||||
steps = encoded.steps
|
||||
@ -566,17 +532,9 @@ proc decodeInternal(
|
||||
cids[].setLen(encoded.blocksCount)
|
||||
try:
|
||||
for step in 0 ..< encoded.steps:
|
||||
# TODO: this is a tight blocking loop so we sleep here to allow
|
||||
# other events to be processed, this should be addressed
|
||||
# by threading
|
||||
await sleepAsync(10.millis)
|
||||
|
||||
var
|
||||
data = seq[seq[byte]].new()
|
||||
parityData = seq[seq[byte]].new()
|
||||
recovered = createDoubleArray(encoded.ecK, encoded.blockSize.int)
|
||||
defer:
|
||||
freeDoubleArray(recovered, encoded.ecK)
|
||||
|
||||
data[].setLen(encoded.ecK) # set len to K
|
||||
parityData[].setLen(encoded.ecM) # set len to M
|
||||
@ -592,26 +550,18 @@ proc decodeInternal(
|
||||
if dataPieces >= encoded.ecK:
|
||||
trace "Retrieved all the required data blocks"
|
||||
continue
|
||||
|
||||
var recovered: seq[seq[byte]]
|
||||
trace "Erasure decoding data"
|
||||
try:
|
||||
if err =? (
|
||||
await self.asyncDecode(
|
||||
encoded.blockSize.int, encoded.ecK, encoded.ecM, data, parityData, recovered
|
||||
)
|
||||
).errorOption:
|
||||
return failure(err)
|
||||
recovered =
|
||||
?(await self.asyncDecode(encoded.blockSize.int, data[], parityData[]))
|
||||
except CancelledError as exc:
|
||||
raise exc
|
||||
|
||||
for i in 0 ..< encoded.ecK:
|
||||
let idx = i * encoded.steps + step
|
||||
if data[i].len <= 0 and not cids[idx].isEmpty:
|
||||
var innerPtr: ptr UncheckedArray[byte] = recovered[][i]
|
||||
|
||||
without blk =? bt.Block.new(
|
||||
innerPtr.toOpenArray(0, encoded.blockSize.int - 1)
|
||||
), error:
|
||||
without blk =? bt.Block.new(recovered[i]), error:
|
||||
trace "Unable to create block!", exc = error.msg
|
||||
return failure(error)
|
||||
|
||||
@ -635,7 +585,7 @@ proc decodeInternal(
|
||||
|
||||
return (cids, recoveredIndices).success
|
||||
|
||||
proc decode*(self: Erasure, encoded: Manifest): Future[?!Manifest] {.async.} =
|
||||
proc decode*(self: ErasureRef, encoded: Manifest): Future[?!Manifest] {.async.} =
|
||||
## Decode a protected manifest into it's original
|
||||
## manifest
|
||||
##
|
||||
@ -667,7 +617,7 @@ proc decode*(self: Erasure, encoded: Manifest): Future[?!Manifest] {.async.} =
|
||||
|
||||
return decoded.success
|
||||
|
||||
proc repair*(self: Erasure, encoded: Manifest): Future[?!void] {.async.} =
|
||||
proc repair*(self: ErasureRef, encoded: Manifest): Future[?!void] {.async.} =
|
||||
## Repair a protected manifest by reconstructing the full dataset
|
||||
##
|
||||
## `encoded` - the encoded (protected) manifest to
|
||||
@ -712,15 +662,15 @@ proc stop*(self: Erasure) {.async.} =
|
||||
return
|
||||
|
||||
proc new*(
|
||||
T: type Erasure,
|
||||
_: type ErasureRef,
|
||||
store: BlockStore,
|
||||
encoderProvider: EncoderProvider,
|
||||
decoderProvider: DecoderProvider,
|
||||
taskPool: Taskpool,
|
||||
): Erasure =
|
||||
## Create a new Erasure instance for encoding and decoding manifests
|
||||
): ErasureRef =
|
||||
## Create a new ErasureRef instance for encoding and decoding manifests
|
||||
##
|
||||
Erasure(
|
||||
ErasureRef(
|
||||
store: store,
|
||||
encoderProvider: encoderProvider,
|
||||
decoderProvider: decoderProvider,
|
||||
|
||||
@ -324,7 +324,7 @@ proc streamEntireDataset(
|
||||
proc erasureJob(): Future[void] {.async: (raises: []).} =
|
||||
try:
|
||||
# Spawn an erasure decoding job
|
||||
let erasure = Erasure.new(
|
||||
let erasure = ErasureRef.new(
|
||||
self.networkStore, leoEncoderProvider, leoDecoderProvider, self.taskpool
|
||||
)
|
||||
without _ =? (await erasure.decode(manifest)), error:
|
||||
@ -567,7 +567,7 @@ proc setupRequest(
|
||||
return failure error
|
||||
|
||||
# Erasure code the dataset according to provided parameters
|
||||
let erasure = Erasure.new(
|
||||
let erasure = ErasureRef.new(
|
||||
self.networkStore.localStore, leoEncoderProvider, leoDecoderProvider, self.taskpool
|
||||
)
|
||||
|
||||
@ -713,7 +713,7 @@ proc onStore(
|
||||
if isRepairing:
|
||||
trace "start repairing slot", slotIdx
|
||||
try:
|
||||
let erasure = Erasure.new(
|
||||
let erasure = ErasureRef.new(
|
||||
self.networkStore, leoEncoderProvider, leoDecoderProvider, self.taskpool
|
||||
)
|
||||
if err =? (await erasure.repair(manifest)).errorOption:
|
||||
|
||||
61
codex/utils/uniqueptr.nim
Normal file
61
codex/utils/uniqueptr.nim
Normal file
@ -0,0 +1,61 @@
|
||||
import std/isolation
|
||||
type UniquePtr*[T] = object
|
||||
## A unique pointer to a seq[seq[T]] in shared memory
|
||||
## Can only be moved, not copied
|
||||
data: ptr T
|
||||
|
||||
proc newUniquePtr*[T](data: sink Isolated[T]): UniquePtr[T] =
|
||||
## Creates a new unique sequence in shared memory
|
||||
## The memory is automatically freed when the object is destroyed
|
||||
result.data = cast[ptr T](allocShared0(sizeof(T)))
|
||||
|
||||
result.data[] = extract(data)
|
||||
|
||||
template newUniquePtr*[T](data: T): UniquePtr[T] =
|
||||
newUniquePtr(isolate(data))
|
||||
|
||||
proc `=destroy`*[T](p: var UniquePtr[T]) =
|
||||
## Destructor for UniquePtr
|
||||
if p.data != nil:
|
||||
deallocShared(p.data)
|
||||
p.data = nil
|
||||
|
||||
proc `=copy`*[T](
|
||||
dest: var UniquePtr[T], src: UniquePtr[T]
|
||||
) {.error: "UniquePtr cannot be copied, only moved".}
|
||||
|
||||
proc `=sink`*[T](dest: var UniquePtr[T], src: UniquePtr[T]) =
|
||||
if dest.data != nil:
|
||||
`=destroy`(dest)
|
||||
dest.data = src.data
|
||||
# We need to nil out the source data to prevent double-free
|
||||
# This is handled by Nim's destructive move semantics
|
||||
|
||||
proc `[]`*[T](p: UniquePtr[T]): lent T =
|
||||
## Access the data (read-only)
|
||||
if p.data == nil:
|
||||
raise newException(NilAccessDefect, "accessing nil UniquePtr")
|
||||
p.data[]
|
||||
|
||||
# proc `[]`*[T](p: var UniquePtr[T]): var T =
|
||||
# ## Access the data (mutable)
|
||||
# if p.data == nil:
|
||||
# raise newException(NilAccessDefect, "accessing nil UniquePtr")
|
||||
# p.data[]
|
||||
|
||||
proc isNil*[T](p: UniquePtr[T]): bool =
|
||||
## Check if the UniquePtr is nil
|
||||
p.data == nil
|
||||
|
||||
proc extractValue*[T](p: var UniquePtr[T]): T =
|
||||
## Extract the value from the UniquePtr and release the memory
|
||||
if p.data == nil:
|
||||
raise newException(NilAccessDefect, "extracting from nil UniquePtr")
|
||||
|
||||
# Move the value out
|
||||
var isolated = isolate(p.data[])
|
||||
result = extract(isolated)
|
||||
|
||||
# Free the shared memory
|
||||
deallocShared(p.data)
|
||||
p.data = nil
|
||||
@ -75,7 +75,8 @@ asyncchecksuite "Test Node - Host contracts":
|
||||
let
|
||||
manifestBlock =
|
||||
bt.Block.new(manifest.encode().tryGet(), codec = ManifestCodec).tryGet()
|
||||
erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider, Taskpool.new)
|
||||
erasure =
|
||||
ErasureRef.new(store, leoEncoderProvider, leoDecoderProvider, Taskpool.new)
|
||||
|
||||
manifestCid = manifestBlock.cid
|
||||
|
||||
|
||||
@ -175,7 +175,7 @@ asyncchecksuite "Test Node - Basic":
|
||||
test "Setup purchase request":
|
||||
let
|
||||
erasure =
|
||||
Erasure.new(store, leoEncoderProvider, leoDecoderProvider, Taskpool.new())
|
||||
ErasureRef.new(store, leoEncoderProvider, leoDecoderProvider, Taskpool.new())
|
||||
manifest = await storeDataGetManifest(localStore, chunker)
|
||||
manifestBlock =
|
||||
bt.Block.new(manifest.encode().tryGet(), codec = ManifestCodec).tryGet()
|
||||
|
||||
@ -96,7 +96,7 @@ asyncchecksuite "Test Node - Slot Repair":
|
||||
manifestBlock =
|
||||
bt.Block.new(manifest.encode().tryGet(), codec = ManifestCodec).tryGet()
|
||||
erasure =
|
||||
Erasure.new(store, leoEncoderProvider, leoDecoderProvider, cluster.taskpool)
|
||||
ErasureRef.new(store, leoEncoderProvider, leoDecoderProvider, cluster.taskpool)
|
||||
|
||||
(await localStore.putBlock(manifestBlock)).tryGet()
|
||||
|
||||
@ -175,7 +175,7 @@ asyncchecksuite "Test Node - Slot Repair":
|
||||
manifestBlock =
|
||||
bt.Block.new(manifest.encode().tryGet(), codec = ManifestCodec).tryGet()
|
||||
erasure =
|
||||
Erasure.new(store, leoEncoderProvider, leoDecoderProvider, cluster.taskpool)
|
||||
ErasureRef.new(store, leoEncoderProvider, leoDecoderProvider, cluster.taskpool)
|
||||
|
||||
(await localStore.putBlock(manifestBlock)).tryGet()
|
||||
|
||||
|
||||
@ -27,7 +27,7 @@ suite "Erasure encode/decode":
|
||||
var chunker: Chunker
|
||||
var manifest: Manifest
|
||||
var store: BlockStore
|
||||
var erasure: Erasure
|
||||
var erasure: ErasureRef
|
||||
let repoTmp = TempLevelDb.new()
|
||||
let metaTmp = TempLevelDb.new()
|
||||
var taskpool: Taskpool
|
||||
@ -40,7 +40,7 @@ suite "Erasure encode/decode":
|
||||
chunker = RandomChunker.new(rng, size = dataSetSize, chunkSize = BlockSize)
|
||||
store = RepoStore.new(repoDs, metaDs)
|
||||
taskpool = Taskpool.new()
|
||||
erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider, taskpool)
|
||||
erasure = ErasureRef.new(store, leoEncoderProvider, leoDecoderProvider, taskpool)
|
||||
manifest = await storeDataGetManifest(store, chunker)
|
||||
|
||||
teardown:
|
||||
@ -253,7 +253,7 @@ suite "Erasure encode/decode":
|
||||
for i in 0 ..< encodeResults.len:
|
||||
decodeTasks.add(erasure.decode(encodeResults[i].read().tryGet()))
|
||||
# wait for all decoding tasks to finish
|
||||
let decodeResults = await allFinished(decodeTasks) # TODO: use allFutures
|
||||
let decodeResults = await allFinished(decodeTasks) # TODO: use allFutures
|
||||
|
||||
for j in 0 ..< decodeTasks.len:
|
||||
let
|
||||
@ -302,73 +302,3 @@ suite "Erasure encode/decode":
|
||||
decoded.treeCid == manifest.treeCid
|
||||
decoded.treeCid == encoded.originalTreeCid
|
||||
decoded.blocksCount == encoded.originalBlocksCount
|
||||
|
||||
test "Should complete encode/decode task when cancelled":
|
||||
let
|
||||
blocksLen = 10000
|
||||
parityLen = 10
|
||||
data = seq[seq[byte]].new()
|
||||
chunker = RandomChunker.new(
|
||||
rng, size = (blocksLen * BlockSize.int), chunkSize = BlockSize
|
||||
)
|
||||
|
||||
data[].setLen(blocksLen)
|
||||
|
||||
for i in 0 ..< blocksLen:
|
||||
let chunk = await chunker.getBytes()
|
||||
shallowCopy(data[i], @(chunk))
|
||||
|
||||
let
|
||||
parity = createDoubleArray(parityLen, BlockSize.int)
|
||||
paritySeq = seq[seq[byte]].new()
|
||||
recovered = createDoubleArray(blocksLen, BlockSize.int)
|
||||
cancelledTaskParity = createDoubleArray(parityLen, BlockSize.int)
|
||||
cancelledTaskRecovered = createDoubleArray(blocksLen, BlockSize.int)
|
||||
|
||||
paritySeq[].setLen(parityLen)
|
||||
defer:
|
||||
freeDoubleArray(parity, parityLen)
|
||||
freeDoubleArray(cancelledTaskParity, parityLen)
|
||||
freeDoubleArray(recovered, blocksLen)
|
||||
freeDoubleArray(cancelledTaskRecovered, blocksLen)
|
||||
|
||||
for i in 0 ..< parityLen:
|
||||
paritySeq[i] = cast[seq[byte]](parity[i])
|
||||
|
||||
# call asyncEncode to get the parity
|
||||
let encFut =
|
||||
await erasure.asyncEncode(BlockSize.int, blocksLen, parityLen, data, parity)
|
||||
check encFut.isOk
|
||||
|
||||
let decFut = await erasure.asyncDecode(
|
||||
BlockSize.int, blocksLen, parityLen, data, paritySeq, recovered
|
||||
)
|
||||
check decFut.isOk
|
||||
|
||||
# call asyncEncode and cancel the task
|
||||
let encodeFut = erasure.asyncEncode(
|
||||
BlockSize.int, blocksLen, parityLen, data, cancelledTaskParity
|
||||
)
|
||||
encodeFut.cancel()
|
||||
|
||||
try:
|
||||
discard await encodeFut
|
||||
except CatchableError as exc:
|
||||
check exc of CancelledError
|
||||
finally:
|
||||
for i in 0 ..< parityLen:
|
||||
check equalMem(parity[i], cancelledTaskParity[i], BlockSize.int)
|
||||
|
||||
# call asyncDecode and cancel the task
|
||||
let decodeFut = erasure.asyncDecode(
|
||||
BlockSize.int, blocksLen, parityLen, data, paritySeq, cancelledTaskRecovered
|
||||
)
|
||||
decodeFut.cancel()
|
||||
|
||||
try:
|
||||
discard await decodeFut
|
||||
except CatchableError as exc:
|
||||
check exc of CancelledError
|
||||
finally:
|
||||
for i in 0 ..< blocksLen:
|
||||
check equalMem(recovered[i], cancelledTaskRecovered[i], BlockSize.int)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user