From 75db491d84116f4cf9e38550d0bc52763d42a81f Mon Sep 17 00:00:00 2001 From: munna0908 <88337208+munna0908@users.noreply.github.com> Date: Fri, 14 Mar 2025 18:39:18 +0530 Subject: [PATCH] fix: optimise erasure encode/decode (#1123) * avoid copying block,parity data to shared memory * use alloc instead of allocShared * code cleanup --- codex/erasure/erasure.nim | 82 +++++++++++++------------------------ codex/utils/arrayutils.nim | 13 ++++++ tests/codex/testerasure.nim | 16 ++++---- 3 files changed, 49 insertions(+), 62 deletions(-) diff --git a/codex/erasure/erasure.nim b/codex/erasure/erasure.nim index 78ce3971..884969d0 100644 --- a/codex/erasure/erasure.nim +++ b/codex/erasure/erasure.nim @@ -310,10 +310,10 @@ proc leopardEncodeTask(tp: Taskpool, task: ptr EncodeTask) {.gcsafe.} = else: task[].success.store(true) -proc encodeAsync*( +proc asyncEncode*( self: Erasure, blockSize, blocksLen, parityLen: int, - data: ref seq[seq[byte]], + blocks: ref seq[seq[byte]], parity: ptr UncheckedArray[ptr UncheckedArray[byte]], ): Future[?!void] {.async: (raises: [CancelledError]).} = without threadPtr =? ThreadSignalPtr.new(): @@ -322,13 +322,10 @@ proc encodeAsync*( defer: threadPtr.close().expect("closing once works") - var blockData = createDoubleArray(blocksLen, blockSize) - - for i in 0 ..< data[].len: - copyMem(blockData[i], addr data[i][0], blockSize) + var data = makeUncheckedArray(blocks) defer: - freeDoubleArray(blockData, blocksLen) + dealloc(data) ## Create an ecode task with block data var task = EncodeTask( @@ -336,7 +333,7 @@ proc encodeAsync*( blockSize: blockSize, blocksLen: blocksLen, parityLen: parityLen, - blocks: blockData, + blocks: data, parity: parity, signal: threadPtr, ) @@ -348,18 +345,13 @@ proc encodeAsync*( self.taskPool.spawn leopardEncodeTask(self.taskPool, t) let threadFut = threadPtr.wait() - try: - await threadFut.join() - except CatchableError as exc: - try: - await threadFut - except AsyncError as asyncExc: - return failure(asyncExc.msg) - finally: - if exc of CancelledError: - raise (ref CancelledError) exc - else: - return failure(exc.msg) + if joinErr =? catch(await threadFut.join()).errorOption: + if err =? catch(await noCancel threadFut).errorOption: + return failure(err) + if joinErr of CancelledError: + raise (ref CancelledError) joinErr + else: + return failure(joinErr) if not t.success.load(): return failure("Leopard encoding failed") @@ -409,7 +401,7 @@ proc encodeData( try: if err =? ( - await self.encodeAsync( + await self.asyncEncode( manifest.blockSize.int, params.ecK, params.ecM, data, parity ) ).errorOption: @@ -489,6 +481,7 @@ proc leopardDecodeTask(tp: Taskpool, task: ptr DecodeTask) {.gcsafe.} = task[].erasure.decoderProvider(task[].blockSize, task[].blocksLen, task[].parityLen) defer: decoder.release() + discard task[].signal.fireSync() if ( let res = decoder.decode( @@ -506,9 +499,7 @@ proc leopardDecodeTask(tp: Taskpool, task: ptr DecodeTask) {.gcsafe.} = else: task[].success.store(true) - discard task[].signal.fireSync() - -proc decodeAsync*( +proc asyncDecode*( self: Erasure, blockSize, blocksLen, parityLen: int, blocks, parity: ref seq[seq[byte]], @@ -521,24 +512,12 @@ proc decodeAsync*( threadPtr.close().expect("closing once works") var - blocksData = createDoubleArray(blocksLen, blockSize) - parityData = createDoubleArray(parityLen, blockSize) - - for i in 0 ..< blocks[].len: - if blocks[i].len > 0: - copyMem(blocksData[i], addr blocks[i][0], blockSize) - else: - blocksData[i] = nil - - for i in 0 ..< parity[].len: - if parity[i].len > 0: - copyMem(parityData[i], addr parity[i][0], blockSize) - else: - parityData[i] = nil + blockData = makeUncheckedArray(blocks) + parityData = makeUncheckedArray(parity) defer: - freeDoubleArray(blocksData, blocksLen) - freeDoubleArray(parityData, parityLen) + dealloc(blockData) + dealloc(parityData) ## Create an decode task with block data var task = DecodeTask( @@ -547,7 +526,7 @@ proc decodeAsync*( blocksLen: blocksLen, parityLen: parityLen, recoveredLen: blocksLen, - blocks: blocksData, + blocks: blockData, parity: parityData, recovered: recovered, signal: threadPtr, @@ -560,18 +539,13 @@ proc decodeAsync*( self.taskPool.spawn leopardDecodeTask(self.taskPool, t) let threadFut = threadPtr.wait() - try: - await threadFut.join() - except CatchableError as exc: - try: - await threadFut - except AsyncError as asyncExc: - return failure(asyncExc.msg) - finally: - if exc of CancelledError: - raise (ref CancelledError) exc - else: - return failure(exc.msg) + if joinErr =? catch(await threadFut.join()).errorOption: + if err =? catch(await noCancel threadFut).errorOption: + return failure(err) + if joinErr of CancelledError: + raise (ref CancelledError) joinErr + else: + return failure(joinErr) if not t.success.load(): return failure("Leopard encoding failed") @@ -627,7 +601,7 @@ proc decode*(self: Erasure, encoded: Manifest): Future[?!Manifest] {.async.} = trace "Erasure decoding data" try: if err =? ( - await self.decodeAsync( + await self.asyncDecode( encoded.blockSize.int, encoded.ecK, encoded.ecM, data, parityData, recovered ) ).errorOption: diff --git a/codex/utils/arrayutils.nim b/codex/utils/arrayutils.nim index c398921f..e36a0cb3 100644 --- a/codex/utils/arrayutils.nim +++ b/codex/utils/arrayutils.nim @@ -23,3 +23,16 @@ proc freeDoubleArray*( # Free outer array if not arr.isNil: deallocShared(arr) + +proc makeUncheckedArray*( + data: ref seq[seq[byte]] +): ptr UncheckedArray[ptr UncheckedArray[byte]] = + result = cast[ptr UncheckedArray[ptr UncheckedArray[byte]]](alloc0( + sizeof(ptr UncheckedArray[byte]) * data[].len + )) + + for i, blk in data[]: + if blk.len > 0: + result[i] = cast[ptr UncheckedArray[byte]](addr blk[0]) + else: + result[i] = nil diff --git a/tests/codex/testerasure.nim b/tests/codex/testerasure.nim index d469b379..5046bac2 100644 --- a/tests/codex/testerasure.nim +++ b/tests/codex/testerasure.nim @@ -228,7 +228,7 @@ suite "Erasure encode/decode": discard (await erasure.decode(encoded)).tryGet() test "Should concurrently encode/decode multiple datasets": - const iterations = 2 + const iterations = 5 let datasetSize = 1.MiBs @@ -335,18 +335,18 @@ suite "Erasure encode/decode": for i in 0 ..< parityLen: paritySeq[i] = cast[seq[byte]](parity[i]) - # call encodeAsync to get the parity + # call asyncEncode to get the parity let encFut = - await erasure.encodeAsync(BlockSize.int, blocksLen, parityLen, data, parity) + await erasure.asyncEncode(BlockSize.int, blocksLen, parityLen, data, parity) check encFut.isOk - let decFut = await erasure.decodeAsync( + let decFut = await erasure.asyncDecode( BlockSize.int, blocksLen, parityLen, data, paritySeq, recovered ) check decFut.isOk - # call encodeAsync and cancel the task - let encodeFut = erasure.encodeAsync( + # call asyncEncode and cancel the task + let encodeFut = erasure.asyncEncode( BlockSize.int, blocksLen, parityLen, data, cancelledTaskParity ) encodeFut.cancel() @@ -359,8 +359,8 @@ suite "Erasure encode/decode": for i in 0 ..< parityLen: check equalMem(parity[i], cancelledTaskParity[i], BlockSize.int) - # call decodeAsync and cancel the task - let decodeFut = erasure.decodeAsync( + # call asyncDecode and cancel the task + let decodeFut = erasure.asyncDecode( BlockSize.int, blocksLen, parityLen, data, paritySeq, cancelledTaskRecovered ) decodeFut.cancel()