Rework erasure.nim to include recent cleanup

This commit is contained in:
Tomasz Bekas 2023-10-17 23:31:13 +02:00 committed by Dmitriy Ryajov
parent baeb707097
commit 9c8d08681f
No known key found for this signature in database
GPG Key ID: DA8C680CE7C657A4
13 changed files with 405 additions and 285 deletions

View File

@ -275,24 +275,9 @@ proc requestBlocks*(
without treeReq =? b.pendingBlocks.getOrPutTreeReq(treeCid, leavesCount.some, merkleRoot), err:
return failure(err)
var
iter = AsyncIter[Block]()
index = 0
proc next(): Future[Block] =
if index < leavesCount:
let fut = b.requestBlock(treeReq, index, timeout)
inc index
if index >= leavesCount:
iter.finished = true
return fut
else:
let fut = newFuture[Block]("engine.requestBlocks")
fut.fail(newException(CodexError, "No more elements for tree with cid " & $treeCid))
return fut
iter.next = next
return success(iter)
return Iter.fromSlice(0..<leavesCount).map(
(index: int) => b.requestBlock(treeReq, index, timeout)
).success
proc blockPresenceHandler*(
b: BlockExcEngine,

View File

@ -243,6 +243,7 @@ proc new*(
repoDs = repoData,
metaDs = SQLiteDatastore.new(config.dataDir / CodexMetaNamespace)
.expect("Should create meta data store!"),
treeReader = treeReader,
quotaMaxBytes = config.storageQuota.uint,
blockTtl = config.blockTtl)

View File

@ -12,6 +12,7 @@ import pkg/upraises
push: {.upraises: [].}
import std/sequtils
import std/sugar
import pkg/chronos
import pkg/chronicles
@ -23,6 +24,7 @@ import ../merkletree
import ../stores
import ../blocktype as bt
import ../utils
import ../utils/asynciter
import pkg/stew/byteutils
@ -68,142 +70,242 @@ type
decoderProvider*: DecoderProvider
store*: BlockStore
proc encode*(
self: Erasure,
manifest: Manifest,
blocks: int,
parity: int
): Future[?!Manifest] {.async.} =
## Encode a manifest into one that is erasure protected.
##
## `manifest` - the original manifest to be encoded
## `blocks` - the number of blocks to be encoded - K
## `parity` - the number of parity blocks to generate - M
EncodingParams = object
ecK: int
ecM: int
rounded: int
steps: int
blocksCount: int
func indexToPos(steps, idx, step: int): int {.inline.} =
## Convert an index to a position in the encoded
## dataset
## `idx` - the index to convert
## `step` - the current step
## `pos` - the position in the encoded dataset
##
logScope:
original_cid = manifest.cid.get()
original_len = manifest.blocksCount
blocks = blocks
parity = parity
(idx - step) div steps
trace "Erasure coding manifest", blocks, parity
without tree =? await self.store.getTree(manifest.treeCid), err:
return err.failure
let leaves = tree.leaves
let
rounded = roundUp(manifest.blocksCount, blocks)
steps = divUp(manifest.blocksCount, blocks)
blocksCount = rounded + (steps * parity)
var cids = newSeq[Cid](blocksCount)
# copy original manifest blocks
for i in 0..<rounded:
if i < manifest.blocksCount:
without cid =? Cid.init(manifest.version, manifest.codec, leaves[i]).mapFailure, err:
return err.failure
cids[i] = cid
else:
without cid =? emptyCid(manifest.version, manifest.hcodec, manifest.codec), err:
return err.failure
cids[i] = cid
logScope:
steps = steps
rounded_blocks = rounded
new_manifest = blocksCount
proc getPendingBlocks(
self: Erasure,
manifest: Manifest,
indicies: seq[int]): AsyncIter[(?!bt.Block, int)] =
## Get pending blocks iterator
##
var
encoder = self.encoderProvider(manifest.blockSize.int, blocks, parity)
var toadd = 0
var tocount = 0
var maxidx = 0
# request blocks from the store
pendingBlocks = indicies.map( (i: int) =>
self.store.getBlock(manifest.treeCid, i, manifest.treeRoot).map((r: ?!bt.Block) => (r, i)) # Get the data blocks (first K)
)
proc isFinished(): bool = pendingBlocks.len == 0
proc genNext(): Future[(?!bt.Block, int)] {.async.} =
let completedFut = await one(pendingBlocks)
pendingBlocks.del(pendingBlocks.find(completedFut))
return await completedFut
Iter.new(genNext, isFinished)
proc prepareEncodingData(
self: Erasure,
manifest: Manifest,
params: EncodingParams,
step: int,
data: ref seq[seq[byte]],
cids: ref seq[Cid],
emptyBlock: seq[byte]): Future[?!int] {.async.} =
## Prepare data for encoding
##
let
indicies = toSeq(countup(step, params.rounded - 1, params.steps))
pendingBlocksIter = self.getPendingBlocks(manifest, indicies.filterIt(it < manifest.blocksCount))
var resolved = 0
for fut in pendingBlocksIter:
let (blkOrErr, idx) = await fut
without blk =? blkOrErr, err:
warn "Failed retreiving a block", idx, treeCid = manifest.treeCid
continue
let pos = indexToPos(params.steps, idx, step)
shallowCopy(data[pos], if blk.isEmpty: emptyBlock else: blk.data)
cids[idx] = blk.cid
resolved.inc()
for idx in indicies.filterIt(it >= manifest.blocksCount):
let pos = indexToPos(params.steps, idx, step)
trace "Padding with empty block", idx
shallowCopy(data[pos], emptyBlock)
without emptyBlockCid =? emptyCid(manifest.version, manifest.hcodec, manifest.codec), err:
return failure(err)
cids[idx] = emptyBlockCid
success(resolved)
proc prepareDecodingData(
self: Erasure,
encoded: Manifest,
step: int,
data: ref seq[seq[byte]],
parityData: ref seq[seq[byte]],
cids: ref seq[Cid],
emptyBlock: seq[byte]): Future[?!(int, int)] {.async.} =
## Prepare data for decoding
## `encoded` - the encoded manifest
## `step` - the current step
## `data` - the data to be prepared
## `parityData` - the parityData to be prepared
## `cids` - cids of prepared data
## `emptyBlock` - the empty block to be used for padding
##
let
indicies = toSeq(countup(step, encoded.blocksCount - 1, encoded.steps))
pendingBlocksIter = self.getPendingBlocks(encoded, indicies)
var
dataPieces = 0
parityPieces = 0
resolved = 0
for fut in pendingBlocksIter:
# Continue to receive blocks until we have just enough for decoding
# or no more blocks can arrive
if resolved >= encoded.ecK:
break
let (blkOrErr, idx) = await fut
without blk =? blkOrErr, err:
trace "Failed retreiving a block", idx, treeCid = encoded.treeCid
continue
let
pos = indexToPos(encoded.steps, idx, step)
logScope:
cid = blk.cid
idx = idx
pos = pos
step = step
empty = blk.isEmpty
cids[idx] = blk.cid
if idx >= encoded.rounded:
trace "Retrieved parity block"
shallowCopy(parityData[pos - encoded.ecK], if blk.isEmpty: emptyBlock else: blk.data)
parityPieces.inc
else:
trace "Retrieved data block"
shallowCopy(data[pos], if blk.isEmpty: emptyBlock else: blk.data)
dataPieces.inc
resolved.inc
return success (dataPieces, parityPieces)
proc init(_: type EncodingParams, manifest: Manifest, ecK: int, ecM: int): ?!EncodingParams =
if ecK > manifest.blocksCount:
return failure("Unable to encode manifest, not enough blocks, ecK = " & $ecK & ", blocksCount = " & $manifest.blocksCount)
let
rounded = roundUp(manifest.blocksCount, ecK)
steps = divUp(manifest.blocksCount, ecK)
blocksCount = rounded + (steps * ecM)
EncodingParams(
ecK: ecK,
ecM: ecM,
rounded: rounded,
steps: steps,
blocksCount: blocksCount
).success
proc encodeData(
self: Erasure,
manifest: Manifest,
params: EncodingParams
): Future[?!Manifest] {.async.} =
## Encode blocks pointed to by the protected manifest
##
## `manifest` - the manifest to encode
##
logScope:
steps = params.steps
rounded_blocks = params.rounded
blocks_count = params.blocksCount
ecK = params.ecK
ecM = params.ecM
var
cids = seq[Cid].new()
encoder = self.encoderProvider(manifest.blockSize.int, params.ecK, params.ecM)
emptyBlock = newSeq[byte](manifest.blockSize.int)
cids[].setLen(params.blocksCount)
try:
for i in 0..<steps:
for step in 0..<params.steps:
# TODO: Don't allocate a new seq every time, allocate once and zero out
var
data = newSeq[seq[byte]](blocks) # number of blocks to encode
parityData = newSeqWith[seq[byte]](parity, newSeq[byte](manifest.blockSize.int))
# calculate block indexes to retrieve
blockIdx = toSeq(countup(i, rounded - 1, steps))
# request all blocks from the store
dataBlocks = await allFinished(
blockIdx.mapIt( self.store.getBlock(cids[it]) ))
data = seq[seq[byte]].new() # number of blocks to encode
parityData = newSeqWith[seq[byte]](params.ecM, newSeq[byte](manifest.blockSize.int))
data[].setLen(params.ecK)
# TODO: this is a tight blocking loop so we sleep here to allow
# other events to be processed, this should be addressed
# by threading
await sleepAsync(10.millis)
for j in 0..<blocks:
let idx = blockIdx[j]
if idx < manifest.blocksCount:
without blk =? (await dataBlocks[j]), error:
trace "Unable to retrieve block", error = error.msg
return failure error
without resolved =?
(await self.prepareEncodingData(manifest, params, step, data, cids, emptyBlock)), err:
trace "Unable to prepare data", error = err.msg
return failure(err)
trace "Encoding block", cid = blk.cid, pos = idx
shallowCopy(data[j], blk.data)
else:
trace "Padding with empty block", pos = idx
data[j] = newSeq[byte](manifest.blockSize.int)
trace "Erasure coding data", data = data[].len, parity = parityData.len
trace "Erasure coding data", data = data.len, parity = parityData.len
let res = encoder.encode(data, parityData);
if res.isErr:
if (
let res = encoder.encode(data[], parityData);
res.isErr):
trace "Unable to encode manifest!", error = $res.error
return failure($res.error)
for j in 0..<parity:
let idx = rounded + blockIdx[j]
var idx = params.rounded + step
for j in 0..<params.ecM:
without blk =? bt.Block.new(parityData[j]), error:
trace "Unable to create parity block", err = error.msg
return failure(error)
trace "Adding parity block", cid = blk.cid, pos = idx
trace "Adding parity block", cid = blk.cid, idx
cids[idx] = blk.cid
maxidx = max(maxidx, idx)
toadd = toadd + blk.data.len
tocount.inc
if isErr (await self.store.putBlock(blk)):
trace "Unable to store block!", cid = blk.cid
return failure("Unable to store block!")
idx.inc(params.steps)
without var builder =? MerkleTreeBuilder.init(manifest.hcodec), err:
without tree =? MerkleTree.init(cids[]), err:
return failure(err)
for cid in cids:
without mhash =? cid.mhash.mapFailure, err:
return err.failure
if err =? builder.addLeaf(mhash).errorOption:
return failure(err)
without tree =? builder.build(), err:
return failure(err)
without treeBlk =? bt.Block.new(tree.encode()), err:
return failure(err)
if err =? (await self.store.putBlock(treeBlk)).errorOption:
return failure("Unable to store merkle tree block " & $treeBlk.cid & ", nested err: " & err.msg)
let encoded = Manifest.new(
let encodedManifest = Manifest.new(
manifest = manifest,
treeCid = treeBlk.cid,
treeRoot = tree.root,
datasetSize = (manifest.blockSize.int * blocksCount).NBytes,
ecK = blocks,
ecM = parity
datasetSize = (manifest.blockSize.int * params.blocksCount).NBytes,
ecK = params.ecK,
ecM = params.ecM
)
return encoded.success
return encodedManifest.success
except CancelledError as exc:
trace "Erasure coding encoding cancelled"
raise exc # cancellation needs to be propagated
@ -213,6 +315,26 @@ proc encode*(
finally:
encoder.release()
proc encode*(
self: Erasure,
manifest: Manifest,
blocks: int,
parity: int): Future[?!Manifest] {.async.} =
## Encode a manifest into one that is erasure protected.
##
## `manifest` - the original manifest to be encoded
## `blocks` - the number of blocks to be encoded - K
## `parity` - the number of parity blocks to generate - M
##
without params =? EncodingParams.init(manifest, blocks, parity), err:
return failure(err)
without encodedManifest =? await self.encodeData(manifest, params), err:
return failure(err)
return success encodedManifest
proc decode*(
self: Erasure,
encoded: Manifest
@ -231,86 +353,56 @@ proc decode*(
var
cids = seq[Cid].new()
recoveredIndices = newSeq[int]()
decoder = self.decoderProvider(encoded.blockSize.int, encoded.ecK, encoded.ecM)
emptyBlock = newSeq[byte](encoded.blockSize.int)
hasParity = false
cids[].setLen(encoded.blocksCount)
try:
for i in 0..<encoded.steps:
# TODO: Don't allocate a new seq every time, allocate once and zero out
let
# calculate block indexes to retrieve
blockIdx = toSeq(countup(i, encoded.blocksCount - 1, encoded.steps))
# request all blocks from the store
pendingBlocks = blockIdx.mapIt(
self.store.getBlock(encoded.treeCid, it, encoded.treeRoot) # Get the data blocks (first K)
)
for step in 0..<encoded.steps:
# TODO: this is a tight blocking loop so we sleep here to allow
# other events to be processed, this should be addressed
# by threading
await sleepAsync(10.millis)
var
data = newSeq[seq[byte]](encoded.ecK) # number of blocks to encode
parityData = newSeq[seq[byte]](encoded.ecM)
data = seq[seq[byte]].new()
parityData = seq[seq[byte]].new()
recovered = newSeqWith[seq[byte]](encoded.ecK, newSeq[byte](encoded.blockSize.int))
idxPendingBlocks = pendingBlocks # copy futures to make using with `one` easier
emptyBlock = newSeq[byte](encoded.blockSize.int)
resolved = 0
while true:
# Continue to receive blocks until we have just enough for decoding
# or no more blocks can arrive
if (resolved >= encoded.ecK) or (idxPendingBlocks.len == 0):
break
data[].setLen(encoded.ecK) # set len to K
parityData[].setLen(encoded.ecM) # set len to M
let
done = await one(idxPendingBlocks)
idx = pendingBlocks.find(done)
idxPendingBlocks.del(idxPendingBlocks.find(done))
without blk =? (await done), error:
trace "Failed retrieving block", error = error.msg
continue
if idx >= encoded.ecK:
trace "Retrieved parity block", cid = blk.cid, idx
shallowCopy(parityData[idx - encoded.ecK], if blk.isEmpty: emptyBlock else: blk.data)
else:
trace "Retrieved data block", cid = blk.cid, idx
shallowCopy(data[idx], if blk.isEmpty: emptyBlock else: blk.data)
resolved.inc
let
dataPieces = data.filterIt( it.len > 0 ).len
parityPieces = parityData.filterIt( it.len > 0 ).len
without (dataPieces, parityPieces) =?
(await self.prepareDecodingData(encoded, step, data, parityData, cids, emptyBlock)), err:
trace "Unable to prepare data", error = err.msg
return failure(err)
if dataPieces >= encoded.ecK:
trace "Retrieved all the required data blocks", data = dataPieces, parity = parityPieces
trace "Retrieved all the required data blocks"
continue
trace "Erasure decoding data", data = dataPieces, parity = parityPieces
trace "Erasure decoding data"
if (
let err = decoder.decode(data, parityData, recovered);
let err = decoder.decode(data[], parityData[], recovered);
err.isErr):
trace "Unable to decode manifest!", err = $err.error
trace "Unable to decode data!", err = $err.error
return failure($err.error)
for i in 0..<encoded.ecK:
if data[i].len <= 0:
let idx = i * encoded.steps + step
if data[i].len <= 0 and not cids[idx].isEmpty:
without blk =? bt.Block.new(recovered[i]), error:
trace "Unable to create block!", exc = error.msg
return failure(error)
trace "Recovered block", cid = blk.cid
trace "Recovered block", cid = blk.cid, index = i
if isErr (await self.store.putBlock(blk)):
trace "Unable to store block!", cid = blk.cid
return failure("Unable to store block!")
cids[idx] = blk.cid
recoveredIndices.add(idx)
except CancelledError as exc:
trace "Erasure coding decoding cancelled"
raise exc # cancellation needs to be propagated
@ -320,6 +412,18 @@ proc decode*(
finally:
decoder.release()
without tree =? MerkleTree.init(cids[0..<encoded.originalBlocksCount]), err:
return failure(err)
if tree.root != encoded.originalTreeRoot:
return failure("Original tree root differs from tree root computed out of recovered data")
without treeBlk =? bt.Block.new(tree.encode()), err:
return failure(err)
if err =? (await self.store.putBlock(treeBlk)).errorOption:
return failure("Unable to store merkle tree block " & $treeBlk.cid & ", nested err: " & err.msg)
let decoded = Manifest.new(encoded)
return decoded.success

View File

@ -16,7 +16,7 @@ import std/algorithm
import pkg/chronicles
import pkg/questionable/results
import pkg/nimcrypto/sha2
import pkg/libp2p/[multicodec, multihash, vbuffer]
import pkg/libp2p/[cid, multicodec, multihash, vbuffer]
import pkg/stew/byteutils
import ../errors
@ -256,7 +256,7 @@ proc `==`*(a, b: MerkleTree): bool =
(a.leavesCount == b.leavesCount) and
(a.nodesBuffer == b.nodesBuffer)
func init*(
proc init*(
T: type MerkleTree,
mcodec: MultiCodec,
digestSize: Natural,
@ -277,6 +277,33 @@ func init*(
else:
failure("Expected nodesBuffer len to be " & $(totalNodes * digestSize) & " but was " & $nodesBuffer.len)
proc init*(
T: type MerkleTree,
cids: openArray[Cid]
): ?!MerkleTree =
let leaves = collect:
for cid in cids:
without mhash =? cid.mhash.mapFailure, errx:
return failure(errx)
mhash
MerkleTree.init(leaves)
proc init*(
T: type MerkleTree,
leaves: openArray[MultiHash]
): ?!MerkleTree =
without leaf =? leaves.?[0]:
return failure("At least one leaf is required")
var builder = ? MerkleTreeBuilder.init(mcodec = leaf.mcodec)
for l in leaves:
if err =? builder.addLeaf(l).errorOption:
return failure(err)
builder.build()
###########################################################
# MerkleProof
###########################################################

View File

@ -55,12 +55,6 @@ method getBlockAndProof*(self: BlockStore, treeCid: Cid, index: Natural): Future
raiseAssert("getBlockAndProof not implemented!")
method getTree*(self: BlockStore, treeCid: Cid): Future[?!MerkleTree] {.base.} =
## Get a merkle tree by Cid
##
raiseAssert("Not implemented!")
method getBlock*(self: BlockStore, treeCid: Cid, index: Natural, merkleRoot: MultiHash): Future[?!Block] {.base.} =
## Get a block by Cid of a merkle tree and an index of a leaf in a tree, validate inclusion using merkle root
##

View File

@ -66,9 +66,6 @@ method getBlock*(self: CacheStore, cid: Cid): Future[?!Block] {.async.} =
trace "Error requesting block from cache", cid, error = exc.msg
return failure exc
method getTree*(self: CacheStore, treeCid: Cid): Future[?!MerkleTree] =
self.treeReader.getTree(treeCid)
method getBlock*(self: CacheStore, treeCid: Cid, index: Natural, merkleRoot: MultiHash): Future[?!Block] =
self.treeReader.getBlock(treeCid, index)

View File

@ -144,9 +144,6 @@ method hasBlock*(self: NetworkStore, cid: Cid): Future[?!bool] {.async.} =
trace "Checking network store for block existence", cid
return await self.localStore.hasBlock(cid)
method getTree*(self: NetworkStore, treeCid: Cid): Future[?!MerkleTree] =
self.localStore.getTree(treeCid)
method close*(self: NetworkStore): Future[void] {.async.} =
## Close the underlying local blockstore
##

View File

@ -161,9 +161,6 @@ method getBlock*(self: RepoStore, cid: Cid): Future[?!Block] {.async.} =
trace "Got block for cid", cid
return Block.new(cid, data, verify = true)
method getTree*(self: RepoStore, treeCid: Cid): Future[?!MerkleTree] =
self.treeReader.getTree(treeCid)
method getBlock*(self: RepoStore, treeCid: Cid, index: Natural, merkleRoot: MultiHash): Future[?!Block] =
self.treeReader.getBlock(treeCid, index)

View File

@ -1,5 +1,4 @@
import pkg/upraises
import pkg/chronos
import pkg/chronos/futures
import pkg/chronicles
@ -16,12 +15,11 @@ const DefaultTreeCacheCapacity* = 10 # Max number of trees stored in memory
type
GetBlock = proc (cid: Cid): Future[?!Block] {.upraises: [], gcsafe, closure.}
DelBlock = proc (cid: Cid): Future[?!void] {.upraises: [], gcsafe, closure.}
TreeReader* = ref object of RootObj
getBlockFromStore*: GetBlock
treeCache*: LruCache[Cid, MerkleTree]
method getTree*(self: TreeReader, cid: Cid): Future[?!MerkleTree] {.async.} =
proc getTree*(self: TreeReader, cid: Cid): Future[?!MerkleTree] {.async.} =
if tree =? self.treeCache.getOption(cid):
return success(tree)
else:
@ -35,7 +33,7 @@ method getTree*(self: TreeReader, cid: Cid): Future[?!MerkleTree] {.async.} =
trace "Got merkle tree for cid", cid
return success(tree)
method getBlockCidAndProof*(self: TreeReader, treeCid: Cid, index: Natural): Future[?!(Cid, MerkleProof)] {.async.} =
proc getBlockCidAndProof*(self: TreeReader, treeCid: Cid, index: Natural): Future[?!(Cid, MerkleProof)] {.async.} =
without tree =? await self.getTree(treeCid), err:
return failure(err)
@ -50,7 +48,7 @@ method getBlockCidAndProof*(self: TreeReader, treeCid: Cid, index: Natural): Fut
return success((leafCid, proof))
method getBlockCid*(self: TreeReader, treeCid: Cid, index: Natural): Future[?!Cid] {.async.} =
proc getBlockCid*(self: TreeReader, treeCid: Cid, index: Natural): Future[?!Cid] {.async.} =
without tree =? await self.getTree(treeCid), err:
return failure(err)
@ -62,7 +60,7 @@ method getBlockCid*(self: TreeReader, treeCid: Cid, index: Natural): Future[?!Ci
return success(leafCid)
method getBlock*(self: TreeReader, treeCid: Cid, index: Natural): Future[?!Block] {.async.} =
proc getBlock*(self: TreeReader, treeCid: Cid, index: Natural): Future[?!Block] {.async.} =
without leafCid =? await self.getBlockCid(treeCid, index), err:
return failure(err)
@ -71,7 +69,7 @@ method getBlock*(self: TreeReader, treeCid: Cid, index: Natural): Future[?!Block
return success(blk)
method getBlockAndProof*(self: TreeReader, treeCid: Cid, index: Natural): Future[?!(Block, MerkleProof)] {.async.} =
proc getBlockAndProof*(self: TreeReader, treeCid: Cid, index: Natural): Future[?!(Block, MerkleProof)] {.async.} =
without (leafCid, proof) =? await self.getBlockCidAndProof(treeCid, index), err:
return failure(err)
@ -80,29 +78,15 @@ method getBlockAndProof*(self: TreeReader, treeCid: Cid, index: Natural): Future
return success((blk, proof))
method getBlocks*(self: TreeReader, treeCid: Cid, leavesCount: Natural): Future[?!AsyncIter[?!Block]] {.async.} =
proc getBlocks*(self: TreeReader, treeCid: Cid, leavesCount: Natural): Future[?!AsyncIter[?!Block]] {.async.} =
without tree =? await self.getTree(treeCid), err:
return failure(err)
var iter = AsyncIter[?!Block]()
proc checkLen(index: Natural): void =
if index >= leavesCount:
iter.finish
checkLen(0)
var index = 0
proc next(): Future[?!Block] {.async.} =
if not iter.finished:
let iter = Iter.fromSlice(0..<leavesCount)
.map(proc (index: int): Future[?!Block] {.async.} =
without leaf =? tree.getLeaf(index), err:
inc index
checkLen(index)
return failure(err)
inc index
checkLen(index)
without leafCid =? Cid.init(treeCid.cidver, treeCid.mcodec, leaf).mapFailure, err:
return failure(err)
@ -110,10 +94,7 @@ method getBlocks*(self: TreeReader, treeCid: Cid, leavesCount: Natural): Future[
return failure(err)
return success(blk)
else:
return failure("No more elements for tree with cid " & $treeCid)
iter.next = next
)
return success(iter)
proc new*(T: type TreeReader, treeCacheCap = DefaultTreeCacheCapacity): TreeReader =

View File

@ -35,13 +35,6 @@ proc orElse*[A](a, b: Option[A]): Option[A] =
else:
b
template `..<`*(a, b: untyped): untyped =
## A shortcut for `a .. pred(b)`.
## ```
## for i in 5 ..< 9:
## echo i # => 5; 6; 7; 8
## ```
a .. (when b is BackwardsIndex: succ(b) else: pred(b))
when not declared(parseDuration): # Odd code formatting to minimize diff v. mainLine
const Whitespace = {' ', '\t', '\v', '\r', '\l', '\f'}

View File

@ -1,13 +1,16 @@
import std/sugar
import pkg/questionable
import pkg/chronos
import pkg/upraises
type
MapItem*[T, U] = proc(fut: T): U {.upraises: [CatchableError], gcsafe, closure.}
NextItem*[T] = proc(): T {.upraises: [CatchableError], gcsafe, closure.}
Function*[T, U] = proc(fut: T): U {.upraises: [CatchableError], gcsafe, closure.}
IsFinished* = proc(): bool {.upraises: [], gcsafe, closure.}
GenNext*[T] = proc(): T {.upraises: [CatchableError], gcsafe, closure.}
Iter*[T] = ref object
finished*: bool
next*: NextItem[T]
finished: bool
next*: GenNext[T]
AsyncIter*[T] = Iter[Future[T]]
proc finish*[T](self: Iter[T]): void =
@ -20,66 +23,119 @@ iterator items*[T](self: Iter[T]): T =
while not self.finished:
yield self.next()
proc map*[T, U](wrappedIter: Iter[T], mapItem: MapItem[T, U]): Iter[U] =
var iter = Iter[U]()
proc checkFinish(): void =
if wrappedIter.finished:
iter.finish
checkFinish()
proc next(): U {.upraises: [CatchableError].} =
if not iter.finished:
let fut = wrappedIter.next()
checkFinish()
return mapItem(fut)
else:
raise newException(CatchableError, "Iterator finished, but next element was requested")
iter.next = next
return iter
proc prefetch*[T](wrappedIter: Iter[T], n: Positive): Iter[T] =
var ringBuf = newSeq[T](n)
var wrappedLen = int.high
proc map*[T, U](fut: Future[T], fn: Function[T, U]): Future[U] {.async.} =
let t = await fut
fn(t)
proc new*[T](_: type Iter, genNext: GenNext[T], isFinished: IsFinished, finishOnErr: bool = true): Iter[T] =
var iter = Iter[T]()
proc tryFetch(i: int): void =
if not wrappedIter.finished:
let res = wrappedIter.next()
ringBuf[i mod n] = res
if wrappedIter.finished:
wrappedLen = min(i + 1, wrappedLen)
else:
if i == 0:
wrappedLen = 0
proc checkLen(i: int): void =
if i >= wrappedLen:
iter.finish
# initialize buf with n prefetched values
for i in 0..<n:
tryFetch(i)
checkLen(0)
var i = 0
proc next(): T {.upraises: [CatchableError].} =
if not iter.finished:
let fut = ringBuf[i mod n]
# prefetch a value
tryFetch(i + n)
inc i
checkLen(i)
return fut
var item: T
try:
item = genNext()
except CatchableError as err:
if finishOnErr or isFinished():
iter.finish
raise err
if isFinished():
iter.finish
return item
else:
raise newException(CatchableError, "Iterator finished, but next element was requested")
raise newException(CatchableError, "Iterator is finished but next item was requested")
if isFinished():
iter.finish
iter.next = next
return iter
proc fromItems*[T](_: type Iter, items: openArray[T]): Iter[T] =
## Create new iterator from items
##
Iter.fromSlice(0..<items.len)
.map((i) => items[i])
proc fromSlice*[U, V: Ordinal](_: type Iter, slice: HSlice[U, V]): Iter[U] =
## Creates new iterator from slice
##
Iter.fromRange(slice.a.int, slice.b.int, 1)
proc fromRange*[U, V, S: Ordinal](_: type Iter, a: U, b: V, step: S = 1): Iter[U] =
## Creates new iterator in range a..b with specified step (default 1)
##
var i = a
proc genNext(): U =
let u = i
inc(i, step)
u
proc isFinished(): bool =
(step > 0 and i > b) or
(step < 0 and i < b)
Iter.new(genNext, isFinished)
proc map*[T, U](iter: Iter[T], fn: Function[T, U]): Iter[U] =
Iter.new(
genNext = () => fn(iter.next()),
isFinished = () => iter.finished
)
proc filter*[T](iter: Iter[T], predicate: Function[T, bool]): Iter[T] =
var nextItem: T
proc tryFetch(): void =
while not iter.finished:
let item = iter.next()
if predicate(item):
nextItem = some(item)
break
proc genNext(): T =
let t = nextItem
tryFetch()
return t
proc isFinished(): bool =
iter.finished
tryFetch()
Iter.new(genNext, isFinished)
proc prefetch*[T](iter: Iter[T], n: Positive): Iter[T] =
var ringBuf = newSeq[T](n)
var iterLen = int.high
var i = 0
proc tryFetch(j: int): void =
if not iter.finished:
let item = iter.next()
ringBuf[j mod n] = item
if iter.finished:
iterLen = min(j + 1, iterLen)
else:
if j == 0:
iterLen = 0
proc genNext(): T =
let item = ringBuf[i mod n]
tryFetch(i + n)
inc i
return item
proc isFinished(): bool =
i >= iterLen
# initialize ringBuf with n prefetched values
for j in 0..<n:
tryFetch(j)
Iter.new(genNext, isFinished)

View File

@ -44,20 +44,9 @@ proc makeManifestAndTree*(blocks: seq[Block]): ?!(Manifest, MerkleTree) =
return failure("Blocks list was empty")
let
mcodec = (? blocks[0].cid.mhash.mapFailure).mcodec
datasetSize = blocks.mapIt(it.data.len).foldl(a + b)
blockSize = blocks.mapIt(it.data.len).foldl(max(a, b))
var builder = ? MerkleTreeBuilder.init(mcodec)
for b in blocks:
let mhash = ? b.cid.mhash.mapFailure
if mhash.mcodec != mcodec:
return failure("Blocks are not using the same codec")
? builder.addLeaf(mhash)
let
tree = ? builder.build()
tree = ? MerkleTree.init(blocks.mapIt(it.cid))
treeBlk = ? Block.new(tree.encode())
manifest = Manifest.new(
treeCid = treeBlk.cid,
@ -65,7 +54,7 @@ proc makeManifestAndTree*(blocks: seq[Block]): ?!(Manifest, MerkleTree) =
blockSize = NBytes(blockSize),
datasetSize = NBytes(datasetSize),
version = CIDv1,
hcodec = mcodec
hcodec = tree.mcodec
)
return success((manifest, tree))

View File

@ -40,7 +40,6 @@ method getBlockExpirations*(self: MockRepoStore, maxNumber: int, offset: int): F
self.getBeOffset = offset
var iter = AsyncIter[?BlockExpiration]()
iter.finished = false
self.iteratorIndex = offset
var numberLeft = maxNumber