From 0541f12e021fcced85a250bbc8449c0c15864a23 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Wed, 25 Oct 2023 08:53:23 -0600 Subject: [PATCH] wip --- codex/merkletree/merkletree.nim | 32 +++++++++------------------ codex/streams/asyncstreamwrapper.nim | 23 +++++++++---------- codex/streams/seekablestorestream.nim | 20 ++++++++--------- codex/streams/storestream.nim | 20 ++++++++--------- codex/utils.nim | 3 ++- codex/utils/asynciter.nim | 9 ++++++-- 6 files changed, 47 insertions(+), 60 deletions(-) diff --git a/codex/merkletree/merkletree.nim b/codex/merkletree/merkletree.nim index 22ba5887..5c96de6f 100644 --- a/codex/merkletree/merkletree.nim +++ b/codex/merkletree/merkletree.nim @@ -27,12 +27,13 @@ logScope: type MerkleTree* = object - mcodec: MultiCodec # multicodec of the hash function - height: Natural # current height of the tree (levels - 1) - levels: Natural # number of levels in the tree (height + 1) - leafs: Natural # total number of leafs, if odd the last leaf will be hashed twice - size: Natural # total number of nodes in the tree (corrected for odd leafs) - nodes: seq[seq[byte]] # nodes of the tree (this should be an iterator) + mcodec: MultiCodec # multicodec of the hash function + height: Natural # current height of the tree (levels - 1) + levels: Natural # number of levels in the tree (height + 1) + leafs: Natural # total number of leafs, if odd the last leaf will be hashed twice + size: Natural # total number of nodes in the tree (corrected for odd leafs) + leafsIter: AsyncIter[seq[byte]] # leafs iterator of the tree + nodesIter: AsyncIter[seq[byte]] # nodes iterator of the tree MerkleProof* = object mcodec: MultiCodec @@ -49,7 +50,7 @@ proc root*(self: MerkleTree): ?!MultiHash = MultiHash.init(self.mcodec, self.nodes[^1]).mapFailure -proc buildSync*(self: var MerkleTree): ?!void = +proc build*(self: var MerkleTree): Future[?!void] {.async.} = ## Builds a tree from previously added data blocks ## ## Tree built from data blocks A, B and C is @@ -121,6 +122,7 @@ func getProofs(self: MerkleTree, indexes: openArray[Natural]): ?!seq[MerkleProof func init*( T: type MerkleTree, leafs: Natural, + leafsIter: AsyncIter[seq[byte]], mcodec: MultiCodec = multiCodec("sha2-256")): ?!MerkleTree = ## Init empty tree with capacity `leafs` ## @@ -136,21 +138,7 @@ func init*( size: size, height: height, levels: height - 1, - nodes: newSeq[seq[byte]](size)) - - success self - -func init*( - T: type MerkleTree, - leafs: openArray[seq[byte]], - mcodec: MultiCodec = multiCodec("sha2-256")): ?!MerkleTree = - ## Init tree from vector of leafs - ## - - var - self = ? MerkleTree.init(leafs.len, mcodec) - - self.nodes[0..= self.size method readOnce*( - self: SeekableStoreStream, - pbytes: pointer, - nbytes: int -): Future[int] {.async.} = + self: SeekableStoreStream, + pbytes: pointer, + nbytes: int): Future[int] {.async.} = ## Read `nbytes` from current position in the SeekableStoreStream into output buffer pointed by `pbytes`. ## Return how many bytes were actually read before EOF was encountered. ## Raise exception if we are already at EOF. - ## + ## trace "Reading from manifest", cid = self.manifest.cid.get(), blocks = self.manifest.blocksCount if self.atEof: diff --git a/codex/streams/storestream.nim b/codex/streams/storestream.nim index 2f897804..bb2f1172 100644 --- a/codex/streams/storestream.nim +++ b/codex/streams/storestream.nim @@ -51,12 +51,11 @@ method initStream*(s: SeekableStoreStream) = procCall LPStream(s).initStream() proc new*( - T: type SeekableStoreStream, - store: BlockStore, - manifest: Manifest, - pad = true -): SeekableStoreStream = - ## Create a new SeekableStoreStream instance for a given store and manifest + T: type StoreStream, + store: BlockStore, + manifest: Manifest, + pad = true): StoreStream = + ## Create a new StoreStream instance for a given store and manifest ## result = SeekableStoreStream( store: store, @@ -78,11 +77,10 @@ method atEof*(self: SeekableStoreStream): bool = self.offset >= self.size method readOnce*( - self: SeekableStoreStream, - pbytes: pointer, - nbytes: int -): Future[int] {.async.} = - ## Read `nbytes` from current position in the SeekableStoreStream into output buffer pointed by `pbytes`. + self: StoreStream, + pbytes: pointer, + nbytes: int): Future[int] {.async.} = + ## Read `nbytes` from current position in the StoreStream into output buffer pointed by `pbytes`. ## Return how many bytes were actually read before EOF was encountered. ## Raise exception if we are already at EOF. ## diff --git a/codex/utils.nim b/codex/utils.nim index 6352155e..e045b902 100644 --- a/codex/utils.nim +++ b/codex/utils.nim @@ -16,8 +16,9 @@ import pkg/chronos import ./utils/asyncheapqueue import ./utils/fileutils import ./utils/asynciter +import ./utils/digest -export asyncheapqueue, fileutils, asynciter +export asyncheapqueue, fileutils, asynciter, digest func divUp*[T: SomeInteger](a, b : T): T = ## Division with result rounded up (rather than truncated as in 'div') diff --git a/codex/utils/asynciter.nim b/codex/utils/asynciter.nim index 9aa7683f..3f31d1a9 100644 --- a/codex/utils/asynciter.nim +++ b/codex/utils/asynciter.nim @@ -7,9 +7,11 @@ type Function*[T, U] = proc(fut: T): U {.upraises: [CatchableError], gcsafe, closure.} IsFinished* = proc(): bool {.upraises: [], gcsafe, closure.} GenNext*[T] = proc(): T {.upraises: [CatchableError], gcsafe, closure.} + Iter*[T] = ref object finished: bool next*: GenNext[T] + AsyncIter*[T] = Iter[Future[T]] proc finish*[T](self: Iter[T]): void = @@ -32,7 +34,11 @@ proc map*[T, U](fut: Future[T], fn: Function[T, U]): Future[U] {.async.} = let t = await fut fn(t) -proc new*[T](_: type Iter, genNext: GenNext[T], isFinished: IsFinished, finishOnErr: bool = true): Iter[T] = +proc new*[T]( + _: type Iter, + genNext: GenNext[T], + isFinished: IsFinished, + finishOnErr: bool = true): Iter[T] = var iter = Iter[T]() proc next(): T {.upraises: [CatchableError].} = @@ -142,4 +148,3 @@ proc prefetch*[T](iter: Iter[T], n: Positive): Iter[T] = tryFetch(j) Iter.new(genNext, isFinished) -