This commit is contained in:
Dmitriy Ryajov 2023-10-25 08:53:23 -06:00
parent 9cd2af6cec
commit 0541f12e02
No known key found for this signature in database
GPG Key ID: DA8C680CE7C657A4
6 changed files with 47 additions and 60 deletions

View File

@ -27,12 +27,13 @@ logScope:
type
MerkleTree* = object
mcodec: MultiCodec # multicodec of the hash function
height: Natural # current height of the tree (levels - 1)
levels: Natural # number of levels in the tree (height + 1)
leafs: Natural # total number of leafs, if odd the last leaf will be hashed twice
size: Natural # total number of nodes in the tree (corrected for odd leafs)
nodes: seq[seq[byte]] # nodes of the tree (this should be an iterator)
mcodec: MultiCodec # multicodec of the hash function
height: Natural # current height of the tree (levels - 1)
levels: Natural # number of levels in the tree (height + 1)
leafs: Natural # total number of leafs, if odd the last leaf will be hashed twice
size: Natural # total number of nodes in the tree (corrected for odd leafs)
leafsIter: AsyncIter[seq[byte]] # leafs iterator of the tree
nodesIter: AsyncIter[seq[byte]] # nodes iterator of the tree
MerkleProof* = object
mcodec: MultiCodec
@ -49,7 +50,7 @@ proc root*(self: MerkleTree): ?!MultiHash =
MultiHash.init(self.mcodec, self.nodes[^1]).mapFailure
proc buildSync*(self: var MerkleTree): ?!void =
proc build*(self: var MerkleTree): Future[?!void] {.async.} =
## Builds a tree from previously added data blocks
##
## Tree built from data blocks A, B and C is
@ -121,6 +122,7 @@ func getProofs(self: MerkleTree, indexes: openArray[Natural]): ?!seq[MerkleProof
func init*(
T: type MerkleTree,
leafs: Natural,
leafsIter: AsyncIter[seq[byte]],
mcodec: MultiCodec = multiCodec("sha2-256")): ?!MerkleTree =
## Init empty tree with capacity `leafs`
##
@ -136,21 +138,7 @@ func init*(
size: size,
height: height,
levels: height - 1,
nodes: newSeq[seq[byte]](size))
success self
func init*(
T: type MerkleTree,
leafs: openArray[seq[byte]],
mcodec: MultiCodec = multiCodec("sha2-256")): ?!MerkleTree =
## Init tree from vector of leafs
##
var
self = ? MerkleTree.init(leafs.len, mcodec)
self.nodes[0..<self.leafs] = leafs.toOpenArray(0, leafs.high)
nodesIter: leafsIter)
success self

View File

@ -32,12 +32,11 @@ method initStream*(self: AsyncStreamWrapper) =
procCall LPStream(self).initStream()
proc new*(
C: type AsyncStreamWrapper,
reader: AsyncStreamReader = nil,
writer: AsyncStreamWriter = nil
): AsyncStreamWrapper =
C: type AsyncStreamWrapper,
reader: AsyncStreamReader = nil,
writer: AsyncStreamWriter = nil): AsyncStreamWrapper =
## Create new instance of an asynchronous stream wrapper
##
##
let
stream = C(reader: reader, writer: writer)
@ -60,10 +59,9 @@ template withExceptions(body: untyped) =
raise newException(LPStreamError, exc.msg)
method readOnce*(
self: AsyncStreamWrapper,
pbytes: pointer,
nbytes: int
): Future[int] {.async.} =
self: AsyncStreamWrapper,
pbytes: pointer,
nbytes: int): Future[int] {.async.} =
trace "Reading bytes from reader", bytes = nbytes
if isNil(self.reader):
@ -77,10 +75,9 @@ method readOnce*(
return await self.reader.readOnce(pbytes, nbytes)
proc completeWrite(
self: AsyncStreamWrapper,
fut: Future[void],
msgLen: int
): Future[void] {.async.} =
self: AsyncStreamWrapper,
fut: Future[void],
msgLen: int): Future[void] {.async.} =
withExceptions:
await fut

View File

@ -47,13 +47,12 @@ method initStream*(s: SeekableStoreStream) =
procCall SeekableStream(s).initStream()
proc new*(
T: type SeekableStoreStream,
store: BlockStore,
manifest: Manifest,
pad = true
): SeekableStoreStream =
T: type SeekableStoreStream,
store: BlockStore,
manifest: Manifest,
pad = true): SeekableStoreStream =
## Create a new SeekableStoreStream instance for a given store and manifest
##
##
result = SeekableStoreStream(
store: store,
manifest: manifest,
@ -73,14 +72,13 @@ method atEof*(self: SeekableStoreStream): bool =
self.offset >= self.size
method readOnce*(
self: SeekableStoreStream,
pbytes: pointer,
nbytes: int
): Future[int] {.async.} =
self: SeekableStoreStream,
pbytes: pointer,
nbytes: int): Future[int] {.async.} =
## Read `nbytes` from current position in the SeekableStoreStream into output buffer pointed by `pbytes`.
## Return how many bytes were actually read before EOF was encountered.
## Raise exception if we are already at EOF.
##
##
trace "Reading from manifest", cid = self.manifest.cid.get(), blocks = self.manifest.blocksCount
if self.atEof:

View File

@ -51,12 +51,11 @@ method initStream*(s: SeekableStoreStream) =
procCall LPStream(s).initStream()
proc new*(
T: type SeekableStoreStream,
store: BlockStore,
manifest: Manifest,
pad = true
): SeekableStoreStream =
## Create a new SeekableStoreStream instance for a given store and manifest
T: type StoreStream,
store: BlockStore,
manifest: Manifest,
pad = true): StoreStream =
## Create a new StoreStream instance for a given store and manifest
##
result = SeekableStoreStream(
store: store,
@ -78,11 +77,10 @@ method atEof*(self: SeekableStoreStream): bool =
self.offset >= self.size
method readOnce*(
self: SeekableStoreStream,
pbytes: pointer,
nbytes: int
): Future[int] {.async.} =
## Read `nbytes` from current position in the SeekableStoreStream into output buffer pointed by `pbytes`.
self: StoreStream,
pbytes: pointer,
nbytes: int): Future[int] {.async.} =
## Read `nbytes` from current position in the StoreStream into output buffer pointed by `pbytes`.
## Return how many bytes were actually read before EOF was encountered.
## Raise exception if we are already at EOF.
##

View File

@ -16,8 +16,9 @@ import pkg/chronos
import ./utils/asyncheapqueue
import ./utils/fileutils
import ./utils/asynciter
import ./utils/digest
export asyncheapqueue, fileutils, asynciter
export asyncheapqueue, fileutils, asynciter, digest
func divUp*[T: SomeInteger](a, b : T): T =
## Division with result rounded up (rather than truncated as in 'div')

View File

@ -7,9 +7,11 @@ type
Function*[T, U] = proc(fut: T): U {.upraises: [CatchableError], gcsafe, closure.}
IsFinished* = proc(): bool {.upraises: [], gcsafe, closure.}
GenNext*[T] = proc(): T {.upraises: [CatchableError], gcsafe, closure.}
Iter*[T] = ref object
finished: bool
next*: GenNext[T]
AsyncIter*[T] = Iter[Future[T]]
proc finish*[T](self: Iter[T]): void =
@ -32,7 +34,11 @@ proc map*[T, U](fut: Future[T], fn: Function[T, U]): Future[U] {.async.} =
let t = await fut
fn(t)
proc new*[T](_: type Iter, genNext: GenNext[T], isFinished: IsFinished, finishOnErr: bool = true): Iter[T] =
proc new*[T](
_: type Iter,
genNext: GenNext[T],
isFinished: IsFinished,
finishOnErr: bool = true): Iter[T] =
var iter = Iter[T]()
proc next(): T {.upraises: [CatchableError].} =
@ -142,4 +148,3 @@ proc prefetch*[T](iter: Iter[T], n: Positive): Iter[T] =
tryFetch(j)
Iter.new(genNext, isFinished)