From b14307e34113d3a7cb73cd09e130d124ac5e7b36 Mon Sep 17 00:00:00 2001 From: munna0908 Date: Wed, 2 Jul 2025 18:26:59 +0530 Subject: [PATCH] add mutlithreading support for codex tree --- codex/erasure/erasure.nim | 8 ++- codex/merkletree/codex/codex.nim | 75 ++++++++++++++++++++++-- codex/merkletree/merkletree.nim | 25 +++++++- codex/node.nim | 2 +- tests/codex/merkletree/testasynctree.nim | 51 ++++++++++++++++ 5 files changed, 152 insertions(+), 9 deletions(-) create mode 100644 tests/codex/merkletree/testasynctree.nim diff --git a/codex/erasure/erasure.nim b/codex/erasure/erasure.nim index 95516500..d378b3c3 100644 --- a/codex/erasure/erasure.nim +++ b/codex/erasure/erasure.nim @@ -428,7 +428,7 @@ proc encodeData( return failure("Unable to store block!") idx.inc(params.steps) - without tree =? CodexTree.init(cids[]), err: + without tree =? (await CodexTree.init(self.taskPool, cids[])), err: return failure(err) without treeCid =? tree.rootCid, err: @@ -649,7 +649,8 @@ proc decode*(self: Erasure, encoded: Manifest): Future[?!Manifest] {.async.} = without (cids, recoveredIndices) =? (await self.decodeInternal(encoded)), err: return failure(err) - without tree =? CodexTree.init(cids[0 ..< encoded.originalBlocksCount]), err: + without tree =? + (await CodexTree.init(self.taskPool, cids[0 ..< encoded.originalBlocksCount])), err: return failure(err) without treeCid =? tree.rootCid, err: @@ -680,7 +681,8 @@ proc repair*(self: Erasure, encoded: Manifest): Future[?!void] {.async.} = without (cids, _) =? (await self.decodeInternal(encoded)), err: return failure(err) - without tree =? CodexTree.init(cids[0 ..< encoded.originalBlocksCount]), err: + without tree =? + (await CodexTree.init(self.taskPool, cids[0 ..< encoded.originalBlocksCount])), err: return failure(err) without treeCid =? tree.rootCid, err: diff --git a/codex/merkletree/codex/codex.nim b/codex/merkletree/codex/codex.nim index 0eec92e4..b35de0b2 100644 --- a/codex/merkletree/codex/codex.nim +++ b/codex/merkletree/codex/codex.nim @@ -10,12 +10,14 @@ {.push raises: [].} import std/bitops -import std/sequtils +import std/[atomics, sequtils] import pkg/questionable import pkg/questionable/results import pkg/libp2p/[cid, multicodec, multihash] import pkg/constantine/hashes +import pkg/taskpools +import pkg/chronos/threadsync import ../../utils import ../../rng import ../../errors @@ -48,7 +50,7 @@ type mcodec*: MultiCodec # CodeHashes is not exported from libp2p -# So we need to recreate it instead of +# So we need to recreate it instead of proc initMultiHashCodeTable(): Table[MultiCodec, MHash] {.compileTime.} = for item in HashesList: result[item.mcodec] = item @@ -160,7 +162,60 @@ func init*( self.layers = ?merkleTreeWorker(self, leaves, isBottomLayer = true) success self -func init*(_: type CodexTree, leaves: openArray[MultiHash]): ?!CodexTree = +proc initCodexTreeAsync*( + tp: Taskpool, mcodec: MultiCodec = Sha256HashCodec, leaves: seq[ByteHash] +): Future[?!CodexTree] {.async: (raises: [CancelledError]).} = + if leaves.len == 0: + return failure "Empty leaves" + + let + mhash = ?mcodec.mhash() + compressor = proc(x, y: seq[byte], key: ByteTreeKey): ?!ByteHash {.noSideEffect.} = + compress(x, y, key, mhash) + Zero: ByteHash = newSeq[byte](mhash.size) + + if mhash.size != leaves[0].len: + return failure "Invalid hash length" + + without threadPtr =? ThreadSignalPtr.new(): + return failure("Unable to create thread signal") + + defer: + threadPtr.close().expect("closing once works") + + var tree = CodexTree(compress: compressor, zero: Zero, mcodec: mcodec) + + var task = MerkleTask[ByteHash, ByteTreeKey]( + tree: cast[ptr MerkleTree[ByteHash, ByteTreeKey]](addr tree), + leaves: @leaves, + signal: threadPtr, + ) + + doAssert tp.numThreads > 1, + "Must have at least one separate thread or signal will never be fired" + + tp.spawn asyncMerkleTreeWorker(addr task) + + let threadFut = threadPtr.wait() + + if err =? catch(await threadFut.join()).errorOption: + ?catch(await noCancel threadFut) + if err of CancelledError: + raise (ref CancelledError) err + + if not task.success.load(): + return failure("merkle tree task failed") + + defer: + task.layers = default(Isolated[seq[seq[ByteHash]]]) + + tree.layers = task.layers.extract + + success tree + +proc init*( + _: type CodexTree, tp: Taskpool, leaves: openArray[MultiHash] +): Future[?!CodexTree] = if leaves.len == 0: return failure "Empty leaves" @@ -168,7 +223,19 @@ func init*(_: type CodexTree, leaves: openArray[MultiHash]): ?!CodexTree = mcodec = leaves[0].mcodec leaves = leaves.mapIt(it.digestBytes) - CodexTree.init(mcodec, leaves) + return initCodexTreeAsync(tp, mcodec, leaves) + +proc init*( + _: type CodexTree, tp: Taskpool, leaves: seq[Cid] +): Future[?!CodexTree] {.async.} = + if leaves.len == 0: + return failure("Empty leaves") + + let + mcodec = (?leaves[0].mhash.mapFailure).mcodec + leaves = leaves.mapIt((?it.mhash.mapFailure).digestBytes) + + ?catch(await initCodexTreeAsync(tp, mcodec, leaves)) func init*(_: type CodexTree, leaves: openArray[Cid]): ?!CodexTree = if leaves.len == 0: diff --git a/codex/merkletree/merkletree.nim b/codex/merkletree/merkletree.nim index f1905bec..1b420863 100644 --- a/codex/merkletree/merkletree.nim +++ b/codex/merkletree/merkletree.nim @@ -9,9 +9,11 @@ {.push raises: [].} -import std/bitops +import std/[bitops, atomics] import pkg/questionable/results +import pkg/taskpools +import pkg/chronos/threadsync import ../errors @@ -30,6 +32,13 @@ type compress*: CompressFn[H, K] # compress function zero*: H # zero value + MerkleTask*[H, K] = object + tree*: ptr MerkleTree[H, K] + leaves*: seq[H] + signal*: ThreadSignalPtr + layers*: Isolated[seq[seq[H]]] + success*: Atomic[bool] + func depth*[H, K](self: MerkleTree[H, K]): int = return self.layers.len - 1 @@ -151,3 +160,17 @@ func merkleTreeWorker*[H, K]( ys[halfn] = ?self.compress(xs[n], self.zero, key = key) success @[@xs] & ?self.merkleTreeWorker(ys, isBottomLayer = false) + +proc asyncMerkleTreeWorker*[H, K](task: ptr MerkleTask[H, K]) {.gcsafe.} = + defer: + discard task[].signal.fireSync() + + let res = merkleTreeWorker(task[].tree[], task[].leaves, isBottomLayer = true) + + if res.isErr: + task[].success.store(false) + return + + var isolatedLayers = isolate(res.get()) + task[].layers = isolatedLayers + task[].success.store(true) diff --git a/codex/node.nim b/codex/node.nim index e010b085..e7a3b555 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -439,7 +439,7 @@ proc store*( finally: await stream.close() - without tree =? CodexTree.init(cids), err: + without tree =? (await CodexTree.init(self.taskpool, cids)), err: return failure(err) without treeCid =? tree.rootCid(CIDv1, dataCodec), err: diff --git a/tests/codex/merkletree/testasynctree.nim b/tests/codex/merkletree/testasynctree.nim new file mode 100644 index 00000000..604663df --- /dev/null +++ b/tests/codex/merkletree/testasynctree.nim @@ -0,0 +1,51 @@ +import std/sequtils + +import pkg/questionable/results +import pkg/stew/byteutils +import pkg/libp2p +import pkg/taskpools +import pkg/chronos + +import pkg/asynctest/chronos/unittest2 + +export unittest2 + +import pkg/codex/codextypes +import pkg/codex/merkletree +import pkg/codex/utils/digest + +import ./helpers + +# TODO: Generalize to other hashes + +const + data = [ + "00000000000000000000000000000001".toBytes, + "00000000000000000000000000000002".toBytes, + "00000000000000000000000000000003".toBytes, + "00000000000000000000000000000004".toBytes, + "00000000000000000000000000000005".toBytes, + "00000000000000000000000000000006".toBytes, + "00000000000000000000000000000007".toBytes, + "00000000000000000000000000000008".toBytes, + "00000000000000000000000000000009".toBytes, + "00000000000000000000000000000010".toBytes, + ] + sha256 = Sha256HashCodec + +suite "Test CodexTree": + var taskpool: Taskpool + + setup: + taskpool = Taskpool.new() + + teardown: + taskpool.shutdown() + + test "Should build tree from multihash leaves asyncronosly": + var t = await CodexTree.init(taskpool, sha256, leaves = data) + var tree = t.tryGet() + check: + tree.isOk + tree.get().leaves == data + tree.get().mcodec == sha256