avoid manual memory managment for parity and recover data

This commit is contained in:
munna0908 2025-06-26 23:18:43 +05:30
parent 7eb2fb12cc
commit 75c4486c60
No known key found for this signature in database
GPG Key ID: 2FFCD637E937D3E6
8 changed files with 158 additions and 191 deletions

View File

@ -29,18 +29,14 @@ method release*(self: ErasureBackend) {.base, gcsafe.} =
raiseAssert("not implemented!")
method encode*(
self: EncoderBackend,
buffers, parity: ptr UncheckedArray[ptr UncheckedArray[byte]],
dataLen, parityLen: int,
self: EncoderBackend, data, parity: var openArray[seq[byte]]
): Result[void, cstring] {.base, gcsafe.} =
## encode buffers using a backend
##
raiseAssert("not implemented!")
method decode*(
self: DecoderBackend,
buffers, parity, recovered: ptr UncheckedArray[ptr UncheckedArray[byte]],
dataLen, parityLen, recoveredLen: int,
self: DecoderBackend, data, parity, recovered: var openArray[seq[byte]]
): Result[void, cstring] {.base, gcsafe.} =
## decode buffers using a backend
##

View File

@ -22,13 +22,11 @@ type
decoder*: Option[LeoDecoder]
method encode*(
self: LeoEncoderBackend,
data, parity: ptr UncheckedArray[ptr UncheckedArray[byte]],
dataLen, parityLen: int,
self: LeoEncoderBackend, data, parity: var openArray[seq[byte]]
): Result[void, cstring] =
## Encode data using Leopard backend
if parityLen == 0:
if parity.len == 0:
return ok()
var encoder =
@ -38,12 +36,10 @@ method encode*(
else:
self.encoder.get()
encoder.encode(data, parity, dataLen, parityLen)
encoder.encode(data, parity)
method decode*(
self: LeoDecoderBackend,
data, parity, recovered: ptr UncheckedArray[ptr UncheckedArray[byte]],
dataLen, parityLen, recoveredLen: int,
self: LeoDecoderBackend, data, parity, recovered: var openArray[seq[byte]]
): Result[void, cstring] =
## Decode data using given Leopard backend
@ -54,7 +50,7 @@ method decode*(
else:
self.decoder.get()
decoder.decode(data, parity, recovered, dataLen, parityLen, recoveredLen)
decoder.decode(data, parity, recovered)
method release*(self: LeoEncoderBackend) =
if self.encoder.isSome:

View File

@ -71,12 +71,14 @@ type
DecoderProvider* =
proc(size, blocks, parity: int): DecoderBackend {.raises: [Defect], noSideEffect.}
Erasure* = ref object
Erasure* = object
taskPool: Taskpool
encoderProvider*: EncoderProvider
decoderProvider*: DecoderProvider
store*: BlockStore
ErasureRef* = ref Erasure
EncodingParams = object
ecK: Natural
ecM: Natural
@ -95,19 +97,18 @@ type
EncodeTask = object
success: Atomic[bool]
erasure: ptr Erasure
blocks: ptr UncheckedArray[ptr UncheckedArray[byte]]
parity: ptr UncheckedArray[ptr UncheckedArray[byte]]
blockSize, blocksLen, parityLen: int
blocks: seq[seq[byte]]
parity: Isolated[seq[seq[byte]]]
blockSize, parityLen: int
signal: ThreadSignalPtr
DecodeTask = object
success: Atomic[bool]
erasure: ptr Erasure
blocks: ptr UncheckedArray[ptr UncheckedArray[byte]]
parity: ptr UncheckedArray[ptr UncheckedArray[byte]]
recovered: ptr UncheckedArray[ptr UncheckedArray[byte]]
blockSize, blocksLen: int
parityLen, recoveredLen: int
blocks: seq[seq[byte]]
parity: seq[seq[byte]]
recovered: Isolated[seq[seq[byte]]]
blockSize, recoveredLen: int
signal: ThreadSignalPtr
func indexToPos(steps, idx, step: int): int {.inline.} =
@ -121,7 +122,7 @@ func indexToPos(steps, idx, step: int): int {.inline.} =
(idx - step) div steps
proc getPendingBlocks(
self: Erasure, manifest: Manifest, indices: seq[int]
self: ErasureRef, manifest: Manifest, indices: seq[int]
): AsyncIter[(?!bt.Block, int)] =
## Get pending blocks iterator
##
@ -157,7 +158,7 @@ proc getPendingBlocks(
AsyncIter[(?!bt.Block, int)].new(genNext, isFinished)
proc prepareEncodingData(
self: Erasure,
self: ErasureRef,
manifest: Manifest,
params: EncodingParams,
step: Natural,
@ -201,7 +202,7 @@ proc prepareEncodingData(
success(resolved.Natural)
proc prepareDecodingData(
self: Erasure,
self: ErasureRef,
encoded: Manifest,
step: Natural,
data: ref seq[seq[byte]],
@ -297,48 +298,43 @@ proc init*(
proc leopardEncodeTask(tp: Taskpool, task: ptr EncodeTask) {.gcsafe.} =
# Task suitable for running in taskpools - look, no GC!
let encoder =
task[].erasure.encoderProvider(task[].blockSize, task[].blocksLen, task[].parityLen)
let encoder = task[].erasure.encoderProvider(
task[].blockSize, task[].blocks.len, task[].parityLen
)
defer:
encoder.release()
discard task[].signal.fireSync()
if (
let res =
encoder.encode(task[].blocks, task[].parity, task[].blocksLen, task[].parityLen)
res.isErr
):
var parity = newSeqWith(task[].parityLen, newSeq[byte](task[].blockSize))
if (let res = encoder.encode(task[].blocks, parity); res.isErr):
warn "Error from leopard encoder backend!", error = $res.error
task[].success.store(false)
else:
var isolatedParity = isolate(parity)
task[].parity = move isolatedParity
task[].success.store(true)
proc asyncEncode*(
self: Erasure,
blockSize, blocksLen, parityLen: int,
blocks: ref seq[seq[byte]],
parity: ptr UncheckedArray[ptr UncheckedArray[byte]],
): Future[?!void] {.async: (raises: [CancelledError]).} =
self: ErasureRef, blockSize, parityLen: int, blocks: seq[seq[byte]]
): Future[?!seq[seq[byte]]] {.async: (raises: [CancelledError]).} =
without threadPtr =? ThreadSignalPtr.new():
return failure("Unable to create thread signal")
defer:
threadPtr.close().expect("closing once works")
var data = makeUncheckedArray(blocks)
# var data = makeUncheckedArray(blocks)
defer:
dealloc(data)
# defer:
# dealloc(data)
## Create an ecode task with block data
var task = EncodeTask(
erasure: addr self,
erasure: cast[ptr Erasure](self),
blockSize: blockSize,
blocksLen: blocksLen,
parityLen: parityLen,
blocks: data,
parity: parity,
blocks: blocks,
signal: threadPtr,
)
@ -347,21 +343,25 @@ proc asyncEncode*(
self.taskPool.spawn leopardEncodeTask(self.taskPool, addr task)
let threadFut = threadPtr.wait()
if joinErr =? catch(await threadFut.join()).errorOption:
if err =? catch(await noCancel threadFut).errorOption:
return failure(err)
if joinErr of CancelledError:
raise (ref CancelledError) joinErr
else:
return failure(joinErr)
if err =? catch(await threadFut.join()).errorOption:
?catch(await noCancel threadFut)
if err of CancelledError:
raise (ref CancelledError) err
return failure(err)
if not task.success.load():
return failure("Leopard encoding task failed")
success()
defer:
task.parity = default(Isolated[seq[seq[byte]]])
var parity = task.parity.extract
success parity
proc encodeData(
self: Erasure, manifest: Manifest, params: EncodingParams
self: ErasureRef, manifest: Manifest, params: EncodingParams
): Future[?!Manifest] {.async.} =
## Encode blocks pointed to by the protected manifest
##
@ -383,11 +383,7 @@ proc encodeData(
try:
for step in 0 ..< params.steps:
# TODO: Don't allocate a new seq every time, allocate once and zero out
var
data = seq[seq[byte]].new() # number of blocks to encode
parity = createDoubleArray(params.ecM, manifest.blockSize.int)
defer:
freeDoubleArray(parity, params.ecM)
var data = seq[seq[byte]].new() # number of blocks to encode
data[].setLen(params.ecK)
# TODO: this is a tight blocking loop so we sleep here to allow
@ -403,21 +399,15 @@ proc encodeData(
trace "Erasure coding data", data = data[].len
var parity: seq[seq[byte]]
try:
if err =? (
await self.asyncEncode(
manifest.blockSize.int, params.ecK, params.ecM, data, parity
)
).errorOption:
return failure(err)
parity = ?(await self.asyncEncode(manifest.blockSize.int, params.ecM, data[]))
except CancelledError as exc:
raise exc
var idx = params.rounded + step
for j in 0 ..< params.ecM:
var innerPtr: ptr UncheckedArray[byte] = parity[][j]
without blk =? bt.Block.new(innerPtr.toOpenArray(0, manifest.blockSize.int - 1)),
error:
without blk =? bt.Block.new(parity[j]), error:
trace "Unable to create parity block", err = error.msg
return failure(error)
@ -456,7 +446,7 @@ proc encodeData(
return failure(exc)
proc encode*(
self: Erasure,
self: ErasureRef,
manifest: Manifest,
blocks: Natural,
parity: Natural,
@ -479,58 +469,46 @@ proc encode*(
proc leopardDecodeTask(tp: Taskpool, task: ptr DecodeTask) {.gcsafe.} =
# Task suitable for running in taskpools - look, no GC!
let decoder =
task[].erasure.decoderProvider(task[].blockSize, task[].blocksLen, task[].parityLen)
let decoder = task[].erasure.decoderProvider(
task[].blockSize, task[].blocks.len, task[].parity.len
)
defer:
decoder.release()
discard task[].signal.fireSync()
if (
let res = decoder.decode(
task[].blocks,
task[].parity,
task[].recovered,
task[].blocksLen,
task[].parityLen,
task[].recoveredLen,
)
res.isErr
):
var recovered = newSeqWith(task[].blocks.len, newSeq[byte](task[].blockSize))
if (let res = decoder.decode(task[].blocks, task[].parity, recovered); res.isErr):
warn "Error from leopard decoder backend!", error = $res.error
task[].success.store(false)
else:
var isolatedRecovered = isolate(recovered)
task[].recovered = move isolatedRecovered
task[].success.store(true)
proc asyncDecode*(
self: Erasure,
blockSize, blocksLen, parityLen: int,
blocks, parity: ref seq[seq[byte]],
recovered: ptr UncheckedArray[ptr UncheckedArray[byte]],
): Future[?!void] {.async: (raises: [CancelledError]).} =
self: ErasureRef, blockSize: int, blocks, parity: seq[seq[byte]]
): Future[?!seq[seq[byte]]] {.async: (raises: [CancelledError]).} =
without threadPtr =? ThreadSignalPtr.new():
return failure("Unable to create thread signal")
defer:
threadPtr.close().expect("closing once works")
var
blockData = makeUncheckedArray(blocks)
parityData = makeUncheckedArray(parity)
# var
# blockData = makeUncheckedArray(blocks)
# parityData = makeUncheckedArray(parity)
defer:
dealloc(blockData)
dealloc(parityData)
# defer:
# dealloc(blockData)
# dealloc(parityData)
## Create an decode task with block data
var task = DecodeTask(
erasure: addr self,
erasure: cast[ptr Erasure](self),
blockSize: blockSize,
blocksLen: blocksLen,
parityLen: parityLen,
recoveredLen: blocksLen,
blocks: blockData,
parity: parityData,
recovered: recovered,
blocks: blocks,
parity: parity,
signal: threadPtr,
)
@ -550,10 +528,12 @@ proc asyncDecode*(
if not task.success.load():
return failure("Leopard decoding task failed")
success()
var recovered = task.recovered.extract
success(recovered)
proc decodeInternal(
self: Erasure, encoded: Manifest
self: ErasureRef, encoded: Manifest
): Future[?!(ref seq[Cid], seq[Natural])] {.async.} =
logScope:
steps = encoded.steps
@ -577,9 +557,6 @@ proc decodeInternal(
var
data = seq[seq[byte]].new()
parityData = seq[seq[byte]].new()
recovered = createDoubleArray(encoded.ecK, encoded.blockSize.int)
defer:
freeDoubleArray(recovered, encoded.ecK)
data[].setLen(encoded.ecK) # set len to K
parityData[].setLen(encoded.ecM) # set len to M
@ -595,26 +572,18 @@ proc decodeInternal(
if dataPieces >= encoded.ecK:
trace "Retrieved all the required data blocks"
continue
var recovered: seq[seq[byte]]
trace "Erasure decoding data"
try:
if err =? (
await self.asyncDecode(
encoded.blockSize.int, encoded.ecK, encoded.ecM, data, parityData, recovered
)
).errorOption:
return failure(err)
recovered =
?(await self.asyncDecode(encoded.blockSize.int, data[], parityData[]))
except CancelledError as exc:
raise exc
for i in 0 ..< encoded.ecK:
let idx = i * encoded.steps + step
if data[i].len <= 0 and not cids[idx].isEmpty:
var innerPtr: ptr UncheckedArray[byte] = recovered[][i]
without blk =? bt.Block.new(
innerPtr.toOpenArray(0, encoded.blockSize.int - 1)
), error:
without blk =? bt.Block.new(recovered[i]), error:
trace "Unable to create block!", exc = error.msg
return failure(error)
@ -638,7 +607,7 @@ proc decodeInternal(
return (cids, recoveredIndices).success
proc decode*(self: Erasure, encoded: Manifest): Future[?!Manifest] {.async.} =
proc decode*(self: ErasureRef, encoded: Manifest): Future[?!Manifest] {.async.} =
## Decode a protected manifest into it's original
## manifest
##
@ -670,7 +639,7 @@ proc decode*(self: Erasure, encoded: Manifest): Future[?!Manifest] {.async.} =
return decoded.success
proc repair*(self: Erasure, encoded: Manifest): Future[?!void] {.async.} =
proc repair*(self: ErasureRef, encoded: Manifest): Future[?!void] {.async.} =
## Repair a protected manifest by reconstructing the full dataset
##
## `encoded` - the encoded (protected) manifest to
@ -715,15 +684,15 @@ proc stop*(self: Erasure) {.async.} =
return
proc new*(
T: type Erasure,
_: type ErasureRef,
store: BlockStore,
encoderProvider: EncoderProvider,
decoderProvider: DecoderProvider,
taskPool: Taskpool,
): Erasure =
## Create a new Erasure instance for encoding and decoding manifests
): ErasureRef =
## Create a new ErasureRef instance for encoding and decoding manifests
##
Erasure(
ErasureRef(
store: store,
encoderProvider: encoderProvider,
decoderProvider: decoderProvider,

View File

@ -293,7 +293,7 @@ proc streamEntireDataset(
proc erasureJob(): Future[void] {.async: (raises: []).} =
try:
# Spawn an erasure decoding job
let erasure = Erasure.new(
let erasure = ErasureRef.new(
self.networkStore, leoEncoderProvider, leoDecoderProvider, self.taskpool
)
without _ =? (await erasure.decode(manifest)), error:
@ -532,7 +532,7 @@ proc setupRequest(
return failure error
# Erasure code the dataset according to provided parameters
let erasure = Erasure.new(
let erasure = ErasureRef.new(
self.networkStore.localStore, leoEncoderProvider, leoDecoderProvider, self.taskpool
)
@ -678,7 +678,7 @@ proc onStore(
if isRepairing:
trace "start repairing slot", slotIdx
try:
let erasure = Erasure.new(
let erasure = ErasureRef.new(
self.networkStore, leoEncoderProvider, leoDecoderProvider, self.taskpool
)
if err =? (await erasure.repair(manifest)).errorOption:

View File

@ -75,7 +75,8 @@ asyncchecksuite "Test Node - Host contracts":
let
manifestBlock =
bt.Block.new(manifest.encode().tryGet(), codec = ManifestCodec).tryGet()
erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider, Taskpool.new)
erasure =
ErasureRef.new(store, leoEncoderProvider, leoDecoderProvider, Taskpool.new)
manifestCid = manifestBlock.cid

View File

@ -175,7 +175,7 @@ asyncchecksuite "Test Node - Basic":
test "Setup purchase request":
let
erasure =
Erasure.new(store, leoEncoderProvider, leoDecoderProvider, Taskpool.new())
ErasureRef.new(store, leoEncoderProvider, leoDecoderProvider, Taskpool.new())
manifest = await storeDataGetManifest(localStore, chunker)
manifestBlock =
bt.Block.new(manifest.encode().tryGet(), codec = ManifestCodec).tryGet()

View File

@ -95,7 +95,7 @@ asyncchecksuite "Test Node - Slot Repair":
manifestBlock =
bt.Block.new(manifest.encode().tryGet(), codec = ManifestCodec).tryGet()
erasure =
Erasure.new(store, leoEncoderProvider, leoDecoderProvider, cluster.taskpool)
ErasureRef.new(store, leoEncoderProvider, leoDecoderProvider, cluster.taskpool)
(await localStore.putBlock(manifestBlock)).tryGet()
@ -174,7 +174,7 @@ asyncchecksuite "Test Node - Slot Repair":
manifestBlock =
bt.Block.new(manifest.encode().tryGet(), codec = ManifestCodec).tryGet()
erasure =
Erasure.new(store, leoEncoderProvider, leoDecoderProvider, cluster.taskpool)
ErasureRef.new(store, leoEncoderProvider, leoDecoderProvider, cluster.taskpool)
(await localStore.putBlock(manifestBlock)).tryGet()

View File

@ -27,7 +27,7 @@ suite "Erasure encode/decode":
var chunker: Chunker
var manifest: Manifest
var store: BlockStore
var erasure: Erasure
var erasure: ErasureRef
let repoTmp = TempLevelDb.new()
let metaTmp = TempLevelDb.new()
var taskpool: Taskpool
@ -40,7 +40,7 @@ suite "Erasure encode/decode":
chunker = RandomChunker.new(rng, size = dataSetSize, chunkSize = BlockSize)
store = RepoStore.new(repoDs, metaDs)
taskpool = Taskpool.new()
erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider, taskpool)
erasure = ErasureRef.new(store, leoEncoderProvider, leoDecoderProvider, taskpool)
manifest = await storeDataGetManifest(store, chunker)
teardown:
@ -303,72 +303,77 @@ suite "Erasure encode/decode":
decoded.treeCid == encoded.originalTreeCid
decoded.blocksCount == encoded.originalBlocksCount
test "Should complete encode/decode task when cancelled":
let
blocksLen = 10000
parityLen = 10
data = seq[seq[byte]].new()
chunker = RandomChunker.new(
rng, size = (blocksLen * BlockSize.int), chunkSize = BlockSize
)
# test "Should complete encode/decode task when cancelled":
# let
# blocksLen = 10000
# parityLen = 10
# data = seq[seq[byte]].new()
# chunker = RandomChunker.new(
# rng, size = (blocksLen * BlockSize.int), chunkSize = BlockSize
# )
data[].setLen(blocksLen)
# data[].setLen(blocksLen)
for i in 0 ..< blocksLen:
let chunk = await chunker.getBytes()
shallowCopy(data[i], @(chunk))
# for i in 0 ..< blocksLen:
# let chunk = await chunker.getBytes()
# shallowCopy(data[i], @(chunk))
let
parity = createDoubleArray(parityLen, BlockSize.int)
paritySeq = seq[seq[byte]].new()
recovered = createDoubleArray(blocksLen, BlockSize.int)
cancelledTaskParity = createDoubleArray(parityLen, BlockSize.int)
cancelledTaskRecovered = createDoubleArray(blocksLen, BlockSize.int)
# var
# parity: seq[seq[byte]]
# recovered: seq[seq[byte]]
# cancelledTaskParity: seq[seq[byte]]
# cancelledTaskRecovered: seq[seq[byte]]
paritySeq[].setLen(parityLen)
defer:
freeDoubleArray(parity, parityLen)
freeDoubleArray(cancelledTaskParity, parityLen)
freeDoubleArray(recovered, blocksLen)
freeDoubleArray(cancelledTaskRecovered, blocksLen)
# # parity = newSeqWith(parityLen, newSeq[byte](BlockSize.int))
# # paritySeq = seq[seq[byte]].new()
# # recovered = createDoubleArray(blocksLen, BlockSize.int)
# # cancelledTaskParity = createDoubleArray(parityLen, BlockSize.int)
# # cancelledTaskRecovered = createDoubleArray(blocksLen, BlockSize.int)
for i in 0 ..< parityLen:
paritySeq[i] = cast[seq[byte]](parity[i])
# # paritySeq[].setLen(parityLen)
# # freeDoubleArray(parity, parityLen)
# # freeDoubleArray(cancelledTaskParity, parityLen)
# # freeDoubleArray(recovered, blocksLen)
# # freeDoubleArray(cancelledTaskRecovered, blocksLen)
# call asyncEncode to get the parity
let encFut =
await erasure.asyncEncode(BlockSize.int, blocksLen, parityLen, data, parity)
check encFut.isOk
# # for i in 0 ..< parityLen:
# # paritySeq[i] = cast[seq[byte]](parity[i])
let decFut = await erasure.asyncDecode(
BlockSize.int, blocksLen, parityLen, data, paritySeq, recovered
)
check decFut.isOk
# # call asyncEncode to get the parity
# parity = (await erasure.asyncEncode(BlockSize.int, parityLen, data)).tryGet()
# call asyncEncode and cancel the task
let encodeFut = erasure.asyncEncode(
BlockSize.int, blocksLen, parityLen, data, cancelledTaskParity
)
encodeFut.cancel()
# let decFut = await erasure.asyncDecode(
# BlockSize.int, data, parity
# )
try:
discard await encodeFut
except CatchableError as exc:
check exc of CancelledError
finally:
for i in 0 ..< parityLen:
check equalMem(parity[i], cancelledTaskParity[i], BlockSize.int)
# check decFut.isOk
# call asyncDecode and cancel the task
let decodeFut = erasure.asyncDecode(
BlockSize.int, blocksLen, parityLen, data, paritySeq, cancelledTaskRecovered
)
decodeFut.cancel()
# # call asyncEncode and cancel the task
# let encodeFut = erasure.asyncEncode(
# BlockSize.int, parityLen, data,
# )
# encodeFut.cancel()
try:
discard await decodeFut
except CatchableError as exc:
check exc of CancelledError
finally:
for i in 0 ..< blocksLen:
check equalMem(recovered[i], cancelledTaskRecovered[i], BlockSize.int)
# try:
# cancelledTaskParity = (await encodeFut).tryGet()
# except CatchableError as exc:
# check exc of CancelledError
# finally:
# check parity == cancelledTaskParity
# # for i in 0 ..< parityLen:
# # check equalMem(parity[i], cancelledTaskParity[i], BlockSize.int)
# # call asyncDecode and cancel the task
# let decodeFut = erasure.asyncDecode(
# BlockSize.int, data, parity,
# )
# decodeFut.cancel()
# try:
# cancelledTaskRecovered = (await decodeFut).tryGet()
# except CatchableError as exc:
# check exc of CancelledError
# finally:
# check recovered == cancelledTaskRecovered
# for i in 0 ..< blocksLen:
# check equalMem(recovered[i], cancelledTaskRecovered[i], BlockSize.int)