From 4d30eac2b66aad2a4817e0800c1c35644d5637e3 Mon Sep 17 00:00:00 2001 From: Tomasz Bekas Date: Wed, 30 Aug 2023 18:04:33 +0200 Subject: [PATCH] Storing and retrieving data locally using merkle trees --- codex/blocktype.nim | 27 ++++---- codex/manifest/coders.nim | 76 ++++++++++++++--------- codex/manifest/manifest.nim | 105 ++++++++++++++++---------------- codex/merkletree/merkletree.nim | 32 ++++++++-- codex/node.nim | 61 ++++++++++++++----- codex/stores/blockstore.nim | 13 ++++ codex/stores/networkstore.nim | 7 +++ codex/stores/repostore.nim | 63 +++++++++++++++++-- codex/streams/storestream.nim | 4 +- 9 files changed, 264 insertions(+), 124 deletions(-) diff --git a/codex/blocktype.nim b/codex/blocktype.nim index 2484908f..950b230c 100644 --- a/codex/blocktype.nim +++ b/codex/blocktype.nim @@ -14,7 +14,7 @@ import pkg/upraises push: {.upraises: [].} -import pkg/libp2p/[cid, multicodec] +import pkg/libp2p/[cid, multicodec, multihash] import pkg/stew/byteutils import pkg/questionable import pkg/questionable/results @@ -147,24 +147,25 @@ func new*( cid: cid, data: @data).success -func new*( +proc new*( T: type Block, cid: Cid, data: openArray[byte], verify: bool = true ): ?!Block = ## creates a new block for both storage and network IO - ## + ## - let - mhash = ? cid.mhash.mapFailure - b = ? Block.new( - data = @data, - version = cid.cidver, - codec = cid.mcodec, - mcodec = mhash.mcodec) + if verify: + let + mhash = ? cid.mhash.mapFailure + computedMhash = ? MultiHash.digest($mhash.mcodec, data).mapFailure + computedCid = ? Cid.init(cid.cidver, cid.mcodec, computedMhash).mapFailure + if computedCid != cid: + return "Cid doesn't match the data".failure - if verify and cid != b.cid: - return "Cid and content don't match!".failure + return Block( + cid: cid, + data: @data + ).success - success b diff --git a/codex/manifest/coders.nim b/codex/manifest/coders.nim index 0a3c894d..bab157ac 100644 --- a/codex/manifest/coders.nim +++ b/codex/manifest/coders.nim @@ -51,21 +51,23 @@ proc encode*(_: DagPBCoder, manifest: Manifest): ?!seq[byte] = # optional uint32 original = 4; # number of original blocks # } # Message Header { - # optional bytes rootHash = 1; # the root (tree) hash - # optional uint32 blockSize = 2; # size of a single block - # optional uint32 blocksLen = 3; # total amount of blocks - # optional ErasureInfo erasure = 4; # erasure coding info - # optional uint64 originalBytes = 5;# exact file size + # optional bytes treeCid = 1; # the root (tree) hash + # optional bytes treeRoot = 2; # the root (tree) hash + # optional uint32 blockSize = 3; # size of a single block + # optional uint32 blocksLen = 4; # total amount of blocks + # optional ErasureInfo erasure = 5; # erasure coding info + # optional uint64 originalBytes = 6;# exact file size # } # ``` # - - let cid = ? manifest.cid + var treeRootVBuf = initVBuffer() var header = initProtoBuffer() - header.write(1, cid.data.buffer) - header.write(2, manifest.blockSize.uint32) - header.write(3, manifest.len.uint32) - header.write(5, manifest.originalBytes.uint64) + header.write(1, manifest.treeCid.data.buffer) + treeRootVBuf.write(manifest.treeRoot) + header.write(2, treeRootVBuf.buffer) + header.write(3, manifest.blockSize.uint32) + header.write(4, manifest.len.uint32) + header.write(6, manifest.originalBytes.uint64) if manifest.protected: var erasureInfo = initProtoBuffer() erasureInfo.write(1, manifest.ecK.uint32) @@ -74,9 +76,9 @@ proc encode*(_: DagPBCoder, manifest: Manifest): ?!seq[byte] = erasureInfo.write(4, manifest.originalLen.uint32) erasureInfo.finish() - header.write(4, erasureInfo) + header.write(5, erasureInfo) - pbNode.write(1, header) # set the rootHash Cid as the data field + pbNode.write(1, header) # set the treeCid as the data field pbNode.finish() return pbNode.buffer.success @@ -89,7 +91,8 @@ func decode*(_: DagPBCoder, data: openArray[byte]): ?!Manifest = pbNode = initProtoBuffer(data) pbHeader: ProtoBuffer pbErasureInfo: ProtoBuffer - rootHash: seq[byte] + treeCidBuf: seq[byte] + treeRootBuf: seq[byte] originalCid: seq[byte] originalBytes: uint64 blockSize: uint32 @@ -103,19 +106,22 @@ func decode*(_: DagPBCoder, data: openArray[byte]): ?!Manifest = return failure("Unable to decode `Header` from dag-pb manifest!") # Decode `Header` contents - if pbHeader.getField(1, rootHash).isErr: - return failure("Unable to decode `rootHash` from manifest!") + if pbHeader.getField(1, treeCidBuf).isErr: + return failure("Unable to decode `treeCid` from manifest!") - if pbHeader.getField(2, blockSize).isErr: + if pbHeader.getField(2, treeRootBuf).isErr: + return failure("Unable to decode `treeRoot` from manifest!") + + if pbHeader.getField(3, blockSize).isErr: return failure("Unable to decode `blockSize` from manifest!") - if pbHeader.getField(3, blocksLen).isErr: + if pbHeader.getField(4, blocksLen).isErr: return failure("Unable to decode `blocksLen` from manifest!") - if pbHeader.getField(5, originalBytes).isErr: + if pbHeader.getField(6, originalBytes).isErr: return failure("Unable to decode `originalBytes` from manifest!") - if pbHeader.getField(4, pbErasureInfo).isErr: + if pbHeader.getField(5, pbErasureInfo).isErr: return failure("Unable to decode `erasureInfo` from manifest!") if pbErasureInfo.buffer.len > 0: @@ -131,8 +137,16 @@ func decode*(_: DagPBCoder, data: openArray[byte]): ?!Manifest = if pbErasureInfo.getField(4, originalLen).isErr: return failure("Unable to decode `originalLen` from manifest!") - let rootHashCid = ? Cid.init(rootHash).mapFailure - var linksBuf: seq[seq[byte]] + var + linksBuf: seq[seq[byte]] + treeRoot: MultiHash + let + treeCid = ? Cid.init(treeCidBuf).mapFailure + res = ? MultiHash.decode(treeRootBuf, treeRoot).mapFailure + + if res != treeRootBuf.len: + return failure("Error decoding `treeRoot` as MultiHash") + if pbNode.getRepeatedField(2, linksBuf).isOk: for pbLinkBuf in linksBuf: var @@ -148,13 +162,14 @@ func decode*(_: DagPBCoder, data: openArray[byte]): ?!Manifest = let self = if pbErasureInfo.buffer.len > 0: Manifest.new( - rootHash = rootHashCid, + treeCid = treeCid, + treeRoot = treeRoot, originalBytes = originalBytes.NBytes, blockSize = blockSize.NBytes, blocks = blocks, - version = rootHashCid.cidver, - hcodec = (? rootHashCid.mhash.mapFailure).mcodec, - codec = rootHashCid.mcodec, + version = treeCid.cidver, + hcodec = (? treeCid.mhash.mapFailure).mcodec, + codec = treeCid.mcodec, ecK = ecK.int, ecM = ecM.int, originalCid = ? Cid.init(originalCid).mapFailure, @@ -162,13 +177,14 @@ func decode*(_: DagPBCoder, data: openArray[byte]): ?!Manifest = ) else: Manifest.new( - rootHash = rootHashCid, + treeCid = treeCid, + treeRoot = treeRoot, originalBytes = originalBytes.NBytes, blockSize = blockSize.NBytes, blocks = blocks, - version = rootHashCid.cidver, - hcodec = (? rootHashCid.mhash.mapFailure).mcodec, - codec = rootHashCid.mcodec + version = treeCid.cidver, + hcodec = (? treeCid.mhash.mapFailure).mcodec, + codec = treeCid.mcodec ) ? self.verify() diff --git a/codex/manifest/manifest.nim b/codex/manifest/manifest.nim index 1bf307ae..7b822645 100644 --- a/codex/manifest/manifest.nim +++ b/codex/manifest/manifest.nim @@ -29,8 +29,9 @@ export types type Manifest* = ref object of RootObj - rootHash: ?Cid # Root (tree) hash of the contained data set - originalBytes*: NBytes # Exact size of the original (uploaded) file + treeCid: Cid # Cid of the merkle tree + treeRoot: MultiHash # Root hash of the merkle tree + originalBytes: NBytes # Exact size of the original (uploaded) file blockSize: NBytes # Size of each contained block (might not be needed if blocks are len-prefixed) blocks: seq[Cid] # Block Cid version: CidVersion # Cid version @@ -79,6 +80,15 @@ proc originalCid*(self: Manifest): Cid = proc originalLen*(self: Manifest): int = self.originalLen +proc originalBytes*(self: Manifest): NBytes = + self.originalBytes + +proc treeCid*(self: Manifest): Cid = + self.treeCid + +proc treeRoot*(self: Manifest): MultiHash = + self.treeRoot + ############################################################ # Operations on block list ############################################################ @@ -90,14 +100,12 @@ func `[]`*(self: Manifest, i: Natural): Cid = self.blocks[i] func `[]=`*(self: var Manifest, i: Natural, item: Cid) = - self.rootHash = Cid.none self.blocks[i] = item func `[]`*(self: Manifest, i: BackwardsIndex): Cid = self.blocks[self.len - i.int] func `[]=`*(self: Manifest, i: BackwardsIndex, item: Cid) = - self.rootHash = Cid.none self.blocks[self.len - i.int] = item func isManifest*(cid: Cid): ?!bool = @@ -107,9 +115,10 @@ func isManifest*(cid: Cid): ?!bool = func isManifest*(mc: MultiCodec): ?!bool = ($mc in ManifestContainers).success +# TODO remove it proc add*(self: Manifest, cid: Cid) = assert not self.protected # we expect that protected manifests are created with properly-sized self.blocks - self.rootHash = Cid.none + # self.treeCid = Cid.none trace "Adding cid to manifest", cid self.blocks.add(cid) self.originalBytes = self.blocks.len.NBytes * self.blockSize @@ -148,10 +157,12 @@ func steps*(self: Manifest): int = func verify*(self: Manifest): ?!void = ## Check manifest correctness ## - let originalLen = (if self.protected: self.originalLen else: self.len) + + # TODO uncomment this and fix it + # let originalLen = (if self.protected: self.originalLen else: self.len) - if divUp(self.originalBytes, self.blockSize) != originalLen: - return failure newException(CodexError, "Broken manifest: wrong originalBytes") + # if divUp(self.originalBytes, self.blockSize) != originalLen: + # return failure newException(CodexError, "Broken manifest: wrong originalBytes") if self.protected and (self.len != self.steps * (self.ecK + self.ecM)): return failure newException(CodexError, "Broken manifest: wrong originalLen") @@ -163,54 +174,38 @@ func verify*(self: Manifest): ?!void = # Cid computation ############################################################ -template hashBytes(mh: MultiHash): seq[byte] = - ## get the hash bytes of a multihash object - ## +proc cid*(self: Manifest): ?!Cid {.deprecated: "use treeCid instead".} = - mh.data.buffer[mh.dpos..(mh.dpos + mh.size - 1)] - -proc makeRoot*(self: Manifest): ?!void = - ## Create a tree hash root of the contained - ## block hashes - ## - - var - stack: seq[MultiHash] - - for cid in self: - stack.add(? cid.mhash.mapFailure) - - while stack.len > 1: - let - (b1, b2) = (stack.pop(), stack.pop()) - mh = ? MultiHash.digest( - $self.hcodec, - (b1.hashBytes() & b2.hashBytes())) - .mapFailure - stack.add(mh) - - if stack.len == 1: - let digest = ? EmptyDigests[self.version][self.hcodec].catch - let cid = ? Cid.init(self.version, self.codec, digest).mapFailure - - self.rootHash = cid.some - - success() - -proc cid*(self: Manifest): ?!Cid = - ## Generate a root hash using the treehash algorithm - ## - - if self.rootHash.isNone: - ? self.makeRoot() - - (!self.rootHash).success + self.treeCid.success ############################################################ # Constructors ############################################################ +proc new*( + T: type Manifest, + treeCid: Cid, + treeRoot: MultiHash, + blockSize: NBytes, + originalBytes: NBytes, + version: CidVersion, + hcodec: MultiCodec, + codec = multiCodec("raw"), + protected = false, +): Manifest = + + T( + treeCid: treeCid, + treeRoot: treeRoot, + blocks: @[], + blockSize: blockSize, + originalBytes: originalBytes, + version: version, + codec: codec, + hcodec: hcodec, + protected: protected) + proc new*( T: type Manifest, blocks: openArray[Cid] = [], @@ -286,7 +281,8 @@ proc new*( proc new*( T: type Manifest, - rootHash: Cid, + treeCid: Cid, + treeRoot: MultiHash, originalBytes: NBytes, blockSize: NBytes, blocks: seq[Cid], @@ -299,7 +295,8 @@ proc new*( originalLen: int ): Manifest = Manifest( - rootHash: rootHash.some, + treeCid: treeCid, + treeRoot: treeRoot, originalBytes: originalBytes, blockSize: blockSize, blocks: blocks, @@ -315,7 +312,8 @@ proc new*( proc new*( T: type Manifest, - rootHash: Cid, + treeCid: Cid, + treeRoot: MultiHash, originalBytes: NBytes, blockSize: NBytes, blocks: seq[Cid], @@ -324,7 +322,8 @@ proc new*( codec: MultiCodec ): Manifest = Manifest( - rootHash: rootHash.some, + treeCid: treeCid, + treeRoot: treeRoot, originalBytes: originalBytes, blockSize: blockSize, blocks: blocks, diff --git a/codex/merkletree/merkletree.nim b/codex/merkletree/merkletree.nim index 3ee76377..97d1369b 100644 --- a/codex/merkletree/merkletree.nim +++ b/codex/merkletree/merkletree.nim @@ -15,7 +15,7 @@ import std/sugar import pkg/questionable/results import pkg/nimcrypto/sha2 import pkg/libp2p/[multicodec, multihash, vbuffer] -import pkg/stew/base58 +import pkg/stew/byteutils import ../errors @@ -69,7 +69,7 @@ proc digestFn(mcodec: MultiCodec, output: pointer, data: openArray[byte]): ?!voi proc init*( T: type MerkleTreeBuilder, - mcodec: MultiCodec + mcodec: MultiCodec = multiCodec("sha2-256") ): ?!MerkleTreeBuilder = let mhash = ? MultiHash.digest($mcodec, "".toBytes).mapFailure success(MerkleTreeBuilder(mcodec: mcodec, digestSize: mhash.size, buffer: newSeq[byte]())) @@ -81,6 +81,16 @@ proc addDataBlock*(self: var MerkleTreeBuilder, dataBlock: openArray[byte]): ?!v self.buffer.setLen(oldLen + self.digestSize) digestFn(self.mcodec, addr self.buffer[oldLen], dataBlock) +proc addLeaf*(self: var MerkleTreeBuilder, leaf: MultiHash): ?!void = + if leaf.mcodec != self.mcodec or leaf.size != self.digestSize: + return failure("Expected mcodec to be " & $self.mcodec & " and digest size to be " & + $self.digestSize & " but was " & $leaf.mcodec & " and " & $leaf.size) + + let oldLen = self.buffer.len + self.buffer.setLen(oldLen + self.digestSize) + copyMem(addr self.buffer[oldLen], unsafeAddr leaf.data.buffer[leaf.dpos], self.digestSize) + success() + proc build*(self: MerkleTreeBuilder): ?!MerkleTree = ## Builds a tree from previously added data blocks ## @@ -171,6 +181,12 @@ proc leaves*(self: MerkleTree): seq[MultiHash] = proc leavesCount*(self: MerkleTree): Natural = self.leavesCount +proc getLeaf*(self: MerkleTree, index: Natural): ?!MultiHash = + if index >= self.leavesCount: + return failure("Index " & $index & " out of range [0.." & $(self.leavesCount - 1) & "]" ) + + success(self.nodeBufferToMultiHash(index)) + proc height*(self: MerkleTree): Natural = computeTreeHeight(self.leavesCount) @@ -220,8 +236,10 @@ proc getProof*(self: MerkleTree, index: Natural): ?!MerkleProof = proc `$`*(self: MerkleTree): string = "mcodec:" & $self.mcodec & - "\nleavesCount: " & $self.leavesCount & - "\nnodes: " & $self.nodes + "\nleavesCount: " & $self.leavesCount + # TODO fix this + # & + # "\nnodes: " & $self.nodes proc `==`*(a, b: MerkleTree): bool = (a.mcodec == b.mcodec) and @@ -259,8 +277,10 @@ proc index*(self: MerkleProof): Natural = proc `$`*(self: MerkleProof): string = "mcodec:" & $self.mcodec & - "\nindex: " & $self.index & - "\nnodes: " & $self.nodes + "\nindex: " & $self.index + # TODO fix this + # & + # "\nnodes: " & $self.nodes func `==`*(a, b: MerkleProof): bool = (a.index == b.index) and diff --git a/codex/node.nim b/codex/node.nim index 0fe78dc8..1970e87a 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -17,7 +17,7 @@ import pkg/questionable/results import pkg/chronicles import pkg/chronos -import pkg/libp2p/switch +import pkg/libp2p/[switch, multicodec, multihash] import pkg/libp2p/stream/bufferstream # TODO: remove once exported by libp2p @@ -27,6 +27,7 @@ import pkg/libp2p/signed_envelope import ./chunker import ./blocktype as bt import ./manifest +import ./merkletree import ./stores/blockstore import ./blockexchange import ./streams @@ -179,11 +180,14 @@ proc store*( ## trace "Storing data" - without var blockManifest =? Manifest.new(blockSize = blockSize): - return failure("Unable to create Block Set") + let + mcodec = multiCodec("sha2-256") + chunker = LPStreamChunker.new(stream, chunkSize = blockSize) - # Manifest and chunker should use the same blockSize - let chunker = LPStreamChunker.new(stream, chunkSize = blockSize) + without tb =? MerkleTreeBuilder.init(mcodec), err: + return failure(err) + + var treeBuilder = tb # TODO fixit try: while ( @@ -191,10 +195,21 @@ proc store*( chunk.len > 0): trace "Got data from stream", len = chunk.len - without blk =? bt.Block.new(chunk): - return failure("Unable to init block from chunk!") - blockManifest.add(blk.cid) + without mhash =? MultiHash.digest($mcodec, chunk).mapFailure, err: + return failure(err) + + without cid =? Cid.init(CIDv1, multiCodec("raw"), mhash).mapFailure, err: + return failure(err) + + without blk =? bt.Block.new(cid, chunk, verify = false): + return failure("Unable to init block from chunk!") + + if err =? treeBuilder.addLeaf(mhash).errorOption: + return failure(err) + # without x =? treeBuilder.addLeaf(mhash), err: + # return failure(err) + if err =? (await self.blockStore.putBlock(blk)).errorOption: trace "Unable to store block", cid = blk.cid, err = err.msg return failure(&"Unable to store block {blk.cid}") @@ -206,11 +221,29 @@ proc store*( finally: await stream.close() + + without tree =? treeBuilder.build(), err: + return failure(err) + + without treeBlk =? bt.Block.new(tree.encode()), err: + return failure(err) + + if err =? (await self.blockStore.putBlock(treeBlk)).errorOption: + return failure("Unable to store merkle tree block " & $treeBlk.cid & ", nested err: " & err.msg) + + let blockManifest = Manifest.new( + treeCid = treeBlk.cid, + treeRoot = tree.root, + blockSize = blockSize, + originalBytes = NBytes(chunker.offset), + version = CIDv1, + hcodec = mcodec + ) # Generate manifest - blockManifest.originalBytes = NBytes(chunker.offset) # store the exact file size - without data =? blockManifest.encode(): + # blockManifest.originalBytes = NBytes(chunker.offset) # store the exact file size + without data =? blockManifest.encode(), err: return failure( - newException(CodexError, "Could not generate dataset manifest!")) + newException(CodexError, "Error encoding manifest: " & err.msg)) # Store as a dag-pb block without manifest =? bt.Block.new(data = data, codec = DagPBCodec): @@ -221,12 +254,8 @@ proc store*( trace "Unable to store manifest", cid = manifest.cid return failure("Unable to store manifest " & $manifest.cid) - without cid =? blockManifest.cid, error: - trace "Unable to generate manifest Cid!", exc = error.msg - return failure(error.msg) - trace "Stored data", manifestCid = manifest.cid, - contentCid = cid, + treeCid = treeBlk.cid, blocks = blockManifest.len # Announce manifest diff --git a/codex/stores/blockstore.nim b/codex/stores/blockstore.nim index bba95eac..bfcae3f1 100644 --- a/codex/stores/blockstore.nim +++ b/codex/stores/blockstore.nim @@ -17,6 +17,7 @@ import pkg/questionable import pkg/questionable/results import ../blocktype +import ../merkletree export blocktype @@ -44,6 +45,18 @@ method getBlock*(self: BlockStore, cid: Cid): Future[?!Block] {.base.} = raiseAssert("Not implemented!") +method getBlock*(self: BlockStore, treeCid: Cid, index: Natural): Future[?!Block] {.base.} = + ## Get a block by Cid of a merkle tree and an index of a leaf in a tree + ## + + raiseAssert("Not implemented!") + +method getBlockAndProof*(self: BlockStore, treeCid: Cid, index: Natural): Future[?!(Block, MerkleProof)] {.base.} = + ## Get a block and associated inclusion proof by Cid of a merkle tree and an index of a leaf in a tree + ## + + raiseAssert("Not implemented!") + method putBlock*( self: BlockStore, blk: Block, diff --git a/codex/stores/networkstore.nim b/codex/stores/networkstore.nim index 9e815caa..638715ff 100644 --- a/codex/stores/networkstore.nim +++ b/codex/stores/networkstore.nim @@ -20,6 +20,7 @@ import ../utils/asyncheapqueue import ./blockstore import ../blockexchange +import ../merkletree export blockstore, blockexchange, asyncheapqueue @@ -46,6 +47,12 @@ method getBlock*(self: NetworkStore, cid: Cid): Future[?!bt.Block] {.async.} = return success blk +method getBlock*(self: NetworkStore, treeCid: Cid, index: Natural): Future[?!bt.Block] {.async.} = + return await self.localStore.getBlock(treeCid, index) + +method getBlockAndProof*(self: NetworkStore, treeCid: Cid, index: Natural): Future[?!(bt.Block, MerkleProof)] {.async.} = + return await self.localStore.getBlockAndProof(treeCid, index) + method putBlock*( self: NetworkStore, blk: bt.Block, diff --git a/codex/stores/repostore.nim b/codex/stores/repostore.nim index 75400296..c72cbf6a 100644 --- a/codex/stores/repostore.nim +++ b/codex/stores/repostore.nim @@ -12,8 +12,10 @@ import pkg/upraises push: {.upraises: [].} import pkg/chronos +import pkg/chronos/futures import pkg/chronicles -import pkg/libp2p/cid +import pkg/libp2p/[cid, multicodec] +import pkg/lrucache import pkg/metrics import pkg/questionable import pkg/questionable/results @@ -25,6 +27,7 @@ import ./keyutils import ../blocktype import ../clock import ../systemclock +import ../merkletree export blocktype, cid @@ -38,6 +41,7 @@ declareGauge(codexRepostoreBytesReserved, "codex repostore bytes reserved") const DefaultBlockTtl* = 24.hours DefaultQuotaBytes* = 1'u shl 33'u # ~8GB + DefaultTreeCacheCapacity* = 10 # Max number of trees stored in memory type QuotaUsedError* = object of CodexError @@ -54,6 +58,7 @@ type quotaReservedBytes*: uint # bytes reserved by the repo blockTtl*: Duration started*: bool + treeCache*: LruCache[Cid, MerkleTree] BlockExpiration* = object cid*: Cid @@ -84,7 +89,7 @@ func available*(self: RepoStore, bytes: uint): bool = method getBlock*(self: RepoStore, cid: Cid): Future[?!Block] {.async.} = ## Get a block from the blockstore ## - + without key =? makePrefixKey(self.postFixLen, cid), err: trace "Error getting key from provider", err = err.msg return failure(err) @@ -97,7 +102,53 @@ method getBlock*(self: RepoStore, cid: Cid): Future[?!Block] {.async.} = return failure(newException(BlockNotFoundError, err.msg)) trace "Got block for cid", cid - return Block.new(cid, data) + return Block.new(cid, data, verify = true) + +proc getMerkleTree(self: RepoStore, cid: Cid): Future[?!MerkleTree] {.async.} = + try: + return success(self.treeCache[cid]) + except KeyError: + without treeBlk =? await self.getBlock(cid), err: + return failure(err) + + without tree =? MerkleTree.decode(treeBlk.data), err: + return failure("Error decoding a merkle tree with cid " & $cid & ". Nested error is: " & err.msg) + self.treeCache[cid] = tree + return success(tree) + +method getBlock*(self: RepoStore, treeCid: Cid, index: Natural): Future[?!Block] {.async.} = + without tree =? await self.getMerkleTree(treeCid), err: + return failure(err) + + without leaf =? tree.getLeaf(index), err: + return failure(err) + + without leafCid =? Cid.init(treeCid.cidver, treeCid.mcodec, leaf).mapFailure, err: + return failure(err) + + without blk =? await self.getBlock(leafCid), err: + return failure(err) + + return success(blk) + +method getBlockAndProof*(self: RepoStore, treeCid: Cid, index: Natural): Future[?!(Block, MerkleProof)] {.async.} = + without tree =? await self.getMerkleTree(treeCid), err: + return failure(err) + + without proof =? tree.getProof(index), err: + return failure(err) + + without leaf =? tree.getLeaf(index), err: + return failure(err) + + without leafCid =? Cid.init(CIDv1, leaf.mcodec, leaf).mapFailure, err: + return failure(err) + + without blk =? await self.getBlock(leafCid), err: + return failure(err) + + return success((blk, proof)) + proc getBlockExpirationTimestamp(self: RepoStore, ttl: ?Duration): SecondsSince1970 = let duration = ttl |? self.blockTtl @@ -450,7 +501,8 @@ func new*( clock: Clock = SystemClock.new(), postFixLen = 2, quotaMaxBytes = DefaultQuotaBytes, - blockTtl = DefaultBlockTtl + blockTtl = DefaultBlockTtl, + treeCacheCapacity = DefaultTreeCacheCapacity ): RepoStore = ## Create new instance of a RepoStore ## @@ -460,4 +512,5 @@ func new*( clock: clock, postFixLen: postFixLen, quotaMaxBytes: quotaMaxBytes, - blockTtl: blockTtl) + blockTtl: blockTtl, + treeCache: newLruCache[Cid, MerkleTree](treeCacheCapacity)) diff --git a/codex/streams/storestream.nim b/codex/streams/storestream.nim index 809d961c..25999f97 100644 --- a/codex/streams/storestream.nim +++ b/codex/streams/storestream.nim @@ -99,7 +99,9 @@ method readOnce*( self.manifest.blockSize.int - blockOffset]) # Read contents of block `blockNum` - without blk =? await self.store.getBlock(self.manifest[blockNum]), error: + without blk =? await self.store.getBlock(self.manifest.treeCid, blockNum), error: + # TODO Log tree cid and perhaps also block index + trace "Error when getting a block ", msg = error.msg raise newLPStreamReadError(error) trace "Reading bytes from store stream", blockNum, cid = blk.cid, bytes = readBytes, blockOffset