From 2ea84f9e6cd2f4081ec391572b88375445c6217d Mon Sep 17 00:00:00 2001 From: Tomasz Bekas Date: Fri, 3 Nov 2023 21:17:20 +0100 Subject: [PATCH] Storing proofs instead of trees --- codex/blockexchange/engine/engine.nim | 170 +++++-------- codex/blockexchange/engine/pendingblocks.nim | 247 +++---------------- codex/blockexchange/protobuf/blockexc.nim | 11 +- codex/blocktype.nim | 6 + codex/erasure/erasure.nim | 32 +-- codex/manifest/coders.nim | 53 +--- codex/manifest/manifest.nim | 30 +-- codex/merkletree/merkletree.nim | 41 +-- codex/node.nim | 32 +-- codex/stores.nim | 4 +- codex/stores/blockstore.nim | 31 +-- codex/stores/cachestore.nim | 69 +++--- codex/stores/networkstore.nim | 55 +---- codex/stores/repostore.nim | 98 +++++--- codex/stores/treereader.nim | 101 -------- codex/streams.nim | 3 +- codex/streams/seekablestorestream.nim | 123 --------- codex/streams/storestream.nim | 41 ++- codex/utils/asynciter.nim | 17 +- tests/codex/helpers.nim | 35 ++- tests/codex/merkletree/testmerkletree.nim | 8 +- tests/codex/testerasure.nim | 18 +- tests/codex/testmanifest.nim | 1 - 23 files changed, 368 insertions(+), 858 deletions(-) delete mode 100644 codex/stores/treereader.nim delete mode 100644 codex/streams/seekablestorestream.nim diff --git a/codex/blockexchange/engine/engine.nim b/codex/blockexchange/engine/engine.nim index a7be9c45..3b13a67f 100644 --- a/codex/blockexchange/engine/engine.nim +++ b/codex/blockexchange/engine/engine.nim @@ -174,78 +174,29 @@ proc monitorBlockHandle(b: BlockExcEngine, handle: Future[Block], address: Block proc requestBlock*( b: BlockExcEngine, - cid: Cid, - timeout = DefaultBlockTimeout): Future[Block] {.async.} = - trace "Begin block request", cid, peers = b.peers.len - - if b.pendingBlocks.isInFlight(cid): - trace "Request handle already pending", cid - return await b.pendingBlocks.getWantHandle(cid, timeout) - - let - blk = b.pendingBlocks.getWantHandle(cid, timeout) - address = BlockAddress(leaf: false, cid: cid) - - trace "Selecting peers who have", address - var - peers = b.peers.selectCheapest(address) - - without blockPeer =? b.findCheapestPeerForBlock(peers): - trace "No peers to request blocks from. Queue discovery...", cid - b.discovery.queueFindBlocksReq(@[cid]) - return await blk - - asyncSpawn b.monitorBlockHandle(blk, address, blockPeer.id) - b.pendingBlocks.setInFlight(cid, true) - await b.sendWantBlock(address, blockPeer) - - codex_block_exchange_want_block_lists_sent.inc() - - if (peers.len - 1) == 0: - trace "No peers to send want list to", cid - b.discovery.queueFindBlocksReq(@[cid]) - return await blk - - await b.sendWantHave(address, blockPeer, toSeq(b.peers)) - - codex_block_exchange_want_have_lists_sent.inc() - - return await blk - -proc requestBlock( - b: BlockExcEngine, - treeReq: TreeReq, - index: Natural, + address: BlockAddress, timeout = DefaultBlockTimeout ): Future[Block] {.async.} = - let address = BlockAddress(leaf: true, treeCid: treeReq.treeCid, index: index) + let blockFuture = b.pendingBlocks.getWantHandle(address, timeout) - let handleOrCid = treeReq.getWantHandleOrCid(index, timeout) - if handleOrCid.resolved: - without blk =? await b.localStore.getBlock(handleOrCid.cid), err: - return await b.requestBlock(handleOrCid.cid, timeout) - return blk - - let blockFuture = handleOrCid.handle - - if treeReq.isInFlight(index): + if b.pendingBlocks.isInFlight(address): return await blockFuture let peers = b.peers.selectCheapest(address) if peers.len == 0: - b.discovery.queueFindBlocksReq(@[treeReq.treeCid]) + b.discovery.queueFindBlocksReq(@[address.cidOrTreeCid]) let maybePeer = if peers.len > 0: - peers[index mod peers.len].some + peers[hash(address) mod peers.len].some elif b.peers.len > 0: - toSeq(b.peers)[index mod b.peers.len].some + toSeq(b.peers)[hash(address) mod b.peers.len].some else: BlockExcPeerCtx.none if peer =? maybePeer: asyncSpawn b.monitorBlockHandle(blockFuture, address, peer.id) - treeReq.trySetInFlight(index) + b.pendingBlocks.setInFlight(address) await b.sendWantBlock(address, peer) codexBlockExchangeWantBlockListsSent.inc() await b.sendWantHave(address, peer, toSeq(b.peers)) @@ -255,29 +206,10 @@ proc requestBlock( proc requestBlock*( b: BlockExcEngine, - treeCid: Cid, - index: Natural, - merkleRoot: MultiHash, + cid: Cid, timeout = DefaultBlockTimeout ): Future[Block] = - without treeReq =? b.pendingBlocks.getOrPutTreeReq(treeCid, Natural.none, merkleRoot), err: - raise err - - return b.requestBlock(treeReq, index, timeout) - -proc requestBlocks*( - b: BlockExcEngine, - treeCid: Cid, - leavesCount: Natural, - merkleRoot: MultiHash, - timeout = DefaultBlockTimeout -): ?!AsyncIter[Block] = - without treeReq =? b.pendingBlocks.getOrPutTreeReq(treeCid, leavesCount.some, merkleRoot), err: - return failure(err) - - return Iter.fromSlice(0.. b.requestBlock(treeReq, index, timeout) - ).success + b.requestBlock(BlockAddress.init(cid)) proc blockPresenceHandler*( b: BlockExcEngine, @@ -352,7 +284,12 @@ proc resolveBlocks*(b: BlockExcEngine, blocksDelivery: seq[BlockDelivery]) {.asy b.pendingBlocks.resolve(blocksDelivery) await b.scheduleTasks(blocksDelivery) - b.discovery.queueProvideBlocksReq(blocksDelivery.mapIt( it.blk.cid )) + var cids = initHashSet[Cid]() + for bd in blocksDelivery: + cids.incl(bd.blk.cid) + if bd.address.leaf: + cids.incl(bd.address.treeCid) + b.discovery.queueProvideBlocksReq(cids.toSeq) proc resolveBlocks*(b: BlockExcEngine, blocks: seq[Block]) {.async.} = await b.resolveBlocks(blocks.mapIt(BlockDelivery(blk: it, address: BlockAddress(leaf: false, cid: it.cid)))) @@ -370,18 +307,66 @@ proc payForBlocks(engine: BlockExcEngine, trace "Sending payment for blocks", price await sendPayment(peer.id, payment) +proc validateBlockDelivery( + b: BlockExcEngine, + bd: BlockDelivery +): ?!void = + if bd.address notin b.pendingBlocks: + return failure("Received block is not currently a pending block") + + if bd.address.leaf: + without proof =? bd.proof: + return failure("Missing proof") + + if proof.index != bd.address.index: + return failure("Proof index " & $proof.index & " doesn't match leaf index " & $bd.address.index) + + without leaf =? bd.blk.cid.mhash.mapFailure, err: + return failure("Unable to get mhash from cid for block, nested err: " & err.msg) + + without treeRoot =? bd.address.treeCid.mhash.mapFailure, err: + return failure("Unable to get mhash from treeCid for block, nested err: " & err.msg) + + without verifyOutcome =? proof.verifyLeaf(leaf, treeRoot), err: + return failure("Unable to verify proof for block, nested err: " & err.msg) + + if not verifyOutcome: + return failure("Provided inclusion proof is invalid") + else: # not leaf + if bd.address.cid != bd.blk.cid: + return failure("Delivery cid " & $bd.address.cid & " doesn't match block cid " & $bd.blk.cid) + + return success() + proc blocksDeliveryHandler*( b: BlockExcEngine, peer: PeerId, blocksDelivery: seq[BlockDelivery]) {.async.} = trace "Got blocks from peer", peer, len = blocksDelivery.len + var validatedBlocksDelivery: seq[BlockDelivery] for bd in blocksDelivery: - if isErr (await b.localStore.putBlock(bd.blk)): - trace "Unable to store block", cid = bd.blk.cid - await b.resolveBlocks(blocksDelivery) - codexBlockExchangeBlocksReceived.inc(blocksDelivery.len.int64) + if err =? b.validateBlockDelivery(bd).errorOption: + warn "Block validation failed", address = bd.address, msg = err.msg + continue + + if err =? (await b.localStore.putBlock(bd.blk)).errorOption: + error "Unable to store block", address = bd.address, err = err.msg + continue + + if bd.address.leaf: + without proof =? bd.proof: + error "Proof expected for a leaf block delivery", address = bd.address + continue + if err =? (await b.localStore.putBlockCidAndProof(bd.address.treeCid, bd.address.index, bd.blk.cid, proof)).errorOption: + error "Unable to store proof and cid for a block", address = bd.address + continue + + validatedBlocksDelivery.add(bd) + + await b.resolveBlocks(validatedBlocksDelivery) + codexBlockExchangeBlocksReceived.inc(validatedBlocksDelivery.len.int64) let peerCtx = b.peers.get(peer) @@ -491,18 +476,6 @@ proc paymentHandler*( else: context.paymentChannel = engine.wallet.acceptChannel(payment).option -proc onTreeHandler(b: BlockExcEngine, tree: MerkleTree): Future[?!void] {.async.} = - trace "Handling tree" - - without treeBlk =? Block.new(tree.encode()), err: - return failure(err) - - if err =? (await b.localStore.putBlock(treeBlk)).errorOption: - return failure("Unable to store merkle tree block " & $treeBlk.cid & ", nested err: " & err.msg) - - return success() - - proc setupPeer*(b: BlockExcEngine, peer: PeerId) {.async.} = ## Perform initial setup, such as want ## list exchange @@ -564,7 +537,7 @@ proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} = BlockDelivery(address: e.address, blk: blkAndProof[0], proof: blkAndProof[1].some) ) else: - (await b.localStore.getBlock(e.address.cid)).map( + (await b.localStore.getBlock(e.address)).map( (blk: Block) => BlockDelivery(address: e.address, blk: blk, proof: MerkleProof.none) ) @@ -665,11 +638,6 @@ proc new*( proc paymentHandler(peer: PeerId, payment: SignedState): Future[void] {.gcsafe.} = engine.paymentHandler(peer, payment) - proc onTree(tree: MerkleTree): Future[void] {.gcsafe, async.} = - if err =? (await engine.onTreeHandler(tree)).errorOption: - echo "Error handling a tree" & err.msg # TODO - # error "Error handling a tree", msg = err.msg - network.handlers = BlockExcHandlers( onWantList: blockWantListHandler, onBlocksDelivery: blocksDeliveryHandler, @@ -677,6 +645,4 @@ proc new*( onAccount: accountHandler, onPayment: paymentHandler) - pendingBlocks.onTree = onTree - return engine diff --git a/codex/blockexchange/engine/pendingblocks.nim b/codex/blockexchange/engine/pendingblocks.nim index 9ece50c1..ae7ddfc8 100644 --- a/codex/blockexchange/engine/pendingblocks.nim +++ b/codex/blockexchange/engine/pendingblocks.nim @@ -14,20 +14,15 @@ import pkg/upraises push: {.upraises: [].} -import ../../blocktype import pkg/chronicles -import pkg/questionable -import pkg/questionable/options -import pkg/questionable/results import pkg/chronos import pkg/libp2p import pkg/metrics import pkg/questionable/results import ../protobuf/blockexc - +import ../../blocktype import ../../merkletree -import ../../utils logScope: topics = "codex pendingblocks" @@ -44,121 +39,12 @@ type inFlight*: bool startTime*: int64 - LeafReq* = object - case delivered*: bool - of false: - handle*: Future[Block] - inFlight*: bool - of true: - leaf: MultiHash - blkCid*: Cid - - TreeReq* = ref object - leaves*: Table[Natural, LeafReq] - deliveredCount*: Natural - leavesCount*: ?Natural - treeRoot*: MultiHash - treeCid*: Cid - - TreeHandler* = proc(tree: MerkleTree): Future[void] {.gcsafe.} - PendingBlocksManager* = ref object of RootObj - blocks*: Table[Cid, BlockReq] # pending Block requests - trees*: Table[Cid, TreeReq] - onTree*: TreeHandler + blocks*: Table[BlockAddress, BlockReq] # pending Block requests proc updatePendingBlockGauge(p: PendingBlocksManager) = codex_block_exchange_pending_block_requests.set(p.blocks.len.int64) -type - BlockHandleOrCid = object - case resolved*: bool - of true: - cid*: Cid - else: - handle*: Future[Block] - -proc buildTree(treeReq: TreeReq): ?!MerkleTree = - trace "Building a merkle tree from leaves", treeCid = treeReq.treeCid, leavesCount = treeReq.leavesCount - - without leavesCount =? treeReq.leavesCount: - return failure("Leaves count is none, cannot build a tree") - - var builder = ? MerkleTreeBuilder.init(treeReq.treeRoot.mcodec) - for i in 0.. - self.store.getBlock(manifest.treeCid, i, manifest.treeRoot).map((r: ?!bt.Block) => (r, i)) # Get the data blocks (first K) + self.store.getBlock(BlockAddress.init(manifest.treeCid, i)).map((r: ?!bt.Block) => (r, i)) # Get the data blocks (first K) ) proc isFinished(): bool = pendingBlocks.len == 0 @@ -290,16 +290,15 @@ proc encodeData( without tree =? MerkleTree.init(cids[]), err: return failure(err) - without treeBlk =? bt.Block.new(tree.encode()), err: + without treeCid =? tree.rootCid, err: return failure(err) - if err =? (await self.store.putBlock(treeBlk)).errorOption: - return failure("Unable to store merkle tree block " & $treeBlk.cid & ", nested err: " & err.msg) + if err =? (await self.store.putAllProofs(tree)).errorOption: + return failure(err) let encodedManifest = Manifest.new( manifest = manifest, - treeCid = treeBlk.cid, - treeRoot = tree.root, + treeCid = treeCid, datasetSize = (manifest.blockSize.int * params.blocksCount).NBytes, ecK = params.ecK, ecM = params.ecM @@ -353,9 +352,9 @@ proc decode*( var cids = seq[Cid].new() + recoveredIndices = newSeq[int]() decoder = self.decoderProvider(encoded.blockSize.int, encoded.ecK, encoded.ecM) emptyBlock = newSeq[byte](encoded.blockSize.int) - hasParity = false cids[].setLen(encoded.blocksCount) try: @@ -403,6 +402,7 @@ proc decode*( return failure("Unable to store block!") cids[idx] = blk.cid + recoveredIndices.add(idx) except CancelledError as exc: trace "Erasure coding decoding cancelled" raise exc # cancellation needs to be propagated @@ -415,14 +415,18 @@ proc decode*( without tree =? MerkleTree.init(cids[0.. i < tree.leavesCount) + + if err =? (await self.store.putSomeProofs(tree, idxIter)).errorOption: + return failure(err) let decoded = Manifest.new(encoded) diff --git a/codex/manifest/coders.nim b/codex/manifest/coders.nim index c691800e..db504617 100644 --- a/codex/manifest/coders.nim +++ b/codex/manifest/coders.nim @@ -45,31 +45,27 @@ proc encode*(_: DagPBCoder, manifest: Manifest): ?!seq[byte] = # optional uint32 originalDatasetSize = 4; # size of the original dataset # } # Message Header { - # optional bytes treeCid = 1; # the cid of the tree - # optional bytes treeRoot = 2; # the root hash of the tree - # optional uint32 blockSize = 3; # size of a single block - # optional uint64 originalBytes = 4;# exact file size - # optional ErasureInfo erasure = 5; # erasure coding info + # optional bytes treeCid = 1; # cid (root) of the tree + # optional uint32 blockSize = 2; # size of a single block + # optional uint64 datasetSize = 3; # size of the dataset + # optional ErasureInfo erasure = 4; # erasure coding info # } # ``` # # var treeRootVBuf = initVBuffer() var header = initProtoBuffer() header.write(1, manifest.treeCid.data.buffer) - # treeRootVBuf.write(manifest.treeRoot) - header.write(2, manifest.treeRoot.data.buffer) - header.write(3, manifest.blockSize.uint32) - header.write(4, manifest.datasetSize.uint32) + header.write(2, manifest.blockSize.uint32) + header.write(3, manifest.datasetSize.uint32) if manifest.protected: var erasureInfo = initProtoBuffer() erasureInfo.write(1, manifest.ecK.uint32) erasureInfo.write(2, manifest.ecM.uint32) - erasureInfo.write(3, manifest.originalCid.data.buffer) - erasureInfo.write(4, manifest.originalTreeRoot.data.buffer) - erasureInfo.write(5, manifest.originalDatasetSize.uint32) + erasureInfo.write(3, manifest.originalTreeCid.data.buffer) + erasureInfo.write(4, manifest.originalDatasetSize.uint32) erasureInfo.finish() - header.write(5, erasureInfo) + header.write(4, erasureInfo) pbNode.write(1, header) # set the treeCid as the data field pbNode.finish() @@ -85,9 +81,7 @@ proc decode*(_: DagPBCoder, data: openArray[byte]): ?!Manifest = pbHeader: ProtoBuffer pbErasureInfo: ProtoBuffer treeCidBuf: seq[byte] - treeRootBuf: seq[byte] originalTreeCid: seq[byte] - originalTreeRootBuf: seq[byte] datasetSize: uint32 blockSize: uint32 originalDatasetSize: uint32 @@ -101,16 +95,13 @@ proc decode*(_: DagPBCoder, data: openArray[byte]): ?!Manifest = if pbHeader.getField(1, treeCidBuf).isErr: return failure("Unable to decode `treeCid` from manifest!") - if pbHeader.getField(2, treeRootBuf).isErr: - return failure("Unable to decode `treeRoot` from manifest!") - - if pbHeader.getField(3, blockSize).isErr: + if pbHeader.getField(2, blockSize).isErr: return failure("Unable to decode `blockSize` from manifest!") - if pbHeader.getField(4, datasetSize).isErr: + if pbHeader.getField(3, datasetSize).isErr: return failure("Unable to decode `datasetSize` from manifest!") - if pbHeader.getField(5, pbErasureInfo).isErr: + if pbHeader.getField(4, pbErasureInfo).isErr: return failure("Unable to decode `erasureInfo` from manifest!") let protected = pbErasureInfo.buffer.len > 0 @@ -123,34 +114,18 @@ proc decode*(_: DagPBCoder, data: openArray[byte]): ?!Manifest = if pbErasureInfo.getField(3, originalTreeCid).isErr: return failure("Unable to decode `originalTreeCid` from manifest!") - - if pbErasureInfo.getField(4, originalTreeRootBuf).isErr: - return failure("Unable to decode `originalTreeRoot` from manifest!") - if pbErasureInfo.getField(5, originalDatasetSize).isErr: + if pbErasureInfo.getField(4, originalDatasetSize).isErr: return failure("Unable to decode `originalDatasetSize` from manifest!") - var - treeRoot: MultiHash - originalTreeRoot: MultiHash let treeCid = ? Cid.init(treeCidBuf).mapFailure - treeRootRes = ? MultiHash.decode(treeRootBuf, treeRoot).mapFailure - - if treeRootRes != treeRootBuf.len: - return failure("Error decoding `treeRoot` as MultiHash") - - if protected: - let originalTreeRootRes = ? MultiHash.decode(originalTreeRootBuf, originalTreeRoot).mapFailure - if originalTreeRootRes != originalTreeRootBuf.len: - return failure("Error decoding `originalTreeRoot` as MultiHash") let self = if protected: Manifest.new( treeCid = treeCid, - treeRoot = treeRoot, datasetSize = datasetSize.NBytes, blockSize = blockSize.NBytes, version = treeCid.cidver, @@ -159,13 +134,11 @@ proc decode*(_: DagPBCoder, data: openArray[byte]): ?!Manifest = ecK = ecK.int, ecM = ecM.int, originalTreeCid = ? Cid.init(originalTreeCid).mapFailure, - originalTreeRoot = originalTreeRoot, originalDatasetSize = originalDatasetSize.NBytes ) else: Manifest.new( treeCid = treeCid, - treeRoot = treeRoot, datasetSize = datasetSize.NBytes, blockSize = blockSize.NBytes, version = treeCid.cidver, diff --git a/codex/manifest/manifest.nim b/codex/manifest/manifest.nim index bfd174ff..64d77711 100644 --- a/codex/manifest/manifest.nim +++ b/codex/manifest/manifest.nim @@ -30,8 +30,7 @@ export types type Manifest* = ref object of RootObj - treeCid: Cid # Cid of the merkle tree - treeRoot: MultiHash # Root hash of the merkle tree + treeCid: Cid # Root of the merkle tree datasetSize: NBytes # Total size of all blocks blockSize: NBytes # Size of each contained block (might not be needed if blocks are len-prefixed) version: CidVersion # Cid version @@ -41,8 +40,7 @@ type of true: ecK: int # Number of blocks to encode ecM: int # Number of resulting parity blocks - originalTreeCid: Cid # The original Cid of the dataset being erasure coded - originalTreeRoot: MultiHash + originalTreeCid: Cid # The original root of the dataset being erasure coded originalDatasetSize: NBytes else: discard @@ -75,24 +73,18 @@ proc ecK*(self: Manifest): int = proc ecM*(self: Manifest): int = self.ecM -proc originalCid*(self: Manifest): Cid = +proc originalTreeCid*(self: Manifest): Cid = self.originalTreeCid proc originalBlocksCount*(self: Manifest): int = divUp(self.originalDatasetSize.int, self.blockSize.int) -proc originalTreeRoot*(self: Manifest): MultiHash = - self.originalTreeRoot - proc originalDatasetSize*(self: Manifest): NBytes = self.originalDatasetSize proc treeCid*(self: Manifest): Cid = self.treeCid -proc treeRoot*(self: Manifest): MultiHash = - self.treeRoot - proc blocksCount*(self: Manifest): int = divUp(self.datasetSize.int, self.blockSize.int) @@ -140,7 +132,6 @@ proc cid*(self: Manifest): ?!Cid {.deprecated: "use treeCid instead".} = proc `==`*(a, b: Manifest): bool = (a.treeCid == b.treeCid) and - (a.treeRoot == b.treeRoot) and (a.datasetSize == b.datasetSize) and (a.blockSize == b.blockSize) and (a.version == b.version) and @@ -151,14 +142,12 @@ proc `==`*(a, b: Manifest): bool = (a.ecK == b.ecK) and (a.ecM == b.ecM) and (a.originalTreeCid == b.originalTreeCid) and - (a.originalTreeRoot == b.originalTreeRoot) and (a.originalDatasetSize == b.originalDatasetSize) else: true) proc `$`*(self: Manifest): string = "treeCid: " & $self.treeCid & - ", treeRoot: " & $self.treeRoot & ", datasetSize: " & $self.datasetSize & ", blockSize: " & $self.blockSize & ", version: " & $self.version & @@ -169,7 +158,6 @@ proc `$`*(self: Manifest): string = ", ecK: " & $self.ecK & ", ecM: " & $self.ecM & ", originalTreeCid: " & $self.originalTreeCid & - ", originalTreeRoot: " & $self.originalTreeRoot & ", originalDatasetSize: " & $self.originalDatasetSize else: "") @@ -181,7 +169,6 @@ proc `$`*(self: Manifest): string = proc new*( T: type Manifest, treeCid: Cid, - treeRoot: MultiHash, blockSize: NBytes, datasetSize: NBytes, version: CidVersion = CIDv1, @@ -192,7 +179,6 @@ proc new*( T( treeCid: treeCid, - treeRoot: treeRoot, blockSize: blockSize, datasetSize: datasetSize, version: version, @@ -204,7 +190,6 @@ proc new*( T: type Manifest, manifest: Manifest, treeCid: Cid, - treeRoot: MultiHash, datasetSize: NBytes, ecK, ecM: int ): Manifest = @@ -213,7 +198,6 @@ proc new*( ## Manifest( treeCid: treeCid, - treeRoot: treeRoot, datasetSize: datasetSize, version: manifest.version, codec: manifest.codec, @@ -222,7 +206,6 @@ proc new*( protected: true, ecK: ecK, ecM: ecM, originalTreeCid: manifest.treeCid, - originalTreeRoot: manifest.treeRoot, originalDatasetSize: manifest.datasetSize) proc new*( @@ -233,8 +216,7 @@ proc new*( ## erasure protected one ## Manifest( - treeCid: manifest.originalCid, - treeRoot: manifest.originalTreeRoot, + treeCid: manifest.originalTreeCid, datasetSize: manifest.originalDatasetSize, version: manifest.version, codec: manifest.codec, @@ -254,7 +236,6 @@ proc new*( proc new*( T: type Manifest, treeCid: Cid, - treeRoot: MultiHash, datasetSize: NBytes, blockSize: NBytes, version: CidVersion, @@ -263,12 +244,10 @@ proc new*( ecK: int, ecM: int, originalTreeCid: Cid, - originalTreeRoot: MultiHash, originalDatasetSize: NBytes ): Manifest = Manifest( treeCid: treeCid, - treeRoot: treeRoot, datasetSize: datasetSize, blockSize: blockSize, version: version, @@ -278,6 +257,5 @@ proc new*( ecK: ecK, ecM: ecM, originalTreeCid: originalTreeCid, - originalTreeRoot: originalTreeRoot, originalDatasetSize: originalDatasetSize ) diff --git a/codex/merkletree/merkletree.nim b/codex/merkletree/merkletree.nim index 96c08cdd..3bfb9071 100644 --- a/codex/merkletree/merkletree.nim +++ b/codex/merkletree/merkletree.nim @@ -14,6 +14,7 @@ import std/sugar import std/algorithm import pkg/chronicles +import pkg/questionable import pkg/questionable/results import pkg/nimcrypto/sha2 import pkg/libp2p/[cid, multicodec, multihash, vbuffer] @@ -186,8 +187,16 @@ proc root*(self: MerkleTree): MultiHash = let rootIndex = self.len - 1 self.nodeBufferToMultiHash(rootIndex) -proc leaves*(self: MerkleTree): seq[MultiHash] = - toSeq(0.. self.nodeBufferToMultiHash(i)) +proc rootCid*(self: MerkleTree, version = CIDv1, dataCodec = multiCodec("raw")): ?!Cid = + Cid.init(version, dataCodec, self.root).mapFailure + +iterator leaves*(self: MerkleTree): MultiHash = + for i in 0.. node.blockStore.getBlock(BlockAddress.init(manifest.treeCid, i))) for batchNum in 0..= self.size - -method readOnce*( - self: SeekableStoreStream, - pbytes: pointer, - nbytes: int -): Future[int] {.async.} = - ## Read `nbytes` from current position in the SeekableStoreStream into output buffer pointed by `pbytes`. - ## Return how many bytes were actually read before EOF was encountered. - ## Raise exception if we are already at EOF. - ## - - trace "Reading from manifest", cid = self.manifest.cid.get(), blocks = self.manifest.blocksCount - if self.atEof: - raise newLPStreamEOFError() - - # The loop iterates over blocks in the SeekableStoreStream, - # reading them and copying their data into outbuf - var read = 0 # Bytes read so far, and thus write offset in the outbuf - while read < nbytes and not self.atEof: - # Compute from the current stream position `self.offset` the block num/offset to read - # Compute how many bytes to read from this block - let - blockNum = self.offset div self.manifest.blockSize.int - blockOffset = self.offset mod self.manifest.blockSize.int - readBytes = min([self.size - self.offset, - nbytes - read, - self.manifest.blockSize.int - blockOffset]) - - # Read contents of block `blockNum` - without blk =? await self.store.getBlock(self.manifest.treeCid, blockNum, self.manifest.treeRoot), error: - raise newLPStreamReadError(error) - - trace "Reading bytes from store stream", blockNum, cid = blk.cid, bytes = readBytes, blockOffset - - # Copy `readBytes` bytes starting at `blockOffset` from the block into the outbuf - if blk.isEmpty: - zeroMem(pbytes.offset(read), readBytes) - else: - copyMem(pbytes.offset(read), blk.data[blockOffset].addr, readBytes) - - # Update current positions in the stream and outbuf - self.offset += readBytes - read += readBytes - - return read - -method closeImpl*(self: SeekableStoreStream) {.async.} = - trace "Closing SeekableStoreStream" - self.offset = self.size # set Eof - await procCall LPStream(self).closeImpl() diff --git a/codex/streams/storestream.nim b/codex/streams/storestream.nim index 5f7bb715..63406a5f 100644 --- a/codex/streams/storestream.nim +++ b/codex/streams/storestream.nim @@ -33,20 +33,18 @@ const StoreStreamTrackerName* = "StoreStream" type - StoreStream* = ref object of LPStream + # Make SeekableStream from a sequence of blocks stored in Manifest + # (only original file data - see StoreStream.size) + StoreStream* = ref object of SeekableStream store*: BlockStore # Store where to lookup block contents manifest*: Manifest # List of block CIDs pad*: bool # Pad last block to manifest.blockSize? - iter*: AsyncIter[?!Block] - lastBlock: Block - lastIndex: int - offset: int method initStream*(s: StoreStream) = if s.objName.len == 0: s.objName = StoreStreamTrackerName - procCall LPStream(s).initStream() + procCall SeekableStream(s).initStream() proc new*( T: type StoreStream, @@ -60,7 +58,6 @@ proc new*( store: store, manifest: manifest, pad: pad, - lastIndex: -1, offset: 0) result.initStream() @@ -88,34 +85,32 @@ method readOnce*( trace "Reading from manifest", cid = self.manifest.cid.get(), blocks = self.manifest.blocksCount if self.atEof: raise newLPStreamEOFError() - - # Initialize a block iterator - if self.lastIndex < 0: - without iter =? await self.store.getBlocks(self.manifest.treeCid, self.manifest.blocksCount, self.manifest.treeRoot), err: - raise newLPStreamReadError(err) - self.iter = iter + # The loop iterates over blocks in the StoreStream, + # reading them and copying their data into outbuf var read = 0 # Bytes read so far, and thus write offset in the outbuf while read < nbytes and not self.atEof: - if self.offset >= (self.lastIndex + 1) * self.manifest.blockSize.int: - if not self.iter.finished: - without lastBlock =? await self.iter.next(), err: - raise newLPStreamReadError(err) - self.lastBlock = lastBlock - inc self.lastIndex - else: - raise newLPStreamReadError(newException(CodexError, "Block iterator finished prematurely")) + # Compute from the current stream position `self.offset` the block num/offset to read # Compute how many bytes to read from this block let + blockNum = self.offset div self.manifest.blockSize.int blockOffset = self.offset mod self.manifest.blockSize.int readBytes = min([self.size - self.offset, nbytes - read, self.manifest.blockSize.int - blockOffset]) + address = BlockAddress(leaf: true, treeCid: self.manifest.treeCid, index: blockNum) + + # Read contents of block `blockNum` + without blk =? await self.store.getBlock(address), error: + raise newLPStreamReadError(error) + + trace "Reading bytes from store stream", blockNum, cid = blk.cid, bytes = readBytes, blockOffset + # Copy `readBytes` bytes starting at `blockOffset` from the block into the outbuf - if self.lastBlock.isEmpty: + if blk.isEmpty: zeroMem(pbytes.offset(read), readBytes) else: - copyMem(pbytes.offset(read), self.lastBlock.data[blockOffset].addr, readBytes) + copyMem(pbytes.offset(read), blk.data[blockOffset].addr, readBytes) # Update current positions in the stream and outbuf self.offset += readBytes diff --git a/codex/utils/asynciter.nim b/codex/utils/asynciter.nim index ffff80d1..f61587b0 100644 --- a/codex/utils/asynciter.nim +++ b/codex/utils/asynciter.nim @@ -52,12 +52,12 @@ proc new*[T](_: type Iter, genNext: GenNext[T], isFinished: IsFinished, finishOn iter.next = next return iter -proc fromItems*[T](_: type Iter, items: openArray[T]): Iter[T] = +proc fromItems*[T](_: type Iter, items: seq[T]): Iter[T] = ## Create new iterator from items ## Iter.fromSlice(0.. items[i]) + .map((i: int) => items[i]) proc fromSlice*[U, V: Ordinal](_: type Iter, slice: HSlice[U, V]): Iter[U] = ## Creates new iterator from slice @@ -89,22 +89,23 @@ proc map*[T, U](iter: Iter[T], fn: Function[T, U]): Iter[U] = ) proc filter*[T](iter: Iter[T], predicate: Function[T, bool]): Iter[T] = - var nextItem: T + var nextT: Option[T] proc tryFetch(): void = + nextT = T.none while not iter.finished: - let item = iter.next() - if predicate(item): - nextItem = some(item) + let t = iter.next() + if predicate(t): + nextT = some(t) break proc genNext(): T = - let t = nextItem + let t = nextT.unsafeGet tryFetch() return t proc isFinished(): bool = - iter.finished + nextT.isNone tryFetch() Iter.new(genNext, isFinished) diff --git a/tests/codex/helpers.nim b/tests/codex/helpers.nim index 0f2090f2..0c8cb595 100644 --- a/tests/codex/helpers.nim +++ b/tests/codex/helpers.nim @@ -47,10 +47,9 @@ proc makeManifestAndTree*(blocks: seq[Block]): ?!(Manifest, MerkleTree) = datasetSize = blocks.mapIt(it.data.len).foldl(a + b) blockSize = blocks.mapIt(it.data.len).foldl(max(a, b)) tree = ? MerkleTree.init(blocks.mapIt(it.cid)) - treeBlk = ? Block.new(tree.encode()) + treeCid = ? tree.rootCid manifest = Manifest.new( - treeCid = treeBlk.cid, - treeRoot = tree.root, + treeCid = treeCid, blockSize = NBytes(blockSize), datasetSize = NBytes(datasetSize), version = CIDv1, @@ -78,30 +77,28 @@ proc makeWantList*( full: full) proc storeDataGetManifest*(store: BlockStore, chunker: Chunker): Future[Manifest] {.async.} = - var builder = MerkleTreeBuilder.init().tryGet() + var cids = newSeq[Cid]() while ( let chunk = await chunker.getBytes(); chunk.len > 0): let blk = Block.new(chunk).tryGet() - # builder.addDataBlock(blk.data).tryGet() - let mhash = blk.cid.mhash.mapFailure.tryGet() - builder.addLeaf(mhash).tryGet() + cids.add(blk.cid) (await store.putBlock(blk)).tryGet() - let - tree = builder.build().tryGet() - treeBlk = Block.new(tree.encode()).tryGet() + let + tree = MerkleTree.init(cids).tryGet() + treeCid = tree.rootCid.tryGet() + manifest = Manifest.new( + treeCid = treeCid, + blockSize = NBytes(chunker.chunkSize), + datasetSize = NBytes(chunker.offset), + ) - let manifest = Manifest.new( - treeCid = treeBlk.cid, - treeRoot = tree.root, - blockSize = NBytes(chunker.chunkSize), - datasetSize = NBytes(chunker.offset), - ) - - (await store.putBlock(treeBlk)).tryGet() + for i in 0..