wip: asyncify the tree building

This commit is contained in:
benbierens 2024-10-28 15:35:40 +01:00
parent 2fb7031ec6
commit cad74cc750
No known key found for this signature in database
GPG Key ID: 877D2C2E09A22F3A
7 changed files with 116 additions and 68 deletions

View File

@ -322,7 +322,7 @@ proc encodeData(
return failure("Unable to store block!") return failure("Unable to store block!")
idx.inc(params.steps) idx.inc(params.steps)
without tree =? CodexTree.init(cids[]), err: without tree =? (await CodexTree.init(cids[])), err:
return failure(err) return failure(err)
without treeCid =? tree.rootCid, err: without treeCid =? tree.rootCid, err:
@ -441,7 +441,7 @@ proc decode*(
finally: finally:
decoder.release() decoder.release()
without tree =? CodexTree.init(cids[0..<encoded.originalBlocksCount]), err: without tree =? (await CodexTree.init(cids[0..<encoded.originalBlocksCount])), err:
return failure(err) return failure(err)
without treeCid =? tree.rootCid, err: without treeCid =? tree.rootCid, err:

View File

@ -145,16 +145,18 @@ func compress*(
mhash.coder(@x & @y & @[ key.byte ], digest) mhash.coder(@x & @y & @[ key.byte ], digest)
success digest success digest
func init*( proc init*(
_: type CodexTree, _: type CodexTree,
mcodec: MultiCodec = Sha256HashCodec, mcodec: MultiCodec = Sha256HashCodec,
leaves: openArray[ByteHash]): ?!CodexTree = leaves: seq[ByteHash]): Future[?!CodexTree] {.async.} =
if leaves.len == 0: if leaves.len == 0:
return failure "Empty leaves" return failure "Empty leaves"
without mhash =? mcodec.mhash(), error:
return failure error
let let
mhash = ? mcodec.mhash()
compressor = proc(x, y: seq[byte], key: ByteTreeKey): ?!ByteHash {.noSideEffect.} = compressor = proc(x, y: seq[byte], key: ByteTreeKey): ?!ByteHash {.noSideEffect.} =
compress(x, y, key, mhash) compress(x, y, key, mhash)
Zero: ByteHash = newSeq[byte](mhash.size) Zero: ByteHash = newSeq[byte](mhash.size)
@ -165,33 +167,42 @@ func init*(
var var
self = CodexTree(mcodec: mcodec, compress: compressor, zero: Zero) self = CodexTree(mcodec: mcodec, compress: compressor, zero: Zero)
self.layers = ? merkleTreeWorker(self, leaves, isBottomLayer = true) without layers =? (await merkleTreeWorker(self, leaves, isBottomLayer = true)), error:
return failure error
self.layers = layers
success self success self
func init*( proc init*(
_: type CodexTree, _: type CodexTree,
leaves: openArray[MultiHash]): ?!CodexTree = leaves: seq[MultiHash]): Future[?!CodexTree] {.async.} =
if leaves.len == 0: if leaves.len == 0:
return failure "Empty leaves" return failure "Empty leaves"
let let
mcodec = leaves[0].mcodec mcodec = leaves[0].mcodec
leaves = leaves.mapIt( it.digestBytes ) leafBytes = leaves.mapIt( it.digestBytes )
CodexTree.init(mcodec, leaves) await CodexTree.init(mcodec, leafBytes)
func init*( proc init*(
_: type CodexTree, _: type CodexTree,
leaves: openArray[Cid]): ?!CodexTree = leaves: seq[Cid]): Future[?!CodexTree] {.async.} =
if leaves.len == 0: if leaves.len == 0:
return failure "Empty leaves" return failure "Empty leaves"
let without mhash =? leaves[0].mhash.mapFailure, error:
mcodec = (? leaves[0].mhash.mapFailure).mcodec return failure error
leaves = leaves.mapIt( (? it.mhash.mapFailure).digestBytes )
CodexTree.init(mcodec, leaves) var hashes = newSeq[seq[byte]]()
for leaf in leaves:
without hash =? leaf.mhash.mapFailure, error:
return failure error
hashes.add(hash.digestBytes)
await CodexTree.init(mhash.mcodec, hashes)
proc fromNodes*( proc fromNodes*(
_: type CodexTree, _: type CodexTree,

View File

@ -11,6 +11,7 @@
import std/bitops import std/bitops
import pkg/chronos
import pkg/questionable/results import pkg/questionable/results
import ../errors import ../errors
@ -120,10 +121,11 @@ func reconstructRoot*[H, K](proof: MerkleProof[H, K], leaf: H): ?!H =
func verify*[H, K](proof: MerkleProof[H, K], leaf: H, root: H): ?!bool = func verify*[H, K](proof: MerkleProof[H, K], leaf: H, root: H): ?!bool =
success bool(root == ? proof.reconstructRoot(leaf)) success bool(root == ? proof.reconstructRoot(leaf))
func merkleTreeWorker*[H, K]( # TODO: replace with implementation in poseidon library plz
proc merkleTreeWorker*[H, K](
self: MerkleTree[H, K], self: MerkleTree[H, K],
xs: openArray[H], xs: seq[H],
isBottomLayer: static bool): ?!seq[seq[H]] = isBottomLayer: static bool): Future[?!seq[seq[H]]] {.async.} =
let a = low(xs) let a = low(xs)
let b = high(xs) let b = high(xs)
@ -145,9 +147,17 @@ func merkleTreeWorker*[H, K](
for i in 0..<halfn: for i in 0..<halfn:
const key = when isBottomLayer: K.KeyBottomLayer else: K.KeyNone const key = when isBottomLayer: K.KeyBottomLayer else: K.KeyNone
ys[i] = ? self.compress( xs[a + 2 * i], xs[a + 2 * i + 1], key = key ) without y =? self.compress( xs[a + 2 * i], xs[a + 2 * i + 1], key = key ), err:
return failure err
ys[i] = y
# yield?
await sleepAsync(1.millis) # cooperative scheduling (may not be necessary)
if isOdd: if isOdd:
const key = when isBottomLayer: K.KeyOddAndBottomLayer else: K.KeyOdd const key = when isBottomLayer: K.KeyOddAndBottomLayer else: K.KeyOdd
ys[halfn] = ? self.compress( xs[n], self.zero, key = key ) without y =? self.compress( xs[n], self.zero, key = key ), err:
return failure err
ys[halfn] = y
success @[ @xs ] & ? self.merkleTreeWorker(ys, isBottomLayer = false) without v =? (await self.merkleTreeWorker(ys, isBottomLayer = false)), err:
return failure err
success @[ @xs ] & v

View File

@ -11,6 +11,7 @@
import std/sequtils import std/sequtils
import pkg/chronos
import pkg/poseidon2 import pkg/poseidon2
import pkg/constantine/math/io/io_fields import pkg/constantine/math/io/io_fields
import pkg/constantine/platforms/abstractions import pkg/constantine/platforms/abstractions
@ -67,9 +68,9 @@ converter toKey*(key: PoseidonKeysEnum): Poseidon2Hash =
of KeyOdd: KeyOddF of KeyOdd: KeyOddF
of KeyOddAndBottomLayer: KeyOddAndBottomLayerF of KeyOddAndBottomLayer: KeyOddAndBottomLayerF
func init*( proc init*(
_: type Poseidon2Tree, _: type Poseidon2Tree,
leaves: openArray[Poseidon2Hash]): ?!Poseidon2Tree = leaves: seq[Poseidon2Hash]): Future[?!Poseidon2Tree] {.async.} =
if leaves.len == 0: if leaves.len == 0:
return failure "Empty leaves" return failure "Empty leaves"
@ -83,12 +84,14 @@ func init*(
var var
self = Poseidon2Tree(compress: compressor, zero: Poseidon2Zero) self = Poseidon2Tree(compress: compressor, zero: Poseidon2Zero)
self.layers = ? merkleTreeWorker(self, leaves, isBottomLayer = true) without l =? (await merkleTreeWorker(self, leaves, isBottomLayer = true)), err:
return failure err
self.layers = l
success self success self
func init*( proc init*(
_: type Poseidon2Tree, _: type Poseidon2Tree,
leaves: openArray[array[31, byte]]): ?!Poseidon2Tree = leaves: seq[array[31, byte]]): Future[?!Poseidon2Tree] {.async.} =
Poseidon2Tree.init( Poseidon2Tree.init(
leaves.mapIt( Poseidon2Hash.fromBytes(it) )) leaves.mapIt( Poseidon2Hash.fromBytes(it) ))

View File

@ -339,7 +339,7 @@ proc store*(
finally: finally:
await stream.close() await stream.close()
without tree =? CodexTree.init(cids), err: without tree =? (await CodexTree.init(cids)), err:
return failure(err) return failure(err)
without treeCid =? tree.rootCid(CIDv1, dataCodec), err: without treeCid =? tree.rootCid(CIDv1, dataCodec), err:
@ -446,19 +446,23 @@ proc setupRequest(
self.taskpool) self.taskpool)
without encoded =? (await erasure.encode(manifest, ecK, ecM)), error: without encoded =? (await erasure.encode(manifest, ecK, ecM)), error:
trace "Unable to erasure code dataset" error "Unable to erasure code dataset"
return failure(error) return failure(error)
without builder =? Poseidon2Builder.new(self.networkStore.localStore, encoded), err: without builder =? Poseidon2Builder.new(self.networkStore.localStore, encoded), err:
trace "Unable to create slot builder" error "Unable to create slot builder"
return failure(err)
if err =? (await builder.init()).errorOption:
error "Failed to initialize slot builder"
return failure(err) return failure(err)
without verifiable =? (await builder.buildManifest()), err: without verifiable =? (await builder.buildManifest()), err:
trace "Unable to build verifiable manifest" error "Unable to build verifiable manifest"
return failure(err) return failure(err)
without manifestBlk =? await self.storeManifest(verifiable), err: without manifestBlk =? await self.storeManifest(verifiable), err:
trace "Unable to store verifiable manifest" error "Unable to store verifiable manifest"
return failure(err) return failure(err)
let let
@ -550,17 +554,21 @@ proc onStore(
trace "Received a request to store a slot" trace "Received a request to store a slot"
without cid =? Cid.init(request.content.cid).mapFailure, err: without cid =? Cid.init(request.content.cid).mapFailure, err:
trace "Unable to parse Cid", cid error "Unable to parse Cid", cid
return failure(err) return failure(err)
without manifest =? (await self.fetchManifest(cid)), err: without manifest =? (await self.fetchManifest(cid)), err:
trace "Unable to fetch manifest for cid", cid, err = err.msg error "Unable to fetch manifest for cid", cid, err = err.msg
return failure(err) return failure(err)
without builder =? Poseidon2Builder.new( without builder =? Poseidon2Builder.new(
self.networkStore, manifest, manifest.verifiableStrategy self.networkStore, manifest, manifest.verifiableStrategy
), err: ), err:
trace "Unable to create slots builder", err = err.msg error "Unable to create slots builder", err = err.msg
return failure(err)
if err =? (await builder.init()).errorOption:
error "Failed to initialize slot builder"
return failure(err) return failure(err)
let let

View File

@ -161,8 +161,7 @@ proc buildBlockTree*[T, H](
if blk.isEmpty: if blk.isEmpty:
success (self.emptyBlock, self.emptyDigestTree) success (self.emptyBlock, self.emptyDigestTree)
else: else:
without tree =? without tree =? (await T.digestTree(blk.data, self.cellSize.int)), err:
T.digestTree(blk.data, self.cellSize.int), err:
error "Failed to create digest for block", err = err.msg error "Failed to create digest for block", err = err.msg
return failure(err) return failure(err)
@ -213,7 +212,7 @@ proc buildSlotTree*[T, H](
error "Failed to select slot blocks", err = err.msg error "Failed to select slot blocks", err = err.msg
return failure(err) return failure(err)
T.init(cellHashes) await T.init(cellHashes)
proc buildSlot*[T, H]( proc buildSlot*[T, H](
self: SlotsBuilder[T, H], self: SlotsBuilder[T, H],
@ -251,8 +250,8 @@ proc buildSlot*[T, H](
tree.root() tree.root()
func buildVerifyTree*[T, H](self: SlotsBuilder[T, H], slotRoots: openArray[H]): ?!T = proc buildVerifyTree*[T, H](self: SlotsBuilder[T, H], slotRoots: seq[H]): Future[?!T] {.async.} =
T.init(@slotRoots) await T.init(@slotRoots)
proc buildSlots*[T, H](self: SlotsBuilder[T, H]): Future[?!void] {.async.} = proc buildSlots*[T, H](self: SlotsBuilder[T, H]): Future[?!void] {.async.} =
## Build all slot trees and store them in the block store. ## Build all slot trees and store them in the block store.
@ -272,7 +271,7 @@ proc buildSlots*[T, H](self: SlotsBuilder[T, H]): Future[?!void] {.async.} =
return failure(err) return failure(err)
slotRoot slotRoot
without tree =? self.buildVerifyTree(self.slotRoots) and root =? tree.root, err: without tree =? (await self.buildVerifyTree(self.slotRoots)) and root =? tree.root, err:
error "Failed to build slot roots tree", err = err.msg error "Failed to build slot roots tree", err = err.msg
return failure(err) return failure(err)
@ -305,6 +304,28 @@ proc buildManifest*[T, H](self: SlotsBuilder[T, H]): Future[?!Manifest] {.async.
self.cellSize, self.cellSize,
self.strategy.strategyType) self.strategy.strategyType)
proc init*[T, H](self: SlotsBuilder[T, H]): Future[?!void] {.async.} =
without emptyTree =? (await T.digestTree(self.emptyBlock, self.cellSize.int)), err:
return failure err
self.emptyDigestTree = emptyTree
if self.manifest.verifiable:
without tree =? (await self.buildVerifyTree(self.slotRoots)), err:
return failure err
without verifyRoot =? tree.root, err:
return failure err
without expectedRoot =? self.manifest.verifyRoot.fromVerifyCid(), err:
return failure err
if verifyRoot != expectedRoot:
return failure "Existing slots root doesn't match reconstructed root."
self.verifiableTree = some tree
trace "Slots builder initialized"
proc new*[T, H]( proc new*[T, H](
_: type SlotsBuilder[T, H], _: type SlotsBuilder[T, H],
store: BlockStore, store: BlockStore,
@ -346,7 +367,7 @@ proc new*[T, H](
numBlocksTotal = numSlotBlocksTotal * manifest.numSlots # number of blocks per slot numBlocksTotal = numSlotBlocksTotal * manifest.numSlots # number of blocks per slot
emptyBlock = newSeq[byte](manifest.blockSize.int) emptyBlock = newSeq[byte](manifest.blockSize.int)
emptyDigestTree = ? T.digestTree(emptyBlock, cellSize.int) # emptyDigestTree = ? (waitFor T.digestTree(emptyBlock, cellSize.int))
strategy = ? strategy.init( strategy = ? strategy.init(
0, 0,
@ -372,24 +393,13 @@ proc new*[T, H](
strategy: strategy, strategy: strategy,
cellSize: cellSize, cellSize: cellSize,
emptyBlock: emptyBlock, emptyBlock: emptyBlock,
numSlotBlocks: numSlotBlocksTotal, numSlotBlocks: numSlotBlocksTotal)
emptyDigestTree: emptyDigestTree)
if manifest.verifiable: if manifest.verifiable:
if manifest.slotRoots.len == 0 or if manifest.slotRoots.len == 0 or
manifest.slotRoots.len != manifest.numSlots: manifest.slotRoots.len != manifest.numSlots:
return failure "Manifest is verifiable but slot roots are missing or invalid." return failure "Manifest is verifiable but slot roots are missing or invalid."
let self.slotRoots = self.manifest.slotRoots.mapIt( (? it.fromSlotCid() ))
slotRoots = manifest.slotRoots.mapIt( (? it.fromSlotCid() ))
tree = ? self.buildVerifyTree(slotRoots)
expectedRoot = ? manifest.verifyRoot.fromVerifyCid()
verifyRoot = ? tree.root
if verifyRoot != expectedRoot:
return failure "Existing slots root doesn't match reconstructed root."
self.slotRoots = slotRoots
self.verifiableTree = some tree
success self success self

View File

@ -7,6 +7,7 @@
## This file may not be copied, modified, or distributed except according to ## This file may not be copied, modified, or distributed except according to
## those terms. ## those terms.
import pkg/chronos
import pkg/poseidon2 import pkg/poseidon2
import pkg/questionable/results import pkg/questionable/results
import pkg/libp2p/multihash import pkg/libp2p/multihash
@ -32,10 +33,11 @@ func spongeDigest*(
success Sponge.digest(bytes, rate) success Sponge.digest(bytes, rate)
func digestTree*( # TODO: replace with poseidon2 library call plz
proc digestTree*(
_: type Poseidon2Tree, _: type Poseidon2Tree,
bytes: openArray[byte], bytes: seq[byte],
chunkSize: int): ?!Poseidon2Tree = chunkSize: int): Future[?!Poseidon2Tree] {.async.} =
## Hashes chunks of data with a sponge of rate 2, and combines the ## Hashes chunks of data with a sponge of rate 2, and combines the
## resulting chunk hashes in a merkle root. ## resulting chunk hashes in a merkle root.
## ##
@ -50,30 +52,34 @@ func digestTree*(
while index < bytes.len: while index < bytes.len:
let start = index let start = index
let finish = min(index + chunkSize, bytes.len) let finish = min(index + chunkSize, bytes.len)
let digest = ? Poseidon2Hash.spongeDigest(bytes.toOpenArray(start, finish - 1), 2) without digest =? Poseidon2Hash.spongeDigest(bytes.toOpenArray(start, finish - 1), 2), err:
return failure err
leaves.add(digest) leaves.add(digest)
index += chunkSize index += chunkSize
return Poseidon2Tree.init(leaves) await sleepAsync(1.millis) # cooperative scheduling (may not be necessary)
return await Poseidon2Tree.init(leaves)
func digest*( proc digest*(
_: type Poseidon2Tree, _: type Poseidon2Tree,
bytes: openArray[byte], bytes: seq[byte],
chunkSize: int): ?!Poseidon2Hash = chunkSize: int): Future[?!Poseidon2Hash] {.async.} =
## Hashes chunks of data with a sponge of rate 2, and combines the ## Hashes chunks of data with a sponge of rate 2, and combines the
## resulting chunk hashes in a merkle root. ## resulting chunk hashes in a merkle root.
## ##
without tree =? (await Poseidon2Tree.digestTree(bytes, chunkSize)), err:
return failure err
(? Poseidon2Tree.digestTree(bytes, chunkSize)).root tree.root
func digestMhash*( proc digestMhash*(
_: type Poseidon2Tree, _: type Poseidon2Tree,
bytes: openArray[byte], bytes: openArray[byte],
chunkSize: int): ?!MultiHash = chunkSize: int): Future[?!MultiHash] {.async.} =
## Hashes chunks of data with a sponge of rate 2 and ## Hashes chunks of data with a sponge of rate 2 and
## returns the multihash of the root ## returns the multihash of the root
## ##
let without hash =? (await Poseidon2Tree.digest(bytes, chunkSize)), err:
hash = ? Poseidon2Tree.digest(bytes, chunkSize) return failure err
? MultiHash.init(Pos2Bn128MrklCodec, hash).mapFailure ? MultiHash.init(Pos2Bn128MrklCodec, hash).mapFailure