add mutlithreading support for codex tree

This commit is contained in:
munna0908 2025-07-02 18:26:59 +05:30
parent 7eb2fb12cc
commit b14307e341
No known key found for this signature in database
GPG Key ID: 2FFCD637E937D3E6
5 changed files with 152 additions and 9 deletions

View File

@ -428,7 +428,7 @@ proc encodeData(
return failure("Unable to store block!")
idx.inc(params.steps)
without tree =? CodexTree.init(cids[]), err:
without tree =? (await CodexTree.init(self.taskPool, cids[])), err:
return failure(err)
without treeCid =? tree.rootCid, err:
@ -649,7 +649,8 @@ proc decode*(self: Erasure, encoded: Manifest): Future[?!Manifest] {.async.} =
without (cids, recoveredIndices) =? (await self.decodeInternal(encoded)), err:
return failure(err)
without tree =? CodexTree.init(cids[0 ..< encoded.originalBlocksCount]), err:
without tree =?
(await CodexTree.init(self.taskPool, cids[0 ..< encoded.originalBlocksCount])), err:
return failure(err)
without treeCid =? tree.rootCid, err:
@ -680,7 +681,8 @@ proc repair*(self: Erasure, encoded: Manifest): Future[?!void] {.async.} =
without (cids, _) =? (await self.decodeInternal(encoded)), err:
return failure(err)
without tree =? CodexTree.init(cids[0 ..< encoded.originalBlocksCount]), err:
without tree =?
(await CodexTree.init(self.taskPool, cids[0 ..< encoded.originalBlocksCount])), err:
return failure(err)
without treeCid =? tree.rootCid, err:

View File

@ -10,12 +10,14 @@
{.push raises: [].}
import std/bitops
import std/sequtils
import std/[atomics, sequtils]
import pkg/questionable
import pkg/questionable/results
import pkg/libp2p/[cid, multicodec, multihash]
import pkg/constantine/hashes
import pkg/taskpools
import pkg/chronos/threadsync
import ../../utils
import ../../rng
import ../../errors
@ -48,7 +50,7 @@ type
mcodec*: MultiCodec
# CodeHashes is not exported from libp2p
# So we need to recreate it instead of
# So we need to recreate it instead of
proc initMultiHashCodeTable(): Table[MultiCodec, MHash] {.compileTime.} =
for item in HashesList:
result[item.mcodec] = item
@ -160,7 +162,60 @@ func init*(
self.layers = ?merkleTreeWorker(self, leaves, isBottomLayer = true)
success self
func init*(_: type CodexTree, leaves: openArray[MultiHash]): ?!CodexTree =
proc initCodexTreeAsync*(
tp: Taskpool, mcodec: MultiCodec = Sha256HashCodec, leaves: seq[ByteHash]
): Future[?!CodexTree] {.async: (raises: [CancelledError]).} =
if leaves.len == 0:
return failure "Empty leaves"
let
mhash = ?mcodec.mhash()
compressor = proc(x, y: seq[byte], key: ByteTreeKey): ?!ByteHash {.noSideEffect.} =
compress(x, y, key, mhash)
Zero: ByteHash = newSeq[byte](mhash.size)
if mhash.size != leaves[0].len:
return failure "Invalid hash length"
without threadPtr =? ThreadSignalPtr.new():
return failure("Unable to create thread signal")
defer:
threadPtr.close().expect("closing once works")
var tree = CodexTree(compress: compressor, zero: Zero, mcodec: mcodec)
var task = MerkleTask[ByteHash, ByteTreeKey](
tree: cast[ptr MerkleTree[ByteHash, ByteTreeKey]](addr tree),
leaves: @leaves,
signal: threadPtr,
)
doAssert tp.numThreads > 1,
"Must have at least one separate thread or signal will never be fired"
tp.spawn asyncMerkleTreeWorker(addr task)
let threadFut = threadPtr.wait()
if err =? catch(await threadFut.join()).errorOption:
?catch(await noCancel threadFut)
if err of CancelledError:
raise (ref CancelledError) err
if not task.success.load():
return failure("merkle tree task failed")
defer:
task.layers = default(Isolated[seq[seq[ByteHash]]])
tree.layers = task.layers.extract
success tree
proc init*(
_: type CodexTree, tp: Taskpool, leaves: openArray[MultiHash]
): Future[?!CodexTree] =
if leaves.len == 0:
return failure "Empty leaves"
@ -168,7 +223,19 @@ func init*(_: type CodexTree, leaves: openArray[MultiHash]): ?!CodexTree =
mcodec = leaves[0].mcodec
leaves = leaves.mapIt(it.digestBytes)
CodexTree.init(mcodec, leaves)
return initCodexTreeAsync(tp, mcodec, leaves)
proc init*(
_: type CodexTree, tp: Taskpool, leaves: seq[Cid]
): Future[?!CodexTree] {.async.} =
if leaves.len == 0:
return failure("Empty leaves")
let
mcodec = (?leaves[0].mhash.mapFailure).mcodec
leaves = leaves.mapIt((?it.mhash.mapFailure).digestBytes)
?catch(await initCodexTreeAsync(tp, mcodec, leaves))
func init*(_: type CodexTree, leaves: openArray[Cid]): ?!CodexTree =
if leaves.len == 0:

View File

@ -9,9 +9,11 @@
{.push raises: [].}
import std/bitops
import std/[bitops, atomics]
import pkg/questionable/results
import pkg/taskpools
import pkg/chronos/threadsync
import ../errors
@ -30,6 +32,13 @@ type
compress*: CompressFn[H, K] # compress function
zero*: H # zero value
MerkleTask*[H, K] = object
tree*: ptr MerkleTree[H, K]
leaves*: seq[H]
signal*: ThreadSignalPtr
layers*: Isolated[seq[seq[H]]]
success*: Atomic[bool]
func depth*[H, K](self: MerkleTree[H, K]): int =
return self.layers.len - 1
@ -151,3 +160,17 @@ func merkleTreeWorker*[H, K](
ys[halfn] = ?self.compress(xs[n], self.zero, key = key)
success @[@xs] & ?self.merkleTreeWorker(ys, isBottomLayer = false)
proc asyncMerkleTreeWorker*[H, K](task: ptr MerkleTask[H, K]) {.gcsafe.} =
defer:
discard task[].signal.fireSync()
let res = merkleTreeWorker(task[].tree[], task[].leaves, isBottomLayer = true)
if res.isErr:
task[].success.store(false)
return
var isolatedLayers = isolate(res.get())
task[].layers = isolatedLayers
task[].success.store(true)

View File

@ -439,7 +439,7 @@ proc store*(
finally:
await stream.close()
without tree =? CodexTree.init(cids), err:
without tree =? (await CodexTree.init(self.taskpool, cids)), err:
return failure(err)
without treeCid =? tree.rootCid(CIDv1, dataCodec), err:

View File

@ -0,0 +1,51 @@
import std/sequtils
import pkg/questionable/results
import pkg/stew/byteutils
import pkg/libp2p
import pkg/taskpools
import pkg/chronos
import pkg/asynctest/chronos/unittest2
export unittest2
import pkg/codex/codextypes
import pkg/codex/merkletree
import pkg/codex/utils/digest
import ./helpers
# TODO: Generalize to other hashes
const
data = [
"00000000000000000000000000000001".toBytes,
"00000000000000000000000000000002".toBytes,
"00000000000000000000000000000003".toBytes,
"00000000000000000000000000000004".toBytes,
"00000000000000000000000000000005".toBytes,
"00000000000000000000000000000006".toBytes,
"00000000000000000000000000000007".toBytes,
"00000000000000000000000000000008".toBytes,
"00000000000000000000000000000009".toBytes,
"00000000000000000000000000000010".toBytes,
]
sha256 = Sha256HashCodec
suite "Test CodexTree":
var taskpool: Taskpool
setup:
taskpool = Taskpool.new()
teardown:
taskpool.shutdown()
test "Should build tree from multihash leaves asyncronosly":
var t = await CodexTree.init(taskpool, sha256, leaves = data)
var tree = t.tryGet()
check:
tree.isOk
tree.get().leaves == data
tree.get().mcodec == sha256