diff --git a/codex/merkletree/codex/codex.nim b/codex/merkletree/codex/codex.nim index b35de0b2..b0892dda 100644 --- a/codex/merkletree/codex/codex.nim +++ b/codex/merkletree/codex/codex.nim @@ -43,6 +43,8 @@ type ByteTree* = MerkleTree[ByteHash, ByteTreeKey] ByteProof* = MerkleProof[ByteHash, ByteTreeKey] + CodexTreeTask* = MerkleTask[ByteHash, ByteTreeKey] + CodexTree* = ref object of ByteTree mcodec*: MultiCodec @@ -162,8 +164,11 @@ func init*( self.layers = ?merkleTreeWorker(self, leaves, isBottomLayer = true) success self -proc initCodexTreeAsync*( - tp: Taskpool, mcodec: MultiCodec = Sha256HashCodec, leaves: seq[ByteHash] +proc init*( + _: type CodexTree, + tp: Taskpool, + mcodec: MultiCodec = Sha256HashCodec, + leaves: seq[ByteHash], ): Future[?!CodexTree] {.async: (raises: [CancelledError]).} = if leaves.len == 0: return failure "Empty leaves" @@ -177,26 +182,23 @@ proc initCodexTreeAsync*( if mhash.size != leaves[0].len: return failure "Invalid hash length" - without threadPtr =? ThreadSignalPtr.new(): + without signal =? ThreadSignalPtr.new(): return failure("Unable to create thread signal") defer: - threadPtr.close().expect("closing once works") + signal.close().expect("closing once works") var tree = CodexTree(compress: compressor, zero: Zero, mcodec: mcodec) - var task = MerkleTask[ByteHash, ByteTreeKey]( - tree: cast[ptr MerkleTree[ByteHash, ByteTreeKey]](addr tree), - leaves: @leaves, - signal: threadPtr, - ) + var task = + CodexTreeTask(tree: cast[ptr ByteTree](addr tree), leaves: @leaves, signal: signal) doAssert tp.numThreads > 1, "Must have at least one separate thread or signal will never be fired" - tp.spawn asyncMerkleTreeWorker(addr task) + tp.spawn merkleTreeWorker(addr task) - let threadFut = threadPtr.wait() + let threadFut = signal.wait() if err =? catch(await threadFut.join()).errorOption: ?catch(await noCancel threadFut) @@ -213,9 +215,7 @@ proc initCodexTreeAsync*( success tree -proc init*( - _: type CodexTree, tp: Taskpool, leaves: openArray[MultiHash] -): Future[?!CodexTree] = +func init*(_: type CodexTree, leaves: openArray[MultiHash]): ?!CodexTree = if leaves.len == 0: return failure "Empty leaves" @@ -223,19 +223,19 @@ proc init*( mcodec = leaves[0].mcodec leaves = leaves.mapIt(it.digestBytes) - return initCodexTreeAsync(tp, mcodec, leaves) + CodexTree.init(mcodec, leaves) proc init*( - _: type CodexTree, tp: Taskpool, leaves: seq[Cid] -): Future[?!CodexTree] {.async.} = + _: type CodexTree, tp: Taskpool, leaves: seq[MultiHash] +): Future[?!CodexTree] {.async: (raises: [CancelledError]).} = if leaves.len == 0: - return failure("Empty leaves") + return failure "Empty leaves" let - mcodec = (?leaves[0].mhash.mapFailure).mcodec - leaves = leaves.mapIt((?it.mhash.mapFailure).digestBytes) + mcodec = leaves[0].mcodec + leaves = leaves.mapIt(it.digestBytes) - ?catch(await initCodexTreeAsync(tp, mcodec, leaves)) + await CodexTree.init(tp, mcodec, leaves) func init*(_: type CodexTree, leaves: openArray[Cid]): ?!CodexTree = if leaves.len == 0: @@ -247,6 +247,18 @@ func init*(_: type CodexTree, leaves: openArray[Cid]): ?!CodexTree = CodexTree.init(mcodec, leaves) +proc init*( + _: type CodexTree, tp: Taskpool, leaves: seq[Cid] +): Future[?!CodexTree] {.async: (raises: [CancelledError]).} = + if leaves.len == 0: + return failure("Empty leaves") + + let + mcodec = (?leaves[0].mhash.mapFailure).mcodec + leaves = leaves.mapIt((?it.mhash.mapFailure).digestBytes) + + await CodexTree.init(tp, mcodec, leaves) + proc fromNodes*( _: type CodexTree, mcodec: MultiCodec = Sha256HashCodec, diff --git a/codex/merkletree/merkletree.nim b/codex/merkletree/merkletree.nim index 1b420863..c973a7e1 100644 --- a/codex/merkletree/merkletree.nim +++ b/codex/merkletree/merkletree.nim @@ -161,7 +161,7 @@ func merkleTreeWorker*[H, K]( success @[@xs] & ?self.merkleTreeWorker(ys, isBottomLayer = false) -proc asyncMerkleTreeWorker*[H, K](task: ptr MerkleTask[H, K]) {.gcsafe.} = +proc merkleTreeWorker*[H, K](task: ptr MerkleTask[H, K]) {.gcsafe.} = defer: discard task[].signal.fireSync() diff --git a/codex/merkletree/poseidon2.nim b/codex/merkletree/poseidon2.nim index 56ad1e4d..0b6fdd77 100644 --- a/codex/merkletree/poseidon2.nim +++ b/codex/merkletree/poseidon2.nim @@ -9,9 +9,11 @@ {.push raises: [].} -import std/sequtils +import std/[sequtils, atomics] import pkg/poseidon2 +import pkg/taskpools +import pkg/chronos/threadsync import pkg/constantine/math/io/io_fields import pkg/constantine/platforms/abstractions import pkg/questionable/results @@ -44,6 +46,8 @@ type Poseidon2Tree* = MerkleTree[Poseidon2Hash, PoseidonKeysEnum] Poseidon2Proof* = MerkleProof[Poseidon2Hash, PoseidonKeysEnum] + Poseidon2TreeTask* = MerkleTask[Poseidon2Hash, PoseidonKeysEnum] + proc `$`*(self: Poseidon2Tree): string = let root = if self.root.isOk: self.root.get.toHex else: "none" "Poseidon2Tree(" & " root: " & root & ", leavesCount: " & $self.leavesCount & @@ -77,9 +81,58 @@ func init*(_: type Poseidon2Tree, leaves: openArray[Poseidon2Hash]): ?!Poseidon2 self.layers = ?merkleTreeWorker(self, leaves, isBottomLayer = true) success self +proc init*( + _: type Poseidon2Tree, tp: Taskpool, leaves: seq[Poseidon2Hash] +): Future[?!Poseidon2Tree] {.async: (raises: [CancelledError]).} = + if leaves.len == 0: + return failure "Empty leaves" + + let compressor = proc( + x, y: Poseidon2Hash, key: PoseidonKeysEnum + ): ?!Poseidon2Hash {.noSideEffect.} = + success compress(x, y, key.toKey) + + without signal =? ThreadSignalPtr.new(): + return failure("Unable to create thread signal") + + defer: + signal.close().expect("closing once works") + + var tree = Poseidon2Tree(compress: compressor, zero: Poseidon2Zero) + var task = Poseidon2TreeTask( + tree: cast[ptr Poseidon2Tree](addr tree), leaves: @leaves, signal: signal + ) + + doAssert tp.numThreads > 1, + "Must have at least one separate thread or signal will never be fired" + + tp.spawn merkleTreeWorker(addr task) + + let threadFut = signal.wait() + + if err =? catch(await threadFut.join()).errorOption: + ?catch(await noCancel threadFut) + if err of CancelledError: + raise (ref CancelledError) err + + if not task.success.load(): + return failure("merkle tree task failed") + + defer: + task.layers = default(Isolated[seq[seq[Poseidon2Hash]]]) + + tree.layers = task.layers.extract + + success tree + func init*(_: type Poseidon2Tree, leaves: openArray[array[31, byte]]): ?!Poseidon2Tree = Poseidon2Tree.init(leaves.mapIt(Poseidon2Hash.fromBytes(it))) +proc init*( + _: type Poseidon2Tree, tp: Taskpool, leaves: seq[array[31, byte]] +): Future[?!Poseidon2Tree] {.async: (raises: [CancelledError]).} = + await Poseidon2Tree.init(tp, leaves.mapIt(Poseidon2Hash.fromBytes(it))) + proc fromNodes*( _: type Poseidon2Tree, nodes: openArray[Poseidon2Hash], nleaves: int ): ?!Poseidon2Tree = diff --git a/tests/codex/merkletree/testasynctree.nim b/tests/codex/merkletree/testasynctree.nim deleted file mode 100644 index 604663df..00000000 --- a/tests/codex/merkletree/testasynctree.nim +++ /dev/null @@ -1,51 +0,0 @@ -import std/sequtils - -import pkg/questionable/results -import pkg/stew/byteutils -import pkg/libp2p -import pkg/taskpools -import pkg/chronos - -import pkg/asynctest/chronos/unittest2 - -export unittest2 - -import pkg/codex/codextypes -import pkg/codex/merkletree -import pkg/codex/utils/digest - -import ./helpers - -# TODO: Generalize to other hashes - -const - data = [ - "00000000000000000000000000000001".toBytes, - "00000000000000000000000000000002".toBytes, - "00000000000000000000000000000003".toBytes, - "00000000000000000000000000000004".toBytes, - "00000000000000000000000000000005".toBytes, - "00000000000000000000000000000006".toBytes, - "00000000000000000000000000000007".toBytes, - "00000000000000000000000000000008".toBytes, - "00000000000000000000000000000009".toBytes, - "00000000000000000000000000000010".toBytes, - ] - sha256 = Sha256HashCodec - -suite "Test CodexTree": - var taskpool: Taskpool - - setup: - taskpool = Taskpool.new() - - teardown: - taskpool.shutdown() - - test "Should build tree from multihash leaves asyncronosly": - var t = await CodexTree.init(taskpool, sha256, leaves = data) - var tree = t.tryGet() - check: - tree.isOk - tree.get().leaves == data - tree.get().mcodec == sha256 diff --git a/tests/codex/merkletree/testcodextree.nim b/tests/codex/merkletree/testcodextree.nim index 29390c16..e8cc537a 100644 --- a/tests/codex/merkletree/testcodextree.nim +++ b/tests/codex/merkletree/testcodextree.nim @@ -1,6 +1,5 @@ import std/sequtils -import pkg/unittest2 import pkg/questionable/results import pkg/stew/byteutils import pkg/libp2p @@ -9,8 +8,11 @@ import pkg/codex/codextypes import pkg/codex/merkletree import pkg/codex/utils/digest +import pkg/taskpools + import ./helpers import ./generictreetests +import ../../asynctest # TODO: Generalize to other hashes @@ -43,9 +45,22 @@ suite "Test CodexTree": CodexTree.init(sha256, leaves = newSeq[ByteHash]()).isErr test "Should build tree from multihash leaves": - var expectedLeaves = data.mapIt(MultiHash.digest($sha256, it).tryGet()) + var + expectedLeaves = data.mapIt(MultiHash.digest($sha256, it).tryGet()) + tree = CodexTree.init(leaves = expectedLeaves) + check: + tree.isOk + tree.get().leaves == expectedLeaves.mapIt(it.digestBytes) + tree.get().mcodec == sha256 - var tree = CodexTree.init(leaves = expectedLeaves) + test "Should build tree from multihash leaves asynchronously": + var tp = Taskpool.new(numThreads = 2) + defer: + tp.shutdown() + + let expectedLeaves = data.mapIt(MultiHash.digest($sha256, it).tryGet()) + + let tree = (await CodexTree.init(tp, leaves = expectedLeaves)) check: tree.isOk tree.get().leaves == expectedLeaves.mapIt(it.digestBytes) @@ -63,6 +78,22 @@ suite "Test CodexTree": tree.get().leaves == expectedLeaves.mapIt(it.mhash.tryGet.digestBytes) tree.get().mcodec == sha256 + test "Should build tree from cid leaves asynchronously": + var tp = Taskpool.new(numThreads = 2) + defer: + tp.shutdown() + + let expectedLeaves = data.mapIt( + Cid.init(CidVersion.CIDv1, BlockCodec, MultiHash.digest($sha256, it).tryGet).tryGet + ) + + let tree = (await CodexTree.init(tp, leaves = expectedLeaves)) + + check: + tree.isOk + tree.get().leaves == expectedLeaves.mapIt(it.mhash.tryGet.digestBytes) + tree.get().mcodec == sha256 + test "Should build from raw digestbytes (should not hash leaves)": let tree = CodexTree.init(sha256, leaves = data).tryGet @@ -70,6 +101,18 @@ suite "Test CodexTree": tree.mcodec == sha256 tree.leaves == data + test "Should build from raw digestbytes (should not hash leaves) asynchronously": + var tp = Taskpool.new(numThreads = 2) + defer: + tp.shutdown() + + let tree = (await CodexTree.init(tp, sha256, leaves = @data)) + + check: + tree.isOk + tree.get().mcodec == sha256 + tree.get().leaves == data + test "Should build from nodes": let tree = CodexTree.init(sha256, leaves = data).tryGet diff --git a/tests/codex/merkletree/testposeidon2tree.nim b/tests/codex/merkletree/testposeidon2tree.nim index e12751b7..0a3e48ac 100644 --- a/tests/codex/merkletree/testposeidon2tree.nim +++ b/tests/codex/merkletree/testposeidon2tree.nim @@ -1,6 +1,5 @@ import std/sequtils -import pkg/unittest2 import pkg/poseidon2 import pkg/poseidon2/io import pkg/questionable/results @@ -9,9 +8,11 @@ import pkg/stew/byteutils import pkg/stew/arrayops import pkg/codex/merkletree +import pkg/taskpools import ./generictreetests import ./helpers +import ../../asynctest const data = [ "0000000000000000000000000000001".toBytes, @@ -36,13 +37,14 @@ suite "Test Poseidon2Tree": check: Poseidon2Tree.init(leaves = newSeq[Poseidon2Hash](0)).isErr - test "Init tree from poseidon2 leaves": - let tree = Poseidon2Tree.init(leaves = expectedLeaves).tryGet + test "Build tree from poseidon2 leaves": + var taskpool = Taskpool.new(numThreads = 2) + let tree = (await Poseidon2Tree.init(taskpool, leaves = expectedLeaves)).tryGet() check: tree.leaves == expectedLeaves - test "Init tree from byte leaves": + test "Build tree from byte leaves": let tree = Poseidon2Tree.init( leaves = expectedLeaves.mapIt(array[31, byte].initCopyFrom(it.toBytes)) ).tryGet @@ -50,7 +52,7 @@ suite "Test Poseidon2Tree": check: tree.leaves == expectedLeaves - test "Should build from nodes": + test "Build tree from nodes": let tree = Poseidon2Tree.init(leaves = expectedLeaves).tryGet fromNodes = Poseidon2Tree.fromNodes( @@ -60,6 +62,30 @@ suite "Test Poseidon2Tree": check: tree == fromNodes + test "Build poseidon2 tree from poseidon2 leaves asynchronously": + var tp = Taskpool.new(numThreads = 2) + defer: + tp.shutdown() + + let tree = (await Poseidon2Tree.init(tp, leaves = expectedLeaves)).tryGet() + + check: + tree.leaves == expectedLeaves + + test "Build poseidon2 tree from byte leaves asynchronously": + var tp = Taskpool.new(numThreads = 2) + defer: + tp.shutdown() + + let tree = ( + await Poseidon2Tree.init( + tp, leaves = expectedLeaves.mapIt(array[31, byte].initCopyFrom(it.toBytes)) + ) + ).tryGet() + + check: + tree.leaves == expectedLeaves + let compressor = proc( x, y: Poseidon2Hash, key: PoseidonKeysEnum