diff --git a/codex/blockexchange/engine/engine.nim b/codex/blockexchange/engine/engine.nim index 64aa624c..eabdfc11 100644 --- a/codex/blockexchange/engine/engine.nim +++ b/codex/blockexchange/engine/engine.nim @@ -121,11 +121,7 @@ proc stop*(b: BlockExcEngine) {.async.} = trace "NetworkStore stopped" -proc sendWantHave( - b: BlockExcEngine, - address: BlockAddress, - selectedPeer: BlockExcPeerCtx, - peers: seq[BlockExcPeerCtx]): Future[void] {.async.} = +proc sendWantHave(b: BlockExcEngine, address: BlockAddress, selectedPeer: BlockExcPeerCtx, peers: seq[BlockExcPeerCtx]): Future[void] {.async.} = trace "Sending wantHave request to peers", address for p in peers: if p != selectedPeer: @@ -136,10 +132,11 @@ proc sendWantHave( @[address], wantType = WantType.WantHave) # we only want to know if the peer has the block -proc sendWantBlock( - b: BlockExcEngine, - address: BlockAddress, - blockPeer: BlockExcPeerCtx): Future[void] {.async.} = +proc sendWantBlock(b: BlockExcEngine, address: BlockAddress, blockPeer: BlockExcPeerCtx): Future[void] {.async.} = + let cid = if address.leaf: + address.treeCid + else: + address.cid trace "Sending wantBlock request to", peer = blockPeer.id, address await b.network.request.sendWantList( blockPeer.id, @@ -177,42 +174,125 @@ proc monitorBlockHandle(b: BlockExcEngine, handle: Future[Block], address: Block proc requestBlock*( b: BlockExcEngine, - address: BlockAddress, + cid: Cid, + timeout = DefaultBlockTimeout): Future[Block] {.async.} = + trace "Begin block request", cid, peers = b.peers.len + + if b.pendingBlocks.isInFlight(cid): + trace "Request handle already pending", cid + return await b.pendingBlocks.getWantHandle(cid, timeout) + + let + blk = b.pendingBlocks.getWantHandle(cid, timeout) + address = BlockAddress(leaf: false, cid: cid) + + trace "Selecting peers who have", address + var + peers = b.peers.selectCheapest(address) + + without blockPeer =? b.findCheapestPeerForBlock(peers): + trace "No peers to request blocks from. Queue discovery...", cid + b.discovery.queueFindBlocksReq(@[cid]) + return await blk + + asyncSpawn b.monitorBlockHandle(blk, address, blockPeer.id) + b.pendingBlocks.setInFlight(cid, true) + await b.sendWantBlock(address, blockPeer) + + codex_block_exchange_want_block_lists_sent.inc() + + if (peers.len - 1) == 0: + trace "No peers to send want list to", cid + b.discovery.queueFindBlocksReq(@[cid]) + return await blk + + await b.sendWantHave(address, blockPeer, toSeq(b.peers)) + + codex_block_exchange_want_have_lists_sent.inc() + + return await blk + +proc requestBlock( + b: BlockExcEngine, + treeReq: TreeReq, + index: Natural, timeout = DefaultBlockTimeout ): Future[Block] {.async.} = - let blockFuture = b.pendingBlocks.getWantHandle(address, timeout) + let address = BlockAddress(leaf: true, treeCid: treeReq.treeCid, index: index) - if b.pendingBlocks.isInFlight(address): + let handleOrCid = treeReq.getWantHandleOrCid(index, timeout) + if handleOrCid.resolved: + without blk =? await b.localStore.getBlock(handleOrCid.cid), err: + return await b.requestBlock(handleOrCid.cid, timeout) + return blk + + let blockFuture = handleOrCid.handle + + if treeReq.isInFlight(index): return await blockFuture let peers = b.peers.selectCheapest(address) if peers.len == 0: - b.discovery.queueFindBlocksReq(@[address.cidOrTreeCid]) + b.discovery.queueFindBlocksReq(@[treeReq.treeCid]) let maybePeer = if peers.len > 0: - peers[hash(address) mod peers.len].some + peers[index mod peers.len].some elif b.peers.len > 0: - toSeq(b.peers)[hash(address) mod b.peers.len].some + toSeq(b.peers)[index mod b.peers.len].some else: BlockExcPeerCtx.none if peer =? maybePeer: asyncSpawn b.monitorBlockHandle(blockFuture, address, peer.id) - b.pendingBlocks.setInFlight(address) + treeReq.trySetInFlight(index) await b.sendWantBlock(address, peer) - codex_block_exchange_want_block_lists_sent.inc() + codexBlockExchangeWantBlockListsSent.inc() await b.sendWantHave(address, peer, toSeq(b.peers)) - codex_block_exchange_want_have_lists_sent.inc() + codexBlockExchangeWantHaveListsSent.inc() return await blockFuture proc requestBlock*( b: BlockExcEngine, - cid: Cid, + treeCid: Cid, + index: Natural, + merkleRoot: MultiHash, timeout = DefaultBlockTimeout ): Future[Block] = - b.requestBlock(BlockAddress.init(cid)) + without treeReq =? b.pendingBlocks.getOrPutTreeReq(treeCid, Natural.none, merkleRoot), err: + raise err + + return b.requestBlock(treeReq, index, timeout) + +proc requestBlocks*( + b: BlockExcEngine, + treeCid: Cid, + leavesCount: Natural, + merkleRoot: MultiHash, + timeout = DefaultBlockTimeout +): ?!AsyncIter[Block] = + without treeReq =? b.pendingBlocks.getOrPutTreeReq(treeCid, leavesCount.some, merkleRoot), err: + return failure(err) + + var + iter = AsyncIter[Block]() + index = 0 + + proc next(): Future[Block] = + if index < leavesCount: + let fut = b.requestBlock(treeReq, index, timeout) + inc index + if index >= leavesCount: + iter.finished = true + return fut + else: + let fut = newFuture[Block]("engine.requestBlocks") + fut.fail(newException(CodexError, "No more elements for tree with cid " & $treeCid)) + return fut + + iter.next = next + return success(iter) proc blockPresenceHandler*( b: BlockExcEngine, @@ -287,12 +367,7 @@ proc resolveBlocks*(b: BlockExcEngine, blocksDelivery: seq[BlockDelivery]) {.asy b.pendingBlocks.resolve(blocksDelivery) await b.scheduleTasks(blocksDelivery) - var cids = initHashSet[Cid]() - for bd in blocksDelivery: - cids.incl(bd.blk.cid) - if bd.address.leaf: - cids.incl(bd.address.treeCid) - b.discovery.queueProvideBlocksReq(cids.toSeq) + b.discovery.queueProvideBlocksReq(blocksDelivery.mapIt( it.blk.cid )) proc resolveBlocks*(b: BlockExcEngine, blocks: seq[Block]) {.async.} = await b.resolveBlocks(blocks.mapIt(BlockDelivery(blk: it, address: BlockAddress(leaf: false, cid: it.cid)))) @@ -310,69 +385,18 @@ proc payForBlocks(engine: BlockExcEngine, trace "Sending payment for blocks", price await sendPayment(peer.id, payment) -proc validateBlockDelivery( - b: BlockExcEngine, - bd: BlockDelivery -): ?!void = - if bd.address notin b.pendingBlocks: - return failure("Received block is not currently a pending block") - - if bd.address.leaf: - without proof =? bd.proof: - return failure("Missing proof") - - if proof.index != bd.address.index: - return failure("Proof index " & $proof.index & " doesn't match leaf index " & $bd.address.index) - - without leaf =? bd.blk.cid.mhash.mapFailure, err: - return failure("Unable to get mhash from cid for block, nested err: " & err.msg) - - without treeRoot =? bd.address.treeCid.mhash.mapFailure, err: - return failure("Unable to get mhash from treeCid for block, nested err: " & err.msg) - - without verifyOutcome =? proof.verifyLeaf(leaf, treeRoot), err: - return failure("Unable to verify proof for block, nested err: " & err.msg) - - if not verifyOutcome: - return failure("Provided inclusion proof is invalid") - else: # not leaf - if bd.address.cid != bd.blk.cid: - return failure("Delivery cid " & $bd.address.cid & " doesn't match block cid " & $bd.blk.cid) - - return success() - proc blocksDeliveryHandler*( b: BlockExcEngine, peer: PeerId, blocksDelivery: seq[BlockDelivery]) {.async.} = trace "Got blocks from peer", peer, len = blocksDelivery.len - var validatedBlocksDelivery: seq[BlockDelivery] for bd in blocksDelivery: - logScope: - peer = peer - address = bd.address + if isErr (await b.localStore.putBlock(bd.blk)): + trace "Unable to store block", cid = bd.blk.cid - if err =? b.validateBlockDelivery(bd).errorOption: - warn "Block validation failed", msg = err.msg - continue - - if err =? (await b.localStore.putBlock(bd.blk)).errorOption: - error "Unable to store block", err = err.msg - continue - - if bd.address.leaf: - without proof =? bd.proof: - error "Proof expected for a leaf block delivery" - continue - if err =? (await b.localStore.putBlockCidAndProof(bd.address.treeCid, bd.address.index, bd.blk.cid, proof)).errorOption: - error "Unable to store proof and cid for a block" - continue - - validatedBlocksDelivery.add(bd) - - await b.resolveBlocks(validatedBlocksDelivery) - codex_block_exchange_blocks_received.inc(validatedBlocksDelivery.len.int64) + await b.resolveBlocks(blocksDelivery) + codexBlockExchangeBlocksReceived.inc(blocksDelivery.len.int64) let peerCtx = b.peers.get(peer) @@ -401,11 +425,11 @@ proc wantListHandler*( logScope: peer = peerCtx.id - address = e.address + # cid = e.cid wantType = $e.wantType if idx < 0: # updating entry - trace "Processing new want list entry" + trace "Processing new want list entry", address = e.address let have = await e.address in b.localStore @@ -417,21 +441,21 @@ proc wantListHandler*( codex_block_exchange_want_have_lists_received.inc() if not have and e.sendDontHave: - trace "Adding dont have entry to presence response" + trace "Adding dont have entry to presence response", address = e.address presence.add( BlockPresence( address: e.address, `type`: BlockPresenceType.DontHave, price: price)) elif have and e.wantType == WantType.WantHave: - trace "Adding have entry to presence response" + trace "Adding have entry to presence response", address = e.address presence.add( BlockPresence( address: e.address, `type`: BlockPresenceType.Have, price: price)) elif e.wantType == WantType.WantBlock: - trace "Added entry to peer's want blocks list" + trace "Added entry to peer's want blocks list", address = e.address peerCtx.peerWants.add(e) codex_block_exchange_want_block_lists_received.inc() else: @@ -482,6 +506,18 @@ proc paymentHandler*( else: context.paymentChannel = engine.wallet.acceptChannel(payment).option +proc onTreeHandler(b: BlockExcEngine, tree: MerkleTree): Future[?!void] {.async.} = + trace "Handling tree" + + without treeBlk =? Block.new(tree.encode()), err: + return failure(err) + + if err =? (await b.localStore.putBlock(treeBlk)).errorOption: + return failure("Unable to store merkle tree block " & $treeBlk.cid & ", nested err: " & err.msg) + + return success() + + proc setupPeer*(b: BlockExcEngine, peer: PeerId) {.async.} = ## Perform initial setup, such as want ## list exchange @@ -543,7 +579,7 @@ proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} = BlockDelivery(address: e.address, blk: blkAndProof[0], proof: blkAndProof[1].some) ) else: - (await b.localStore.getBlock(e.address)).map( + (await b.localStore.getBlock(e.address.cid)).map( (blk: Block) => BlockDelivery(address: e.address, blk: blk, proof: MerkleProof.none) ) @@ -563,7 +599,7 @@ proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} = blocksDelivery ) - codex_block_exchange_blocks_sent.inc(blocksDelivery.len.int64) + codexBlockExchangeBlocksSent.inc(blocksDelivery.len.int64) trace "About to remove entries from peerWants", blocks = blocksDelivery.len, items = task.peerWants.len # Remove successfully sent blocks @@ -644,6 +680,11 @@ proc new*( proc paymentHandler(peer: PeerId, payment: SignedState): Future[void] {.gcsafe.} = engine.paymentHandler(peer, payment) + proc onTree(tree: MerkleTree): Future[void] {.gcsafe, async.} = + if err =? (await engine.onTreeHandler(tree)).errorOption: + echo "Error handling a tree" & err.msg # TODO + # error "Error handling a tree", msg = err.msg + network.handlers = BlockExcHandlers( onWantList: blockWantListHandler, onBlocksDelivery: blocksDeliveryHandler, @@ -651,4 +692,6 @@ proc new*( onAccount: accountHandler, onPayment: paymentHandler) + pendingBlocks.onTree = onTree + return engine diff --git a/codex/blockexchange/engine/pendingblocks.nim b/codex/blockexchange/engine/pendingblocks.nim index 328f3ead..9ece50c1 100644 --- a/codex/blockexchange/engine/pendingblocks.nim +++ b/codex/blockexchange/engine/pendingblocks.nim @@ -14,15 +14,20 @@ import pkg/upraises push: {.upraises: [].} +import ../../blocktype import pkg/chronicles +import pkg/questionable +import pkg/questionable/options +import pkg/questionable/results import pkg/chronos import pkg/libp2p import pkg/metrics import pkg/questionable/results import ../protobuf/blockexc -import ../../blocktype + import ../../merkletree +import ../../utils logScope: topics = "codex pendingblocks" @@ -39,12 +44,121 @@ type inFlight*: bool startTime*: int64 + LeafReq* = object + case delivered*: bool + of false: + handle*: Future[Block] + inFlight*: bool + of true: + leaf: MultiHash + blkCid*: Cid + + TreeReq* = ref object + leaves*: Table[Natural, LeafReq] + deliveredCount*: Natural + leavesCount*: ?Natural + treeRoot*: MultiHash + treeCid*: Cid + + TreeHandler* = proc(tree: MerkleTree): Future[void] {.gcsafe.} + PendingBlocksManager* = ref object of RootObj - blocks*: Table[BlockAddress, BlockReq] # pending Block requests + blocks*: Table[Cid, BlockReq] # pending Block requests + trees*: Table[Cid, TreeReq] + onTree*: TreeHandler proc updatePendingBlockGauge(p: PendingBlocksManager) = codex_block_exchange_pending_block_requests.set(p.blocks.len.int64) +type + BlockHandleOrCid = object + case resolved*: bool + of true: + cid*: Cid + else: + handle*: Future[Block] + +proc buildTree(treeReq: TreeReq): ?!MerkleTree = + trace "Building a merkle tree from leaves", treeCid = treeReq.treeCid, leavesCount = treeReq.leavesCount + + without leavesCount =? treeReq.leavesCount: + return failure("Leaves count is none, cannot build a tree") + + var builder = ? MerkleTreeBuilder.init(treeReq.treeRoot.mcodec) + for i in 0.. a.mapFailure) proc emptyDigest*(version: CidVersion, hcodec: MultiCodec, dcodec: MultiCodec): ?!MultiHash = emptyCid(version, hcodec, dcodec) diff --git a/codex/codex.nim b/codex/codex.nim index bc0d53ff..c1b9de62 100644 --- a/codex/codex.nim +++ b/codex/codex.nim @@ -231,6 +231,8 @@ proc new*( wallet = WalletRef.new(EthPrivateKey.random()) network = BlockExcNetwork.new(switch) + treeReader = TreeReader.new() + repoData = case config.repoKind of repoFS: Datastore(FSDatastore.new($config.dataDir, depth = 5) .expect("Should create repo file data store!")) diff --git a/codex/erasure/erasure.nim b/codex/erasure/erasure.nim index 35109a99..91819145 100644 --- a/codex/erasure/erasure.nim +++ b/codex/erasure/erasure.nim @@ -12,7 +12,6 @@ import pkg/upraises push: {.upraises: [].} import std/sequtils -import std/sugar import pkg/chronos import pkg/chronicles @@ -24,7 +23,6 @@ import ../merkletree import ../stores import ../blocktype as bt import ../utils -import ../utils/asynciter import pkg/stew/byteutils @@ -70,245 +68,142 @@ type decoderProvider*: DecoderProvider store*: BlockStore - EncodingParams = object - ecK: int - ecM: int - rounded: int - steps: int - blocksCount: int - -func indexToPos(steps, idx, step: int): int {.inline.} = - ## Convert an index to a position in the encoded - ## dataset - ## `idx` - the index to convert - ## `step` - the current step - ## `pos` - the position in the encoded dataset +proc encode*( + self: Erasure, + manifest: Manifest, + blocks: int, + parity: int +): Future[?!Manifest] {.async.} = + ## Encode a manifest into one that is erasure protected. ## - - (idx - step) div steps - -proc getPendingBlocks( - self: Erasure, - manifest: Manifest, - indicies: seq[int]): AsyncIter[(?!bt.Block, int)] = - ## Get pending blocks iterator - ## - - var - # request blocks from the store - pendingBlocks = indicies.map( (i: int) => - self.store.getBlock(BlockAddress.init(manifest.treeCid, i)).map((r: ?!bt.Block) => (r, i)) # Get the data blocks (first K) - ) - - proc isFinished(): bool = pendingBlocks.len == 0 - - proc genNext(): Future[(?!bt.Block, int)] {.async.} = - let completedFut = await one(pendingBlocks) - if (let i = pendingBlocks.find(completedFut); i >= 0): - pendingBlocks.del(i) - return await completedFut - else: - let (_, index) = await completedFut - raise newException(CatchableError, "Future for block id not found, tree cid: " & $manifest.treeCid & ", index: " & $index) - - Iter.new(genNext, isFinished) - -proc prepareEncodingData( - self: Erasure, - manifest: Manifest, - params: EncodingParams, - step: int, - data: ref seq[seq[byte]], - cids: ref seq[Cid], - emptyBlock: seq[byte]): Future[?!int] {.async.} = - ## Prepare data for encoding - ## - - let - indicies = toSeq(countup(step, params.rounded - 1, params.steps)) - pendingBlocksIter = self.getPendingBlocks(manifest, indicies.filterIt(it < manifest.blocksCount)) - - var resolved = 0 - for fut in pendingBlocksIter: - let (blkOrErr, idx) = await fut - without blk =? blkOrErr, err: - warn "Failed retreiving a block", treeCid = manifest.treeCid, idx, msg = err.msg - continue - - let pos = indexToPos(params.steps, idx, step) - shallowCopy(data[pos], if blk.isEmpty: emptyBlock else: blk.data) - cids[idx] = blk.cid - - resolved.inc() - - for idx in indicies.filterIt(it >= manifest.blocksCount): - let pos = indexToPos(params.steps, idx, step) - trace "Padding with empty block", idx - shallowCopy(data[pos], emptyBlock) - without emptyBlockCid =? emptyCid(manifest.version, manifest.hcodec, manifest.codec), err: - return failure(err) - cids[idx] = emptyBlockCid - - success(resolved) - -proc prepareDecodingData( - self: Erasure, - encoded: Manifest, - step: int, - data: ref seq[seq[byte]], - parityData: ref seq[seq[byte]], - cids: ref seq[Cid], - emptyBlock: seq[byte]): Future[?!(int, int)] {.async.} = - ## Prepare data for decoding - ## `encoded` - the encoded manifest - ## `step` - the current step - ## `data` - the data to be prepared - ## `parityData` - the parityData to be prepared - ## `cids` - cids of prepared data - ## `emptyBlock` - the empty block to be used for padding - ## - - let - indicies = toSeq(countup(step, encoded.blocksCount - 1, encoded.steps)) - pendingBlocksIter = self.getPendingBlocks(encoded, indicies) - - var - dataPieces = 0 - parityPieces = 0 - resolved = 0 - for fut in pendingBlocksIter: - # Continue to receive blocks until we have just enough for decoding - # or no more blocks can arrive - if resolved >= encoded.ecK: - break - - let (blkOrErr, idx) = await fut - without blk =? blkOrErr, err: - trace "Failed retreiving a block", idx, treeCid = encoded.treeCid, msg = err.msg - continue - - let - pos = indexToPos(encoded.steps, idx, step) - - logScope: - cid = blk.cid - idx = idx - pos = pos - step = step - empty = blk.isEmpty - - cids[idx] = blk.cid - if idx >= encoded.rounded: - trace "Retrieved parity block" - shallowCopy(parityData[pos - encoded.ecK], if blk.isEmpty: emptyBlock else: blk.data) - parityPieces.inc - else: - trace "Retrieved data block" - shallowCopy(data[pos], if blk.isEmpty: emptyBlock else: blk.data) - dataPieces.inc - - resolved.inc - - return success (dataPieces, parityPieces) - -proc init(_: type EncodingParams, manifest: Manifest, ecK: int, ecM: int): ?!EncodingParams = - if ecK > manifest.blocksCount: - return failure("Unable to encode manifest, not enough blocks, ecK = " & $ecK & ", blocksCount = " & $manifest.blocksCount) - - let - rounded = roundUp(manifest.blocksCount, ecK) - steps = divUp(manifest.blocksCount, ecK) - blocksCount = rounded + (steps * ecM) - - EncodingParams( - ecK: ecK, - ecM: ecM, - rounded: rounded, - steps: steps, - blocksCount: blocksCount - ).success - -proc encodeData( - self: Erasure, - manifest: Manifest, - params: EncodingParams - ): Future[?!Manifest] {.async.} = - ## Encode blocks pointed to by the protected manifest - ## - ## `manifest` - the manifest to encode + ## `manifest` - the original manifest to be encoded + ## `blocks` - the number of blocks to be encoded - K + ## `parity` - the number of parity blocks to generate - M ## logScope: - steps = params.steps - rounded_blocks = params.rounded - blocks_count = params.blocksCount - ecK = params.ecK - ecM = params.ecM + original_cid = manifest.cid.get() + original_len = manifest.blocksCount + blocks = blocks + parity = parity + + trace "Erasure coding manifest", blocks, parity + + without tree =? await self.store.getTree(manifest.treeCid), err: + return err.failure + + let leaves = tree.leaves + + let + rounded = roundUp(manifest.blocksCount, blocks) + steps = divUp(manifest.blocksCount, blocks) + blocksCount = rounded + (steps * parity) + + var cids = newSeq[Cid](blocksCount) + + + # copy original manifest blocks + for i in 0..= encoded.ecK) or (idxPendingBlocks.len == 0): + break - without (dataPieces, parityPieces) =? - (await self.prepareDecodingData(encoded, step, data, parityData, cids, emptyBlock)), err: - trace "Unable to prepare data", error = err.msg - return failure(err) + let + done = await one(idxPendingBlocks) + idx = pendingBlocks.find(done) + + idxPendingBlocks.del(idxPendingBlocks.find(done)) + + without blk =? (await done), error: + trace "Failed retrieving block", error = error.msg + continue + + if idx >= encoded.ecK: + trace "Retrieved parity block", cid = blk.cid, idx + shallowCopy(parityData[idx - encoded.ecK], if blk.isEmpty: emptyBlock else: blk.data) + else: + trace "Retrieved data block", cid = blk.cid, idx + shallowCopy(data[idx], if blk.isEmpty: emptyBlock else: blk.data) + + resolved.inc + + let + dataPieces = data.filterIt( it.len > 0 ).len + parityPieces = parityData.filterIt( it.len > 0 ).len if dataPieces >= encoded.ecK: - trace "Retrieved all the required data blocks" + trace "Retrieved all the required data blocks", data = dataPieces, parity = parityPieces continue - trace "Erasure decoding data" + trace "Erasure decoding data", data = dataPieces, parity = parityPieces if ( - let err = decoder.decode(data[], parityData[], recovered); + let err = decoder.decode(data, parityData, recovered); err.isErr): - trace "Unable to decode data!", err = $err.error + trace "Unable to decode manifest!", err = $err.error return failure($err.error) for i in 0.. i < tree.leavesCount) - - if err =? (await self.store.putSomeProofs(tree, idxIter)).errorOption: - return failure(err) - let decoded = Manifest.new(encoded) return decoded.success diff --git a/codex/manifest/coders.nim b/codex/manifest/coders.nim index db504617..c691800e 100644 --- a/codex/manifest/coders.nim +++ b/codex/manifest/coders.nim @@ -45,27 +45,31 @@ proc encode*(_: DagPBCoder, manifest: Manifest): ?!seq[byte] = # optional uint32 originalDatasetSize = 4; # size of the original dataset # } # Message Header { - # optional bytes treeCid = 1; # cid (root) of the tree - # optional uint32 blockSize = 2; # size of a single block - # optional uint64 datasetSize = 3; # size of the dataset - # optional ErasureInfo erasure = 4; # erasure coding info + # optional bytes treeCid = 1; # the cid of the tree + # optional bytes treeRoot = 2; # the root hash of the tree + # optional uint32 blockSize = 3; # size of a single block + # optional uint64 originalBytes = 4;# exact file size + # optional ErasureInfo erasure = 5; # erasure coding info # } # ``` # # var treeRootVBuf = initVBuffer() var header = initProtoBuffer() header.write(1, manifest.treeCid.data.buffer) - header.write(2, manifest.blockSize.uint32) - header.write(3, manifest.datasetSize.uint32) + # treeRootVBuf.write(manifest.treeRoot) + header.write(2, manifest.treeRoot.data.buffer) + header.write(3, manifest.blockSize.uint32) + header.write(4, manifest.datasetSize.uint32) if manifest.protected: var erasureInfo = initProtoBuffer() erasureInfo.write(1, manifest.ecK.uint32) erasureInfo.write(2, manifest.ecM.uint32) - erasureInfo.write(3, manifest.originalTreeCid.data.buffer) - erasureInfo.write(4, manifest.originalDatasetSize.uint32) + erasureInfo.write(3, manifest.originalCid.data.buffer) + erasureInfo.write(4, manifest.originalTreeRoot.data.buffer) + erasureInfo.write(5, manifest.originalDatasetSize.uint32) erasureInfo.finish() - header.write(4, erasureInfo) + header.write(5, erasureInfo) pbNode.write(1, header) # set the treeCid as the data field pbNode.finish() @@ -81,7 +85,9 @@ proc decode*(_: DagPBCoder, data: openArray[byte]): ?!Manifest = pbHeader: ProtoBuffer pbErasureInfo: ProtoBuffer treeCidBuf: seq[byte] + treeRootBuf: seq[byte] originalTreeCid: seq[byte] + originalTreeRootBuf: seq[byte] datasetSize: uint32 blockSize: uint32 originalDatasetSize: uint32 @@ -95,13 +101,16 @@ proc decode*(_: DagPBCoder, data: openArray[byte]): ?!Manifest = if pbHeader.getField(1, treeCidBuf).isErr: return failure("Unable to decode `treeCid` from manifest!") - if pbHeader.getField(2, blockSize).isErr: + if pbHeader.getField(2, treeRootBuf).isErr: + return failure("Unable to decode `treeRoot` from manifest!") + + if pbHeader.getField(3, blockSize).isErr: return failure("Unable to decode `blockSize` from manifest!") - if pbHeader.getField(3, datasetSize).isErr: + if pbHeader.getField(4, datasetSize).isErr: return failure("Unable to decode `datasetSize` from manifest!") - if pbHeader.getField(4, pbErasureInfo).isErr: + if pbHeader.getField(5, pbErasureInfo).isErr: return failure("Unable to decode `erasureInfo` from manifest!") let protected = pbErasureInfo.buffer.len > 0 @@ -114,18 +123,34 @@ proc decode*(_: DagPBCoder, data: openArray[byte]): ?!Manifest = if pbErasureInfo.getField(3, originalTreeCid).isErr: return failure("Unable to decode `originalTreeCid` from manifest!") + + if pbErasureInfo.getField(4, originalTreeRootBuf).isErr: + return failure("Unable to decode `originalTreeRoot` from manifest!") - if pbErasureInfo.getField(4, originalDatasetSize).isErr: + if pbErasureInfo.getField(5, originalDatasetSize).isErr: return failure("Unable to decode `originalDatasetSize` from manifest!") + var + treeRoot: MultiHash + originalTreeRoot: MultiHash let treeCid = ? Cid.init(treeCidBuf).mapFailure + treeRootRes = ? MultiHash.decode(treeRootBuf, treeRoot).mapFailure + + if treeRootRes != treeRootBuf.len: + return failure("Error decoding `treeRoot` as MultiHash") + + if protected: + let originalTreeRootRes = ? MultiHash.decode(originalTreeRootBuf, originalTreeRoot).mapFailure + if originalTreeRootRes != originalTreeRootBuf.len: + return failure("Error decoding `originalTreeRoot` as MultiHash") let self = if protected: Manifest.new( treeCid = treeCid, + treeRoot = treeRoot, datasetSize = datasetSize.NBytes, blockSize = blockSize.NBytes, version = treeCid.cidver, @@ -134,11 +159,13 @@ proc decode*(_: DagPBCoder, data: openArray[byte]): ?!Manifest = ecK = ecK.int, ecM = ecM.int, originalTreeCid = ? Cid.init(originalTreeCid).mapFailure, + originalTreeRoot = originalTreeRoot, originalDatasetSize = originalDatasetSize.NBytes ) else: Manifest.new( treeCid = treeCid, + treeRoot = treeRoot, datasetSize = datasetSize.NBytes, blockSize = blockSize.NBytes, version = treeCid.cidver, diff --git a/codex/manifest/manifest.nim b/codex/manifest/manifest.nim index 53c37ce3..bfd174ff 100644 --- a/codex/manifest/manifest.nim +++ b/codex/manifest/manifest.nim @@ -30,17 +30,19 @@ export types type Manifest* = ref object of RootObj - treeCid {.serialize.}: Cid # Root of the merkle tree - datasetSize {.serialize.}: NBytes # Total size of all blocks - blockSize {.serialize.}: NBytes # Size of each contained block (might not be needed if blocks are len-prefixed) - version: CidVersion # Cid version - hcodec: MultiCodec # Multihash codec - codec: MultiCodec # Data set codec - case protected {.serialize.}: bool # Protected datasets have erasure coded info + treeCid: Cid # Cid of the merkle tree + treeRoot: MultiHash # Root hash of the merkle tree + datasetSize: NBytes # Total size of all blocks + blockSize: NBytes # Size of each contained block (might not be needed if blocks are len-prefixed) + version: CidVersion # Cid version + hcodec: MultiCodec # Multihash codec + codec: MultiCodec # Data set codec + case protected: bool # Protected datasets have erasure coded info of true: - ecK: int # Number of blocks to encode - ecM: int # Number of resulting parity blocks - originalTreeCid: Cid # The original root of the dataset being erasure coded + ecK: int # Number of blocks to encode + ecM: int # Number of resulting parity blocks + originalTreeCid: Cid # The original Cid of the dataset being erasure coded + originalTreeRoot: MultiHash originalDatasetSize: NBytes else: discard @@ -73,18 +75,24 @@ proc ecK*(self: Manifest): int = proc ecM*(self: Manifest): int = self.ecM -proc originalTreeCid*(self: Manifest): Cid = +proc originalCid*(self: Manifest): Cid = self.originalTreeCid proc originalBlocksCount*(self: Manifest): int = divUp(self.originalDatasetSize.int, self.blockSize.int) +proc originalTreeRoot*(self: Manifest): MultiHash = + self.originalTreeRoot + proc originalDatasetSize*(self: Manifest): NBytes = self.originalDatasetSize proc treeCid*(self: Manifest): Cid = self.treeCid +proc treeRoot*(self: Manifest): MultiHash = + self.treeRoot + proc blocksCount*(self: Manifest): int = divUp(self.datasetSize.int, self.blockSize.int) @@ -132,6 +140,7 @@ proc cid*(self: Manifest): ?!Cid {.deprecated: "use treeCid instead".} = proc `==`*(a, b: Manifest): bool = (a.treeCid == b.treeCid) and + (a.treeRoot == b.treeRoot) and (a.datasetSize == b.datasetSize) and (a.blockSize == b.blockSize) and (a.version == b.version) and @@ -142,12 +151,14 @@ proc `==`*(a, b: Manifest): bool = (a.ecK == b.ecK) and (a.ecM == b.ecM) and (a.originalTreeCid == b.originalTreeCid) and + (a.originalTreeRoot == b.originalTreeRoot) and (a.originalDatasetSize == b.originalDatasetSize) else: true) proc `$`*(self: Manifest): string = "treeCid: " & $self.treeCid & + ", treeRoot: " & $self.treeRoot & ", datasetSize: " & $self.datasetSize & ", blockSize: " & $self.blockSize & ", version: " & $self.version & @@ -158,6 +169,7 @@ proc `$`*(self: Manifest): string = ", ecK: " & $self.ecK & ", ecM: " & $self.ecM & ", originalTreeCid: " & $self.originalTreeCid & + ", originalTreeRoot: " & $self.originalTreeRoot & ", originalDatasetSize: " & $self.originalDatasetSize else: "") @@ -169,6 +181,7 @@ proc `$`*(self: Manifest): string = proc new*( T: type Manifest, treeCid: Cid, + treeRoot: MultiHash, blockSize: NBytes, datasetSize: NBytes, version: CidVersion = CIDv1, @@ -179,6 +192,7 @@ proc new*( T( treeCid: treeCid, + treeRoot: treeRoot, blockSize: blockSize, datasetSize: datasetSize, version: version, @@ -190,6 +204,7 @@ proc new*( T: type Manifest, manifest: Manifest, treeCid: Cid, + treeRoot: MultiHash, datasetSize: NBytes, ecK, ecM: int ): Manifest = @@ -198,6 +213,7 @@ proc new*( ## Manifest( treeCid: treeCid, + treeRoot: treeRoot, datasetSize: datasetSize, version: manifest.version, codec: manifest.codec, @@ -206,6 +222,7 @@ proc new*( protected: true, ecK: ecK, ecM: ecM, originalTreeCid: manifest.treeCid, + originalTreeRoot: manifest.treeRoot, originalDatasetSize: manifest.datasetSize) proc new*( @@ -216,7 +233,8 @@ proc new*( ## erasure protected one ## Manifest( - treeCid: manifest.originalTreeCid, + treeCid: manifest.originalCid, + treeRoot: manifest.originalTreeRoot, datasetSize: manifest.originalDatasetSize, version: manifest.version, codec: manifest.codec, @@ -236,6 +254,7 @@ proc new*( proc new*( T: type Manifest, treeCid: Cid, + treeRoot: MultiHash, datasetSize: NBytes, blockSize: NBytes, version: CidVersion, @@ -244,10 +263,12 @@ proc new*( ecK: int, ecM: int, originalTreeCid: Cid, + originalTreeRoot: MultiHash, originalDatasetSize: NBytes ): Manifest = Manifest( treeCid: treeCid, + treeRoot: treeRoot, datasetSize: datasetSize, blockSize: blockSize, version: version, @@ -257,5 +278,6 @@ proc new*( ecK: ecK, ecM: ecM, originalTreeCid: originalTreeCid, + originalTreeRoot: originalTreeRoot, originalDatasetSize: originalDatasetSize ) diff --git a/codex/merkletree/merkletree.nim b/codex/merkletree/merkletree.nim index 7612f0d6..3082bee9 100644 --- a/codex/merkletree/merkletree.nim +++ b/codex/merkletree/merkletree.nim @@ -14,10 +14,9 @@ import std/sugar import std/algorithm import pkg/chronicles -import pkg/questionable import pkg/questionable/results import pkg/nimcrypto/sha2 -import pkg/libp2p/[cid, multicodec, multihash, vbuffer] +import pkg/libp2p/[multicodec, multihash, vbuffer] import pkg/stew/byteutils import ../errors @@ -187,16 +186,8 @@ proc root*(self: MerkleTree): MultiHash = let rootIndex = self.len - 1 self.nodeBufferToMultiHash(rootIndex) -proc rootCid*(self: MerkleTree, version = CIDv1, dataCodec = multiCodec("raw")): ?!Cid = - Cid.init(version, dataCodec, self.root).mapFailure - -iterator leaves*(self: MerkleTree): MultiHash = - for i in 0.. self.nodeBufferToMultiHash(i)) proc leavesCount*(self: MerkleTree): Natural = self.leavesCount @@ -207,10 +198,6 @@ proc getLeaf*(self: MerkleTree, index: Natural): ?!MultiHash = success(self.nodeBufferToMultiHash(index)) -proc getLeafCid*(self: MerkleTree, index: Natural, version = CIDv1, dataCodec = multiCodec("raw")): ?!Cid = - let leaf = ? self.getLeaf(index) - Cid.init(version, dataCodec, leaf).mapFailure - proc height*(self: MerkleTree): Natural = computeTreeHeight(self.leavesCount) @@ -269,7 +256,7 @@ proc `==`*(a, b: MerkleTree): bool = (a.leavesCount == b.leavesCount) and (a.nodesBuffer == b.nodesBuffer) -proc init*( +func init*( T: type MerkleTree, mcodec: MultiCodec, digestSize: Natural, @@ -290,37 +277,6 @@ proc init*( else: failure("Expected nodesBuffer len to be " & $(totalNodes * digestSize) & " but was " & $nodesBuffer.len) -proc init*( - T: type MerkleTree, - leaves: openArray[MultiHash] -): ?!MerkleTree = - without leaf =? leaves.?[0]: - return failure("At least one leaf is required") - - var builder = ? MerkleTreeBuilder.init(mcodec = leaf.mcodec) - - for l in leaves: - let res = builder.addLeaf(l) - if res.isErr: - return failure(res.error) - - builder.build() - -proc init*( - T: type MerkleTree, - cids: openArray[Cid] -): ?!MerkleTree = - var leaves = newSeq[MultiHash]() - - for cid in cids: - let res = cid.mhash.mapFailure - if res.isErr: - return failure(res.error) - else: - leaves.add(res.value) - - MerkleTree.init(leaves) - ########################################################### # MerkleProof ########################################################### diff --git a/codex/node.nim b/codex/node.nim index 2ae9d385..70c26e7c 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -116,15 +116,20 @@ proc fetchBatched*( trace "Fetching blocks in batches of", size = batchSize - let iter = Iter.fromSlice(0.. node.blockStore.getBlock(BlockAddress.init(manifest.treeCid, i))) + without iter =? await node.blockStore.getBlocks(manifest.treeCid, manifest.blocksCount, manifest.treeRoot), err: + return failure(err) for batchNum in 0..= leavesCount: + iter.finish + + checkLen(0) + + var index = 0 + proc next(): Future[?!Block] {.async.} = + if not iter.finished: + without leaf =? tree.getLeaf(index), err: + inc index + checkLen(index) + return failure(err) + + inc index + checkLen(index) + + without leafCid =? Cid.init(treeCid.cidver, treeCid.mcodec, leaf).mapFailure, err: + return failure(err) + + without blk =? await self.getBlockFromStore(leafCid), err: + return failure(err) + + return success(blk) + else: + return failure("No more elements for tree with cid " & $treeCid) + + iter.next = next + return success(iter) + +proc new*(T: type TreeReader, treeCacheCap = DefaultTreeCacheCapacity): TreeReader = + TreeReader(treeCache: newLruCache[Cid, MerkleTree](treeCacheCap)) \ No newline at end of file diff --git a/codex/streams.nim b/codex/streams.nim index 0c3f573a..79e3b4e4 100644 --- a/codex/streams.nim +++ b/codex/streams.nim @@ -1,5 +1,6 @@ import ./streams/seekablestream import ./streams/storestream +import ./streams/seekablestorestream import ./streams/asyncstreamwrapper -export seekablestream, storestream, asyncstreamwrapper +export seekablestream, storestream, seekablestorestream, asyncstreamwrapper diff --git a/codex/streams/seekablestorestream.nim b/codex/streams/seekablestorestream.nim new file mode 100644 index 00000000..43992ed3 --- /dev/null +++ b/codex/streams/seekablestorestream.nim @@ -0,0 +1,123 @@ +## Nim-Dagger +## Copyright (c) 2022 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +import std/options + +import pkg/upraises + +push: {.upraises: [].} + +import pkg/chronos +import pkg/chronicles +import pkg/stew/ptrops + +import ../stores +import ../manifest +import ../blocktype +import ../utils + +import ./seekablestream + +export stores, blocktype, manifest, chronos + +logScope: + topics = "codex storestream" + +const + SeekableStoreStreamTrackerName* = "SeekableStoreStream" + +type + # Make SeekableStream from a sequence of blocks stored in Manifest + # (only original file data - see StoreStream.size) + SeekableStoreStream* = ref object of SeekableStream + store*: BlockStore # Store where to lookup block contents + manifest*: Manifest # List of block CIDs + pad*: bool # Pad last block to manifest.blockSize? + +method initStream*(s: SeekableStoreStream) = + if s.objName.len == 0: + s.objName = SeekableStoreStreamTrackerName + + procCall SeekableStream(s).initStream() + +proc new*( + T: type SeekableStoreStream, + store: BlockStore, + manifest: Manifest, + pad = true +): SeekableStoreStream = + ## Create a new SeekableStoreStream instance for a given store and manifest + ## + result = SeekableStoreStream( + store: store, + manifest: manifest, + pad: pad, + offset: 0) + + result.initStream() + +method `size`*(self: SeekableStoreStream): int = + bytes(self.manifest, self.pad).int + +proc `size=`*(self: SeekableStoreStream, size: int) + {.error: "Setting the size is forbidden".} = + discard + +method atEof*(self: SeekableStoreStream): bool = + self.offset >= self.size + +method readOnce*( + self: SeekableStoreStream, + pbytes: pointer, + nbytes: int +): Future[int] {.async.} = + ## Read `nbytes` from current position in the SeekableStoreStream into output buffer pointed by `pbytes`. + ## Return how many bytes were actually read before EOF was encountered. + ## Raise exception if we are already at EOF. + ## + + trace "Reading from manifest", cid = self.manifest.cid.get(), blocks = self.manifest.blocksCount + if self.atEof: + raise newLPStreamEOFError() + + # The loop iterates over blocks in the SeekableStoreStream, + # reading them and copying their data into outbuf + var read = 0 # Bytes read so far, and thus write offset in the outbuf + while read < nbytes and not self.atEof: + # Compute from the current stream position `self.offset` the block num/offset to read + # Compute how many bytes to read from this block + let + blockNum = self.offset div self.manifest.blockSize.int + blockOffset = self.offset mod self.manifest.blockSize.int + readBytes = min([self.size - self.offset, + nbytes - read, + self.manifest.blockSize.int - blockOffset]) + + # Read contents of block `blockNum` + without blk =? await self.store.getBlock(self.manifest.treeCid, blockNum, self.manifest.treeRoot), error: + raise newLPStreamReadError(error) + + trace "Reading bytes from store stream", blockNum, cid = blk.cid, bytes = readBytes, blockOffset + + # Copy `readBytes` bytes starting at `blockOffset` from the block into the outbuf + if blk.isEmpty: + zeroMem(pbytes.offset(read), readBytes) + else: + copyMem(pbytes.offset(read), blk.data[blockOffset].addr, readBytes) + + # Update current positions in the stream and outbuf + self.offset += readBytes + read += readBytes + + return read + +method closeImpl*(self: SeekableStoreStream) {.async.} = + trace "Closing SeekableStoreStream" + self.offset = self.size # set Eof + await procCall LPStream(self).closeImpl() diff --git a/codex/streams/storestream.nim b/codex/streams/storestream.nim index 63406a5f..5f7bb715 100644 --- a/codex/streams/storestream.nim +++ b/codex/streams/storestream.nim @@ -33,18 +33,20 @@ const StoreStreamTrackerName* = "StoreStream" type - # Make SeekableStream from a sequence of blocks stored in Manifest - # (only original file data - see StoreStream.size) - StoreStream* = ref object of SeekableStream + StoreStream* = ref object of LPStream store*: BlockStore # Store where to lookup block contents manifest*: Manifest # List of block CIDs pad*: bool # Pad last block to manifest.blockSize? + iter*: AsyncIter[?!Block] + lastBlock: Block + lastIndex: int + offset: int method initStream*(s: StoreStream) = if s.objName.len == 0: s.objName = StoreStreamTrackerName - procCall SeekableStream(s).initStream() + procCall LPStream(s).initStream() proc new*( T: type StoreStream, @@ -58,6 +60,7 @@ proc new*( store: store, manifest: manifest, pad: pad, + lastIndex: -1, offset: 0) result.initStream() @@ -85,32 +88,34 @@ method readOnce*( trace "Reading from manifest", cid = self.manifest.cid.get(), blocks = self.manifest.blocksCount if self.atEof: raise newLPStreamEOFError() + + # Initialize a block iterator + if self.lastIndex < 0: + without iter =? await self.store.getBlocks(self.manifest.treeCid, self.manifest.blocksCount, self.manifest.treeRoot), err: + raise newLPStreamReadError(err) + self.iter = iter - # The loop iterates over blocks in the StoreStream, - # reading them and copying their data into outbuf var read = 0 # Bytes read so far, and thus write offset in the outbuf while read < nbytes and not self.atEof: - # Compute from the current stream position `self.offset` the block num/offset to read + if self.offset >= (self.lastIndex + 1) * self.manifest.blockSize.int: + if not self.iter.finished: + without lastBlock =? await self.iter.next(), err: + raise newLPStreamReadError(err) + self.lastBlock = lastBlock + inc self.lastIndex + else: + raise newLPStreamReadError(newException(CodexError, "Block iterator finished prematurely")) # Compute how many bytes to read from this block let - blockNum = self.offset div self.manifest.blockSize.int blockOffset = self.offset mod self.manifest.blockSize.int readBytes = min([self.size - self.offset, nbytes - read, self.manifest.blockSize.int - blockOffset]) - address = BlockAddress(leaf: true, treeCid: self.manifest.treeCid, index: blockNum) - - # Read contents of block `blockNum` - without blk =? await self.store.getBlock(address), error: - raise newLPStreamReadError(error) - - trace "Reading bytes from store stream", blockNum, cid = blk.cid, bytes = readBytes, blockOffset - # Copy `readBytes` bytes starting at `blockOffset` from the block into the outbuf - if blk.isEmpty: + if self.lastBlock.isEmpty: zeroMem(pbytes.offset(read), readBytes) else: - copyMem(pbytes.offset(read), blk.data[blockOffset].addr, readBytes) + copyMem(pbytes.offset(read), self.lastBlock.data[blockOffset].addr, readBytes) # Update current positions in the stream and outbuf self.offset += readBytes diff --git a/codex/utils.nim b/codex/utils.nim index 1180455e..c2597d9b 100644 --- a/codex/utils.nim +++ b/codex/utils.nim @@ -35,6 +35,13 @@ proc orElse*[A](a, b: Option[A]): Option[A] = else: b +template `..<`*(a, b: untyped): untyped = + ## A shortcut for `a .. pred(b)`. + ## ``` + ## for i in 5 ..< 9: + ## echo i # => 5; 6; 7; 8 + ## ``` + a .. (when b is BackwardsIndex: succ(b) else: pred(b)) when not declared(parseDuration): # Odd code formatting to minimize diff v. mainLine const Whitespace = {' ', '\t', '\v', '\r', '\l', '\f'} diff --git a/codex/utils/asynciter.nim b/codex/utils/asynciter.nim index e3ac7fb0..7156fc6e 100644 --- a/codex/utils/asynciter.nim +++ b/codex/utils/asynciter.nim @@ -1,16 +1,13 @@ -import std/sugar - import pkg/questionable import pkg/chronos import pkg/upraises type - Function*[T, U] = proc(fut: T): U {.upraises: [CatchableError], gcsafe, closure.} - IsFinished* = proc(): bool {.upraises: [], gcsafe, closure.} - GenNext*[T] = proc(): T {.upraises: [CatchableError], gcsafe, closure.} + MapItem*[T, U] = proc(fut: T): U {.upraises: [CatchableError], gcsafe, closure.} + NextItem*[T] = proc(): T {.upraises: [CatchableError], gcsafe, closure.} Iter*[T] = ref object - finished: bool - next*: GenNext[T] + finished*: bool + next*: NextItem[T] AsyncIter*[T] = Iter[Future[T]] proc finish*[T](self: Iter[T]): void = @@ -23,126 +20,66 @@ iterator items*[T](self: Iter[T]): T = while not self.finished: yield self.next() -iterator pairs*[T](self: Iter[T]): tuple[key: int, val: T] {.inline.} = - var i = 0 - while not self.finished: - yield (i, self.next()) - inc(i) +proc map*[T, U](wrappedIter: Iter[T], mapItem: MapItem[T, U]): Iter[U] = + var iter = Iter[U]() -proc map*[T, U](fut: Future[T], fn: Function[T, U]): Future[U] {.async.} = - let t = await fut - fn(t) + proc checkFinish(): void = + if wrappedIter.finished: + iter.finish + + checkFinish() + + proc next(): U {.upraises: [CatchableError].} = + if not iter.finished: + let fut = wrappedIter.next() + checkFinish() + return mapItem(fut) + else: + raise newException(CatchableError, "Iterator finished, but next element was requested") + + iter.next = next + return iter + +proc prefetch*[T](wrappedIter: Iter[T], n: Positive): Iter[T] = + + var ringBuf = newSeq[T](n) + var wrappedLen = int.high -proc new*[T](_: type Iter, genNext: GenNext[T], isFinished: IsFinished, finishOnErr: bool = true): Iter[T] = var iter = Iter[T]() + proc tryFetch(i: int): void = + if not wrappedIter.finished: + let res = wrappedIter.next() + ringBuf[i mod n] = res + if wrappedIter.finished: + wrappedLen = min(i + 1, wrappedLen) + else: + if i == 0: + wrappedLen = 0 + + proc checkLen(i: int): void = + if i >= wrappedLen: + iter.finish + + # initialize buf with n prefetched values + for i in 0.. items[i]) - -proc fromSlice*[U, V: Ordinal](_: type Iter, slice: HSlice[U, V]): Iter[U] = - ## Creates new iterator from slice - ## - - Iter.fromRange(slice.a.int, slice.b.int, 1) - -proc fromRange*[U, V, S: Ordinal](_: type Iter, a: U, b: V, step: S = 1): Iter[U] = - ## Creates new iterator in range a..b with specified step (default 1) - ## - - var i = a - - proc genNext(): U = - let u = i - inc(i, step) - u - - proc isFinished(): bool = - (step > 0 and i > b) or - (step < 0 and i < b) - - Iter.new(genNext, isFinished) - -proc map*[T, U](iter: Iter[T], fn: Function[T, U]): Iter[U] = - Iter.new( - genNext = () => fn(iter.next()), - isFinished = () => iter.finished - ) - -proc filter*[T](iter: Iter[T], predicate: Function[T, bool]): Iter[T] = - var nextT: Option[T] - - proc tryFetch(): void = - nextT = T.none - while not iter.finished: - let t = iter.next() - if predicate(t): - nextT = some(t) - break - - proc genNext(): T = - let t = nextT.unsafeGet - tryFetch() - return t - - proc isFinished(): bool = - nextT.isNone - - tryFetch() - Iter.new(genNext, isFinished) - -proc prefetch*[T](iter: Iter[T], n: Positive): Iter[T] = - var ringBuf = newSeq[T](n) - var iterLen = int.high - var i = 0 - - proc tryFetch(j: int): void = - if not iter.finished: - let item = iter.next() - ringBuf[j mod n] = item - if iter.finished: - iterLen = min(j + 1, iterLen) - else: - if j == 0: - iterLen = 0 - - proc genNext(): T = - let item = ringBuf[i mod n] - tryFetch(i + n) - inc i - return item - - proc isFinished(): bool = - i >= iterLen - - # initialize ringBuf with n prefetched values - for j in 0.. 0): let blk = Block.new(chunk).tryGet() - cids.add(blk.cid) + # builder.addDataBlock(blk.data).tryGet() + let mhash = blk.cid.mhash.mapFailure.tryGet() + builder.addLeaf(mhash).tryGet() (await store.putBlock(blk)).tryGet() - let - tree = MerkleTree.init(cids).tryGet() - treeCid = tree.rootCid.tryGet() - manifest = Manifest.new( - treeCid = treeCid, - blockSize = NBytes(chunker.chunkSize), - datasetSize = NBytes(chunker.offset), - ) + let + tree = builder.build().tryGet() + treeBlk = Block.new(tree.encode()).tryGet() - for i in 0..