diff --git a/codex/codex.nim b/codex/codex.nim index 73575c53..3c28ec01 100644 --- a/codex/codex.nim +++ b/codex/codex.nim @@ -258,16 +258,16 @@ proc new*( var cache: CacheStore = nil - taskpool: Taskpool + taskPool: Taskpool try: if config.numThreads == ThreadCount(0): - taskpool = Taskpool.new(numThreads = min(countProcessors(), 16)) + taskPool = Taskpool.new(numThreads = min(countProcessors(), 16)) else: - taskpool = Taskpool.new(numThreads = int(config.numThreads)) - info "Threadpool started", numThreads = taskpool.numThreads + taskPool = Taskpool.new(numThreads = int(config.numThreads)) + info "Threadpool started", numThreads = taskPool.numThreads except CatchableError as exc: - raiseAssert("Failure in taskpool initialization:" & exc.msg) + raiseAssert("Failure in taskPool initialization:" & exc.msg) if config.cacheSize > 0'nb: cache = CacheStore.new(cacheSize = config.cacheSize) @@ -349,7 +349,7 @@ proc new*( if config.prover: let backend = config.initializeBackend().expect("Unable to create prover backend.") - some Prover.new(store, backend, config.numProofSamples) + some Prover.new(store, backend, config.numProofSamples, taskPool) else: none Prover @@ -359,7 +359,7 @@ proc new*( engine = engine, discovery = discovery, prover = prover, - taskPool = taskpool, + taskPool = taskPool, ) var restServer: RestServerRef = nil @@ -382,5 +382,5 @@ proc new*( restServer: restServer, repoStore: repoStore, maintenance: maintenance, - taskpool: taskpool, + taskPool: taskPool, ) diff --git a/codex/erasure/erasure.nim b/codex/erasure/erasure.nim index c8afe440..c91968f9 100644 --- a/codex/erasure/erasure.nim +++ b/codex/erasure/erasure.nim @@ -425,7 +425,7 @@ proc encodeData( return failure("Unable to store block!") idx.inc(params.steps) - without tree =? CodexTree.init(cids[]), err: + without tree =? (await CodexTree.init(self.taskPool, cids[])), err: return failure(err) without treeCid =? tree.rootCid, err: @@ -646,7 +646,8 @@ proc decode*(self: Erasure, encoded: Manifest): Future[?!Manifest] {.async.} = without (cids, recoveredIndices) =? (await self.decodeInternal(encoded)), err: return failure(err) - without tree =? CodexTree.init(cids[0 ..< encoded.originalBlocksCount]), err: + without tree =? + (await CodexTree.init(self.taskPool, cids[0 ..< encoded.originalBlocksCount])), err: return failure(err) without treeCid =? tree.rootCid, err: @@ -677,7 +678,8 @@ proc repair*(self: Erasure, encoded: Manifest): Future[?!void] {.async.} = without (cids, _) =? (await self.decodeInternal(encoded)), err: return failure(err) - without tree =? CodexTree.init(cids[0 ..< encoded.originalBlocksCount]), err: + without tree =? + (await CodexTree.init(self.taskPool, cids[0 ..< encoded.originalBlocksCount])), err: return failure(err) without treeCid =? tree.rootCid, err: diff --git a/codex/merkletree/codex/codex.nim b/codex/merkletree/codex/codex.nim index b3e3e18b..173befd0 100644 --- a/codex/merkletree/codex/codex.nim +++ b/codex/merkletree/codex/codex.nim @@ -10,16 +10,18 @@ {.push raises: [].} import std/bitops -import std/sequtils +import std/[atomics, sequtils] import pkg/questionable import pkg/questionable/results import pkg/libp2p/[cid, multicodec, multihash] import pkg/constantine/hashes +import pkg/taskpools +import pkg/chronos/threadsync import ../../utils import ../../rng import ../../errors -import ../../blocktype +import ../../codextypes from ../../utils/digest import digestBytes @@ -113,9 +115,7 @@ func compress*(x, y: openArray[byte], key: ByteTreeKey, codec: MultiCodec): ?!By let digest = ?MultiHash.digest(codec, input).mapFailure success digest.digestBytes -func init*( - _: type CodexTree, mcodec: MultiCodec = Sha256HashCodec, leaves: openArray[ByteHash] -): ?!CodexTree = +func initTree(mcodec: MultiCodec, leaves: openArray[ByteHash]): ?!CodexTree = if leaves.len == 0: return failure "Empty leaves" @@ -128,11 +128,27 @@ func init*( if digestSize != leaves[0].len: return failure "Invalid hash length" - var self = CodexTree(mcodec: mcodec, compress: compressor, zero: Zero) - - self.layers = ?merkleTreeWorker(self, leaves, isBottomLayer = true) + var self = CodexTree(mcodec: mcodec) + ?self.prepare(compressor, Zero, leaves) success self +func init*( + _: type CodexTree, mcodec: MultiCodec = Sha256HashCodec, leaves: openArray[ByteHash] +): ?!CodexTree = + let tree = ?initTree(mcodec, leaves) + ?tree.compute() + success tree + +proc init*( + _: type CodexTree, + tp: Taskpool, + mcodec: MultiCodec = Sha256HashCodec, + leaves: seq[ByteHash], +): Future[?!CodexTree] {.async: (raises: [CancelledError]).} = + let tree = ?initTree(mcodec, leaves) + ?await tree.compute(tp) + success tree + func init*(_: type CodexTree, leaves: openArray[MultiHash]): ?!CodexTree = if leaves.len == 0: return failure "Empty leaves" @@ -143,6 +159,18 @@ func init*(_: type CodexTree, leaves: openArray[MultiHash]): ?!CodexTree = CodexTree.init(mcodec, leaves) +proc init*( + _: type CodexTree, tp: Taskpool, leaves: seq[MultiHash] +): Future[?!CodexTree] {.async: (raises: [CancelledError]).} = + if leaves.len == 0: + return failure "Empty leaves" + + let + mcodec = leaves[0].mcodec + leaves = leaves.mapIt(it.digestBytes) + + await CodexTree.init(tp, mcodec, leaves) + func init*(_: type CodexTree, leaves: openArray[Cid]): ?!CodexTree = if leaves.len == 0: return failure "Empty leaves" @@ -153,6 +181,18 @@ func init*(_: type CodexTree, leaves: openArray[Cid]): ?!CodexTree = CodexTree.init(mcodec, leaves) +proc init*( + _: type CodexTree, tp: Taskpool, leaves: seq[Cid] +): Future[?!CodexTree] {.async: (raises: [CancelledError]).} = + if leaves.len == 0: + return failure("Empty leaves") + + let + mcodec = (?leaves[0].mhash.mapFailure).mcodec + leaves = leaves.mapIt((?it.mhash.mapFailure).digestBytes) + + await CodexTree.init(tp, mcodec, leaves) + proc fromNodes*( _: type CodexTree, mcodec: MultiCodec = Sha256HashCodec, @@ -171,15 +211,8 @@ proc fromNodes*( if digestSize != nodes[0].len: return failure "Invalid hash length" - var - self = CodexTree(compress: compressor, zero: Zero, mcodec: mcodec) - layer = nleaves - pos = 0 - - while pos < nodes.len: - self.layers.add(nodes[pos ..< (pos + layer)]) - pos += layer - layer = divUp(layer, 2) + var self = CodexTree(mcodec: mcodec) + ?self.fromNodes(compressor, Zero, nodes, nleaves) let index = Rng.instance.rand(nleaves - 1) diff --git a/codex/merkletree/merkletree.nim b/codex/merkletree/merkletree.nim index 06859316..56491d84 100644 --- a/codex/merkletree/merkletree.nim +++ b/codex/merkletree/merkletree.nim @@ -9,19 +9,58 @@ {.push raises: [].} -import std/bitops +import std/[bitops, atomics, sequtils] +import stew/assign2 import pkg/questionable/results +import pkg/taskpools +import pkg/chronos +import pkg/chronos/threadsync import ../errors +import ../utils/sharedbuf + +export sharedbuf + +template nodeData( + data: openArray[byte], offsets: openArray[int], nodeSize, i, j: int +): openArray[byte] = + ## Bytes of the j'th entry of the i'th level in the tree, starting with the + ## leaves (at level 0). + let start = (offsets[i] + j) * nodeSize + data.toOpenArray(start, start + nodeSize - 1) type + # TODO hash functions don't fail - removing the ?! from this function would + # significantly simplify the flow below CompressFn*[H, K] = proc(x, y: H, key: K): ?!H {.noSideEffect, raises: [].} - MerkleTree*[H, K] = ref object of RootObj - layers*: seq[seq[H]] - compress*: CompressFn[H, K] - zero*: H + CompressData[H, K] = object + fn: CompressFn[H, K] + nodeSize: int + zero: H + + MerkleTreeObj*[H, K] = object of RootObj + store*: seq[byte] + ## Flattened merkle tree where hashes are assumed to be trivial bytes and + ## uniform in size. + ## + ## Each layer of the tree is stored serially starting with the leaves and + ## ending with the root. + ## + ## Beacuse the tree might not be balanced, `layerOffsets` contains the + ## index of the starting point of each level, for easy lookup. + layerOffsets*: seq[int] + ## Starting point of each level in the tree, starting from the leaves - + ## multiplied by the entry size, this is the offset in the payload where + ## the entries of that level start + ## + ## For example, a tree with 4 leaves will have [0, 4, 6] stored here. + ## + ## See nodesPerLevel function, from whic this sequence is derived + compress*: CompressData[H, K] + + MerkleTree*[H, K] = ref MerkleTreeObj[H, K] MerkleProof*[H, K] = ref object of RootObj index*: int # linear index of the leaf, starting from 0 @@ -30,33 +69,99 @@ type compress*: CompressFn[H, K] # compress function zero*: H # zero value +func levels*[H, K](self: MerkleTree[H, K]): int = + return self.layerOffsets.len + func depth*[H, K](self: MerkleTree[H, K]): int = - return self.layers.len - 1 + return self.levels() - 1 + +func nodesInLayer(offsets: openArray[int], layer: int): int = + if layer == offsets.high: + 1 + else: + offsets[layer + 1] - offsets[layer] + +func nodesInLayer(self: MerkleTree | MerkleTreeObj, layer: int): int = + self.layerOffsets.nodesInLayer(layer) func leavesCount*[H, K](self: MerkleTree[H, K]): int = - return self.layers[0].len + return self.nodesInLayer(0) -func levels*[H, K](self: MerkleTree[H, K]): int = - return self.layers.len +func nodesPerLevel(nleaves: int): seq[int] = + ## Given a number of leaves, return a seq with the number of nodes at each + ## layer of the tree (from the bottom/leaves to the root) + ## + ## Ie For a tree of 4 leaves, return `[4, 2, 1]` + if nleaves <= 0: + return @[] + elif nleaves == 1: + return @[1, 1] # leaf and root -func leaves*[H, K](self: MerkleTree[H, K]): seq[H] = - return self.layers[0] + var nodes: seq[int] = @[] + var m = nleaves + while true: + nodes.add(m) + if m == 1: + break + # Next layer size is ceil(m/2) + m = (m + 1) shr 1 -iterator layers*[H, K](self: MerkleTree[H, K]): seq[H] = - for layer in self.layers: - yield layer + nodes + +func layerOffsets(nleaves: int): seq[int] = + ## Given a number of leaves, return a seq of the starting offsets of each + ## layer in the node store that results from flattening the binary tree + ## + ## Ie For a tree of 4 leaves, return `[0, 4, 6]` + let nodes = nodesPerLevel(nleaves) + var tot = 0 + let offsets = nodes.mapIt: + let cur = tot + tot += it + cur + offsets + +template nodeData(self: MerkleTreeObj, i, j: int): openArray[byte] = + ## Bytes of the j'th node of the i'th level in the tree, starting with the + ## leaves (at level 0). + self.store.nodeData(self.layerOffsets, self.compress.nodeSize, i, j) + +func layer*[H, K]( + self: MerkleTree[H, K], layer: int +): seq[H] {.deprecated: "Expensive".} = + var nodes = newSeq[H](self.nodesInLayer(layer)) + for i, h in nodes.mpairs: + assign(h, self[].nodeData(layer, i)) + return nodes + +func leaves*[H, K](self: MerkleTree[H, K]): seq[H] {.deprecated: "Expensive".} = + self.layer(0) + +iterator layers*[H, K](self: MerkleTree[H, K]): seq[H] {.deprecated: "Expensive".} = + for i in 0 ..< self.layerOffsets.len: + yield self.layer(i) + +proc layers*[H, K](self: MerkleTree[H, K]): seq[seq[H]] {.deprecated: "Expensive".} = + for l in self.layers(): + result.add l iterator nodes*[H, K](self: MerkleTree[H, K]): H = - for layer in self.layers: - for node in layer: + ## Iterate over the nodes of each layer starting with the leaves + var node: H + for i in 0 ..< self.layerOffsets.len: + let nodesInLayer = self.nodesInLayer(i) + for j in 0 ..< nodesInLayer: + assign(node, self[].nodeData(i, j)) yield node func root*[H, K](self: MerkleTree[H, K]): ?!H = - let last = self.layers[^1] - if last.len != 1: + mixin assign + if self.layerOffsets.len == 0: return failure "invalid tree" - return success last[0] + var h: H + assign(h, self[].nodeData(self.layerOffsets.high(), 0)) + return success h func getProof*[H, K]( self: MerkleTree[H, K], index: int, proof: MerkleProof[H, K] @@ -72,18 +177,19 @@ func getProof*[H, K]( var m = nleaves for i in 0 ..< depth: let j = k xor 1 - path[i] = - if (j < m): - self.layers[i][j] - else: - self.zero + + if (j < m): + assign(path[i], self[].nodeData(i, j)) + else: + path[i] = self.compress.zero + k = k shr 1 m = (m + 1) shr 1 proof.index = index proof.path = path proof.nleaves = nleaves - proof.compress = self.compress + proof.compress = self.compress.fn success() @@ -122,32 +228,169 @@ func reconstructRoot*[H, K](proof: MerkleProof[H, K], leaf: H): ?!H = func verify*[H, K](proof: MerkleProof[H, K], leaf: H, root: H): ?!bool = success bool(root == ?proof.reconstructRoot(leaf)) -func merkleTreeWorker*[H, K]( - self: MerkleTree[H, K], xs: openArray[H], isBottomLayer: static bool -): ?!seq[seq[H]] = - let a = low(xs) - let b = high(xs) - let m = b - a + 1 +func fromNodes*[H, K]( + self: MerkleTree[H, K], + compressor: CompressFn, + zero: H, + nodes: openArray[H], + nleaves: int, +): ?!void = + mixin assign + + if nodes.len < 2: # At least leaf and root + return failure "Not enough nodes" + + if nleaves == 0: + return failure "No leaves" + + self.compress = CompressData[H, K](fn: compressor, nodeSize: nodes[0].len, zero: zero) + self.layerOffsets = layerOffsets(nleaves) + + if self.layerOffsets[^1] + 1 != nodes.len: + return failure "bad node count" + + self.store = newSeqUninit[byte](nodes.len * self.compress.nodeSize) + + for i in 0 ..< nodes.len: + assign( + self[].store.toOpenArray( + i * self.compress.nodeSize, (i + 1) * self.compress.nodeSize - 1 + ), + nodes[i], + ) + + success() + +func merkleTreeWorker[H, K]( + store: var openArray[byte], + offsets: openArray[int], + compress: CompressData[H, K], + layer: int, + isBottomLayer: static bool, +): ?!void = + ## Worker used to compute the merkle tree from the leaves that are assumed to + ## already be stored at the beginning of the `store`, as done by `prepare`. + + # Throughout, we use `assign` to convert from H to bytes and back, assuming + # this assignment can be done somewhat efficiently (ie memcpy) - because + # the code must work with multihash where len(H) is can differ, we cannot + # simply use a fixed-size array here. + mixin assign + + template nodeData(i, j: int): openArray[byte] = + # Pick out the bytes of node j in layer i + store.nodeData(offsets, compress.nodeSize, i, j) + + let m = offsets.nodesInLayer(layer) when not isBottomLayer: if m == 1: - return success @[@xs] + return success() let halfn: int = m div 2 let n: int = 2 * halfn let isOdd: bool = (n != m) - var ys: seq[H] - if not isOdd: - ys = newSeq[H](halfn) - else: - ys = newSeq[H](halfn + 1) + # Because the compression function we work with works with H and not bytes, + # we need to extract H from the raw data - a little abstraction tax that + # ensures that properties like alignment of H are respected. + var a, b, tmp: H for i in 0 ..< halfn: const key = when isBottomLayer: K.KeyBottomLayer else: K.KeyNone - ys[i] = ?self.compress(xs[a + 2 * i], xs[a + 2 * i + 1], key = key) + + assign(a, nodeData(layer, i * 2)) + assign(b, nodeData(layer, i * 2 + 1)) + + tmp = ?compress.fn(a, b, key = key) + + assign(nodeData(layer + 1, i), tmp) + if isOdd: const key = when isBottomLayer: K.KeyOddAndBottomLayer else: K.KeyOdd - ys[halfn] = ?self.compress(xs[n], self.zero, key = key) - success @[@xs] & ?self.merkleTreeWorker(ys, isBottomLayer = false) + assign(a, nodeData(layer, n)) + + tmp = ?compress.fn(a, compress.zero, key = key) + + assign(nodeData(layer + 1, halfn), tmp) + + merkleTreeWorker(store, offsets, compress, layer + 1, false) + +proc merkleTreeWorker[H, K]( + store: SharedBuf[byte], + offsets: SharedBuf[int], + compress: ptr CompressData[H, K], + signal: ThreadSignalPtr, +): bool = + defer: + discard signal.fireSync() + + let res = merkleTreeWorker( + store.toOpenArray(), offsets.toOpenArray(), compress[], 0, isBottomLayer = true + ) + + return res.isOk() + +func prepare*[H, K]( + self: MerkleTree[H, K], compressor: CompressFn, zero: H, leaves: openArray[H] +): ?!void = + ## Prepare the instance for computing the merkle tree of the given leaves using + ## the given compression function. After preparation, `compute` should be + ## called to perform the actual computation. `leaves` will be copied into the + ## tree so they can be freed after the call. + + if leaves.len == 0: + return failure "No leaves" + + self.compress = + CompressData[H, K](fn: compressor, nodeSize: leaves[0].len, zero: zero) + self.layerOffsets = layerOffsets(leaves.len) + + self.store = newSeqUninit[byte]((self.layerOffsets[^1] + 1) * self.compress.nodeSize) + + for j in 0 ..< leaves.len: + assign(self[].nodeData(0, j), leaves[j]) + + return success() + +proc compute*[H, K](self: MerkleTree[H, K]): ?!void = + merkleTreeWorker( + self.store, self.layerOffsets, self.compress, 0, isBottomLayer = true + ) + +proc compute*[H, K]( + self: MerkleTree[H, K], tp: Taskpool +): Future[?!void] {.async: (raises: []).} = + if tp.numThreads == 1: + # With a single thread, there's no point creating a separate task + return self.compute() + + # TODO this signal would benefit from reuse across computations + without signal =? ThreadSignalPtr.new(): + return failure("Unable to create thread signal") + + defer: + signal.close().expect("closing once works") + + let res = tp.spawn merkleTreeWorker( + SharedBuf.view(self.store), + SharedBuf.view(self.layerOffsets), + addr self.compress, + signal, + ) + + # To support cancellation, we'd have to ensure the task we posted to taskpools + # exits early - since we're not doing that, block cancellation attempts + try: + await noCancel signal.wait() + except AsyncError as exc: + # Since we initialized the signal, the OS or chronos is misbehaving. In any + # case, it would mean the task is still running which would cause a memory + # a memory violation if we let it run - panic instead + raiseAssert "Could not wait for signal, was it initialized? " & exc.msg + + if not res.sync(): + return failure("merkle tree task failed") + + return success() diff --git a/codex/merkletree/poseidon2.nim b/codex/merkletree/poseidon2.nim index 65b3c4dd..436cd273 100644 --- a/codex/merkletree/poseidon2.nim +++ b/codex/merkletree/poseidon2.nim @@ -9,9 +9,11 @@ {.push raises: [].} -import std/sequtils +import std/[sequtils, atomics] import pkg/poseidon2 +import pkg/taskpools +import pkg/chronos/threadsync import pkg/constantine/math/io/io_fields import pkg/constantine/platforms/abstractions import pkg/questionable/results @@ -44,6 +46,17 @@ type Poseidon2Tree* = MerkleTree[Poseidon2Hash, PoseidonKeysEnum] Poseidon2Proof* = MerkleProof[Poseidon2Hash, PoseidonKeysEnum] +proc len*(v: Poseidon2Hash): int = + sizeof(v) + +proc assign*(v: var openArray[byte], h: Poseidon2Hash) = + doAssert v.len == sizeof(h) + copyMem(addr v[0], addr h, sizeof(h)) + +proc assign*(h: var Poseidon2Hash, v: openArray[byte]) = + doAssert v.len == sizeof(h) + copyMem(addr h, addr v[0], sizeof(h)) + proc `$`*(self: Poseidon2Tree): string = let root = if self.root.isOk: self.root.get.toHex else: "none" "Poseidon2Tree(" & " root: " & root & ", leavesCount: " & $self.leavesCount & @@ -63,7 +76,7 @@ converter toKey*(key: PoseidonKeysEnum): Poseidon2Hash = of KeyOdd: KeyOddF of KeyOddAndBottomLayer: KeyOddAndBottomLayerF -func init*(_: type Poseidon2Tree, leaves: openArray[Poseidon2Hash]): ?!Poseidon2Tree = +proc initTree(leaves: openArray[Poseidon2Hash]): ?!Poseidon2Tree = if leaves.len == 0: return failure "Empty leaves" @@ -72,34 +85,43 @@ func init*(_: type Poseidon2Tree, leaves: openArray[Poseidon2Hash]): ?!Poseidon2 ): ?!Poseidon2Hash {.noSideEffect.} = success compress(x, y, key.toKey) - var self = Poseidon2Tree(compress: compressor, zero: Poseidon2Zero) + var self = Poseidon2Tree() + ?self.prepare(compressor, Poseidon2Zero, leaves) + success self + +func init*(_: type Poseidon2Tree, leaves: openArray[Poseidon2Hash]): ?!Poseidon2Tree = + let self = ?initTree(leaves) + ?self.compute() + + success self + +proc init*( + _: type Poseidon2Tree, tp: Taskpool, leaves: seq[Poseidon2Hash] +): Future[?!Poseidon2Tree] {.async: (raises: [CancelledError]).} = + let self = ?initTree(leaves) + + ?await self.compute(tp) - self.layers = ?merkleTreeWorker(self, leaves, isBottomLayer = true) success self func init*(_: type Poseidon2Tree, leaves: openArray[array[31, byte]]): ?!Poseidon2Tree = Poseidon2Tree.init(leaves.mapIt(Poseidon2Hash.fromBytes(it))) +proc init*( + _: type Poseidon2Tree, tp: Taskpool, leaves: seq[array[31, byte]] +): Future[?!Poseidon2Tree] {.async: (raises: [CancelledError]).} = + await Poseidon2Tree.init(tp, leaves.mapIt(Poseidon2Hash.fromBytes(it))) + proc fromNodes*( _: type Poseidon2Tree, nodes: openArray[Poseidon2Hash], nleaves: int ): ?!Poseidon2Tree = - if nodes.len == 0: - return failure "Empty nodes" - let compressor = proc( x, y: Poseidon2Hash, key: PoseidonKeysEnum ): ?!Poseidon2Hash {.noSideEffect.} = success compress(x, y, key.toKey) - var - self = Poseidon2Tree(compress: compressor, zero: zero) - layer = nleaves - pos = 0 - - while pos < nodes.len: - self.layers.add(nodes[pos ..< (pos + layer)]) - pos += layer - layer = divUp(layer, 2) + let self = Poseidon2Tree() + ?self.fromNodes(compressor, Poseidon2Zero, nodes, nleaves) let index = Rng.instance.rand(nleaves - 1) diff --git a/codex/node.nim b/codex/node.nim index e5c8926e..3d249c4b 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -75,7 +75,7 @@ type contracts*: Contracts clock*: Clock storage*: Contracts - taskpool: Taskpool + taskPool: Taskpool trackedFutures: TrackedFutures CodexNodeRef* = ref CodexNode @@ -325,7 +325,7 @@ proc streamEntireDataset( try: # Spawn an erasure decoding job let erasure = Erasure.new( - self.networkStore, leoEncoderProvider, leoDecoderProvider, self.taskpool + self.networkStore, leoEncoderProvider, leoDecoderProvider, self.taskPool ) without _ =? (await erasure.decode(manifest)), error: error "Unable to erasure decode manifest", manifestCid, exc = error.msg @@ -474,7 +474,7 @@ proc store*( finally: await stream.close() - without tree =? CodexTree.init(cids), err: + without tree =? (await CodexTree.init(self.taskPool, cids)), err: return failure(err) without treeCid =? tree.rootCid(CIDv1, dataCodec), err: @@ -568,14 +568,15 @@ proc setupRequest( # Erasure code the dataset according to provided parameters let erasure = Erasure.new( - self.networkStore.localStore, leoEncoderProvider, leoDecoderProvider, self.taskpool + self.networkStore.localStore, leoEncoderProvider, leoDecoderProvider, self.taskPool ) without encoded =? (await erasure.encode(manifest, ecK, ecM)), error: trace "Unable to erasure code dataset" return failure(error) - without builder =? Poseidon2Builder.new(self.networkStore.localStore, encoded), err: + without builder =? + Poseidon2Builder.new(self.networkStore.localStore, encoded, self.taskPool), err: trace "Unable to create slot builder" return failure(err) @@ -679,7 +680,9 @@ proc onStore( return failure(err) without builder =? - Poseidon2Builder.new(self.networkStore, manifest, manifest.verifiableStrategy), err: + Poseidon2Builder.new( + self.networkStore, manifest, self.taskPool, manifest.verifiableStrategy + ), err: trace "Unable to create slots builder", err = err.msg return failure(err) @@ -714,7 +717,7 @@ proc onStore( trace "start repairing slot", slotIdx try: let erasure = Erasure.new( - self.networkStore, leoEncoderProvider, leoDecoderProvider, self.taskpool + self.networkStore, leoEncoderProvider, leoDecoderProvider, self.taskPool ) if err =? (await erasure.repair(manifest)).errorOption: error "Unable to erasure decode repairing manifest", @@ -916,7 +919,7 @@ proc new*( networkStore: NetworkStore, engine: BlockExcEngine, discovery: Discovery, - taskpool: Taskpool, + taskPool: Taskpool, prover = Prover.none, contracts = Contracts.default, ): CodexNodeRef = @@ -929,7 +932,7 @@ proc new*( engine: engine, prover: prover, discovery: discovery, - taskPool: taskpool, + taskPool: taskPool, contracts: contracts, trackedFutures: TrackedFutures(), ) diff --git a/codex/slots/builder/builder.nim b/codex/slots/builder/builder.nim index 633e19ea..b1ca502f 100644 --- a/codex/slots/builder/builder.nim +++ b/codex/slots/builder/builder.nim @@ -18,18 +18,20 @@ import pkg/chronos import pkg/questionable import pkg/questionable/results import pkg/constantine/math/io/io_fields +import pkg/taskpools import ../../logutils import ../../utils import ../../stores import ../../manifest import ../../merkletree +import ../../utils/poseidon2digest import ../../utils/asynciter import ../../indexingstrategy import ../converters -export converters, asynciter +export converters, asynciter, poseidon2digest logScope: topics = "codex slotsbuilder" @@ -45,6 +47,7 @@ type SlotsBuilder*[T, H] = ref object of RootObj emptyBlock: seq[byte] # empty block verifiableTree: ?T # verification tree (dataset tree) emptyDigestTree: T # empty digest tree for empty blocks + taskPool: Taskpool func verifiable*[T, H](self: SlotsBuilder[T, H]): bool {.inline.} = ## Returns true if the slots are verifiable. @@ -165,6 +168,35 @@ proc buildBlockTree*[T, H]( success (blk.data, tree) +proc getBlockDigest*[T, H]( + self: SlotsBuilder[T, H], blkIdx: Natural, slotPos: Natural +): Future[?!H] {.async: (raises: [CancelledError]).} = + logScope: + blkIdx = blkIdx + slotPos = slotPos + numSlotBlocks = self.manifest.numSlotBlocks + cellSize = self.cellSize + + trace "Building block tree" + + if slotPos > (self.manifest.numSlotBlocks - 1): + # pad blocks are 0 byte blocks + trace "Returning empty digest tree for pad block" + return self.emptyDigestTree.root + + without blk =? await self.store.getBlock(self.manifest.treeCid, blkIdx), err: + error "Failed to get block CID for tree at index", err = err.msg + return failure(err) + + if blk.isEmpty: + return self.emptyDigestTree.root + + without dg =? (await T.digest(self.taskPool, blk.data, self.cellSize.int)), err: + error "Failed to create digest for block", err = err.msg + return failure(err) + + return success dg + proc getCellHashes*[T, H]( self: SlotsBuilder[T, H], slotIndex: Natural ): Future[?!seq[H]] {.async: (raises: [CancelledError, IndexingError]).} = @@ -190,8 +222,7 @@ proc getCellHashes*[T, H]( pos = i trace "Getting block CID for tree at index" - without (_, tree) =? (await self.buildBlockTree(blkIdx, i)) and digest =? tree.root, - err: + without digest =? (await self.getBlockDigest(blkIdx, i)), err: error "Failed to get block CID for tree at index", err = err.msg return failure(err) @@ -310,6 +341,7 @@ proc new*[T, H]( _: type SlotsBuilder[T, H], store: BlockStore, manifest: Manifest, + taskPool: Taskpool, strategy = LinearStrategy, cellSize = DefaultCellSize, ): ?!SlotsBuilder[T, H] = @@ -383,6 +415,7 @@ proc new*[T, H]( emptyBlock: emptyBlock, numSlotBlocks: numSlotBlocksTotal, emptyDigestTree: emptyDigestTree, + taskPool: taskPool, ) if manifest.verifiable: diff --git a/codex/slots/proofs/prover.nim b/codex/slots/proofs/prover.nim index b3e83d7c..eeb8cbe3 100644 --- a/codex/slots/proofs/prover.nim +++ b/codex/slots/proofs/prover.nim @@ -13,6 +13,7 @@ import pkg/chronicles import pkg/circomcompat import pkg/poseidon2 import pkg/questionable/results +import pkg/taskpools import pkg/libp2p/cid @@ -47,6 +48,7 @@ type backend: AnyBackend store: BlockStore nSamples: int + taskPool: Taskpool proc prove*( self: Prover, slotIdx: int, manifest: Manifest, challenge: ProofChallenge @@ -61,7 +63,7 @@ proc prove*( trace "Received proof challenge" - without builder =? AnyBuilder.new(self.store, manifest), err: + without builder =? AnyBuilder.new(self.store, manifest, self.taskPool), err: error "Unable to create slots builder", err = err.msg return failure(err) @@ -88,6 +90,6 @@ proc verify*( self.backend.verify(proof, inputs) proc new*( - _: type Prover, store: BlockStore, backend: AnyBackend, nSamples: int + _: type Prover, store: BlockStore, backend: AnyBackend, nSamples: int, tp: Taskpool ): Prover = - Prover(store: store, backend: backend, nSamples: nSamples) + Prover(store: store, backend: backend, nSamples: nSamples, taskPool: tp) diff --git a/codex/utils/poseidon2digest.nim b/codex/utils/poseidon2digest.nim index 31f07b26..321dbe12 100644 --- a/codex/utils/poseidon2digest.nim +++ b/codex/utils/poseidon2digest.nim @@ -7,13 +7,27 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. +import std/[atomics] import pkg/poseidon2 import pkg/questionable/results import pkg/libp2p/multihash import pkg/stew/byteutils +import pkg/taskpools +import pkg/chronos +import pkg/chronos/threadsync +import ./uniqueptr import ../merkletree +type DigestTask* = object + signal: ThreadSignalPtr + bytes: seq[byte] + chunkSize: int + success: Atomic[bool] + digest: UniquePtr[Poseidon2Hash] + +export DigestTask + func spongeDigest*( _: type Poseidon2Hash, bytes: openArray[byte], rate: static int = 2 ): ?!Poseidon2Hash = @@ -30,7 +44,7 @@ func spongeDigest*( success Sponge.digest(bytes, rate) -func digestTree*( +proc digestTree*( _: type Poseidon2Tree, bytes: openArray[byte], chunkSize: int ): ?!Poseidon2Tree = ## Hashes chunks of data with a sponge of rate 2, and combines the @@ -44,6 +58,7 @@ func digestTree*( var index = 0 var leaves: seq[Poseidon2Hash] + while index < bytes.len: let start = index let finish = min(index + chunkSize, bytes.len) @@ -61,6 +76,46 @@ func digest*( (?Poseidon2Tree.digestTree(bytes, chunkSize)).root +proc digestWorker(tp: Taskpool, task: ptr DigestTask) {.gcsafe.} = + defer: + discard task[].signal.fireSync() + + var res = Poseidon2Tree.digest(task[].bytes, task[].chunkSize) + + if res.isErr: + task[].success.store(false) + return + + task[].digest = newUniquePtr(res.get()) + task[].success.store(true) + +proc digest*( + _: type Poseidon2Tree, tp: Taskpool, bytes: seq[byte], chunkSize: int +): Future[?!Poseidon2Hash] {.async: (raises: [CancelledError]).} = + without signal =? ThreadSignalPtr.new(): + return failure("Unable to create thread signal") + defer: + signal.close().expect("closing once works") + + doAssert tp.numThreads > 1, + "Must have at least one separate thread or signal will never be fired" + + var task = DigestTask(signal: signal, bytes: bytes, chunkSize: chunkSize) + + tp.spawn digestWorker(tp, addr task) + + let signalFut = signal.wait() + + if err =? catch(await signalFut.join()).errorOption: + ?catch(await noCancel signalFut) + if err of CancelledError: + raise (ref CancelledError) err + + if not task.success.load(): + return failure("digest task failed") + + success extractValue(task.digest) + func digestMhash*( _: type Poseidon2Tree, bytes: openArray[byte], chunkSize: int ): ?!MultiHash = diff --git a/codex/utils/sharedbuf.nim b/codex/utils/sharedbuf.nim new file mode 100644 index 00000000..186d7126 --- /dev/null +++ b/codex/utils/sharedbuf.nim @@ -0,0 +1,24 @@ +import stew/ptrops + +type SharedBuf*[T] = object + payload*: ptr UncheckedArray[T] + len*: int + +proc view*[T](_: type SharedBuf, v: openArray[T]): SharedBuf[T] = + if v.len > 0: + SharedBuf[T](payload: makeUncheckedArray(addr v[0]), len: v.len) + else: + default(SharedBuf[T]) + +template checkIdx(v: SharedBuf, i: int) = + doAssert i > 0 and i <= v.len + +proc `[]`*[T](v: SharedBuf[T], i: int): var T = + v.checkIdx(i) + v.payload[i] + +template toOpenArray*[T](v: SharedBuf[T]): var openArray[T] = + v.payload.toOpenArray(0, v.len - 1) + +template toOpenArray*[T](v: SharedBuf[T], s, e: int): var openArray[T] = + v.toOpenArray().toOpenArray(s, e) diff --git a/codex/utils/uniqueptr.nim b/codex/utils/uniqueptr.nim new file mode 100644 index 00000000..2aec0d38 --- /dev/null +++ b/codex/utils/uniqueptr.nim @@ -0,0 +1,58 @@ +import std/isolation +type UniquePtr*[T] = object + ## A unique pointer to a seq[seq[T]] in shared memory + ## Can only be moved, not copied + data: ptr T + +proc newUniquePtr*[T](data: sink Isolated[T]): UniquePtr[T] = + ## Creates a new unique sequence in shared memory + ## The memory is automatically freed when the object is destroyed + result.data = cast[ptr T](allocShared0(sizeof(T))) + result.data[] = extract(data) + +template newUniquePtr*[T](data: T): UniquePtr[T] = + newUniquePtr(isolate(data)) + +proc `=destroy`*[T](p: var UniquePtr[T]) = + ## Destructor for UniquePtr + if p.data != nil: + deallocShared(p.data) + p.data = nil + +proc `=copy`*[T]( + dest: var UniquePtr[T], src: UniquePtr[T] +) {.error: "UniquePtr cannot be copied, only moved".} + +proc `=sink`*[T](dest: var UniquePtr[T], src: UniquePtr[T]) = + if dest.data != nil: + `=destroy`(dest) + dest.data = src.data + # We need to nil out the source data to prevent double-free + # This is handled by Nim's destructive move semantics + +proc `[]`*[T](p: UniquePtr[T]): lent T = + ## Access the data (read-only) + if p.data == nil: + raise newException(NilAccessDefect, "accessing nil UniquePtr") + p.data[] + +# proc `[]`*[T](p: var UniquePtr[T]): var T = +# ## Access the data (mutable) +# if p.data == nil: +# raise newException(NilAccessDefect, "accessing nil UniquePtr") +# p.data[] + +proc isNil*[T](p: UniquePtr[T]): bool = + ## Check if the UniquePtr is nil + p.data == nil + +proc extractValue*[T](p: var UniquePtr[T]): T = + ## Extract the value from the UniquePtr and release the memory + if p.data == nil: + raise newException(NilAccessDefect, "extracting from nil UniquePtr") + # Move the value out + var isolated = isolate(p.data[]) + result = extract(isolated) + # Free the shared memory + deallocShared(p.data) + p.data = nil diff --git a/tests/codex/merkletree/generictreetests.nim b/tests/codex/merkletree/generictreetests.nim index 6244bc1c..e24cbad1 100644 --- a/tests/codex/merkletree/generictreetests.nim +++ b/tests/codex/merkletree/generictreetests.nim @@ -12,6 +12,13 @@ proc testGenericTree*[H, K, U]( let data = @data suite "Correctness tests - " & name: + test "Should build correct tree for single leaf": + let expectedRoot = compress(data[0], zero, K.KeyOddAndBottomLayer) + + let tree = makeTree(data[0 .. 0]) + check: + tree.root.tryGet == expectedRoot + test "Should build correct tree for even bottom layer": let expectedRoot = compress( compress( diff --git a/tests/codex/merkletree/testcodextree.nim b/tests/codex/merkletree/testcodextree.nim index 91ccd7f5..16765dbb 100644 --- a/tests/codex/merkletree/testcodextree.nim +++ b/tests/codex/merkletree/testcodextree.nim @@ -1,6 +1,6 @@ import std/sequtils +import std/times -import pkg/unittest2 import pkg/questionable/results import pkg/stew/byteutils import pkg/libp2p @@ -9,8 +9,11 @@ import pkg/codex/codextypes import pkg/codex/merkletree import pkg/codex/utils/digest +import pkg/taskpools + import ./helpers import ./generictreetests +import ../../asynctest # TODO: Generalize to other hashes @@ -43,9 +46,23 @@ suite "Test CodexTree": CodexTree.init(sha256, leaves = newSeq[ByteHash]()).isErr test "Should build tree from multihash leaves": - var expectedLeaves = data.mapIt(MultiHash.digest($sha256, it).tryGet()) + var + expectedLeaves = data.mapIt(MultiHash.digest($sha256, it).tryGet()) + tree = CodexTree.init(leaves = expectedLeaves) - var tree = CodexTree.init(leaves = expectedLeaves) + check: + tree.isOk + tree.get().leaves == expectedLeaves.mapIt(it.digestBytes) + tree.get().mcodec == sha256 + + test "Should build tree from multihash leaves asynchronously": + var tp = Taskpool.new(numThreads = 2) + defer: + tp.shutdown() + + let expectedLeaves = data.mapIt(MultiHash.digest($sha256, it).tryGet()) + + let tree = (await CodexTree.init(tp, leaves = expectedLeaves)) check: tree.isOk tree.get().leaves == expectedLeaves.mapIt(it.digestBytes) @@ -63,6 +80,48 @@ suite "Test CodexTree": tree.get().leaves == expectedLeaves.mapIt(it.mhash.tryGet.digestBytes) tree.get().mcodec == sha256 + test "Should build tree from cid leaves asynchronously": + var tp = Taskpool.new(numThreads = 2) + defer: + tp.shutdown() + + let expectedLeaves = data.mapIt( + Cid.init(CidVersion.CIDv1, BlockCodec, MultiHash.digest($sha256, it).tryGet).tryGet + ) + + let tree = (await CodexTree.init(tp, leaves = expectedLeaves)) + + check: + tree.isOk + tree.get().leaves == expectedLeaves.mapIt(it.mhash.tryGet.digestBytes) + tree.get().mcodec == sha256 + + test "Should build tree the same tree sync and async": + var tp = Taskpool.new(numThreads = 2) + defer: + tp.shutdown() + + let expectedLeaves = data.mapIt( + Cid.init(CidVersion.CIDv1, BlockCodec, MultiHash.digest($sha256, it).tryGet).tryGet + ) + + let + atree = (await CodexTree.init(tp, leaves = expectedLeaves)) + stree = CodexTree.init(leaves = expectedLeaves) + + check: + toSeq(atree.get().nodes) == toSeq(stree.get().nodes) + atree.get().root == stree.get().root + + # Single-leaf trees have their root separately computed + let + atree1 = (await CodexTree.init(tp, leaves = expectedLeaves[0 .. 0])) + stree1 = CodexTree.init(leaves = expectedLeaves[0 .. 0]) + + check: + toSeq(atree.get().nodes) == toSeq(stree.get().nodes) + atree.get().root == stree.get().root + test "Should build from raw digestbytes (should not hash leaves)": let tree = CodexTree.init(sha256, leaves = data).tryGet @@ -70,6 +129,18 @@ suite "Test CodexTree": tree.mcodec == sha256 tree.leaves == data + test "Should build from raw digestbytes (should not hash leaves) asynchronously": + var tp = Taskpool.new(numThreads = 2) + defer: + tp.shutdown() + + let tree = (await CodexTree.init(tp, sha256, leaves = @data)) + + check: + tree.isOk + tree.get().mcodec == sha256 + tree.get().leaves == data + test "Should build from nodes": let tree = CodexTree.init(sha256, leaves = data).tryGet diff --git a/tests/codex/merkletree/testposeidon2tree.nim b/tests/codex/merkletree/testposeidon2tree.nim index e12751b7..8e7ce34d 100644 --- a/tests/codex/merkletree/testposeidon2tree.nim +++ b/tests/codex/merkletree/testposeidon2tree.nim @@ -1,6 +1,5 @@ import std/sequtils -import pkg/unittest2 import pkg/poseidon2 import pkg/poseidon2/io import pkg/questionable/results @@ -9,9 +8,11 @@ import pkg/stew/byteutils import pkg/stew/arrayops import pkg/codex/merkletree +import pkg/taskpools import ./generictreetests import ./helpers +import ../../asynctest const data = [ "0000000000000000000000000000001".toBytes, @@ -36,13 +37,14 @@ suite "Test Poseidon2Tree": check: Poseidon2Tree.init(leaves = newSeq[Poseidon2Hash](0)).isErr - test "Init tree from poseidon2 leaves": - let tree = Poseidon2Tree.init(leaves = expectedLeaves).tryGet + test "Build tree from poseidon2 leaves": + var taskpool = Taskpool.new(numThreads = 2) + let tree = (await Poseidon2Tree.init(taskpool, leaves = expectedLeaves)).tryGet() check: tree.leaves == expectedLeaves - test "Init tree from byte leaves": + test "Build tree from byte leaves": let tree = Poseidon2Tree.init( leaves = expectedLeaves.mapIt(array[31, byte].initCopyFrom(it.toBytes)) ).tryGet @@ -50,7 +52,7 @@ suite "Test Poseidon2Tree": check: tree.leaves == expectedLeaves - test "Should build from nodes": + test "Build tree from nodes": let tree = Poseidon2Tree.init(leaves = expectedLeaves).tryGet fromNodes = Poseidon2Tree.fromNodes( @@ -60,6 +62,29 @@ suite "Test Poseidon2Tree": check: tree == fromNodes + test "Build poseidon2 tree from poseidon2 leaves asynchronously": + var tp = Taskpool.new() + defer: + tp.shutdown() + + let tree = (await Poseidon2Tree.init(tp, leaves = expectedLeaves)).tryGet() + check: + tree.leaves == expectedLeaves + + test "Build poseidon2 tree from byte leaves asynchronously": + var tp = Taskpool.new() + defer: + tp.shutdown() + + let tree = ( + await Poseidon2Tree.init( + tp, leaves = expectedLeaves.mapIt(array[31, byte].initCopyFrom(it.toBytes)) + ) + ).tryGet() + + check: + tree.leaves == expectedLeaves + let compressor = proc( x, y: Poseidon2Hash, key: PoseidonKeysEnum diff --git a/tests/codex/node/testcontracts.nim b/tests/codex/node/testcontracts.nim index e8d9c743..fcb91e8f 100644 --- a/tests/codex/node/testcontracts.nim +++ b/tests/codex/node/testcontracts.nim @@ -56,12 +56,13 @@ asyncchecksuite "Test Node - Host contracts": verifiable: Manifest verifiableBlock: bt.Block protected: Manifest + taskPool: Taskpool setup: # Setup Host Contracts and dependencies market = MockMarket.new() sales = Sales.new(market, clock, localStore) - + taskPool = Taskpool.new() node.contracts = ( none ClientInteractions, some HostInteractions.new(clock, sales), @@ -75,20 +76,23 @@ asyncchecksuite "Test Node - Host contracts": let manifestBlock = bt.Block.new(manifest.encode().tryGet(), codec = ManifestCodec).tryGet() - erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider, Taskpool.new) + erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider, taskPool) manifestCid = manifestBlock.cid (await localStore.putBlock(manifestBlock)).tryGet() protected = (await erasure.encode(manifest, 3, 2)).tryGet() - builder = Poseidon2Builder.new(localStore, protected).tryGet() + builder = Poseidon2Builder.new(localStore, protected, taskPool).tryGet() verifiable = (await builder.buildManifest()).tryGet() verifiableBlock = bt.Block.new(verifiable.encode().tryGet(), codec = ManifestCodec).tryGet() (await localStore.putBlock(verifiableBlock)).tryGet() + teardown: + taskPool.shutdown() + test "onExpiryUpdate callback is set": check sales.onExpiryUpdate.isSome diff --git a/tests/codex/node/testnode.nim b/tests/codex/node/testnode.nim index 09ed7ca7..81460c79 100644 --- a/tests/codex/node/testnode.nim +++ b/tests/codex/node/testnode.nim @@ -47,10 +47,15 @@ privateAccess(CodexNodeRef) # enable access to private fields asyncchecksuite "Test Node - Basic": setupAndTearDown() + var taskPool: Taskpool setup: + taskPool = Taskpool.new() await node.start() + teardown: + taskPool.shutdown() + test "Fetch Manifest": let manifest = await storeDataGetManifest(localStore, chunker) @@ -173,14 +178,15 @@ asyncchecksuite "Test Node - Basic": check string.fromBytes(data) == testString test "Setup purchase request": + echo "Here the tedt" let - erasure = - Erasure.new(store, leoEncoderProvider, leoDecoderProvider, Taskpool.new()) + erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider, taskPool) manifest = await storeDataGetManifest(localStore, chunker) manifestBlock = bt.Block.new(manifest.encode().tryGet(), codec = ManifestCodec).tryGet() protected = (await erasure.encode(manifest, 3, 2)).tryGet() - builder = Poseidon2Builder.new(localStore, protected).tryGet() + let + builder = Poseidon2Builder.new(localStore, protected, taskPool).tryGet() verifiable = (await builder.buildManifest()).tryGet() verifiableBlock = bt.Block.new(verifiable.encode().tryGet(), codec = ManifestCodec).tryGet() diff --git a/tests/codex/node/testslotrepair.nim b/tests/codex/node/testslotrepair.nim index 2c778f44..f06602e8 100644 --- a/tests/codex/node/testslotrepair.nim +++ b/tests/codex/node/testslotrepair.nim @@ -13,6 +13,7 @@ import pkg/codex/contracts import pkg/codex/slots import pkg/codex/manifest import pkg/codex/erasure +import pkg/taskpools import pkg/codex/blocktype as bt import pkg/chronos/transports/stream @@ -101,7 +102,7 @@ asyncchecksuite "Test Node - Slot Repair": (await localStore.putBlock(manifestBlock)).tryGet() protected = (await erasure.encode(manifest, ecK, ecM)).tryGet() - builder = Poseidon2Builder.new(localStore, protected).tryGet() + builder = Poseidon2Builder.new(localStore, protected, cluster.taskpool).tryGet() verifiable = (await builder.buildManifest()).tryGet() verifiableBlock = bt.Block.new(verifiable.encode().tryGet(), codec = ManifestCodec).tryGet() @@ -119,6 +120,7 @@ asyncchecksuite "Test Node - Slot Repair": await nodes[1].switch.stop() # slot 0 missing now # repair missing slot + (await nodes[4].onStore(request, expiry, 0.uint64, nil, isRepairing = true)).tryGet() await nodes[2].switch.stop() # slot 1 missing now @@ -132,16 +134,19 @@ asyncchecksuite "Test Node - Slot Repair": await nodes[4].switch.stop() # slot 0 missing now # repair missing slot from repaired slots + (await nodes[7].onStore(request, expiry, 0.uint64, nil, isRepairing = true)).tryGet() await nodes[5].switch.stop() # slot 1 missing now # repair missing slot from repaired slots + (await nodes[8].onStore(request, expiry, 1.uint64, nil, isRepairing = true)).tryGet() await nodes[6].switch.stop() # slot 2 missing now # repair missing slot from repaired slots + (await nodes[9].onStore(request, expiry, 2.uint64, nil, isRepairing = true)).tryGet() let @@ -180,7 +185,7 @@ asyncchecksuite "Test Node - Slot Repair": (await localStore.putBlock(manifestBlock)).tryGet() protected = (await erasure.encode(manifest, ecK, ecM)).tryGet() - builder = Poseidon2Builder.new(localStore, protected).tryGet() + builder = Poseidon2Builder.new(localStore, protected, cluster.taskpool).tryGet() verifiable = (await builder.buildManifest()).tryGet() verifiableBlock = bt.Block.new(verifiable.encode().tryGet(), codec = ManifestCodec).tryGet() @@ -199,19 +204,24 @@ asyncchecksuite "Test Node - Slot Repair": await nodes[3].switch.stop() # slot 2 missing now # repair missing slots + (await nodes[6].onStore(request, expiry, 0.uint64, nil, isRepairing = true)).tryGet() + (await nodes[7].onStore(request, expiry, 2.uint64, nil, isRepairing = true)).tryGet() await nodes[2].switch.stop() # slot 1 missing now await nodes[4].switch.stop() # slot 3 missing now # repair missing slots from repaired slots + (await nodes[8].onStore(request, expiry, 1.uint64, nil, isRepairing = true)).tryGet() + (await nodes[9].onStore(request, expiry, 3.uint64, nil, isRepairing = true)).tryGet() await nodes[5].switch.stop() # slot 4 missing now # repair missing slot from repaired slots + (await nodes[10].onStore(request, expiry, 4.uint64, nil, isRepairing = true)).tryGet() let diff --git a/tests/codex/slots/backends/testcircomcompat.nim b/tests/codex/slots/backends/testcircomcompat.nim index b61d4f18..637ee36b 100644 --- a/tests/codex/slots/backends/testcircomcompat.nim +++ b/tests/codex/slots/backends/testcircomcompat.nim @@ -3,6 +3,7 @@ import std/options import ../../../asynctest import pkg/chronos +import pkg/taskpools import pkg/poseidon2 import pkg/serde/json @@ -77,6 +78,7 @@ suite "Test Circom Compat Backend": challenge: array[32, byte] builder: Poseidon2Builder sampler: Poseidon2Sampler + taskPool: Taskpool setup: let @@ -85,11 +87,13 @@ suite "Test Circom Compat Backend": store = RepoStore.new(repoDs, metaDs) + taskPool = Taskpool.new() + (manifest, protected, verifiable) = await createVerifiableManifest( - store, numDatasetBlocks, ecK, ecM, blockSize, cellSize + store, numDatasetBlocks, ecK, ecM, blockSize, cellSize, taskPool ) - builder = Poseidon2Builder.new(store, verifiable).tryGet + builder = Poseidon2Builder.new(store, verifiable, taskPool).tryGet sampler = Poseidon2Sampler.new(slotId, store, builder).tryGet circom = CircomCompat.init(r1cs, wasm, zkey) @@ -101,6 +105,7 @@ suite "Test Circom Compat Backend": circom.release() # this comes from the rust FFI await repoTmp.destroyDb() await metaTmp.destroyDb() + taskPool.shutdown() test "Should verify with correct input": var proof = circom.prove(proofInputs).tryGet diff --git a/tests/codex/slots/helpers.nim b/tests/codex/slots/helpers.nim index fced1f1c..01159c21 100644 --- a/tests/codex/slots/helpers.nim +++ b/tests/codex/slots/helpers.nim @@ -12,6 +12,7 @@ import pkg/codex/chunker import pkg/codex/indexingstrategy import pkg/codex/slots import pkg/codex/rng +import pkg/taskpools import ../helpers @@ -145,6 +146,7 @@ proc createVerifiableManifest*( ecM: int, blockSize: NBytes, cellSize: NBytes, + taskPool: Taskpool, ): Future[tuple[manifest: Manifest, protected: Manifest, verifiable: Manifest]] {. async .} = @@ -165,7 +167,9 @@ proc createVerifiableManifest*( totalDatasetSize, ) - builder = Poseidon2Builder.new(store, protectedManifest, cellSize = cellSize).tryGet + builder = Poseidon2Builder.new( + store, protectedManifest, cellSize = cellSize, taskPool = taskPool + ).tryGet verifiableManifest = (await builder.buildManifest()).tryGet # build the slots and manifest diff --git a/tests/codex/slots/sampler/testsampler.nim b/tests/codex/slots/sampler/testsampler.nim index 78b245a3..bf7277a3 100644 --- a/tests/codex/slots/sampler/testsampler.nim +++ b/tests/codex/slots/sampler/testsampler.nim @@ -5,6 +5,7 @@ import ../../../asynctest import pkg/questionable/results +import pkg/taskpools import pkg/codex/stores import pkg/codex/merkletree import pkg/codex/utils/json @@ -26,11 +27,16 @@ suite "Test Sampler - control samples": inputData: string inputJson: JsonNode proofInput: ProofInputs[Poseidon2Hash] + taskpool: Taskpool setup: inputData = readFile("tests/circuits/fixtures/input.json") inputJson = !JsonNode.parse(inputData) proofInput = Poseidon2Hash.jsonToProofInput(inputJson) + taskpool = Taskpool.new() + + teardown: + taskpool.shutdown() test "Should verify control samples": let @@ -87,25 +93,29 @@ suite "Test Sampler": manifest: Manifest protected: Manifest verifiable: Manifest + taskpool: Taskpool setup: let repoDs = repoTmp.newDb() metaDs = metaTmp.newDb() + taskpool = Taskpool.new() + store = RepoStore.new(repoDs, metaDs) (manifest, protected, verifiable) = await createVerifiableManifest( - store, datasetBlocks, ecK, ecM, blockSize, cellSize + store, datasetBlocks, ecK, ecM, blockSize, cellSize, taskpool ) # create sampler - builder = Poseidon2Builder.new(store, verifiable).tryGet + builder = Poseidon2Builder.new(store, verifiable, taskpool).tryGet teardown: await store.close() await repoTmp.destroyDb() await metaTmp.destroyDb() + taskpool.shutdown() test "Should fail instantiating for invalid slot index": let sampler = Poseidon2Sampler.new(builder.slotRoots.len, store, builder) @@ -114,7 +124,7 @@ suite "Test Sampler": test "Should fail instantiating for non verifiable builder": let - nonVerifiableBuilder = Poseidon2Builder.new(store, protected).tryGet + nonVerifiableBuilder = Poseidon2Builder.new(store, protected, taskpool).tryGet sampler = Poseidon2Sampler.new(slotIndex, store, nonVerifiableBuilder) check sampler.isErr diff --git a/tests/codex/slots/testprover.nim b/tests/codex/slots/testprover.nim index c567db55..34ff96ba 100644 --- a/tests/codex/slots/testprover.nim +++ b/tests/codex/slots/testprover.nim @@ -4,6 +4,7 @@ import pkg/chronos import pkg/libp2p/cid import pkg/codex/merkletree +import pkg/taskpools import pkg/codex/chunker import pkg/codex/blocktype as bt import pkg/codex/slots @@ -29,6 +30,7 @@ suite "Test Prover": var store: BlockStore prover: Prover + taskPool: Taskpool setup: let @@ -45,13 +47,14 @@ suite "Test Prover": numProofSamples: samples, ) backend = config.initializeBackend().tryGet() - + taskPool = Taskpool.new() store = RepoStore.new(repoDs, metaDs) - prover = Prover.new(store, backend, config.numProofSamples) + prover = Prover.new(store, backend, config.numProofSamples, taskPool) teardown: await repoTmp.destroyDb() await metaTmp.destroyDb() + taskPool.shutdown() test "Should sample and prove a slot": let (_, _, verifiable) = await createVerifiableManifest( @@ -61,6 +64,7 @@ suite "Test Prover": 3, # ecM blockSize, cellSize, + taskPool, ) let (inputs, proof) = (await prover.prove(1, verifiable, challenge)).tryGet @@ -80,6 +84,7 @@ suite "Test Prover": 1, # ecM blockSize, cellSize, + taskPool, ) let (inputs, proof) = (await prover.prove(1, verifiable, challenge)).tryGet diff --git a/tests/codex/slots/testslotbuilder.nim b/tests/codex/slots/testslotbuilder.nim index fc3c7bd5..55f917ef 100644 --- a/tests/codex/slots/testslotbuilder.nim +++ b/tests/codex/slots/testslotbuilder.nim @@ -15,6 +15,7 @@ import pkg/codex/utils import pkg/codex/utils/digest import pkg/poseidon2 import pkg/poseidon2/io +import pkg/taskpools import ./helpers import ../helpers @@ -72,12 +73,13 @@ suite "Slot builder": protectedManifest: Manifest builder: Poseidon2Builder chunker: Chunker + taskPool: Taskpool setup: let repoDs = repoTmp.newDb() metaDs = metaTmp.newDb() - + taskPool = Taskpool.new() localStore = RepoStore.new(repoDs, metaDs) chunker = RandomChunker.new(Rng.instance(), size = totalDatasetSize, chunkSize = blockSize) @@ -92,6 +94,7 @@ suite "Slot builder": await localStore.close() await repoTmp.destroyDb() await metaTmp.destroyDb() + taskPool.shutdown() # TODO: THIS IS A BUG IN asynctest, because it doesn't release the # objects after the test is done, so we need to do it manually @@ -113,8 +116,9 @@ suite "Slot builder": ) check: - Poseidon2Builder.new(localStore, unprotectedManifest, cellSize = cellSize).error.msg == - "Manifest is not protected." + Poseidon2Builder.new( + localStore, unprotectedManifest, taskPool, cellSize = cellSize + ).error.msg == "Manifest is not protected." test "Number of blocks must be devisable by number of slots": let mismatchManifest = Manifest.new( @@ -131,7 +135,7 @@ suite "Slot builder": ) check: - Poseidon2Builder.new(localStore, mismatchManifest, cellSize = cellSize).error.msg == + Poseidon2Builder.new(localStore, mismatchManifest, taskPool, cellSize = cellSize).error.msg == "Number of blocks must be divisible by number of slots." test "Block size must be divisable by cell size": @@ -149,12 +153,13 @@ suite "Slot builder": ) check: - Poseidon2Builder.new(localStore, mismatchManifest, cellSize = cellSize).error.msg == + Poseidon2Builder.new(localStore, mismatchManifest, taskPool, cellSize = cellSize).error.msg == "Block size must be divisible by cell size." test "Should build correct slot builder": - builder = - Poseidon2Builder.new(localStore, protectedManifest, cellSize = cellSize).tryGet() + builder = Poseidon2Builder + .new(localStore, protectedManifest, taskPool, cellSize = cellSize) + .tryGet() check: builder.cellSize == cellSize @@ -171,7 +176,7 @@ suite "Slot builder": ) builder = Poseidon2Builder - .new(localStore, protectedManifest, cellSize = cellSize) + .new(localStore, protectedManifest, taskPool, cellSize = cellSize) .tryGet() for i in 0 ..< numSlots: @@ -196,7 +201,7 @@ suite "Slot builder": ) builder = Poseidon2Builder - .new(localStore, protectedManifest, cellSize = cellSize) + .new(localStore, protectedManifest, taskPool, cellSize = cellSize) .tryGet() for i in 0 ..< numSlots: @@ -215,8 +220,9 @@ suite "Slot builder": slotTree.root().tryGet() == expectedRoot test "Should persist trees for all slots": - let builder = - Poseidon2Builder.new(localStore, protectedManifest, cellSize = cellSize).tryGet() + let builder = Poseidon2Builder + .new(localStore, protectedManifest, taskPool, cellSize = cellSize) + .tryGet() for i in 0 ..< numSlots: let @@ -242,7 +248,7 @@ suite "Slot builder": 0, protectedManifest.blocksCount - 1, numSlots, numSlots, numPadSlotBlocks ) builder = Poseidon2Builder - .new(localStore, protectedManifest, cellSize = cellSize) + .new(localStore, protectedManifest, taskPool, cellSize = cellSize) .tryGet() (await builder.buildSlots()).tryGet @@ -270,7 +276,7 @@ suite "Slot builder": 0, protectedManifest.blocksCount - 1, numSlots, numSlots, numPadSlotBlocks ) builder = Poseidon2Builder - .new(localStore, protectedManifest, cellSize = cellSize) + .new(localStore, protectedManifest, taskPool, cellSize = cellSize) .tryGet() slotsHashes = collect(newSeq): @@ -296,45 +302,53 @@ suite "Slot builder": test "Should not build from verifiable manifest with 0 slots": var builder = Poseidon2Builder - .new(localStore, protectedManifest, cellSize = cellSize) + .new(localStore, protectedManifest, taskPool, cellSize = cellSize) .tryGet() verifyManifest = (await builder.buildManifest()).tryGet() verifyManifest.slotRoots = @[] - check Poseidon2Builder.new(localStore, verifyManifest, cellSize = cellSize).isErr + check Poseidon2Builder.new( + localStore, verifyManifest, taskPool, cellSize = cellSize + ).isErr test "Should not build from verifiable manifest with incorrect number of slots": var builder = Poseidon2Builder - .new(localStore, protectedManifest, cellSize = cellSize) + .new(localStore, protectedManifest, taskPool, cellSize = cellSize) .tryGet() verifyManifest = (await builder.buildManifest()).tryGet() verifyManifest.slotRoots.del(verifyManifest.slotRoots.len - 1) - check Poseidon2Builder.new(localStore, verifyManifest, cellSize = cellSize).isErr + check Poseidon2Builder.new( + localStore, verifyManifest, taskPool, cellSize = cellSize + ).isErr test "Should not build from verifiable manifest with invalid verify root": - let builder = - Poseidon2Builder.new(localStore, protectedManifest, cellSize = cellSize).tryGet() + let builder = Poseidon2Builder + .new(localStore, protectedManifest, taskPool, cellSize = cellSize) + .tryGet() var verifyManifest = (await builder.buildManifest()).tryGet() rng.shuffle(Rng.instance, verifyManifest.verifyRoot.data.buffer) - check Poseidon2Builder.new(localStore, verifyManifest, cellSize = cellSize).isErr + check Poseidon2Builder.new( + localStore, verifyManifest, taskPool, cellSize = cellSize + ).isErr test "Should build from verifiable manifest": let builder = Poseidon2Builder - .new(localStore, protectedManifest, cellSize = cellSize) + .new(localStore, protectedManifest, taskPool, cellSize = cellSize) .tryGet() verifyManifest = (await builder.buildManifest()).tryGet() - verificationBuilder = - Poseidon2Builder.new(localStore, verifyManifest, cellSize = cellSize).tryGet() + verificationBuilder = Poseidon2Builder + .new(localStore, verifyManifest, taskPool, cellSize = cellSize) + .tryGet() check: builder.slotRoots == verificationBuilder.slotRoots diff --git a/tests/codex/utils/testPoseidon.nim b/tests/codex/utils/testPoseidon.nim new file mode 100644 index 00000000..aedf5fcf --- /dev/null +++ b/tests/codex/utils/testPoseidon.nim @@ -0,0 +1,40 @@ +{.push raises: [].} + +import std/[times, strformat, random] +import pkg/questionable/results + +import pkg/codex/merkletree/poseidon2 + +import pkg/codex/utils/poseidon2digest +import ../../asynctest + +test "Test poseidon2 digestTree": + randomize(42) + const + dataSize = 64 * 1024 # 64KB + chunkSize = 2 * 1024 # 2KB + iterations = 10 # Number of iterations + + echo &"Benchmarking digestTree with data size: {dataSize} bytes, chunk size: {chunkSize} bytes" + + # Generate random data + var data = newSeq[byte](dataSize) + for i in 0 ..< dataSize: + data[i] = byte(rand(255)) + + # Actual benchmark + let startTime = cpuTime() + + for i in 1 .. iterations: + let treeResult = Poseidon2Tree.digestTree(data, chunkSize).tryGet() + + # Optionally print info about each iteration + + let endTime = cpuTime() + let totalTime = endTime - startTime + let avgTime = totalTime / iterations.float + + echo &"Results:" + echo &" Total time for {iterations} iterations: {totalTime:.6f} seconds" + echo &" Average time per iteration: {avgTime:.6f} seconds" + echo &" Iterations per second: {iterations.float / totalTime:.2f}" diff --git a/vendor/nim-taskpools b/vendor/nim-taskpools index 4acdc6ef..97f76fae 160000 --- a/vendor/nim-taskpools +++ b/vendor/nim-taskpools @@ -1 +1 @@ -Subproject commit 4acdc6ef005a93dba09f902ed75197548cf7b451 +Subproject commit 97f76faef6ba64bc77d9808c27ec5e9917e7cfde