mirror of
https://github.com/logos-storage/logos-storage-nim.git
synced 2026-01-15 20:03:09 +00:00
feat: async tree building v2 (#1360)
Signed-off-by: Giuliano Mega <giuliano.mega@gmail.com> Co-authored-by: munna0908 <munnasitu0908@gmail.com> Co-authored-by: gmega <giuliano.mega@gmail.com> Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
This commit is contained in:
parent
50bd183984
commit
cce002fcbf
@ -258,16 +258,16 @@ proc new*(
|
||||
|
||||
var
|
||||
cache: CacheStore = nil
|
||||
taskpool: Taskpool
|
||||
taskPool: Taskpool
|
||||
|
||||
try:
|
||||
if config.numThreads == ThreadCount(0):
|
||||
taskpool = Taskpool.new(numThreads = min(countProcessors(), 16))
|
||||
taskPool = Taskpool.new(numThreads = min(countProcessors(), 16))
|
||||
else:
|
||||
taskpool = Taskpool.new(numThreads = int(config.numThreads))
|
||||
info "Threadpool started", numThreads = taskpool.numThreads
|
||||
taskPool = Taskpool.new(numThreads = int(config.numThreads))
|
||||
info "Threadpool started", numThreads = taskPool.numThreads
|
||||
except CatchableError as exc:
|
||||
raiseAssert("Failure in taskpool initialization:" & exc.msg)
|
||||
raiseAssert("Failure in taskPool initialization:" & exc.msg)
|
||||
|
||||
if config.cacheSize > 0'nb:
|
||||
cache = CacheStore.new(cacheSize = config.cacheSize)
|
||||
@ -349,7 +349,7 @@ proc new*(
|
||||
if config.prover:
|
||||
let backend =
|
||||
config.initializeBackend().expect("Unable to create prover backend.")
|
||||
some Prover.new(store, backend, config.numProofSamples)
|
||||
some Prover.new(store, backend, config.numProofSamples, taskPool)
|
||||
else:
|
||||
none Prover
|
||||
|
||||
@ -359,7 +359,7 @@ proc new*(
|
||||
engine = engine,
|
||||
discovery = discovery,
|
||||
prover = prover,
|
||||
taskPool = taskpool,
|
||||
taskPool = taskPool,
|
||||
)
|
||||
|
||||
var restServer: RestServerRef = nil
|
||||
@ -382,5 +382,5 @@ proc new*(
|
||||
restServer: restServer,
|
||||
repoStore: repoStore,
|
||||
maintenance: maintenance,
|
||||
taskpool: taskpool,
|
||||
taskPool: taskPool,
|
||||
)
|
||||
|
||||
@ -425,7 +425,7 @@ proc encodeData(
|
||||
return failure("Unable to store block!")
|
||||
idx.inc(params.steps)
|
||||
|
||||
without tree =? CodexTree.init(cids[]), err:
|
||||
without tree =? (await CodexTree.init(self.taskPool, cids[])), err:
|
||||
return failure(err)
|
||||
|
||||
without treeCid =? tree.rootCid, err:
|
||||
@ -646,7 +646,8 @@ proc decode*(self: Erasure, encoded: Manifest): Future[?!Manifest] {.async.} =
|
||||
without (cids, recoveredIndices) =? (await self.decodeInternal(encoded)), err:
|
||||
return failure(err)
|
||||
|
||||
without tree =? CodexTree.init(cids[0 ..< encoded.originalBlocksCount]), err:
|
||||
without tree =?
|
||||
(await CodexTree.init(self.taskPool, cids[0 ..< encoded.originalBlocksCount])), err:
|
||||
return failure(err)
|
||||
|
||||
without treeCid =? tree.rootCid, err:
|
||||
@ -677,7 +678,8 @@ proc repair*(self: Erasure, encoded: Manifest): Future[?!void] {.async.} =
|
||||
without (cids, _) =? (await self.decodeInternal(encoded)), err:
|
||||
return failure(err)
|
||||
|
||||
without tree =? CodexTree.init(cids[0 ..< encoded.originalBlocksCount]), err:
|
||||
without tree =?
|
||||
(await CodexTree.init(self.taskPool, cids[0 ..< encoded.originalBlocksCount])), err:
|
||||
return failure(err)
|
||||
|
||||
without treeCid =? tree.rootCid, err:
|
||||
|
||||
@ -10,16 +10,18 @@
|
||||
{.push raises: [].}
|
||||
|
||||
import std/bitops
|
||||
import std/sequtils
|
||||
import std/[atomics, sequtils]
|
||||
|
||||
import pkg/questionable
|
||||
import pkg/questionable/results
|
||||
import pkg/libp2p/[cid, multicodec, multihash]
|
||||
import pkg/constantine/hashes
|
||||
import pkg/taskpools
|
||||
import pkg/chronos/threadsync
|
||||
import ../../utils
|
||||
import ../../rng
|
||||
import ../../errors
|
||||
import ../../blocktype
|
||||
import ../../codextypes
|
||||
|
||||
from ../../utils/digest import digestBytes
|
||||
|
||||
@ -113,9 +115,7 @@ func compress*(x, y: openArray[byte], key: ByteTreeKey, codec: MultiCodec): ?!By
|
||||
let digest = ?MultiHash.digest(codec, input).mapFailure
|
||||
success digest.digestBytes
|
||||
|
||||
func init*(
|
||||
_: type CodexTree, mcodec: MultiCodec = Sha256HashCodec, leaves: openArray[ByteHash]
|
||||
): ?!CodexTree =
|
||||
func initTree(mcodec: MultiCodec, leaves: openArray[ByteHash]): ?!CodexTree =
|
||||
if leaves.len == 0:
|
||||
return failure "Empty leaves"
|
||||
|
||||
@ -128,11 +128,27 @@ func init*(
|
||||
if digestSize != leaves[0].len:
|
||||
return failure "Invalid hash length"
|
||||
|
||||
var self = CodexTree(mcodec: mcodec, compress: compressor, zero: Zero)
|
||||
|
||||
self.layers = ?merkleTreeWorker(self, leaves, isBottomLayer = true)
|
||||
var self = CodexTree(mcodec: mcodec)
|
||||
?self.prepare(compressor, Zero, leaves)
|
||||
success self
|
||||
|
||||
func init*(
|
||||
_: type CodexTree, mcodec: MultiCodec = Sha256HashCodec, leaves: openArray[ByteHash]
|
||||
): ?!CodexTree =
|
||||
let tree = ?initTree(mcodec, leaves)
|
||||
?tree.compute()
|
||||
success tree
|
||||
|
||||
proc init*(
|
||||
_: type CodexTree,
|
||||
tp: Taskpool,
|
||||
mcodec: MultiCodec = Sha256HashCodec,
|
||||
leaves: seq[ByteHash],
|
||||
): Future[?!CodexTree] {.async: (raises: [CancelledError]).} =
|
||||
let tree = ?initTree(mcodec, leaves)
|
||||
?await tree.compute(tp)
|
||||
success tree
|
||||
|
||||
func init*(_: type CodexTree, leaves: openArray[MultiHash]): ?!CodexTree =
|
||||
if leaves.len == 0:
|
||||
return failure "Empty leaves"
|
||||
@ -143,6 +159,18 @@ func init*(_: type CodexTree, leaves: openArray[MultiHash]): ?!CodexTree =
|
||||
|
||||
CodexTree.init(mcodec, leaves)
|
||||
|
||||
proc init*(
|
||||
_: type CodexTree, tp: Taskpool, leaves: seq[MultiHash]
|
||||
): Future[?!CodexTree] {.async: (raises: [CancelledError]).} =
|
||||
if leaves.len == 0:
|
||||
return failure "Empty leaves"
|
||||
|
||||
let
|
||||
mcodec = leaves[0].mcodec
|
||||
leaves = leaves.mapIt(it.digestBytes)
|
||||
|
||||
await CodexTree.init(tp, mcodec, leaves)
|
||||
|
||||
func init*(_: type CodexTree, leaves: openArray[Cid]): ?!CodexTree =
|
||||
if leaves.len == 0:
|
||||
return failure "Empty leaves"
|
||||
@ -153,6 +181,18 @@ func init*(_: type CodexTree, leaves: openArray[Cid]): ?!CodexTree =
|
||||
|
||||
CodexTree.init(mcodec, leaves)
|
||||
|
||||
proc init*(
|
||||
_: type CodexTree, tp: Taskpool, leaves: seq[Cid]
|
||||
): Future[?!CodexTree] {.async: (raises: [CancelledError]).} =
|
||||
if leaves.len == 0:
|
||||
return failure("Empty leaves")
|
||||
|
||||
let
|
||||
mcodec = (?leaves[0].mhash.mapFailure).mcodec
|
||||
leaves = leaves.mapIt((?it.mhash.mapFailure).digestBytes)
|
||||
|
||||
await CodexTree.init(tp, mcodec, leaves)
|
||||
|
||||
proc fromNodes*(
|
||||
_: type CodexTree,
|
||||
mcodec: MultiCodec = Sha256HashCodec,
|
||||
@ -171,15 +211,8 @@ proc fromNodes*(
|
||||
if digestSize != nodes[0].len:
|
||||
return failure "Invalid hash length"
|
||||
|
||||
var
|
||||
self = CodexTree(compress: compressor, zero: Zero, mcodec: mcodec)
|
||||
layer = nleaves
|
||||
pos = 0
|
||||
|
||||
while pos < nodes.len:
|
||||
self.layers.add(nodes[pos ..< (pos + layer)])
|
||||
pos += layer
|
||||
layer = divUp(layer, 2)
|
||||
var self = CodexTree(mcodec: mcodec)
|
||||
?self.fromNodes(compressor, Zero, nodes, nleaves)
|
||||
|
||||
let
|
||||
index = Rng.instance.rand(nleaves - 1)
|
||||
|
||||
@ -9,19 +9,58 @@
|
||||
|
||||
{.push raises: [].}
|
||||
|
||||
import std/bitops
|
||||
import std/[bitops, atomics, sequtils]
|
||||
import stew/assign2
|
||||
|
||||
import pkg/questionable/results
|
||||
import pkg/taskpools
|
||||
import pkg/chronos
|
||||
import pkg/chronos/threadsync
|
||||
|
||||
import ../errors
|
||||
import ../utils/sharedbuf
|
||||
|
||||
export sharedbuf
|
||||
|
||||
template nodeData(
|
||||
data: openArray[byte], offsets: openArray[int], nodeSize, i, j: int
|
||||
): openArray[byte] =
|
||||
## Bytes of the j'th entry of the i'th level in the tree, starting with the
|
||||
## leaves (at level 0).
|
||||
let start = (offsets[i] + j) * nodeSize
|
||||
data.toOpenArray(start, start + nodeSize - 1)
|
||||
|
||||
type
|
||||
# TODO hash functions don't fail - removing the ?! from this function would
|
||||
# significantly simplify the flow below
|
||||
CompressFn*[H, K] = proc(x, y: H, key: K): ?!H {.noSideEffect, raises: [].}
|
||||
|
||||
MerkleTree*[H, K] = ref object of RootObj
|
||||
layers*: seq[seq[H]]
|
||||
compress*: CompressFn[H, K]
|
||||
zero*: H
|
||||
CompressData[H, K] = object
|
||||
fn: CompressFn[H, K]
|
||||
nodeSize: int
|
||||
zero: H
|
||||
|
||||
MerkleTreeObj*[H, K] = object of RootObj
|
||||
store*: seq[byte]
|
||||
## Flattened merkle tree where hashes are assumed to be trivial bytes and
|
||||
## uniform in size.
|
||||
##
|
||||
## Each layer of the tree is stored serially starting with the leaves and
|
||||
## ending with the root.
|
||||
##
|
||||
## Beacuse the tree might not be balanced, `layerOffsets` contains the
|
||||
## index of the starting point of each level, for easy lookup.
|
||||
layerOffsets*: seq[int]
|
||||
## Starting point of each level in the tree, starting from the leaves -
|
||||
## multiplied by the entry size, this is the offset in the payload where
|
||||
## the entries of that level start
|
||||
##
|
||||
## For example, a tree with 4 leaves will have [0, 4, 6] stored here.
|
||||
##
|
||||
## See nodesPerLevel function, from whic this sequence is derived
|
||||
compress*: CompressData[H, K]
|
||||
|
||||
MerkleTree*[H, K] = ref MerkleTreeObj[H, K]
|
||||
|
||||
MerkleProof*[H, K] = ref object of RootObj
|
||||
index*: int # linear index of the leaf, starting from 0
|
||||
@ -30,33 +69,99 @@ type
|
||||
compress*: CompressFn[H, K] # compress function
|
||||
zero*: H # zero value
|
||||
|
||||
func levels*[H, K](self: MerkleTree[H, K]): int =
|
||||
return self.layerOffsets.len
|
||||
|
||||
func depth*[H, K](self: MerkleTree[H, K]): int =
|
||||
return self.layers.len - 1
|
||||
return self.levels() - 1
|
||||
|
||||
func nodesInLayer(offsets: openArray[int], layer: int): int =
|
||||
if layer == offsets.high:
|
||||
1
|
||||
else:
|
||||
offsets[layer + 1] - offsets[layer]
|
||||
|
||||
func nodesInLayer(self: MerkleTree | MerkleTreeObj, layer: int): int =
|
||||
self.layerOffsets.nodesInLayer(layer)
|
||||
|
||||
func leavesCount*[H, K](self: MerkleTree[H, K]): int =
|
||||
return self.layers[0].len
|
||||
return self.nodesInLayer(0)
|
||||
|
||||
func levels*[H, K](self: MerkleTree[H, K]): int =
|
||||
return self.layers.len
|
||||
func nodesPerLevel(nleaves: int): seq[int] =
|
||||
## Given a number of leaves, return a seq with the number of nodes at each
|
||||
## layer of the tree (from the bottom/leaves to the root)
|
||||
##
|
||||
## Ie For a tree of 4 leaves, return `[4, 2, 1]`
|
||||
if nleaves <= 0:
|
||||
return @[]
|
||||
elif nleaves == 1:
|
||||
return @[1, 1] # leaf and root
|
||||
|
||||
func leaves*[H, K](self: MerkleTree[H, K]): seq[H] =
|
||||
return self.layers[0]
|
||||
var nodes: seq[int] = @[]
|
||||
var m = nleaves
|
||||
while true:
|
||||
nodes.add(m)
|
||||
if m == 1:
|
||||
break
|
||||
# Next layer size is ceil(m/2)
|
||||
m = (m + 1) shr 1
|
||||
|
||||
iterator layers*[H, K](self: MerkleTree[H, K]): seq[H] =
|
||||
for layer in self.layers:
|
||||
yield layer
|
||||
nodes
|
||||
|
||||
func layerOffsets(nleaves: int): seq[int] =
|
||||
## Given a number of leaves, return a seq of the starting offsets of each
|
||||
## layer in the node store that results from flattening the binary tree
|
||||
##
|
||||
## Ie For a tree of 4 leaves, return `[0, 4, 6]`
|
||||
let nodes = nodesPerLevel(nleaves)
|
||||
var tot = 0
|
||||
let offsets = nodes.mapIt:
|
||||
let cur = tot
|
||||
tot += it
|
||||
cur
|
||||
offsets
|
||||
|
||||
template nodeData(self: MerkleTreeObj, i, j: int): openArray[byte] =
|
||||
## Bytes of the j'th node of the i'th level in the tree, starting with the
|
||||
## leaves (at level 0).
|
||||
self.store.nodeData(self.layerOffsets, self.compress.nodeSize, i, j)
|
||||
|
||||
func layer*[H, K](
|
||||
self: MerkleTree[H, K], layer: int
|
||||
): seq[H] {.deprecated: "Expensive".} =
|
||||
var nodes = newSeq[H](self.nodesInLayer(layer))
|
||||
for i, h in nodes.mpairs:
|
||||
assign(h, self[].nodeData(layer, i))
|
||||
return nodes
|
||||
|
||||
func leaves*[H, K](self: MerkleTree[H, K]): seq[H] {.deprecated: "Expensive".} =
|
||||
self.layer(0)
|
||||
|
||||
iterator layers*[H, K](self: MerkleTree[H, K]): seq[H] {.deprecated: "Expensive".} =
|
||||
for i in 0 ..< self.layerOffsets.len:
|
||||
yield self.layer(i)
|
||||
|
||||
proc layers*[H, K](self: MerkleTree[H, K]): seq[seq[H]] {.deprecated: "Expensive".} =
|
||||
for l in self.layers():
|
||||
result.add l
|
||||
|
||||
iterator nodes*[H, K](self: MerkleTree[H, K]): H =
|
||||
for layer in self.layers:
|
||||
for node in layer:
|
||||
## Iterate over the nodes of each layer starting with the leaves
|
||||
var node: H
|
||||
for i in 0 ..< self.layerOffsets.len:
|
||||
let nodesInLayer = self.nodesInLayer(i)
|
||||
for j in 0 ..< nodesInLayer:
|
||||
assign(node, self[].nodeData(i, j))
|
||||
yield node
|
||||
|
||||
func root*[H, K](self: MerkleTree[H, K]): ?!H =
|
||||
let last = self.layers[^1]
|
||||
if last.len != 1:
|
||||
mixin assign
|
||||
if self.layerOffsets.len == 0:
|
||||
return failure "invalid tree"
|
||||
|
||||
return success last[0]
|
||||
var h: H
|
||||
assign(h, self[].nodeData(self.layerOffsets.high(), 0))
|
||||
return success h
|
||||
|
||||
func getProof*[H, K](
|
||||
self: MerkleTree[H, K], index: int, proof: MerkleProof[H, K]
|
||||
@ -72,18 +177,19 @@ func getProof*[H, K](
|
||||
var m = nleaves
|
||||
for i in 0 ..< depth:
|
||||
let j = k xor 1
|
||||
path[i] =
|
||||
if (j < m):
|
||||
self.layers[i][j]
|
||||
else:
|
||||
self.zero
|
||||
|
||||
if (j < m):
|
||||
assign(path[i], self[].nodeData(i, j))
|
||||
else:
|
||||
path[i] = self.compress.zero
|
||||
|
||||
k = k shr 1
|
||||
m = (m + 1) shr 1
|
||||
|
||||
proof.index = index
|
||||
proof.path = path
|
||||
proof.nleaves = nleaves
|
||||
proof.compress = self.compress
|
||||
proof.compress = self.compress.fn
|
||||
|
||||
success()
|
||||
|
||||
@ -122,32 +228,169 @@ func reconstructRoot*[H, K](proof: MerkleProof[H, K], leaf: H): ?!H =
|
||||
func verify*[H, K](proof: MerkleProof[H, K], leaf: H, root: H): ?!bool =
|
||||
success bool(root == ?proof.reconstructRoot(leaf))
|
||||
|
||||
func merkleTreeWorker*[H, K](
|
||||
self: MerkleTree[H, K], xs: openArray[H], isBottomLayer: static bool
|
||||
): ?!seq[seq[H]] =
|
||||
let a = low(xs)
|
||||
let b = high(xs)
|
||||
let m = b - a + 1
|
||||
func fromNodes*[H, K](
|
||||
self: MerkleTree[H, K],
|
||||
compressor: CompressFn,
|
||||
zero: H,
|
||||
nodes: openArray[H],
|
||||
nleaves: int,
|
||||
): ?!void =
|
||||
mixin assign
|
||||
|
||||
if nodes.len < 2: # At least leaf and root
|
||||
return failure "Not enough nodes"
|
||||
|
||||
if nleaves == 0:
|
||||
return failure "No leaves"
|
||||
|
||||
self.compress = CompressData[H, K](fn: compressor, nodeSize: nodes[0].len, zero: zero)
|
||||
self.layerOffsets = layerOffsets(nleaves)
|
||||
|
||||
if self.layerOffsets[^1] + 1 != nodes.len:
|
||||
return failure "bad node count"
|
||||
|
||||
self.store = newSeqUninit[byte](nodes.len * self.compress.nodeSize)
|
||||
|
||||
for i in 0 ..< nodes.len:
|
||||
assign(
|
||||
self[].store.toOpenArray(
|
||||
i * self.compress.nodeSize, (i + 1) * self.compress.nodeSize - 1
|
||||
),
|
||||
nodes[i],
|
||||
)
|
||||
|
||||
success()
|
||||
|
||||
func merkleTreeWorker[H, K](
|
||||
store: var openArray[byte],
|
||||
offsets: openArray[int],
|
||||
compress: CompressData[H, K],
|
||||
layer: int,
|
||||
isBottomLayer: static bool,
|
||||
): ?!void =
|
||||
## Worker used to compute the merkle tree from the leaves that are assumed to
|
||||
## already be stored at the beginning of the `store`, as done by `prepare`.
|
||||
|
||||
# Throughout, we use `assign` to convert from H to bytes and back, assuming
|
||||
# this assignment can be done somewhat efficiently (ie memcpy) - because
|
||||
# the code must work with multihash where len(H) is can differ, we cannot
|
||||
# simply use a fixed-size array here.
|
||||
mixin assign
|
||||
|
||||
template nodeData(i, j: int): openArray[byte] =
|
||||
# Pick out the bytes of node j in layer i
|
||||
store.nodeData(offsets, compress.nodeSize, i, j)
|
||||
|
||||
let m = offsets.nodesInLayer(layer)
|
||||
|
||||
when not isBottomLayer:
|
||||
if m == 1:
|
||||
return success @[@xs]
|
||||
return success()
|
||||
|
||||
let halfn: int = m div 2
|
||||
let n: int = 2 * halfn
|
||||
let isOdd: bool = (n != m)
|
||||
|
||||
var ys: seq[H]
|
||||
if not isOdd:
|
||||
ys = newSeq[H](halfn)
|
||||
else:
|
||||
ys = newSeq[H](halfn + 1)
|
||||
# Because the compression function we work with works with H and not bytes,
|
||||
# we need to extract H from the raw data - a little abstraction tax that
|
||||
# ensures that properties like alignment of H are respected.
|
||||
var a, b, tmp: H
|
||||
|
||||
for i in 0 ..< halfn:
|
||||
const key = when isBottomLayer: K.KeyBottomLayer else: K.KeyNone
|
||||
ys[i] = ?self.compress(xs[a + 2 * i], xs[a + 2 * i + 1], key = key)
|
||||
|
||||
assign(a, nodeData(layer, i * 2))
|
||||
assign(b, nodeData(layer, i * 2 + 1))
|
||||
|
||||
tmp = ?compress.fn(a, b, key = key)
|
||||
|
||||
assign(nodeData(layer + 1, i), tmp)
|
||||
|
||||
if isOdd:
|
||||
const key = when isBottomLayer: K.KeyOddAndBottomLayer else: K.KeyOdd
|
||||
ys[halfn] = ?self.compress(xs[n], self.zero, key = key)
|
||||
|
||||
success @[@xs] & ?self.merkleTreeWorker(ys, isBottomLayer = false)
|
||||
assign(a, nodeData(layer, n))
|
||||
|
||||
tmp = ?compress.fn(a, compress.zero, key = key)
|
||||
|
||||
assign(nodeData(layer + 1, halfn), tmp)
|
||||
|
||||
merkleTreeWorker(store, offsets, compress, layer + 1, false)
|
||||
|
||||
proc merkleTreeWorker[H, K](
|
||||
store: SharedBuf[byte],
|
||||
offsets: SharedBuf[int],
|
||||
compress: ptr CompressData[H, K],
|
||||
signal: ThreadSignalPtr,
|
||||
): bool =
|
||||
defer:
|
||||
discard signal.fireSync()
|
||||
|
||||
let res = merkleTreeWorker(
|
||||
store.toOpenArray(), offsets.toOpenArray(), compress[], 0, isBottomLayer = true
|
||||
)
|
||||
|
||||
return res.isOk()
|
||||
|
||||
func prepare*[H, K](
|
||||
self: MerkleTree[H, K], compressor: CompressFn, zero: H, leaves: openArray[H]
|
||||
): ?!void =
|
||||
## Prepare the instance for computing the merkle tree of the given leaves using
|
||||
## the given compression function. After preparation, `compute` should be
|
||||
## called to perform the actual computation. `leaves` will be copied into the
|
||||
## tree so they can be freed after the call.
|
||||
|
||||
if leaves.len == 0:
|
||||
return failure "No leaves"
|
||||
|
||||
self.compress =
|
||||
CompressData[H, K](fn: compressor, nodeSize: leaves[0].len, zero: zero)
|
||||
self.layerOffsets = layerOffsets(leaves.len)
|
||||
|
||||
self.store = newSeqUninit[byte]((self.layerOffsets[^1] + 1) * self.compress.nodeSize)
|
||||
|
||||
for j in 0 ..< leaves.len:
|
||||
assign(self[].nodeData(0, j), leaves[j])
|
||||
|
||||
return success()
|
||||
|
||||
proc compute*[H, K](self: MerkleTree[H, K]): ?!void =
|
||||
merkleTreeWorker(
|
||||
self.store, self.layerOffsets, self.compress, 0, isBottomLayer = true
|
||||
)
|
||||
|
||||
proc compute*[H, K](
|
||||
self: MerkleTree[H, K], tp: Taskpool
|
||||
): Future[?!void] {.async: (raises: []).} =
|
||||
if tp.numThreads == 1:
|
||||
# With a single thread, there's no point creating a separate task
|
||||
return self.compute()
|
||||
|
||||
# TODO this signal would benefit from reuse across computations
|
||||
without signal =? ThreadSignalPtr.new():
|
||||
return failure("Unable to create thread signal")
|
||||
|
||||
defer:
|
||||
signal.close().expect("closing once works")
|
||||
|
||||
let res = tp.spawn merkleTreeWorker(
|
||||
SharedBuf.view(self.store),
|
||||
SharedBuf.view(self.layerOffsets),
|
||||
addr self.compress,
|
||||
signal,
|
||||
)
|
||||
|
||||
# To support cancellation, we'd have to ensure the task we posted to taskpools
|
||||
# exits early - since we're not doing that, block cancellation attempts
|
||||
try:
|
||||
await noCancel signal.wait()
|
||||
except AsyncError as exc:
|
||||
# Since we initialized the signal, the OS or chronos is misbehaving. In any
|
||||
# case, it would mean the task is still running which would cause a memory
|
||||
# a memory violation if we let it run - panic instead
|
||||
raiseAssert "Could not wait for signal, was it initialized? " & exc.msg
|
||||
|
||||
if not res.sync():
|
||||
return failure("merkle tree task failed")
|
||||
|
||||
return success()
|
||||
|
||||
@ -9,9 +9,11 @@
|
||||
|
||||
{.push raises: [].}
|
||||
|
||||
import std/sequtils
|
||||
import std/[sequtils, atomics]
|
||||
|
||||
import pkg/poseidon2
|
||||
import pkg/taskpools
|
||||
import pkg/chronos/threadsync
|
||||
import pkg/constantine/math/io/io_fields
|
||||
import pkg/constantine/platforms/abstractions
|
||||
import pkg/questionable/results
|
||||
@ -44,6 +46,17 @@ type
|
||||
Poseidon2Tree* = MerkleTree[Poseidon2Hash, PoseidonKeysEnum]
|
||||
Poseidon2Proof* = MerkleProof[Poseidon2Hash, PoseidonKeysEnum]
|
||||
|
||||
proc len*(v: Poseidon2Hash): int =
|
||||
sizeof(v)
|
||||
|
||||
proc assign*(v: var openArray[byte], h: Poseidon2Hash) =
|
||||
doAssert v.len == sizeof(h)
|
||||
copyMem(addr v[0], addr h, sizeof(h))
|
||||
|
||||
proc assign*(h: var Poseidon2Hash, v: openArray[byte]) =
|
||||
doAssert v.len == sizeof(h)
|
||||
copyMem(addr h, addr v[0], sizeof(h))
|
||||
|
||||
proc `$`*(self: Poseidon2Tree): string =
|
||||
let root = if self.root.isOk: self.root.get.toHex else: "none"
|
||||
"Poseidon2Tree(" & " root: " & root & ", leavesCount: " & $self.leavesCount &
|
||||
@ -63,7 +76,7 @@ converter toKey*(key: PoseidonKeysEnum): Poseidon2Hash =
|
||||
of KeyOdd: KeyOddF
|
||||
of KeyOddAndBottomLayer: KeyOddAndBottomLayerF
|
||||
|
||||
func init*(_: type Poseidon2Tree, leaves: openArray[Poseidon2Hash]): ?!Poseidon2Tree =
|
||||
proc initTree(leaves: openArray[Poseidon2Hash]): ?!Poseidon2Tree =
|
||||
if leaves.len == 0:
|
||||
return failure "Empty leaves"
|
||||
|
||||
@ -72,34 +85,43 @@ func init*(_: type Poseidon2Tree, leaves: openArray[Poseidon2Hash]): ?!Poseidon2
|
||||
): ?!Poseidon2Hash {.noSideEffect.} =
|
||||
success compress(x, y, key.toKey)
|
||||
|
||||
var self = Poseidon2Tree(compress: compressor, zero: Poseidon2Zero)
|
||||
var self = Poseidon2Tree()
|
||||
?self.prepare(compressor, Poseidon2Zero, leaves)
|
||||
success self
|
||||
|
||||
func init*(_: type Poseidon2Tree, leaves: openArray[Poseidon2Hash]): ?!Poseidon2Tree =
|
||||
let self = ?initTree(leaves)
|
||||
?self.compute()
|
||||
|
||||
success self
|
||||
|
||||
proc init*(
|
||||
_: type Poseidon2Tree, tp: Taskpool, leaves: seq[Poseidon2Hash]
|
||||
): Future[?!Poseidon2Tree] {.async: (raises: [CancelledError]).} =
|
||||
let self = ?initTree(leaves)
|
||||
|
||||
?await self.compute(tp)
|
||||
|
||||
self.layers = ?merkleTreeWorker(self, leaves, isBottomLayer = true)
|
||||
success self
|
||||
|
||||
func init*(_: type Poseidon2Tree, leaves: openArray[array[31, byte]]): ?!Poseidon2Tree =
|
||||
Poseidon2Tree.init(leaves.mapIt(Poseidon2Hash.fromBytes(it)))
|
||||
|
||||
proc init*(
|
||||
_: type Poseidon2Tree, tp: Taskpool, leaves: seq[array[31, byte]]
|
||||
): Future[?!Poseidon2Tree] {.async: (raises: [CancelledError]).} =
|
||||
await Poseidon2Tree.init(tp, leaves.mapIt(Poseidon2Hash.fromBytes(it)))
|
||||
|
||||
proc fromNodes*(
|
||||
_: type Poseidon2Tree, nodes: openArray[Poseidon2Hash], nleaves: int
|
||||
): ?!Poseidon2Tree =
|
||||
if nodes.len == 0:
|
||||
return failure "Empty nodes"
|
||||
|
||||
let compressor = proc(
|
||||
x, y: Poseidon2Hash, key: PoseidonKeysEnum
|
||||
): ?!Poseidon2Hash {.noSideEffect.} =
|
||||
success compress(x, y, key.toKey)
|
||||
|
||||
var
|
||||
self = Poseidon2Tree(compress: compressor, zero: zero)
|
||||
layer = nleaves
|
||||
pos = 0
|
||||
|
||||
while pos < nodes.len:
|
||||
self.layers.add(nodes[pos ..< (pos + layer)])
|
||||
pos += layer
|
||||
layer = divUp(layer, 2)
|
||||
let self = Poseidon2Tree()
|
||||
?self.fromNodes(compressor, Poseidon2Zero, nodes, nleaves)
|
||||
|
||||
let
|
||||
index = Rng.instance.rand(nleaves - 1)
|
||||
|
||||
@ -75,7 +75,7 @@ type
|
||||
contracts*: Contracts
|
||||
clock*: Clock
|
||||
storage*: Contracts
|
||||
taskpool: Taskpool
|
||||
taskPool: Taskpool
|
||||
trackedFutures: TrackedFutures
|
||||
|
||||
CodexNodeRef* = ref CodexNode
|
||||
@ -325,7 +325,7 @@ proc streamEntireDataset(
|
||||
try:
|
||||
# Spawn an erasure decoding job
|
||||
let erasure = Erasure.new(
|
||||
self.networkStore, leoEncoderProvider, leoDecoderProvider, self.taskpool
|
||||
self.networkStore, leoEncoderProvider, leoDecoderProvider, self.taskPool
|
||||
)
|
||||
without _ =? (await erasure.decode(manifest)), error:
|
||||
error "Unable to erasure decode manifest", manifestCid, exc = error.msg
|
||||
@ -474,7 +474,7 @@ proc store*(
|
||||
finally:
|
||||
await stream.close()
|
||||
|
||||
without tree =? CodexTree.init(cids), err:
|
||||
without tree =? (await CodexTree.init(self.taskPool, cids)), err:
|
||||
return failure(err)
|
||||
|
||||
without treeCid =? tree.rootCid(CIDv1, dataCodec), err:
|
||||
@ -568,14 +568,15 @@ proc setupRequest(
|
||||
|
||||
# Erasure code the dataset according to provided parameters
|
||||
let erasure = Erasure.new(
|
||||
self.networkStore.localStore, leoEncoderProvider, leoDecoderProvider, self.taskpool
|
||||
self.networkStore.localStore, leoEncoderProvider, leoDecoderProvider, self.taskPool
|
||||
)
|
||||
|
||||
without encoded =? (await erasure.encode(manifest, ecK, ecM)), error:
|
||||
trace "Unable to erasure code dataset"
|
||||
return failure(error)
|
||||
|
||||
without builder =? Poseidon2Builder.new(self.networkStore.localStore, encoded), err:
|
||||
without builder =?
|
||||
Poseidon2Builder.new(self.networkStore.localStore, encoded, self.taskPool), err:
|
||||
trace "Unable to create slot builder"
|
||||
return failure(err)
|
||||
|
||||
@ -679,7 +680,9 @@ proc onStore(
|
||||
return failure(err)
|
||||
|
||||
without builder =?
|
||||
Poseidon2Builder.new(self.networkStore, manifest, manifest.verifiableStrategy), err:
|
||||
Poseidon2Builder.new(
|
||||
self.networkStore, manifest, self.taskPool, manifest.verifiableStrategy
|
||||
), err:
|
||||
trace "Unable to create slots builder", err = err.msg
|
||||
return failure(err)
|
||||
|
||||
@ -714,7 +717,7 @@ proc onStore(
|
||||
trace "start repairing slot", slotIdx
|
||||
try:
|
||||
let erasure = Erasure.new(
|
||||
self.networkStore, leoEncoderProvider, leoDecoderProvider, self.taskpool
|
||||
self.networkStore, leoEncoderProvider, leoDecoderProvider, self.taskPool
|
||||
)
|
||||
if err =? (await erasure.repair(manifest)).errorOption:
|
||||
error "Unable to erasure decode repairing manifest",
|
||||
@ -916,7 +919,7 @@ proc new*(
|
||||
networkStore: NetworkStore,
|
||||
engine: BlockExcEngine,
|
||||
discovery: Discovery,
|
||||
taskpool: Taskpool,
|
||||
taskPool: Taskpool,
|
||||
prover = Prover.none,
|
||||
contracts = Contracts.default,
|
||||
): CodexNodeRef =
|
||||
@ -929,7 +932,7 @@ proc new*(
|
||||
engine: engine,
|
||||
prover: prover,
|
||||
discovery: discovery,
|
||||
taskPool: taskpool,
|
||||
taskPool: taskPool,
|
||||
contracts: contracts,
|
||||
trackedFutures: TrackedFutures(),
|
||||
)
|
||||
|
||||
@ -18,18 +18,20 @@ import pkg/chronos
|
||||
import pkg/questionable
|
||||
import pkg/questionable/results
|
||||
import pkg/constantine/math/io/io_fields
|
||||
import pkg/taskpools
|
||||
|
||||
import ../../logutils
|
||||
import ../../utils
|
||||
import ../../stores
|
||||
import ../../manifest
|
||||
import ../../merkletree
|
||||
import ../../utils/poseidon2digest
|
||||
import ../../utils/asynciter
|
||||
import ../../indexingstrategy
|
||||
|
||||
import ../converters
|
||||
|
||||
export converters, asynciter
|
||||
export converters, asynciter, poseidon2digest
|
||||
|
||||
logScope:
|
||||
topics = "codex slotsbuilder"
|
||||
@ -45,6 +47,7 @@ type SlotsBuilder*[T, H] = ref object of RootObj
|
||||
emptyBlock: seq[byte] # empty block
|
||||
verifiableTree: ?T # verification tree (dataset tree)
|
||||
emptyDigestTree: T # empty digest tree for empty blocks
|
||||
taskPool: Taskpool
|
||||
|
||||
func verifiable*[T, H](self: SlotsBuilder[T, H]): bool {.inline.} =
|
||||
## Returns true if the slots are verifiable.
|
||||
@ -165,6 +168,35 @@ proc buildBlockTree*[T, H](
|
||||
|
||||
success (blk.data, tree)
|
||||
|
||||
proc getBlockDigest*[T, H](
|
||||
self: SlotsBuilder[T, H], blkIdx: Natural, slotPos: Natural
|
||||
): Future[?!H] {.async: (raises: [CancelledError]).} =
|
||||
logScope:
|
||||
blkIdx = blkIdx
|
||||
slotPos = slotPos
|
||||
numSlotBlocks = self.manifest.numSlotBlocks
|
||||
cellSize = self.cellSize
|
||||
|
||||
trace "Building block tree"
|
||||
|
||||
if slotPos > (self.manifest.numSlotBlocks - 1):
|
||||
# pad blocks are 0 byte blocks
|
||||
trace "Returning empty digest tree for pad block"
|
||||
return self.emptyDigestTree.root
|
||||
|
||||
without blk =? await self.store.getBlock(self.manifest.treeCid, blkIdx), err:
|
||||
error "Failed to get block CID for tree at index", err = err.msg
|
||||
return failure(err)
|
||||
|
||||
if blk.isEmpty:
|
||||
return self.emptyDigestTree.root
|
||||
|
||||
without dg =? (await T.digest(self.taskPool, blk.data, self.cellSize.int)), err:
|
||||
error "Failed to create digest for block", err = err.msg
|
||||
return failure(err)
|
||||
|
||||
return success dg
|
||||
|
||||
proc getCellHashes*[T, H](
|
||||
self: SlotsBuilder[T, H], slotIndex: Natural
|
||||
): Future[?!seq[H]] {.async: (raises: [CancelledError, IndexingError]).} =
|
||||
@ -190,8 +222,7 @@ proc getCellHashes*[T, H](
|
||||
pos = i
|
||||
|
||||
trace "Getting block CID for tree at index"
|
||||
without (_, tree) =? (await self.buildBlockTree(blkIdx, i)) and digest =? tree.root,
|
||||
err:
|
||||
without digest =? (await self.getBlockDigest(blkIdx, i)), err:
|
||||
error "Failed to get block CID for tree at index", err = err.msg
|
||||
return failure(err)
|
||||
|
||||
@ -310,6 +341,7 @@ proc new*[T, H](
|
||||
_: type SlotsBuilder[T, H],
|
||||
store: BlockStore,
|
||||
manifest: Manifest,
|
||||
taskPool: Taskpool,
|
||||
strategy = LinearStrategy,
|
||||
cellSize = DefaultCellSize,
|
||||
): ?!SlotsBuilder[T, H] =
|
||||
@ -383,6 +415,7 @@ proc new*[T, H](
|
||||
emptyBlock: emptyBlock,
|
||||
numSlotBlocks: numSlotBlocksTotal,
|
||||
emptyDigestTree: emptyDigestTree,
|
||||
taskPool: taskPool,
|
||||
)
|
||||
|
||||
if manifest.verifiable:
|
||||
|
||||
@ -13,6 +13,7 @@ import pkg/chronicles
|
||||
import pkg/circomcompat
|
||||
import pkg/poseidon2
|
||||
import pkg/questionable/results
|
||||
import pkg/taskpools
|
||||
|
||||
import pkg/libp2p/cid
|
||||
|
||||
@ -47,6 +48,7 @@ type
|
||||
backend: AnyBackend
|
||||
store: BlockStore
|
||||
nSamples: int
|
||||
taskPool: Taskpool
|
||||
|
||||
proc prove*(
|
||||
self: Prover, slotIdx: int, manifest: Manifest, challenge: ProofChallenge
|
||||
@ -61,7 +63,7 @@ proc prove*(
|
||||
|
||||
trace "Received proof challenge"
|
||||
|
||||
without builder =? AnyBuilder.new(self.store, manifest), err:
|
||||
without builder =? AnyBuilder.new(self.store, manifest, self.taskPool), err:
|
||||
error "Unable to create slots builder", err = err.msg
|
||||
return failure(err)
|
||||
|
||||
@ -88,6 +90,6 @@ proc verify*(
|
||||
self.backend.verify(proof, inputs)
|
||||
|
||||
proc new*(
|
||||
_: type Prover, store: BlockStore, backend: AnyBackend, nSamples: int
|
||||
_: type Prover, store: BlockStore, backend: AnyBackend, nSamples: int, tp: Taskpool
|
||||
): Prover =
|
||||
Prover(store: store, backend: backend, nSamples: nSamples)
|
||||
Prover(store: store, backend: backend, nSamples: nSamples, taskPool: tp)
|
||||
|
||||
@ -7,13 +7,27 @@
|
||||
## This file may not be copied, modified, or distributed except according to
|
||||
## those terms.
|
||||
|
||||
import std/[atomics]
|
||||
import pkg/poseidon2
|
||||
import pkg/questionable/results
|
||||
import pkg/libp2p/multihash
|
||||
import pkg/stew/byteutils
|
||||
import pkg/taskpools
|
||||
import pkg/chronos
|
||||
import pkg/chronos/threadsync
|
||||
import ./uniqueptr
|
||||
|
||||
import ../merkletree
|
||||
|
||||
type DigestTask* = object
|
||||
signal: ThreadSignalPtr
|
||||
bytes: seq[byte]
|
||||
chunkSize: int
|
||||
success: Atomic[bool]
|
||||
digest: UniquePtr[Poseidon2Hash]
|
||||
|
||||
export DigestTask
|
||||
|
||||
func spongeDigest*(
|
||||
_: type Poseidon2Hash, bytes: openArray[byte], rate: static int = 2
|
||||
): ?!Poseidon2Hash =
|
||||
@ -30,7 +44,7 @@ func spongeDigest*(
|
||||
|
||||
success Sponge.digest(bytes, rate)
|
||||
|
||||
func digestTree*(
|
||||
proc digestTree*(
|
||||
_: type Poseidon2Tree, bytes: openArray[byte], chunkSize: int
|
||||
): ?!Poseidon2Tree =
|
||||
## Hashes chunks of data with a sponge of rate 2, and combines the
|
||||
@ -44,6 +58,7 @@ func digestTree*(
|
||||
|
||||
var index = 0
|
||||
var leaves: seq[Poseidon2Hash]
|
||||
|
||||
while index < bytes.len:
|
||||
let start = index
|
||||
let finish = min(index + chunkSize, bytes.len)
|
||||
@ -61,6 +76,46 @@ func digest*(
|
||||
|
||||
(?Poseidon2Tree.digestTree(bytes, chunkSize)).root
|
||||
|
||||
proc digestWorker(tp: Taskpool, task: ptr DigestTask) {.gcsafe.} =
|
||||
defer:
|
||||
discard task[].signal.fireSync()
|
||||
|
||||
var res = Poseidon2Tree.digest(task[].bytes, task[].chunkSize)
|
||||
|
||||
if res.isErr:
|
||||
task[].success.store(false)
|
||||
return
|
||||
|
||||
task[].digest = newUniquePtr(res.get())
|
||||
task[].success.store(true)
|
||||
|
||||
proc digest*(
|
||||
_: type Poseidon2Tree, tp: Taskpool, bytes: seq[byte], chunkSize: int
|
||||
): Future[?!Poseidon2Hash] {.async: (raises: [CancelledError]).} =
|
||||
without signal =? ThreadSignalPtr.new():
|
||||
return failure("Unable to create thread signal")
|
||||
defer:
|
||||
signal.close().expect("closing once works")
|
||||
|
||||
doAssert tp.numThreads > 1,
|
||||
"Must have at least one separate thread or signal will never be fired"
|
||||
|
||||
var task = DigestTask(signal: signal, bytes: bytes, chunkSize: chunkSize)
|
||||
|
||||
tp.spawn digestWorker(tp, addr task)
|
||||
|
||||
let signalFut = signal.wait()
|
||||
|
||||
if err =? catch(await signalFut.join()).errorOption:
|
||||
?catch(await noCancel signalFut)
|
||||
if err of CancelledError:
|
||||
raise (ref CancelledError) err
|
||||
|
||||
if not task.success.load():
|
||||
return failure("digest task failed")
|
||||
|
||||
success extractValue(task.digest)
|
||||
|
||||
func digestMhash*(
|
||||
_: type Poseidon2Tree, bytes: openArray[byte], chunkSize: int
|
||||
): ?!MultiHash =
|
||||
|
||||
24
codex/utils/sharedbuf.nim
Normal file
24
codex/utils/sharedbuf.nim
Normal file
@ -0,0 +1,24 @@
|
||||
import stew/ptrops
|
||||
|
||||
type SharedBuf*[T] = object
|
||||
payload*: ptr UncheckedArray[T]
|
||||
len*: int
|
||||
|
||||
proc view*[T](_: type SharedBuf, v: openArray[T]): SharedBuf[T] =
|
||||
if v.len > 0:
|
||||
SharedBuf[T](payload: makeUncheckedArray(addr v[0]), len: v.len)
|
||||
else:
|
||||
default(SharedBuf[T])
|
||||
|
||||
template checkIdx(v: SharedBuf, i: int) =
|
||||
doAssert i > 0 and i <= v.len
|
||||
|
||||
proc `[]`*[T](v: SharedBuf[T], i: int): var T =
|
||||
v.checkIdx(i)
|
||||
v.payload[i]
|
||||
|
||||
template toOpenArray*[T](v: SharedBuf[T]): var openArray[T] =
|
||||
v.payload.toOpenArray(0, v.len - 1)
|
||||
|
||||
template toOpenArray*[T](v: SharedBuf[T], s, e: int): var openArray[T] =
|
||||
v.toOpenArray().toOpenArray(s, e)
|
||||
58
codex/utils/uniqueptr.nim
Normal file
58
codex/utils/uniqueptr.nim
Normal file
@ -0,0 +1,58 @@
|
||||
import std/isolation
|
||||
type UniquePtr*[T] = object
|
||||
## A unique pointer to a seq[seq[T]] in shared memory
|
||||
## Can only be moved, not copied
|
||||
data: ptr T
|
||||
|
||||
proc newUniquePtr*[T](data: sink Isolated[T]): UniquePtr[T] =
|
||||
## Creates a new unique sequence in shared memory
|
||||
## The memory is automatically freed when the object is destroyed
|
||||
result.data = cast[ptr T](allocShared0(sizeof(T)))
|
||||
result.data[] = extract(data)
|
||||
|
||||
template newUniquePtr*[T](data: T): UniquePtr[T] =
|
||||
newUniquePtr(isolate(data))
|
||||
|
||||
proc `=destroy`*[T](p: var UniquePtr[T]) =
|
||||
## Destructor for UniquePtr
|
||||
if p.data != nil:
|
||||
deallocShared(p.data)
|
||||
p.data = nil
|
||||
|
||||
proc `=copy`*[T](
|
||||
dest: var UniquePtr[T], src: UniquePtr[T]
|
||||
) {.error: "UniquePtr cannot be copied, only moved".}
|
||||
|
||||
proc `=sink`*[T](dest: var UniquePtr[T], src: UniquePtr[T]) =
|
||||
if dest.data != nil:
|
||||
`=destroy`(dest)
|
||||
dest.data = src.data
|
||||
# We need to nil out the source data to prevent double-free
|
||||
# This is handled by Nim's destructive move semantics
|
||||
|
||||
proc `[]`*[T](p: UniquePtr[T]): lent T =
|
||||
## Access the data (read-only)
|
||||
if p.data == nil:
|
||||
raise newException(NilAccessDefect, "accessing nil UniquePtr")
|
||||
p.data[]
|
||||
|
||||
# proc `[]`*[T](p: var UniquePtr[T]): var T =
|
||||
# ## Access the data (mutable)
|
||||
# if p.data == nil:
|
||||
# raise newException(NilAccessDefect, "accessing nil UniquePtr")
|
||||
# p.data[]
|
||||
|
||||
proc isNil*[T](p: UniquePtr[T]): bool =
|
||||
## Check if the UniquePtr is nil
|
||||
p.data == nil
|
||||
|
||||
proc extractValue*[T](p: var UniquePtr[T]): T =
|
||||
## Extract the value from the UniquePtr and release the memory
|
||||
if p.data == nil:
|
||||
raise newException(NilAccessDefect, "extracting from nil UniquePtr")
|
||||
# Move the value out
|
||||
var isolated = isolate(p.data[])
|
||||
result = extract(isolated)
|
||||
# Free the shared memory
|
||||
deallocShared(p.data)
|
||||
p.data = nil
|
||||
@ -12,6 +12,13 @@ proc testGenericTree*[H, K, U](
|
||||
let data = @data
|
||||
|
||||
suite "Correctness tests - " & name:
|
||||
test "Should build correct tree for single leaf":
|
||||
let expectedRoot = compress(data[0], zero, K.KeyOddAndBottomLayer)
|
||||
|
||||
let tree = makeTree(data[0 .. 0])
|
||||
check:
|
||||
tree.root.tryGet == expectedRoot
|
||||
|
||||
test "Should build correct tree for even bottom layer":
|
||||
let expectedRoot = compress(
|
||||
compress(
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
import std/sequtils
|
||||
import std/times
|
||||
|
||||
import pkg/unittest2
|
||||
import pkg/questionable/results
|
||||
import pkg/stew/byteutils
|
||||
import pkg/libp2p
|
||||
@ -9,8 +9,11 @@ import pkg/codex/codextypes
|
||||
import pkg/codex/merkletree
|
||||
import pkg/codex/utils/digest
|
||||
|
||||
import pkg/taskpools
|
||||
|
||||
import ./helpers
|
||||
import ./generictreetests
|
||||
import ../../asynctest
|
||||
|
||||
# TODO: Generalize to other hashes
|
||||
|
||||
@ -43,9 +46,23 @@ suite "Test CodexTree":
|
||||
CodexTree.init(sha256, leaves = newSeq[ByteHash]()).isErr
|
||||
|
||||
test "Should build tree from multihash leaves":
|
||||
var expectedLeaves = data.mapIt(MultiHash.digest($sha256, it).tryGet())
|
||||
var
|
||||
expectedLeaves = data.mapIt(MultiHash.digest($sha256, it).tryGet())
|
||||
tree = CodexTree.init(leaves = expectedLeaves)
|
||||
|
||||
var tree = CodexTree.init(leaves = expectedLeaves)
|
||||
check:
|
||||
tree.isOk
|
||||
tree.get().leaves == expectedLeaves.mapIt(it.digestBytes)
|
||||
tree.get().mcodec == sha256
|
||||
|
||||
test "Should build tree from multihash leaves asynchronously":
|
||||
var tp = Taskpool.new(numThreads = 2)
|
||||
defer:
|
||||
tp.shutdown()
|
||||
|
||||
let expectedLeaves = data.mapIt(MultiHash.digest($sha256, it).tryGet())
|
||||
|
||||
let tree = (await CodexTree.init(tp, leaves = expectedLeaves))
|
||||
check:
|
||||
tree.isOk
|
||||
tree.get().leaves == expectedLeaves.mapIt(it.digestBytes)
|
||||
@ -63,6 +80,48 @@ suite "Test CodexTree":
|
||||
tree.get().leaves == expectedLeaves.mapIt(it.mhash.tryGet.digestBytes)
|
||||
tree.get().mcodec == sha256
|
||||
|
||||
test "Should build tree from cid leaves asynchronously":
|
||||
var tp = Taskpool.new(numThreads = 2)
|
||||
defer:
|
||||
tp.shutdown()
|
||||
|
||||
let expectedLeaves = data.mapIt(
|
||||
Cid.init(CidVersion.CIDv1, BlockCodec, MultiHash.digest($sha256, it).tryGet).tryGet
|
||||
)
|
||||
|
||||
let tree = (await CodexTree.init(tp, leaves = expectedLeaves))
|
||||
|
||||
check:
|
||||
tree.isOk
|
||||
tree.get().leaves == expectedLeaves.mapIt(it.mhash.tryGet.digestBytes)
|
||||
tree.get().mcodec == sha256
|
||||
|
||||
test "Should build tree the same tree sync and async":
|
||||
var tp = Taskpool.new(numThreads = 2)
|
||||
defer:
|
||||
tp.shutdown()
|
||||
|
||||
let expectedLeaves = data.mapIt(
|
||||
Cid.init(CidVersion.CIDv1, BlockCodec, MultiHash.digest($sha256, it).tryGet).tryGet
|
||||
)
|
||||
|
||||
let
|
||||
atree = (await CodexTree.init(tp, leaves = expectedLeaves))
|
||||
stree = CodexTree.init(leaves = expectedLeaves)
|
||||
|
||||
check:
|
||||
toSeq(atree.get().nodes) == toSeq(stree.get().nodes)
|
||||
atree.get().root == stree.get().root
|
||||
|
||||
# Single-leaf trees have their root separately computed
|
||||
let
|
||||
atree1 = (await CodexTree.init(tp, leaves = expectedLeaves[0 .. 0]))
|
||||
stree1 = CodexTree.init(leaves = expectedLeaves[0 .. 0])
|
||||
|
||||
check:
|
||||
toSeq(atree.get().nodes) == toSeq(stree.get().nodes)
|
||||
atree.get().root == stree.get().root
|
||||
|
||||
test "Should build from raw digestbytes (should not hash leaves)":
|
||||
let tree = CodexTree.init(sha256, leaves = data).tryGet
|
||||
|
||||
@ -70,6 +129,18 @@ suite "Test CodexTree":
|
||||
tree.mcodec == sha256
|
||||
tree.leaves == data
|
||||
|
||||
test "Should build from raw digestbytes (should not hash leaves) asynchronously":
|
||||
var tp = Taskpool.new(numThreads = 2)
|
||||
defer:
|
||||
tp.shutdown()
|
||||
|
||||
let tree = (await CodexTree.init(tp, sha256, leaves = @data))
|
||||
|
||||
check:
|
||||
tree.isOk
|
||||
tree.get().mcodec == sha256
|
||||
tree.get().leaves == data
|
||||
|
||||
test "Should build from nodes":
|
||||
let
|
||||
tree = CodexTree.init(sha256, leaves = data).tryGet
|
||||
|
||||
@ -1,6 +1,5 @@
|
||||
import std/sequtils
|
||||
|
||||
import pkg/unittest2
|
||||
import pkg/poseidon2
|
||||
import pkg/poseidon2/io
|
||||
import pkg/questionable/results
|
||||
@ -9,9 +8,11 @@ import pkg/stew/byteutils
|
||||
import pkg/stew/arrayops
|
||||
|
||||
import pkg/codex/merkletree
|
||||
import pkg/taskpools
|
||||
|
||||
import ./generictreetests
|
||||
import ./helpers
|
||||
import ../../asynctest
|
||||
|
||||
const data = [
|
||||
"0000000000000000000000000000001".toBytes,
|
||||
@ -36,13 +37,14 @@ suite "Test Poseidon2Tree":
|
||||
check:
|
||||
Poseidon2Tree.init(leaves = newSeq[Poseidon2Hash](0)).isErr
|
||||
|
||||
test "Init tree from poseidon2 leaves":
|
||||
let tree = Poseidon2Tree.init(leaves = expectedLeaves).tryGet
|
||||
test "Build tree from poseidon2 leaves":
|
||||
var taskpool = Taskpool.new(numThreads = 2)
|
||||
let tree = (await Poseidon2Tree.init(taskpool, leaves = expectedLeaves)).tryGet()
|
||||
|
||||
check:
|
||||
tree.leaves == expectedLeaves
|
||||
|
||||
test "Init tree from byte leaves":
|
||||
test "Build tree from byte leaves":
|
||||
let tree = Poseidon2Tree.init(
|
||||
leaves = expectedLeaves.mapIt(array[31, byte].initCopyFrom(it.toBytes))
|
||||
).tryGet
|
||||
@ -50,7 +52,7 @@ suite "Test Poseidon2Tree":
|
||||
check:
|
||||
tree.leaves == expectedLeaves
|
||||
|
||||
test "Should build from nodes":
|
||||
test "Build tree from nodes":
|
||||
let
|
||||
tree = Poseidon2Tree.init(leaves = expectedLeaves).tryGet
|
||||
fromNodes = Poseidon2Tree.fromNodes(
|
||||
@ -60,6 +62,29 @@ suite "Test Poseidon2Tree":
|
||||
check:
|
||||
tree == fromNodes
|
||||
|
||||
test "Build poseidon2 tree from poseidon2 leaves asynchronously":
|
||||
var tp = Taskpool.new()
|
||||
defer:
|
||||
tp.shutdown()
|
||||
|
||||
let tree = (await Poseidon2Tree.init(tp, leaves = expectedLeaves)).tryGet()
|
||||
check:
|
||||
tree.leaves == expectedLeaves
|
||||
|
||||
test "Build poseidon2 tree from byte leaves asynchronously":
|
||||
var tp = Taskpool.new()
|
||||
defer:
|
||||
tp.shutdown()
|
||||
|
||||
let tree = (
|
||||
await Poseidon2Tree.init(
|
||||
tp, leaves = expectedLeaves.mapIt(array[31, byte].initCopyFrom(it.toBytes))
|
||||
)
|
||||
).tryGet()
|
||||
|
||||
check:
|
||||
tree.leaves == expectedLeaves
|
||||
|
||||
let
|
||||
compressor = proc(
|
||||
x, y: Poseidon2Hash, key: PoseidonKeysEnum
|
||||
|
||||
@ -56,12 +56,13 @@ asyncchecksuite "Test Node - Host contracts":
|
||||
verifiable: Manifest
|
||||
verifiableBlock: bt.Block
|
||||
protected: Manifest
|
||||
taskPool: Taskpool
|
||||
|
||||
setup:
|
||||
# Setup Host Contracts and dependencies
|
||||
market = MockMarket.new()
|
||||
sales = Sales.new(market, clock, localStore)
|
||||
|
||||
taskPool = Taskpool.new()
|
||||
node.contracts = (
|
||||
none ClientInteractions,
|
||||
some HostInteractions.new(clock, sales),
|
||||
@ -75,20 +76,23 @@ asyncchecksuite "Test Node - Host contracts":
|
||||
let
|
||||
manifestBlock =
|
||||
bt.Block.new(manifest.encode().tryGet(), codec = ManifestCodec).tryGet()
|
||||
erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider, Taskpool.new)
|
||||
erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider, taskPool)
|
||||
|
||||
manifestCid = manifestBlock.cid
|
||||
|
||||
(await localStore.putBlock(manifestBlock)).tryGet()
|
||||
|
||||
protected = (await erasure.encode(manifest, 3, 2)).tryGet()
|
||||
builder = Poseidon2Builder.new(localStore, protected).tryGet()
|
||||
builder = Poseidon2Builder.new(localStore, protected, taskPool).tryGet()
|
||||
verifiable = (await builder.buildManifest()).tryGet()
|
||||
verifiableBlock =
|
||||
bt.Block.new(verifiable.encode().tryGet(), codec = ManifestCodec).tryGet()
|
||||
|
||||
(await localStore.putBlock(verifiableBlock)).tryGet()
|
||||
|
||||
teardown:
|
||||
taskPool.shutdown()
|
||||
|
||||
test "onExpiryUpdate callback is set":
|
||||
check sales.onExpiryUpdate.isSome
|
||||
|
||||
|
||||
@ -47,10 +47,15 @@ privateAccess(CodexNodeRef) # enable access to private fields
|
||||
|
||||
asyncchecksuite "Test Node - Basic":
|
||||
setupAndTearDown()
|
||||
var taskPool: Taskpool
|
||||
|
||||
setup:
|
||||
taskPool = Taskpool.new()
|
||||
await node.start()
|
||||
|
||||
teardown:
|
||||
taskPool.shutdown()
|
||||
|
||||
test "Fetch Manifest":
|
||||
let
|
||||
manifest = await storeDataGetManifest(localStore, chunker)
|
||||
@ -173,14 +178,15 @@ asyncchecksuite "Test Node - Basic":
|
||||
check string.fromBytes(data) == testString
|
||||
|
||||
test "Setup purchase request":
|
||||
echo "Here the tedt"
|
||||
let
|
||||
erasure =
|
||||
Erasure.new(store, leoEncoderProvider, leoDecoderProvider, Taskpool.new())
|
||||
erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider, taskPool)
|
||||
manifest = await storeDataGetManifest(localStore, chunker)
|
||||
manifestBlock =
|
||||
bt.Block.new(manifest.encode().tryGet(), codec = ManifestCodec).tryGet()
|
||||
protected = (await erasure.encode(manifest, 3, 2)).tryGet()
|
||||
builder = Poseidon2Builder.new(localStore, protected).tryGet()
|
||||
let
|
||||
builder = Poseidon2Builder.new(localStore, protected, taskPool).tryGet()
|
||||
verifiable = (await builder.buildManifest()).tryGet()
|
||||
verifiableBlock =
|
||||
bt.Block.new(verifiable.encode().tryGet(), codec = ManifestCodec).tryGet()
|
||||
|
||||
@ -13,6 +13,7 @@ import pkg/codex/contracts
|
||||
import pkg/codex/slots
|
||||
import pkg/codex/manifest
|
||||
import pkg/codex/erasure
|
||||
import pkg/taskpools
|
||||
import pkg/codex/blocktype as bt
|
||||
import pkg/chronos/transports/stream
|
||||
|
||||
@ -101,7 +102,7 @@ asyncchecksuite "Test Node - Slot Repair":
|
||||
(await localStore.putBlock(manifestBlock)).tryGet()
|
||||
|
||||
protected = (await erasure.encode(manifest, ecK, ecM)).tryGet()
|
||||
builder = Poseidon2Builder.new(localStore, protected).tryGet()
|
||||
builder = Poseidon2Builder.new(localStore, protected, cluster.taskpool).tryGet()
|
||||
verifiable = (await builder.buildManifest()).tryGet()
|
||||
verifiableBlock =
|
||||
bt.Block.new(verifiable.encode().tryGet(), codec = ManifestCodec).tryGet()
|
||||
@ -119,6 +120,7 @@ asyncchecksuite "Test Node - Slot Repair":
|
||||
await nodes[1].switch.stop() # slot 0 missing now
|
||||
|
||||
# repair missing slot
|
||||
|
||||
(await nodes[4].onStore(request, expiry, 0.uint64, nil, isRepairing = true)).tryGet()
|
||||
|
||||
await nodes[2].switch.stop() # slot 1 missing now
|
||||
@ -132,16 +134,19 @@ asyncchecksuite "Test Node - Slot Repair":
|
||||
await nodes[4].switch.stop() # slot 0 missing now
|
||||
|
||||
# repair missing slot from repaired slots
|
||||
|
||||
(await nodes[7].onStore(request, expiry, 0.uint64, nil, isRepairing = true)).tryGet()
|
||||
|
||||
await nodes[5].switch.stop() # slot 1 missing now
|
||||
|
||||
# repair missing slot from repaired slots
|
||||
|
||||
(await nodes[8].onStore(request, expiry, 1.uint64, nil, isRepairing = true)).tryGet()
|
||||
|
||||
await nodes[6].switch.stop() # slot 2 missing now
|
||||
|
||||
# repair missing slot from repaired slots
|
||||
|
||||
(await nodes[9].onStore(request, expiry, 2.uint64, nil, isRepairing = true)).tryGet()
|
||||
|
||||
let
|
||||
@ -180,7 +185,7 @@ asyncchecksuite "Test Node - Slot Repair":
|
||||
(await localStore.putBlock(manifestBlock)).tryGet()
|
||||
|
||||
protected = (await erasure.encode(manifest, ecK, ecM)).tryGet()
|
||||
builder = Poseidon2Builder.new(localStore, protected).tryGet()
|
||||
builder = Poseidon2Builder.new(localStore, protected, cluster.taskpool).tryGet()
|
||||
verifiable = (await builder.buildManifest()).tryGet()
|
||||
verifiableBlock =
|
||||
bt.Block.new(verifiable.encode().tryGet(), codec = ManifestCodec).tryGet()
|
||||
@ -199,19 +204,24 @@ asyncchecksuite "Test Node - Slot Repair":
|
||||
await nodes[3].switch.stop() # slot 2 missing now
|
||||
|
||||
# repair missing slots
|
||||
|
||||
(await nodes[6].onStore(request, expiry, 0.uint64, nil, isRepairing = true)).tryGet()
|
||||
|
||||
(await nodes[7].onStore(request, expiry, 2.uint64, nil, isRepairing = true)).tryGet()
|
||||
|
||||
await nodes[2].switch.stop() # slot 1 missing now
|
||||
await nodes[4].switch.stop() # slot 3 missing now
|
||||
|
||||
# repair missing slots from repaired slots
|
||||
|
||||
(await nodes[8].onStore(request, expiry, 1.uint64, nil, isRepairing = true)).tryGet()
|
||||
|
||||
(await nodes[9].onStore(request, expiry, 3.uint64, nil, isRepairing = true)).tryGet()
|
||||
|
||||
await nodes[5].switch.stop() # slot 4 missing now
|
||||
|
||||
# repair missing slot from repaired slots
|
||||
|
||||
(await nodes[10].onStore(request, expiry, 4.uint64, nil, isRepairing = true)).tryGet()
|
||||
|
||||
let
|
||||
|
||||
@ -3,6 +3,7 @@ import std/options
|
||||
import ../../../asynctest
|
||||
|
||||
import pkg/chronos
|
||||
import pkg/taskpools
|
||||
import pkg/poseidon2
|
||||
import pkg/serde/json
|
||||
|
||||
@ -77,6 +78,7 @@ suite "Test Circom Compat Backend":
|
||||
challenge: array[32, byte]
|
||||
builder: Poseidon2Builder
|
||||
sampler: Poseidon2Sampler
|
||||
taskPool: Taskpool
|
||||
|
||||
setup:
|
||||
let
|
||||
@ -85,11 +87,13 @@ suite "Test Circom Compat Backend":
|
||||
|
||||
store = RepoStore.new(repoDs, metaDs)
|
||||
|
||||
taskPool = Taskpool.new()
|
||||
|
||||
(manifest, protected, verifiable) = await createVerifiableManifest(
|
||||
store, numDatasetBlocks, ecK, ecM, blockSize, cellSize
|
||||
store, numDatasetBlocks, ecK, ecM, blockSize, cellSize, taskPool
|
||||
)
|
||||
|
||||
builder = Poseidon2Builder.new(store, verifiable).tryGet
|
||||
builder = Poseidon2Builder.new(store, verifiable, taskPool).tryGet
|
||||
sampler = Poseidon2Sampler.new(slotId, store, builder).tryGet
|
||||
|
||||
circom = CircomCompat.init(r1cs, wasm, zkey)
|
||||
@ -101,6 +105,7 @@ suite "Test Circom Compat Backend":
|
||||
circom.release() # this comes from the rust FFI
|
||||
await repoTmp.destroyDb()
|
||||
await metaTmp.destroyDb()
|
||||
taskPool.shutdown()
|
||||
|
||||
test "Should verify with correct input":
|
||||
var proof = circom.prove(proofInputs).tryGet
|
||||
|
||||
@ -12,6 +12,7 @@ import pkg/codex/chunker
|
||||
import pkg/codex/indexingstrategy
|
||||
import pkg/codex/slots
|
||||
import pkg/codex/rng
|
||||
import pkg/taskpools
|
||||
|
||||
import ../helpers
|
||||
|
||||
@ -145,6 +146,7 @@ proc createVerifiableManifest*(
|
||||
ecM: int,
|
||||
blockSize: NBytes,
|
||||
cellSize: NBytes,
|
||||
taskPool: Taskpool,
|
||||
): Future[tuple[manifest: Manifest, protected: Manifest, verifiable: Manifest]] {.
|
||||
async
|
||||
.} =
|
||||
@ -165,7 +167,9 @@ proc createVerifiableManifest*(
|
||||
totalDatasetSize,
|
||||
)
|
||||
|
||||
builder = Poseidon2Builder.new(store, protectedManifest, cellSize = cellSize).tryGet
|
||||
builder = Poseidon2Builder.new(
|
||||
store, protectedManifest, cellSize = cellSize, taskPool = taskPool
|
||||
).tryGet
|
||||
verifiableManifest = (await builder.buildManifest()).tryGet
|
||||
|
||||
# build the slots and manifest
|
||||
|
||||
@ -5,6 +5,7 @@ import ../../../asynctest
|
||||
|
||||
import pkg/questionable/results
|
||||
|
||||
import pkg/taskpools
|
||||
import pkg/codex/stores
|
||||
import pkg/codex/merkletree
|
||||
import pkg/codex/utils/json
|
||||
@ -26,11 +27,16 @@ suite "Test Sampler - control samples":
|
||||
inputData: string
|
||||
inputJson: JsonNode
|
||||
proofInput: ProofInputs[Poseidon2Hash]
|
||||
taskpool: Taskpool
|
||||
|
||||
setup:
|
||||
inputData = readFile("tests/circuits/fixtures/input.json")
|
||||
inputJson = !JsonNode.parse(inputData)
|
||||
proofInput = Poseidon2Hash.jsonToProofInput(inputJson)
|
||||
taskpool = Taskpool.new()
|
||||
|
||||
teardown:
|
||||
taskpool.shutdown()
|
||||
|
||||
test "Should verify control samples":
|
||||
let
|
||||
@ -87,25 +93,29 @@ suite "Test Sampler":
|
||||
manifest: Manifest
|
||||
protected: Manifest
|
||||
verifiable: Manifest
|
||||
taskpool: Taskpool
|
||||
|
||||
setup:
|
||||
let
|
||||
repoDs = repoTmp.newDb()
|
||||
metaDs = metaTmp.newDb()
|
||||
|
||||
taskpool = Taskpool.new()
|
||||
|
||||
store = RepoStore.new(repoDs, metaDs)
|
||||
|
||||
(manifest, protected, verifiable) = await createVerifiableManifest(
|
||||
store, datasetBlocks, ecK, ecM, blockSize, cellSize
|
||||
store, datasetBlocks, ecK, ecM, blockSize, cellSize, taskpool
|
||||
)
|
||||
|
||||
# create sampler
|
||||
builder = Poseidon2Builder.new(store, verifiable).tryGet
|
||||
builder = Poseidon2Builder.new(store, verifiable, taskpool).tryGet
|
||||
|
||||
teardown:
|
||||
await store.close()
|
||||
await repoTmp.destroyDb()
|
||||
await metaTmp.destroyDb()
|
||||
taskpool.shutdown()
|
||||
|
||||
test "Should fail instantiating for invalid slot index":
|
||||
let sampler = Poseidon2Sampler.new(builder.slotRoots.len, store, builder)
|
||||
@ -114,7 +124,7 @@ suite "Test Sampler":
|
||||
|
||||
test "Should fail instantiating for non verifiable builder":
|
||||
let
|
||||
nonVerifiableBuilder = Poseidon2Builder.new(store, protected).tryGet
|
||||
nonVerifiableBuilder = Poseidon2Builder.new(store, protected, taskpool).tryGet
|
||||
sampler = Poseidon2Sampler.new(slotIndex, store, nonVerifiableBuilder)
|
||||
|
||||
check sampler.isErr
|
||||
|
||||
@ -4,6 +4,7 @@ import pkg/chronos
|
||||
import pkg/libp2p/cid
|
||||
|
||||
import pkg/codex/merkletree
|
||||
import pkg/taskpools
|
||||
import pkg/codex/chunker
|
||||
import pkg/codex/blocktype as bt
|
||||
import pkg/codex/slots
|
||||
@ -29,6 +30,7 @@ suite "Test Prover":
|
||||
var
|
||||
store: BlockStore
|
||||
prover: Prover
|
||||
taskPool: Taskpool
|
||||
|
||||
setup:
|
||||
let
|
||||
@ -45,13 +47,14 @@ suite "Test Prover":
|
||||
numProofSamples: samples,
|
||||
)
|
||||
backend = config.initializeBackend().tryGet()
|
||||
|
||||
taskPool = Taskpool.new()
|
||||
store = RepoStore.new(repoDs, metaDs)
|
||||
prover = Prover.new(store, backend, config.numProofSamples)
|
||||
prover = Prover.new(store, backend, config.numProofSamples, taskPool)
|
||||
|
||||
teardown:
|
||||
await repoTmp.destroyDb()
|
||||
await metaTmp.destroyDb()
|
||||
taskPool.shutdown()
|
||||
|
||||
test "Should sample and prove a slot":
|
||||
let (_, _, verifiable) = await createVerifiableManifest(
|
||||
@ -61,6 +64,7 @@ suite "Test Prover":
|
||||
3, # ecM
|
||||
blockSize,
|
||||
cellSize,
|
||||
taskPool,
|
||||
)
|
||||
|
||||
let (inputs, proof) = (await prover.prove(1, verifiable, challenge)).tryGet
|
||||
@ -80,6 +84,7 @@ suite "Test Prover":
|
||||
1, # ecM
|
||||
blockSize,
|
||||
cellSize,
|
||||
taskPool,
|
||||
)
|
||||
|
||||
let (inputs, proof) = (await prover.prove(1, verifiable, challenge)).tryGet
|
||||
|
||||
@ -15,6 +15,7 @@ import pkg/codex/utils
|
||||
import pkg/codex/utils/digest
|
||||
import pkg/poseidon2
|
||||
import pkg/poseidon2/io
|
||||
import pkg/taskpools
|
||||
|
||||
import ./helpers
|
||||
import ../helpers
|
||||
@ -72,12 +73,13 @@ suite "Slot builder":
|
||||
protectedManifest: Manifest
|
||||
builder: Poseidon2Builder
|
||||
chunker: Chunker
|
||||
taskPool: Taskpool
|
||||
|
||||
setup:
|
||||
let
|
||||
repoDs = repoTmp.newDb()
|
||||
metaDs = metaTmp.newDb()
|
||||
|
||||
taskPool = Taskpool.new()
|
||||
localStore = RepoStore.new(repoDs, metaDs)
|
||||
chunker =
|
||||
RandomChunker.new(Rng.instance(), size = totalDatasetSize, chunkSize = blockSize)
|
||||
@ -92,6 +94,7 @@ suite "Slot builder":
|
||||
await localStore.close()
|
||||
await repoTmp.destroyDb()
|
||||
await metaTmp.destroyDb()
|
||||
taskPool.shutdown()
|
||||
|
||||
# TODO: THIS IS A BUG IN asynctest, because it doesn't release the
|
||||
# objects after the test is done, so we need to do it manually
|
||||
@ -113,8 +116,9 @@ suite "Slot builder":
|
||||
)
|
||||
|
||||
check:
|
||||
Poseidon2Builder.new(localStore, unprotectedManifest, cellSize = cellSize).error.msg ==
|
||||
"Manifest is not protected."
|
||||
Poseidon2Builder.new(
|
||||
localStore, unprotectedManifest, taskPool, cellSize = cellSize
|
||||
).error.msg == "Manifest is not protected."
|
||||
|
||||
test "Number of blocks must be devisable by number of slots":
|
||||
let mismatchManifest = Manifest.new(
|
||||
@ -131,7 +135,7 @@ suite "Slot builder":
|
||||
)
|
||||
|
||||
check:
|
||||
Poseidon2Builder.new(localStore, mismatchManifest, cellSize = cellSize).error.msg ==
|
||||
Poseidon2Builder.new(localStore, mismatchManifest, taskPool, cellSize = cellSize).error.msg ==
|
||||
"Number of blocks must be divisible by number of slots."
|
||||
|
||||
test "Block size must be divisable by cell size":
|
||||
@ -149,12 +153,13 @@ suite "Slot builder":
|
||||
)
|
||||
|
||||
check:
|
||||
Poseidon2Builder.new(localStore, mismatchManifest, cellSize = cellSize).error.msg ==
|
||||
Poseidon2Builder.new(localStore, mismatchManifest, taskPool, cellSize = cellSize).error.msg ==
|
||||
"Block size must be divisible by cell size."
|
||||
|
||||
test "Should build correct slot builder":
|
||||
builder =
|
||||
Poseidon2Builder.new(localStore, protectedManifest, cellSize = cellSize).tryGet()
|
||||
builder = Poseidon2Builder
|
||||
.new(localStore, protectedManifest, taskPool, cellSize = cellSize)
|
||||
.tryGet()
|
||||
|
||||
check:
|
||||
builder.cellSize == cellSize
|
||||
@ -171,7 +176,7 @@ suite "Slot builder":
|
||||
)
|
||||
|
||||
builder = Poseidon2Builder
|
||||
.new(localStore, protectedManifest, cellSize = cellSize)
|
||||
.new(localStore, protectedManifest, taskPool, cellSize = cellSize)
|
||||
.tryGet()
|
||||
|
||||
for i in 0 ..< numSlots:
|
||||
@ -196,7 +201,7 @@ suite "Slot builder":
|
||||
)
|
||||
|
||||
builder = Poseidon2Builder
|
||||
.new(localStore, protectedManifest, cellSize = cellSize)
|
||||
.new(localStore, protectedManifest, taskPool, cellSize = cellSize)
|
||||
.tryGet()
|
||||
|
||||
for i in 0 ..< numSlots:
|
||||
@ -215,8 +220,9 @@ suite "Slot builder":
|
||||
slotTree.root().tryGet() == expectedRoot
|
||||
|
||||
test "Should persist trees for all slots":
|
||||
let builder =
|
||||
Poseidon2Builder.new(localStore, protectedManifest, cellSize = cellSize).tryGet()
|
||||
let builder = Poseidon2Builder
|
||||
.new(localStore, protectedManifest, taskPool, cellSize = cellSize)
|
||||
.tryGet()
|
||||
|
||||
for i in 0 ..< numSlots:
|
||||
let
|
||||
@ -242,7 +248,7 @@ suite "Slot builder":
|
||||
0, protectedManifest.blocksCount - 1, numSlots, numSlots, numPadSlotBlocks
|
||||
)
|
||||
builder = Poseidon2Builder
|
||||
.new(localStore, protectedManifest, cellSize = cellSize)
|
||||
.new(localStore, protectedManifest, taskPool, cellSize = cellSize)
|
||||
.tryGet()
|
||||
|
||||
(await builder.buildSlots()).tryGet
|
||||
@ -270,7 +276,7 @@ suite "Slot builder":
|
||||
0, protectedManifest.blocksCount - 1, numSlots, numSlots, numPadSlotBlocks
|
||||
)
|
||||
builder = Poseidon2Builder
|
||||
.new(localStore, protectedManifest, cellSize = cellSize)
|
||||
.new(localStore, protectedManifest, taskPool, cellSize = cellSize)
|
||||
.tryGet()
|
||||
|
||||
slotsHashes = collect(newSeq):
|
||||
@ -296,45 +302,53 @@ suite "Slot builder":
|
||||
test "Should not build from verifiable manifest with 0 slots":
|
||||
var
|
||||
builder = Poseidon2Builder
|
||||
.new(localStore, protectedManifest, cellSize = cellSize)
|
||||
.new(localStore, protectedManifest, taskPool, cellSize = cellSize)
|
||||
.tryGet()
|
||||
verifyManifest = (await builder.buildManifest()).tryGet()
|
||||
|
||||
verifyManifest.slotRoots = @[]
|
||||
check Poseidon2Builder.new(localStore, verifyManifest, cellSize = cellSize).isErr
|
||||
check Poseidon2Builder.new(
|
||||
localStore, verifyManifest, taskPool, cellSize = cellSize
|
||||
).isErr
|
||||
|
||||
test "Should not build from verifiable manifest with incorrect number of slots":
|
||||
var
|
||||
builder = Poseidon2Builder
|
||||
.new(localStore, protectedManifest, cellSize = cellSize)
|
||||
.new(localStore, protectedManifest, taskPool, cellSize = cellSize)
|
||||
.tryGet()
|
||||
|
||||
verifyManifest = (await builder.buildManifest()).tryGet()
|
||||
|
||||
verifyManifest.slotRoots.del(verifyManifest.slotRoots.len - 1)
|
||||
|
||||
check Poseidon2Builder.new(localStore, verifyManifest, cellSize = cellSize).isErr
|
||||
check Poseidon2Builder.new(
|
||||
localStore, verifyManifest, taskPool, cellSize = cellSize
|
||||
).isErr
|
||||
|
||||
test "Should not build from verifiable manifest with invalid verify root":
|
||||
let builder =
|
||||
Poseidon2Builder.new(localStore, protectedManifest, cellSize = cellSize).tryGet()
|
||||
let builder = Poseidon2Builder
|
||||
.new(localStore, protectedManifest, taskPool, cellSize = cellSize)
|
||||
.tryGet()
|
||||
|
||||
var verifyManifest = (await builder.buildManifest()).tryGet()
|
||||
|
||||
rng.shuffle(Rng.instance, verifyManifest.verifyRoot.data.buffer)
|
||||
|
||||
check Poseidon2Builder.new(localStore, verifyManifest, cellSize = cellSize).isErr
|
||||
check Poseidon2Builder.new(
|
||||
localStore, verifyManifest, taskPool, cellSize = cellSize
|
||||
).isErr
|
||||
|
||||
test "Should build from verifiable manifest":
|
||||
let
|
||||
builder = Poseidon2Builder
|
||||
.new(localStore, protectedManifest, cellSize = cellSize)
|
||||
.new(localStore, protectedManifest, taskPool, cellSize = cellSize)
|
||||
.tryGet()
|
||||
|
||||
verifyManifest = (await builder.buildManifest()).tryGet()
|
||||
|
||||
verificationBuilder =
|
||||
Poseidon2Builder.new(localStore, verifyManifest, cellSize = cellSize).tryGet()
|
||||
verificationBuilder = Poseidon2Builder
|
||||
.new(localStore, verifyManifest, taskPool, cellSize = cellSize)
|
||||
.tryGet()
|
||||
|
||||
check:
|
||||
builder.slotRoots == verificationBuilder.slotRoots
|
||||
|
||||
40
tests/codex/utils/testPoseidon.nim
Normal file
40
tests/codex/utils/testPoseidon.nim
Normal file
@ -0,0 +1,40 @@
|
||||
{.push raises: [].}
|
||||
|
||||
import std/[times, strformat, random]
|
||||
import pkg/questionable/results
|
||||
|
||||
import pkg/codex/merkletree/poseidon2
|
||||
|
||||
import pkg/codex/utils/poseidon2digest
|
||||
import ../../asynctest
|
||||
|
||||
test "Test poseidon2 digestTree":
|
||||
randomize(42)
|
||||
const
|
||||
dataSize = 64 * 1024 # 64KB
|
||||
chunkSize = 2 * 1024 # 2KB
|
||||
iterations = 10 # Number of iterations
|
||||
|
||||
echo &"Benchmarking digestTree with data size: {dataSize} bytes, chunk size: {chunkSize} bytes"
|
||||
|
||||
# Generate random data
|
||||
var data = newSeq[byte](dataSize)
|
||||
for i in 0 ..< dataSize:
|
||||
data[i] = byte(rand(255))
|
||||
|
||||
# Actual benchmark
|
||||
let startTime = cpuTime()
|
||||
|
||||
for i in 1 .. iterations:
|
||||
let treeResult = Poseidon2Tree.digestTree(data, chunkSize).tryGet()
|
||||
|
||||
# Optionally print info about each iteration
|
||||
|
||||
let endTime = cpuTime()
|
||||
let totalTime = endTime - startTime
|
||||
let avgTime = totalTime / iterations.float
|
||||
|
||||
echo &"Results:"
|
||||
echo &" Total time for {iterations} iterations: {totalTime:.6f} seconds"
|
||||
echo &" Average time per iteration: {avgTime:.6f} seconds"
|
||||
echo &" Iterations per second: {iterations.float / totalTime:.2f}"
|
||||
2
vendor/nim-taskpools
vendored
2
vendor/nim-taskpools
vendored
@ -1 +1 @@
|
||||
Subproject commit 4acdc6ef005a93dba09f902ed75197548cf7b451
|
||||
Subproject commit 97f76faef6ba64bc77d9808c27ec5e9917e7cfde
|
||||
Loading…
x
Reference in New Issue
Block a user