fix: optimise erasure encode/decode (#1123)

* avoid copying block,parity data to shared memory

* use alloc instead of allocShared

* code cleanup
This commit is contained in:
munna0908 2025-03-14 18:39:18 +05:30 committed by GitHub
parent f1b84dc6d1
commit 75db491d84
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 49 additions and 62 deletions

View File

@ -310,10 +310,10 @@ proc leopardEncodeTask(tp: Taskpool, task: ptr EncodeTask) {.gcsafe.} =
else:
task[].success.store(true)
proc encodeAsync*(
proc asyncEncode*(
self: Erasure,
blockSize, blocksLen, parityLen: int,
data: ref seq[seq[byte]],
blocks: ref seq[seq[byte]],
parity: ptr UncheckedArray[ptr UncheckedArray[byte]],
): Future[?!void] {.async: (raises: [CancelledError]).} =
without threadPtr =? ThreadSignalPtr.new():
@ -322,13 +322,10 @@ proc encodeAsync*(
defer:
threadPtr.close().expect("closing once works")
var blockData = createDoubleArray(blocksLen, blockSize)
for i in 0 ..< data[].len:
copyMem(blockData[i], addr data[i][0], blockSize)
var data = makeUncheckedArray(blocks)
defer:
freeDoubleArray(blockData, blocksLen)
dealloc(data)
## Create an ecode task with block data
var task = EncodeTask(
@ -336,7 +333,7 @@ proc encodeAsync*(
blockSize: blockSize,
blocksLen: blocksLen,
parityLen: parityLen,
blocks: blockData,
blocks: data,
parity: parity,
signal: threadPtr,
)
@ -348,18 +345,13 @@ proc encodeAsync*(
self.taskPool.spawn leopardEncodeTask(self.taskPool, t)
let threadFut = threadPtr.wait()
try:
await threadFut.join()
except CatchableError as exc:
try:
await threadFut
except AsyncError as asyncExc:
return failure(asyncExc.msg)
finally:
if exc of CancelledError:
raise (ref CancelledError) exc
else:
return failure(exc.msg)
if joinErr =? catch(await threadFut.join()).errorOption:
if err =? catch(await noCancel threadFut).errorOption:
return failure(err)
if joinErr of CancelledError:
raise (ref CancelledError) joinErr
else:
return failure(joinErr)
if not t.success.load():
return failure("Leopard encoding failed")
@ -409,7 +401,7 @@ proc encodeData(
try:
if err =? (
await self.encodeAsync(
await self.asyncEncode(
manifest.blockSize.int, params.ecK, params.ecM, data, parity
)
).errorOption:
@ -489,6 +481,7 @@ proc leopardDecodeTask(tp: Taskpool, task: ptr DecodeTask) {.gcsafe.} =
task[].erasure.decoderProvider(task[].blockSize, task[].blocksLen, task[].parityLen)
defer:
decoder.release()
discard task[].signal.fireSync()
if (
let res = decoder.decode(
@ -506,9 +499,7 @@ proc leopardDecodeTask(tp: Taskpool, task: ptr DecodeTask) {.gcsafe.} =
else:
task[].success.store(true)
discard task[].signal.fireSync()
proc decodeAsync*(
proc asyncDecode*(
self: Erasure,
blockSize, blocksLen, parityLen: int,
blocks, parity: ref seq[seq[byte]],
@ -521,24 +512,12 @@ proc decodeAsync*(
threadPtr.close().expect("closing once works")
var
blocksData = createDoubleArray(blocksLen, blockSize)
parityData = createDoubleArray(parityLen, blockSize)
for i in 0 ..< blocks[].len:
if blocks[i].len > 0:
copyMem(blocksData[i], addr blocks[i][0], blockSize)
else:
blocksData[i] = nil
for i in 0 ..< parity[].len:
if parity[i].len > 0:
copyMem(parityData[i], addr parity[i][0], blockSize)
else:
parityData[i] = nil
blockData = makeUncheckedArray(blocks)
parityData = makeUncheckedArray(parity)
defer:
freeDoubleArray(blocksData, blocksLen)
freeDoubleArray(parityData, parityLen)
dealloc(blockData)
dealloc(parityData)
## Create an decode task with block data
var task = DecodeTask(
@ -547,7 +526,7 @@ proc decodeAsync*(
blocksLen: blocksLen,
parityLen: parityLen,
recoveredLen: blocksLen,
blocks: blocksData,
blocks: blockData,
parity: parityData,
recovered: recovered,
signal: threadPtr,
@ -560,18 +539,13 @@ proc decodeAsync*(
self.taskPool.spawn leopardDecodeTask(self.taskPool, t)
let threadFut = threadPtr.wait()
try:
await threadFut.join()
except CatchableError as exc:
try:
await threadFut
except AsyncError as asyncExc:
return failure(asyncExc.msg)
finally:
if exc of CancelledError:
raise (ref CancelledError) exc
else:
return failure(exc.msg)
if joinErr =? catch(await threadFut.join()).errorOption:
if err =? catch(await noCancel threadFut).errorOption:
return failure(err)
if joinErr of CancelledError:
raise (ref CancelledError) joinErr
else:
return failure(joinErr)
if not t.success.load():
return failure("Leopard encoding failed")
@ -627,7 +601,7 @@ proc decode*(self: Erasure, encoded: Manifest): Future[?!Manifest] {.async.} =
trace "Erasure decoding data"
try:
if err =? (
await self.decodeAsync(
await self.asyncDecode(
encoded.blockSize.int, encoded.ecK, encoded.ecM, data, parityData, recovered
)
).errorOption:

View File

@ -23,3 +23,16 @@ proc freeDoubleArray*(
# Free outer array
if not arr.isNil:
deallocShared(arr)
proc makeUncheckedArray*(
data: ref seq[seq[byte]]
): ptr UncheckedArray[ptr UncheckedArray[byte]] =
result = cast[ptr UncheckedArray[ptr UncheckedArray[byte]]](alloc0(
sizeof(ptr UncheckedArray[byte]) * data[].len
))
for i, blk in data[]:
if blk.len > 0:
result[i] = cast[ptr UncheckedArray[byte]](addr blk[0])
else:
result[i] = nil

View File

@ -228,7 +228,7 @@ suite "Erasure encode/decode":
discard (await erasure.decode(encoded)).tryGet()
test "Should concurrently encode/decode multiple datasets":
const iterations = 2
const iterations = 5
let
datasetSize = 1.MiBs
@ -335,18 +335,18 @@ suite "Erasure encode/decode":
for i in 0 ..< parityLen:
paritySeq[i] = cast[seq[byte]](parity[i])
# call encodeAsync to get the parity
# call asyncEncode to get the parity
let encFut =
await erasure.encodeAsync(BlockSize.int, blocksLen, parityLen, data, parity)
await erasure.asyncEncode(BlockSize.int, blocksLen, parityLen, data, parity)
check encFut.isOk
let decFut = await erasure.decodeAsync(
let decFut = await erasure.asyncDecode(
BlockSize.int, blocksLen, parityLen, data, paritySeq, recovered
)
check decFut.isOk
# call encodeAsync and cancel the task
let encodeFut = erasure.encodeAsync(
# call asyncEncode and cancel the task
let encodeFut = erasure.asyncEncode(
BlockSize.int, blocksLen, parityLen, data, cancelledTaskParity
)
encodeFut.cancel()
@ -359,8 +359,8 @@ suite "Erasure encode/decode":
for i in 0 ..< parityLen:
check equalMem(parity[i], cancelledTaskParity[i], BlockSize.int)
# call decodeAsync and cancel the task
let decodeFut = erasure.decodeAsync(
# call asyncDecode and cancel the task
let decodeFut = erasure.asyncDecode(
BlockSize.int, blocksLen, parityLen, data, paritySeq, cancelledTaskRecovered
)
decodeFut.cancel()