threading support for tree building

This commit is contained in:
munna0908 2025-07-04 19:35:25 +05:30
parent b14307e341
commit 8aa51dfdd9
No known key found for this signature in database
GPG Key ID: 2FFCD637E937D3E6
6 changed files with 165 additions and 82 deletions

View File

@ -43,6 +43,8 @@ type
ByteTree* = MerkleTree[ByteHash, ByteTreeKey] ByteTree* = MerkleTree[ByteHash, ByteTreeKey]
ByteProof* = MerkleProof[ByteHash, ByteTreeKey] ByteProof* = MerkleProof[ByteHash, ByteTreeKey]
CodexTreeTask* = MerkleTask[ByteHash, ByteTreeKey]
CodexTree* = ref object of ByteTree CodexTree* = ref object of ByteTree
mcodec*: MultiCodec mcodec*: MultiCodec
@ -162,8 +164,11 @@ func init*(
self.layers = ?merkleTreeWorker(self, leaves, isBottomLayer = true) self.layers = ?merkleTreeWorker(self, leaves, isBottomLayer = true)
success self success self
proc initCodexTreeAsync*( proc init*(
tp: Taskpool, mcodec: MultiCodec = Sha256HashCodec, leaves: seq[ByteHash] _: type CodexTree,
tp: Taskpool,
mcodec: MultiCodec = Sha256HashCodec,
leaves: seq[ByteHash],
): Future[?!CodexTree] {.async: (raises: [CancelledError]).} = ): Future[?!CodexTree] {.async: (raises: [CancelledError]).} =
if leaves.len == 0: if leaves.len == 0:
return failure "Empty leaves" return failure "Empty leaves"
@ -177,26 +182,23 @@ proc initCodexTreeAsync*(
if mhash.size != leaves[0].len: if mhash.size != leaves[0].len:
return failure "Invalid hash length" return failure "Invalid hash length"
without threadPtr =? ThreadSignalPtr.new(): without signal =? ThreadSignalPtr.new():
return failure("Unable to create thread signal") return failure("Unable to create thread signal")
defer: defer:
threadPtr.close().expect("closing once works") signal.close().expect("closing once works")
var tree = CodexTree(compress: compressor, zero: Zero, mcodec: mcodec) var tree = CodexTree(compress: compressor, zero: Zero, mcodec: mcodec)
var task = MerkleTask[ByteHash, ByteTreeKey]( var task =
tree: cast[ptr MerkleTree[ByteHash, ByteTreeKey]](addr tree), CodexTreeTask(tree: cast[ptr ByteTree](addr tree), leaves: @leaves, signal: signal)
leaves: @leaves,
signal: threadPtr,
)
doAssert tp.numThreads > 1, doAssert tp.numThreads > 1,
"Must have at least one separate thread or signal will never be fired" "Must have at least one separate thread or signal will never be fired"
tp.spawn asyncMerkleTreeWorker(addr task) tp.spawn merkleTreeWorker(addr task)
let threadFut = threadPtr.wait() let threadFut = signal.wait()
if err =? catch(await threadFut.join()).errorOption: if err =? catch(await threadFut.join()).errorOption:
?catch(await noCancel threadFut) ?catch(await noCancel threadFut)
@ -213,9 +215,7 @@ proc initCodexTreeAsync*(
success tree success tree
proc init*( func init*(_: type CodexTree, leaves: openArray[MultiHash]): ?!CodexTree =
_: type CodexTree, tp: Taskpool, leaves: openArray[MultiHash]
): Future[?!CodexTree] =
if leaves.len == 0: if leaves.len == 0:
return failure "Empty leaves" return failure "Empty leaves"
@ -223,19 +223,19 @@ proc init*(
mcodec = leaves[0].mcodec mcodec = leaves[0].mcodec
leaves = leaves.mapIt(it.digestBytes) leaves = leaves.mapIt(it.digestBytes)
return initCodexTreeAsync(tp, mcodec, leaves) CodexTree.init(mcodec, leaves)
proc init*( proc init*(
_: type CodexTree, tp: Taskpool, leaves: seq[Cid] _: type CodexTree, tp: Taskpool, leaves: seq[MultiHash]
): Future[?!CodexTree] {.async.} = ): Future[?!CodexTree] {.async: (raises: [CancelledError]).} =
if leaves.len == 0: if leaves.len == 0:
return failure("Empty leaves") return failure "Empty leaves"
let let
mcodec = (?leaves[0].mhash.mapFailure).mcodec mcodec = leaves[0].mcodec
leaves = leaves.mapIt((?it.mhash.mapFailure).digestBytes) leaves = leaves.mapIt(it.digestBytes)
?catch(await initCodexTreeAsync(tp, mcodec, leaves)) await CodexTree.init(tp, mcodec, leaves)
func init*(_: type CodexTree, leaves: openArray[Cid]): ?!CodexTree = func init*(_: type CodexTree, leaves: openArray[Cid]): ?!CodexTree =
if leaves.len == 0: if leaves.len == 0:
@ -247,6 +247,18 @@ func init*(_: type CodexTree, leaves: openArray[Cid]): ?!CodexTree =
CodexTree.init(mcodec, leaves) CodexTree.init(mcodec, leaves)
proc init*(
_: type CodexTree, tp: Taskpool, leaves: seq[Cid]
): Future[?!CodexTree] {.async: (raises: [CancelledError]).} =
if leaves.len == 0:
return failure("Empty leaves")
let
mcodec = (?leaves[0].mhash.mapFailure).mcodec
leaves = leaves.mapIt((?it.mhash.mapFailure).digestBytes)
await CodexTree.init(tp, mcodec, leaves)
proc fromNodes*( proc fromNodes*(
_: type CodexTree, _: type CodexTree,
mcodec: MultiCodec = Sha256HashCodec, mcodec: MultiCodec = Sha256HashCodec,

View File

@ -161,7 +161,7 @@ func merkleTreeWorker*[H, K](
success @[@xs] & ?self.merkleTreeWorker(ys, isBottomLayer = false) success @[@xs] & ?self.merkleTreeWorker(ys, isBottomLayer = false)
proc asyncMerkleTreeWorker*[H, K](task: ptr MerkleTask[H, K]) {.gcsafe.} = proc merkleTreeWorker*[H, K](task: ptr MerkleTask[H, K]) {.gcsafe.} =
defer: defer:
discard task[].signal.fireSync() discard task[].signal.fireSync()

View File

@ -9,9 +9,11 @@
{.push raises: [].} {.push raises: [].}
import std/sequtils import std/[sequtils, atomics]
import pkg/poseidon2 import pkg/poseidon2
import pkg/taskpools
import pkg/chronos/threadsync
import pkg/constantine/math/io/io_fields import pkg/constantine/math/io/io_fields
import pkg/constantine/platforms/abstractions import pkg/constantine/platforms/abstractions
import pkg/questionable/results import pkg/questionable/results
@ -44,6 +46,8 @@ type
Poseidon2Tree* = MerkleTree[Poseidon2Hash, PoseidonKeysEnum] Poseidon2Tree* = MerkleTree[Poseidon2Hash, PoseidonKeysEnum]
Poseidon2Proof* = MerkleProof[Poseidon2Hash, PoseidonKeysEnum] Poseidon2Proof* = MerkleProof[Poseidon2Hash, PoseidonKeysEnum]
Poseidon2TreeTask* = MerkleTask[Poseidon2Hash, PoseidonKeysEnum]
proc `$`*(self: Poseidon2Tree): string = proc `$`*(self: Poseidon2Tree): string =
let root = if self.root.isOk: self.root.get.toHex else: "none" let root = if self.root.isOk: self.root.get.toHex else: "none"
"Poseidon2Tree(" & " root: " & root & ", leavesCount: " & $self.leavesCount & "Poseidon2Tree(" & " root: " & root & ", leavesCount: " & $self.leavesCount &
@ -77,9 +81,58 @@ func init*(_: type Poseidon2Tree, leaves: openArray[Poseidon2Hash]): ?!Poseidon2
self.layers = ?merkleTreeWorker(self, leaves, isBottomLayer = true) self.layers = ?merkleTreeWorker(self, leaves, isBottomLayer = true)
success self success self
proc init*(
_: type Poseidon2Tree, tp: Taskpool, leaves: seq[Poseidon2Hash]
): Future[?!Poseidon2Tree] {.async: (raises: [CancelledError]).} =
if leaves.len == 0:
return failure "Empty leaves"
let compressor = proc(
x, y: Poseidon2Hash, key: PoseidonKeysEnum
): ?!Poseidon2Hash {.noSideEffect.} =
success compress(x, y, key.toKey)
without signal =? ThreadSignalPtr.new():
return failure("Unable to create thread signal")
defer:
signal.close().expect("closing once works")
var tree = Poseidon2Tree(compress: compressor, zero: Poseidon2Zero)
var task = Poseidon2TreeTask(
tree: cast[ptr Poseidon2Tree](addr tree), leaves: @leaves, signal: signal
)
doAssert tp.numThreads > 1,
"Must have at least one separate thread or signal will never be fired"
tp.spawn merkleTreeWorker(addr task)
let threadFut = signal.wait()
if err =? catch(await threadFut.join()).errorOption:
?catch(await noCancel threadFut)
if err of CancelledError:
raise (ref CancelledError) err
if not task.success.load():
return failure("merkle tree task failed")
defer:
task.layers = default(Isolated[seq[seq[Poseidon2Hash]]])
tree.layers = task.layers.extract
success tree
func init*(_: type Poseidon2Tree, leaves: openArray[array[31, byte]]): ?!Poseidon2Tree = func init*(_: type Poseidon2Tree, leaves: openArray[array[31, byte]]): ?!Poseidon2Tree =
Poseidon2Tree.init(leaves.mapIt(Poseidon2Hash.fromBytes(it))) Poseidon2Tree.init(leaves.mapIt(Poseidon2Hash.fromBytes(it)))
proc init*(
_: type Poseidon2Tree, tp: Taskpool, leaves: seq[array[31, byte]]
): Future[?!Poseidon2Tree] {.async: (raises: [CancelledError]).} =
await Poseidon2Tree.init(tp, leaves.mapIt(Poseidon2Hash.fromBytes(it)))
proc fromNodes*( proc fromNodes*(
_: type Poseidon2Tree, nodes: openArray[Poseidon2Hash], nleaves: int _: type Poseidon2Tree, nodes: openArray[Poseidon2Hash], nleaves: int
): ?!Poseidon2Tree = ): ?!Poseidon2Tree =

View File

@ -1,51 +0,0 @@
import std/sequtils
import pkg/questionable/results
import pkg/stew/byteutils
import pkg/libp2p
import pkg/taskpools
import pkg/chronos
import pkg/asynctest/chronos/unittest2
export unittest2
import pkg/codex/codextypes
import pkg/codex/merkletree
import pkg/codex/utils/digest
import ./helpers
# TODO: Generalize to other hashes
const
data = [
"00000000000000000000000000000001".toBytes,
"00000000000000000000000000000002".toBytes,
"00000000000000000000000000000003".toBytes,
"00000000000000000000000000000004".toBytes,
"00000000000000000000000000000005".toBytes,
"00000000000000000000000000000006".toBytes,
"00000000000000000000000000000007".toBytes,
"00000000000000000000000000000008".toBytes,
"00000000000000000000000000000009".toBytes,
"00000000000000000000000000000010".toBytes,
]
sha256 = Sha256HashCodec
suite "Test CodexTree":
var taskpool: Taskpool
setup:
taskpool = Taskpool.new()
teardown:
taskpool.shutdown()
test "Should build tree from multihash leaves asyncronosly":
var t = await CodexTree.init(taskpool, sha256, leaves = data)
var tree = t.tryGet()
check:
tree.isOk
tree.get().leaves == data
tree.get().mcodec == sha256

View File

@ -1,6 +1,5 @@
import std/sequtils import std/sequtils
import pkg/unittest2
import pkg/questionable/results import pkg/questionable/results
import pkg/stew/byteutils import pkg/stew/byteutils
import pkg/libp2p import pkg/libp2p
@ -9,8 +8,11 @@ import pkg/codex/codextypes
import pkg/codex/merkletree import pkg/codex/merkletree
import pkg/codex/utils/digest import pkg/codex/utils/digest
import pkg/taskpools
import ./helpers import ./helpers
import ./generictreetests import ./generictreetests
import ../../asynctest
# TODO: Generalize to other hashes # TODO: Generalize to other hashes
@ -43,9 +45,22 @@ suite "Test CodexTree":
CodexTree.init(sha256, leaves = newSeq[ByteHash]()).isErr CodexTree.init(sha256, leaves = newSeq[ByteHash]()).isErr
test "Should build tree from multihash leaves": test "Should build tree from multihash leaves":
var expectedLeaves = data.mapIt(MultiHash.digest($sha256, it).tryGet()) var
expectedLeaves = data.mapIt(MultiHash.digest($sha256, it).tryGet())
tree = CodexTree.init(leaves = expectedLeaves)
check:
tree.isOk
tree.get().leaves == expectedLeaves.mapIt(it.digestBytes)
tree.get().mcodec == sha256
var tree = CodexTree.init(leaves = expectedLeaves) test "Should build tree from multihash leaves asynchronously":
var tp = Taskpool.new(numThreads = 2)
defer:
tp.shutdown()
let expectedLeaves = data.mapIt(MultiHash.digest($sha256, it).tryGet())
let tree = (await CodexTree.init(tp, leaves = expectedLeaves))
check: check:
tree.isOk tree.isOk
tree.get().leaves == expectedLeaves.mapIt(it.digestBytes) tree.get().leaves == expectedLeaves.mapIt(it.digestBytes)
@ -63,6 +78,22 @@ suite "Test CodexTree":
tree.get().leaves == expectedLeaves.mapIt(it.mhash.tryGet.digestBytes) tree.get().leaves == expectedLeaves.mapIt(it.mhash.tryGet.digestBytes)
tree.get().mcodec == sha256 tree.get().mcodec == sha256
test "Should build tree from cid leaves asynchronously":
var tp = Taskpool.new(numThreads = 2)
defer:
tp.shutdown()
let expectedLeaves = data.mapIt(
Cid.init(CidVersion.CIDv1, BlockCodec, MultiHash.digest($sha256, it).tryGet).tryGet
)
let tree = (await CodexTree.init(tp, leaves = expectedLeaves))
check:
tree.isOk
tree.get().leaves == expectedLeaves.mapIt(it.mhash.tryGet.digestBytes)
tree.get().mcodec == sha256
test "Should build from raw digestbytes (should not hash leaves)": test "Should build from raw digestbytes (should not hash leaves)":
let tree = CodexTree.init(sha256, leaves = data).tryGet let tree = CodexTree.init(sha256, leaves = data).tryGet
@ -70,6 +101,18 @@ suite "Test CodexTree":
tree.mcodec == sha256 tree.mcodec == sha256
tree.leaves == data tree.leaves == data
test "Should build from raw digestbytes (should not hash leaves) asynchronously":
var tp = Taskpool.new(numThreads = 2)
defer:
tp.shutdown()
let tree = (await CodexTree.init(tp, sha256, leaves = @data))
check:
tree.isOk
tree.get().mcodec == sha256
tree.get().leaves == data
test "Should build from nodes": test "Should build from nodes":
let let
tree = CodexTree.init(sha256, leaves = data).tryGet tree = CodexTree.init(sha256, leaves = data).tryGet

View File

@ -1,6 +1,5 @@
import std/sequtils import std/sequtils
import pkg/unittest2
import pkg/poseidon2 import pkg/poseidon2
import pkg/poseidon2/io import pkg/poseidon2/io
import pkg/questionable/results import pkg/questionable/results
@ -9,9 +8,11 @@ import pkg/stew/byteutils
import pkg/stew/arrayops import pkg/stew/arrayops
import pkg/codex/merkletree import pkg/codex/merkletree
import pkg/taskpools
import ./generictreetests import ./generictreetests
import ./helpers import ./helpers
import ../../asynctest
const data = [ const data = [
"0000000000000000000000000000001".toBytes, "0000000000000000000000000000001".toBytes,
@ -36,13 +37,14 @@ suite "Test Poseidon2Tree":
check: check:
Poseidon2Tree.init(leaves = newSeq[Poseidon2Hash](0)).isErr Poseidon2Tree.init(leaves = newSeq[Poseidon2Hash](0)).isErr
test "Init tree from poseidon2 leaves": test "Build tree from poseidon2 leaves":
let tree = Poseidon2Tree.init(leaves = expectedLeaves).tryGet var taskpool = Taskpool.new(numThreads = 2)
let tree = (await Poseidon2Tree.init(taskpool, leaves = expectedLeaves)).tryGet()
check: check:
tree.leaves == expectedLeaves tree.leaves == expectedLeaves
test "Init tree from byte leaves": test "Build tree from byte leaves":
let tree = Poseidon2Tree.init( let tree = Poseidon2Tree.init(
leaves = expectedLeaves.mapIt(array[31, byte].initCopyFrom(it.toBytes)) leaves = expectedLeaves.mapIt(array[31, byte].initCopyFrom(it.toBytes))
).tryGet ).tryGet
@ -50,7 +52,7 @@ suite "Test Poseidon2Tree":
check: check:
tree.leaves == expectedLeaves tree.leaves == expectedLeaves
test "Should build from nodes": test "Build tree from nodes":
let let
tree = Poseidon2Tree.init(leaves = expectedLeaves).tryGet tree = Poseidon2Tree.init(leaves = expectedLeaves).tryGet
fromNodes = Poseidon2Tree.fromNodes( fromNodes = Poseidon2Tree.fromNodes(
@ -60,6 +62,30 @@ suite "Test Poseidon2Tree":
check: check:
tree == fromNodes tree == fromNodes
test "Build poseidon2 tree from poseidon2 leaves asynchronously":
var tp = Taskpool.new(numThreads = 2)
defer:
tp.shutdown()
let tree = (await Poseidon2Tree.init(tp, leaves = expectedLeaves)).tryGet()
check:
tree.leaves == expectedLeaves
test "Build poseidon2 tree from byte leaves asynchronously":
var tp = Taskpool.new(numThreads = 2)
defer:
tp.shutdown()
let tree = (
await Poseidon2Tree.init(
tp, leaves = expectedLeaves.mapIt(array[31, byte].initCopyFrom(it.toBytes))
)
).tryGet()
check:
tree.leaves == expectedLeaves
let let
compressor = proc( compressor = proc(
x, y: Poseidon2Hash, key: PoseidonKeysEnum x, y: Poseidon2Hash, key: PoseidonKeysEnum