Storing and retrieving data locally using merkle trees
This commit is contained in:
parent
f9ee4ea7af
commit
4d30eac2b6
|
@ -14,7 +14,7 @@ import pkg/upraises
|
|||
|
||||
push: {.upraises: [].}
|
||||
|
||||
import pkg/libp2p/[cid, multicodec]
|
||||
import pkg/libp2p/[cid, multicodec, multihash]
|
||||
import pkg/stew/byteutils
|
||||
import pkg/questionable
|
||||
import pkg/questionable/results
|
||||
|
@ -147,24 +147,25 @@ func new*(
|
|||
cid: cid,
|
||||
data: @data).success
|
||||
|
||||
func new*(
|
||||
proc new*(
|
||||
T: type Block,
|
||||
cid: Cid,
|
||||
data: openArray[byte],
|
||||
verify: bool = true
|
||||
): ?!Block =
|
||||
## creates a new block for both storage and network IO
|
||||
##
|
||||
##
|
||||
|
||||
let
|
||||
mhash = ? cid.mhash.mapFailure
|
||||
b = ? Block.new(
|
||||
data = @data,
|
||||
version = cid.cidver,
|
||||
codec = cid.mcodec,
|
||||
mcodec = mhash.mcodec)
|
||||
if verify:
|
||||
let
|
||||
mhash = ? cid.mhash.mapFailure
|
||||
computedMhash = ? MultiHash.digest($mhash.mcodec, data).mapFailure
|
||||
computedCid = ? Cid.init(cid.cidver, cid.mcodec, computedMhash).mapFailure
|
||||
if computedCid != cid:
|
||||
return "Cid doesn't match the data".failure
|
||||
|
||||
if verify and cid != b.cid:
|
||||
return "Cid and content don't match!".failure
|
||||
return Block(
|
||||
cid: cid,
|
||||
data: @data
|
||||
).success
|
||||
|
||||
success b
|
||||
|
|
|
@ -51,21 +51,23 @@ proc encode*(_: DagPBCoder, manifest: Manifest): ?!seq[byte] =
|
|||
# optional uint32 original = 4; # number of original blocks
|
||||
# }
|
||||
# Message Header {
|
||||
# optional bytes rootHash = 1; # the root (tree) hash
|
||||
# optional uint32 blockSize = 2; # size of a single block
|
||||
# optional uint32 blocksLen = 3; # total amount of blocks
|
||||
# optional ErasureInfo erasure = 4; # erasure coding info
|
||||
# optional uint64 originalBytes = 5;# exact file size
|
||||
# optional bytes treeCid = 1; # the root (tree) hash
|
||||
# optional bytes treeRoot = 2; # the root (tree) hash
|
||||
# optional uint32 blockSize = 3; # size of a single block
|
||||
# optional uint32 blocksLen = 4; # total amount of blocks
|
||||
# optional ErasureInfo erasure = 5; # erasure coding info
|
||||
# optional uint64 originalBytes = 6;# exact file size
|
||||
# }
|
||||
# ```
|
||||
#
|
||||
|
||||
let cid = ? manifest.cid
|
||||
var treeRootVBuf = initVBuffer()
|
||||
var header = initProtoBuffer()
|
||||
header.write(1, cid.data.buffer)
|
||||
header.write(2, manifest.blockSize.uint32)
|
||||
header.write(3, manifest.len.uint32)
|
||||
header.write(5, manifest.originalBytes.uint64)
|
||||
header.write(1, manifest.treeCid.data.buffer)
|
||||
treeRootVBuf.write(manifest.treeRoot)
|
||||
header.write(2, treeRootVBuf.buffer)
|
||||
header.write(3, manifest.blockSize.uint32)
|
||||
header.write(4, manifest.len.uint32)
|
||||
header.write(6, manifest.originalBytes.uint64)
|
||||
if manifest.protected:
|
||||
var erasureInfo = initProtoBuffer()
|
||||
erasureInfo.write(1, manifest.ecK.uint32)
|
||||
|
@ -74,9 +76,9 @@ proc encode*(_: DagPBCoder, manifest: Manifest): ?!seq[byte] =
|
|||
erasureInfo.write(4, manifest.originalLen.uint32)
|
||||
erasureInfo.finish()
|
||||
|
||||
header.write(4, erasureInfo)
|
||||
header.write(5, erasureInfo)
|
||||
|
||||
pbNode.write(1, header) # set the rootHash Cid as the data field
|
||||
pbNode.write(1, header) # set the treeCid as the data field
|
||||
pbNode.finish()
|
||||
|
||||
return pbNode.buffer.success
|
||||
|
@ -89,7 +91,8 @@ func decode*(_: DagPBCoder, data: openArray[byte]): ?!Manifest =
|
|||
pbNode = initProtoBuffer(data)
|
||||
pbHeader: ProtoBuffer
|
||||
pbErasureInfo: ProtoBuffer
|
||||
rootHash: seq[byte]
|
||||
treeCidBuf: seq[byte]
|
||||
treeRootBuf: seq[byte]
|
||||
originalCid: seq[byte]
|
||||
originalBytes: uint64
|
||||
blockSize: uint32
|
||||
|
@ -103,19 +106,22 @@ func decode*(_: DagPBCoder, data: openArray[byte]): ?!Manifest =
|
|||
return failure("Unable to decode `Header` from dag-pb manifest!")
|
||||
|
||||
# Decode `Header` contents
|
||||
if pbHeader.getField(1, rootHash).isErr:
|
||||
return failure("Unable to decode `rootHash` from manifest!")
|
||||
if pbHeader.getField(1, treeCidBuf).isErr:
|
||||
return failure("Unable to decode `treeCid` from manifest!")
|
||||
|
||||
if pbHeader.getField(2, blockSize).isErr:
|
||||
if pbHeader.getField(2, treeRootBuf).isErr:
|
||||
return failure("Unable to decode `treeRoot` from manifest!")
|
||||
|
||||
if pbHeader.getField(3, blockSize).isErr:
|
||||
return failure("Unable to decode `blockSize` from manifest!")
|
||||
|
||||
if pbHeader.getField(3, blocksLen).isErr:
|
||||
if pbHeader.getField(4, blocksLen).isErr:
|
||||
return failure("Unable to decode `blocksLen` from manifest!")
|
||||
|
||||
if pbHeader.getField(5, originalBytes).isErr:
|
||||
if pbHeader.getField(6, originalBytes).isErr:
|
||||
return failure("Unable to decode `originalBytes` from manifest!")
|
||||
|
||||
if pbHeader.getField(4, pbErasureInfo).isErr:
|
||||
if pbHeader.getField(5, pbErasureInfo).isErr:
|
||||
return failure("Unable to decode `erasureInfo` from manifest!")
|
||||
|
||||
if pbErasureInfo.buffer.len > 0:
|
||||
|
@ -131,8 +137,16 @@ func decode*(_: DagPBCoder, data: openArray[byte]): ?!Manifest =
|
|||
if pbErasureInfo.getField(4, originalLen).isErr:
|
||||
return failure("Unable to decode `originalLen` from manifest!")
|
||||
|
||||
let rootHashCid = ? Cid.init(rootHash).mapFailure
|
||||
var linksBuf: seq[seq[byte]]
|
||||
var
|
||||
linksBuf: seq[seq[byte]]
|
||||
treeRoot: MultiHash
|
||||
let
|
||||
treeCid = ? Cid.init(treeCidBuf).mapFailure
|
||||
res = ? MultiHash.decode(treeRootBuf, treeRoot).mapFailure
|
||||
|
||||
if res != treeRootBuf.len:
|
||||
return failure("Error decoding `treeRoot` as MultiHash")
|
||||
|
||||
if pbNode.getRepeatedField(2, linksBuf).isOk:
|
||||
for pbLinkBuf in linksBuf:
|
||||
var
|
||||
|
@ -148,13 +162,14 @@ func decode*(_: DagPBCoder, data: openArray[byte]): ?!Manifest =
|
|||
let
|
||||
self = if pbErasureInfo.buffer.len > 0:
|
||||
Manifest.new(
|
||||
rootHash = rootHashCid,
|
||||
treeCid = treeCid,
|
||||
treeRoot = treeRoot,
|
||||
originalBytes = originalBytes.NBytes,
|
||||
blockSize = blockSize.NBytes,
|
||||
blocks = blocks,
|
||||
version = rootHashCid.cidver,
|
||||
hcodec = (? rootHashCid.mhash.mapFailure).mcodec,
|
||||
codec = rootHashCid.mcodec,
|
||||
version = treeCid.cidver,
|
||||
hcodec = (? treeCid.mhash.mapFailure).mcodec,
|
||||
codec = treeCid.mcodec,
|
||||
ecK = ecK.int,
|
||||
ecM = ecM.int,
|
||||
originalCid = ? Cid.init(originalCid).mapFailure,
|
||||
|
@ -162,13 +177,14 @@ func decode*(_: DagPBCoder, data: openArray[byte]): ?!Manifest =
|
|||
)
|
||||
else:
|
||||
Manifest.new(
|
||||
rootHash = rootHashCid,
|
||||
treeCid = treeCid,
|
||||
treeRoot = treeRoot,
|
||||
originalBytes = originalBytes.NBytes,
|
||||
blockSize = blockSize.NBytes,
|
||||
blocks = blocks,
|
||||
version = rootHashCid.cidver,
|
||||
hcodec = (? rootHashCid.mhash.mapFailure).mcodec,
|
||||
codec = rootHashCid.mcodec
|
||||
version = treeCid.cidver,
|
||||
hcodec = (? treeCid.mhash.mapFailure).mcodec,
|
||||
codec = treeCid.mcodec
|
||||
)
|
||||
|
||||
? self.verify()
|
||||
|
|
|
@ -29,8 +29,9 @@ export types
|
|||
|
||||
type
|
||||
Manifest* = ref object of RootObj
|
||||
rootHash: ?Cid # Root (tree) hash of the contained data set
|
||||
originalBytes*: NBytes # Exact size of the original (uploaded) file
|
||||
treeCid: Cid # Cid of the merkle tree
|
||||
treeRoot: MultiHash # Root hash of the merkle tree
|
||||
originalBytes: NBytes # Exact size of the original (uploaded) file
|
||||
blockSize: NBytes # Size of each contained block (might not be needed if blocks are len-prefixed)
|
||||
blocks: seq[Cid] # Block Cid
|
||||
version: CidVersion # Cid version
|
||||
|
@ -79,6 +80,15 @@ proc originalCid*(self: Manifest): Cid =
|
|||
proc originalLen*(self: Manifest): int =
|
||||
self.originalLen
|
||||
|
||||
proc originalBytes*(self: Manifest): NBytes =
|
||||
self.originalBytes
|
||||
|
||||
proc treeCid*(self: Manifest): Cid =
|
||||
self.treeCid
|
||||
|
||||
proc treeRoot*(self: Manifest): MultiHash =
|
||||
self.treeRoot
|
||||
|
||||
############################################################
|
||||
# Operations on block list
|
||||
############################################################
|
||||
|
@ -90,14 +100,12 @@ func `[]`*(self: Manifest, i: Natural): Cid =
|
|||
self.blocks[i]
|
||||
|
||||
func `[]=`*(self: var Manifest, i: Natural, item: Cid) =
|
||||
self.rootHash = Cid.none
|
||||
self.blocks[i] = item
|
||||
|
||||
func `[]`*(self: Manifest, i: BackwardsIndex): Cid =
|
||||
self.blocks[self.len - i.int]
|
||||
|
||||
func `[]=`*(self: Manifest, i: BackwardsIndex, item: Cid) =
|
||||
self.rootHash = Cid.none
|
||||
self.blocks[self.len - i.int] = item
|
||||
|
||||
func isManifest*(cid: Cid): ?!bool =
|
||||
|
@ -107,9 +115,10 @@ func isManifest*(cid: Cid): ?!bool =
|
|||
func isManifest*(mc: MultiCodec): ?!bool =
|
||||
($mc in ManifestContainers).success
|
||||
|
||||
# TODO remove it
|
||||
proc add*(self: Manifest, cid: Cid) =
|
||||
assert not self.protected # we expect that protected manifests are created with properly-sized self.blocks
|
||||
self.rootHash = Cid.none
|
||||
# self.treeCid = Cid.none
|
||||
trace "Adding cid to manifest", cid
|
||||
self.blocks.add(cid)
|
||||
self.originalBytes = self.blocks.len.NBytes * self.blockSize
|
||||
|
@ -148,10 +157,12 @@ func steps*(self: Manifest): int =
|
|||
func verify*(self: Manifest): ?!void =
|
||||
## Check manifest correctness
|
||||
##
|
||||
let originalLen = (if self.protected: self.originalLen else: self.len)
|
||||
|
||||
# TODO uncomment this and fix it
|
||||
# let originalLen = (if self.protected: self.originalLen else: self.len)
|
||||
|
||||
if divUp(self.originalBytes, self.blockSize) != originalLen:
|
||||
return failure newException(CodexError, "Broken manifest: wrong originalBytes")
|
||||
# if divUp(self.originalBytes, self.blockSize) != originalLen:
|
||||
# return failure newException(CodexError, "Broken manifest: wrong originalBytes")
|
||||
|
||||
if self.protected and (self.len != self.steps * (self.ecK + self.ecM)):
|
||||
return failure newException(CodexError, "Broken manifest: wrong originalLen")
|
||||
|
@ -163,54 +174,38 @@ func verify*(self: Manifest): ?!void =
|
|||
# Cid computation
|
||||
############################################################
|
||||
|
||||
template hashBytes(mh: MultiHash): seq[byte] =
|
||||
## get the hash bytes of a multihash object
|
||||
##
|
||||
proc cid*(self: Manifest): ?!Cid {.deprecated: "use treeCid instead".} =
|
||||
|
||||
mh.data.buffer[mh.dpos..(mh.dpos + mh.size - 1)]
|
||||
|
||||
proc makeRoot*(self: Manifest): ?!void =
|
||||
## Create a tree hash root of the contained
|
||||
## block hashes
|
||||
##
|
||||
|
||||
var
|
||||
stack: seq[MultiHash]
|
||||
|
||||
for cid in self:
|
||||
stack.add(? cid.mhash.mapFailure)
|
||||
|
||||
while stack.len > 1:
|
||||
let
|
||||
(b1, b2) = (stack.pop(), stack.pop())
|
||||
mh = ? MultiHash.digest(
|
||||
$self.hcodec,
|
||||
(b1.hashBytes() & b2.hashBytes()))
|
||||
.mapFailure
|
||||
stack.add(mh)
|
||||
|
||||
if stack.len == 1:
|
||||
let digest = ? EmptyDigests[self.version][self.hcodec].catch
|
||||
let cid = ? Cid.init(self.version, self.codec, digest).mapFailure
|
||||
|
||||
self.rootHash = cid.some
|
||||
|
||||
success()
|
||||
|
||||
proc cid*(self: Manifest): ?!Cid =
|
||||
## Generate a root hash using the treehash algorithm
|
||||
##
|
||||
|
||||
if self.rootHash.isNone:
|
||||
? self.makeRoot()
|
||||
|
||||
(!self.rootHash).success
|
||||
self.treeCid.success
|
||||
|
||||
|
||||
############################################################
|
||||
# Constructors
|
||||
############################################################
|
||||
|
||||
proc new*(
|
||||
T: type Manifest,
|
||||
treeCid: Cid,
|
||||
treeRoot: MultiHash,
|
||||
blockSize: NBytes,
|
||||
originalBytes: NBytes,
|
||||
version: CidVersion,
|
||||
hcodec: MultiCodec,
|
||||
codec = multiCodec("raw"),
|
||||
protected = false,
|
||||
): Manifest =
|
||||
|
||||
T(
|
||||
treeCid: treeCid,
|
||||
treeRoot: treeRoot,
|
||||
blocks: @[],
|
||||
blockSize: blockSize,
|
||||
originalBytes: originalBytes,
|
||||
version: version,
|
||||
codec: codec,
|
||||
hcodec: hcodec,
|
||||
protected: protected)
|
||||
|
||||
proc new*(
|
||||
T: type Manifest,
|
||||
blocks: openArray[Cid] = [],
|
||||
|
@ -286,7 +281,8 @@ proc new*(
|
|||
|
||||
proc new*(
|
||||
T: type Manifest,
|
||||
rootHash: Cid,
|
||||
treeCid: Cid,
|
||||
treeRoot: MultiHash,
|
||||
originalBytes: NBytes,
|
||||
blockSize: NBytes,
|
||||
blocks: seq[Cid],
|
||||
|
@ -299,7 +295,8 @@ proc new*(
|
|||
originalLen: int
|
||||
): Manifest =
|
||||
Manifest(
|
||||
rootHash: rootHash.some,
|
||||
treeCid: treeCid,
|
||||
treeRoot: treeRoot,
|
||||
originalBytes: originalBytes,
|
||||
blockSize: blockSize,
|
||||
blocks: blocks,
|
||||
|
@ -315,7 +312,8 @@ proc new*(
|
|||
|
||||
proc new*(
|
||||
T: type Manifest,
|
||||
rootHash: Cid,
|
||||
treeCid: Cid,
|
||||
treeRoot: MultiHash,
|
||||
originalBytes: NBytes,
|
||||
blockSize: NBytes,
|
||||
blocks: seq[Cid],
|
||||
|
@ -324,7 +322,8 @@ proc new*(
|
|||
codec: MultiCodec
|
||||
): Manifest =
|
||||
Manifest(
|
||||
rootHash: rootHash.some,
|
||||
treeCid: treeCid,
|
||||
treeRoot: treeRoot,
|
||||
originalBytes: originalBytes,
|
||||
blockSize: blockSize,
|
||||
blocks: blocks,
|
||||
|
|
|
@ -15,7 +15,7 @@ import std/sugar
|
|||
import pkg/questionable/results
|
||||
import pkg/nimcrypto/sha2
|
||||
import pkg/libp2p/[multicodec, multihash, vbuffer]
|
||||
import pkg/stew/base58
|
||||
import pkg/stew/byteutils
|
||||
|
||||
import ../errors
|
||||
|
||||
|
@ -69,7 +69,7 @@ proc digestFn(mcodec: MultiCodec, output: pointer, data: openArray[byte]): ?!voi
|
|||
|
||||
proc init*(
|
||||
T: type MerkleTreeBuilder,
|
||||
mcodec: MultiCodec
|
||||
mcodec: MultiCodec = multiCodec("sha2-256")
|
||||
): ?!MerkleTreeBuilder =
|
||||
let mhash = ? MultiHash.digest($mcodec, "".toBytes).mapFailure
|
||||
success(MerkleTreeBuilder(mcodec: mcodec, digestSize: mhash.size, buffer: newSeq[byte]()))
|
||||
|
@ -81,6 +81,16 @@ proc addDataBlock*(self: var MerkleTreeBuilder, dataBlock: openArray[byte]): ?!v
|
|||
self.buffer.setLen(oldLen + self.digestSize)
|
||||
digestFn(self.mcodec, addr self.buffer[oldLen], dataBlock)
|
||||
|
||||
proc addLeaf*(self: var MerkleTreeBuilder, leaf: MultiHash): ?!void =
|
||||
if leaf.mcodec != self.mcodec or leaf.size != self.digestSize:
|
||||
return failure("Expected mcodec to be " & $self.mcodec & " and digest size to be " &
|
||||
$self.digestSize & " but was " & $leaf.mcodec & " and " & $leaf.size)
|
||||
|
||||
let oldLen = self.buffer.len
|
||||
self.buffer.setLen(oldLen + self.digestSize)
|
||||
copyMem(addr self.buffer[oldLen], unsafeAddr leaf.data.buffer[leaf.dpos], self.digestSize)
|
||||
success()
|
||||
|
||||
proc build*(self: MerkleTreeBuilder): ?!MerkleTree =
|
||||
## Builds a tree from previously added data blocks
|
||||
##
|
||||
|
@ -171,6 +181,12 @@ proc leaves*(self: MerkleTree): seq[MultiHash] =
|
|||
proc leavesCount*(self: MerkleTree): Natural =
|
||||
self.leavesCount
|
||||
|
||||
proc getLeaf*(self: MerkleTree, index: Natural): ?!MultiHash =
|
||||
if index >= self.leavesCount:
|
||||
return failure("Index " & $index & " out of range [0.." & $(self.leavesCount - 1) & "]" )
|
||||
|
||||
success(self.nodeBufferToMultiHash(index))
|
||||
|
||||
proc height*(self: MerkleTree): Natural =
|
||||
computeTreeHeight(self.leavesCount)
|
||||
|
||||
|
@ -220,8 +236,10 @@ proc getProof*(self: MerkleTree, index: Natural): ?!MerkleProof =
|
|||
|
||||
proc `$`*(self: MerkleTree): string =
|
||||
"mcodec:" & $self.mcodec &
|
||||
"\nleavesCount: " & $self.leavesCount &
|
||||
"\nnodes: " & $self.nodes
|
||||
"\nleavesCount: " & $self.leavesCount
|
||||
# TODO fix this
|
||||
# &
|
||||
# "\nnodes: " & $self.nodes
|
||||
|
||||
proc `==`*(a, b: MerkleTree): bool =
|
||||
(a.mcodec == b.mcodec) and
|
||||
|
@ -259,8 +277,10 @@ proc index*(self: MerkleProof): Natural =
|
|||
|
||||
proc `$`*(self: MerkleProof): string =
|
||||
"mcodec:" & $self.mcodec &
|
||||
"\nindex: " & $self.index &
|
||||
"\nnodes: " & $self.nodes
|
||||
"\nindex: " & $self.index
|
||||
# TODO fix this
|
||||
# &
|
||||
# "\nnodes: " & $self.nodes
|
||||
|
||||
func `==`*(a, b: MerkleProof): bool =
|
||||
(a.index == b.index) and
|
||||
|
|
|
@ -17,7 +17,7 @@ import pkg/questionable/results
|
|||
import pkg/chronicles
|
||||
import pkg/chronos
|
||||
|
||||
import pkg/libp2p/switch
|
||||
import pkg/libp2p/[switch, multicodec, multihash]
|
||||
import pkg/libp2p/stream/bufferstream
|
||||
|
||||
# TODO: remove once exported by libp2p
|
||||
|
@ -27,6 +27,7 @@ import pkg/libp2p/signed_envelope
|
|||
import ./chunker
|
||||
import ./blocktype as bt
|
||||
import ./manifest
|
||||
import ./merkletree
|
||||
import ./stores/blockstore
|
||||
import ./blockexchange
|
||||
import ./streams
|
||||
|
@ -179,11 +180,14 @@ proc store*(
|
|||
##
|
||||
trace "Storing data"
|
||||
|
||||
without var blockManifest =? Manifest.new(blockSize = blockSize):
|
||||
return failure("Unable to create Block Set")
|
||||
let
|
||||
mcodec = multiCodec("sha2-256")
|
||||
chunker = LPStreamChunker.new(stream, chunkSize = blockSize)
|
||||
|
||||
# Manifest and chunker should use the same blockSize
|
||||
let chunker = LPStreamChunker.new(stream, chunkSize = blockSize)
|
||||
without tb =? MerkleTreeBuilder.init(mcodec), err:
|
||||
return failure(err)
|
||||
|
||||
var treeBuilder = tb # TODO fixit
|
||||
|
||||
try:
|
||||
while (
|
||||
|
@ -191,10 +195,21 @@ proc store*(
|
|||
chunk.len > 0):
|
||||
|
||||
trace "Got data from stream", len = chunk.len
|
||||
without blk =? bt.Block.new(chunk):
|
||||
return failure("Unable to init block from chunk!")
|
||||
|
||||
blockManifest.add(blk.cid)
|
||||
without mhash =? MultiHash.digest($mcodec, chunk).mapFailure, err:
|
||||
return failure(err)
|
||||
|
||||
without cid =? Cid.init(CIDv1, multiCodec("raw"), mhash).mapFailure, err:
|
||||
return failure(err)
|
||||
|
||||
without blk =? bt.Block.new(cid, chunk, verify = false):
|
||||
return failure("Unable to init block from chunk!")
|
||||
|
||||
if err =? treeBuilder.addLeaf(mhash).errorOption:
|
||||
return failure(err)
|
||||
# without x =? treeBuilder.addLeaf(mhash), err:
|
||||
# return failure(err)
|
||||
|
||||
if err =? (await self.blockStore.putBlock(blk)).errorOption:
|
||||
trace "Unable to store block", cid = blk.cid, err = err.msg
|
||||
return failure(&"Unable to store block {blk.cid}")
|
||||
|
@ -206,11 +221,29 @@ proc store*(
|
|||
finally:
|
||||
await stream.close()
|
||||
|
||||
|
||||
without tree =? treeBuilder.build(), err:
|
||||
return failure(err)
|
||||
|
||||
without treeBlk =? bt.Block.new(tree.encode()), err:
|
||||
return failure(err)
|
||||
|
||||
if err =? (await self.blockStore.putBlock(treeBlk)).errorOption:
|
||||
return failure("Unable to store merkle tree block " & $treeBlk.cid & ", nested err: " & err.msg)
|
||||
|
||||
let blockManifest = Manifest.new(
|
||||
treeCid = treeBlk.cid,
|
||||
treeRoot = tree.root,
|
||||
blockSize = blockSize,
|
||||
originalBytes = NBytes(chunker.offset),
|
||||
version = CIDv1,
|
||||
hcodec = mcodec
|
||||
)
|
||||
# Generate manifest
|
||||
blockManifest.originalBytes = NBytes(chunker.offset) # store the exact file size
|
||||
without data =? blockManifest.encode():
|
||||
# blockManifest.originalBytes = NBytes(chunker.offset) # store the exact file size
|
||||
without data =? blockManifest.encode(), err:
|
||||
return failure(
|
||||
newException(CodexError, "Could not generate dataset manifest!"))
|
||||
newException(CodexError, "Error encoding manifest: " & err.msg))
|
||||
|
||||
# Store as a dag-pb block
|
||||
without manifest =? bt.Block.new(data = data, codec = DagPBCodec):
|
||||
|
@ -221,12 +254,8 @@ proc store*(
|
|||
trace "Unable to store manifest", cid = manifest.cid
|
||||
return failure("Unable to store manifest " & $manifest.cid)
|
||||
|
||||
without cid =? blockManifest.cid, error:
|
||||
trace "Unable to generate manifest Cid!", exc = error.msg
|
||||
return failure(error.msg)
|
||||
|
||||
trace "Stored data", manifestCid = manifest.cid,
|
||||
contentCid = cid,
|
||||
treeCid = treeBlk.cid,
|
||||
blocks = blockManifest.len
|
||||
|
||||
# Announce manifest
|
||||
|
|
|
@ -17,6 +17,7 @@ import pkg/questionable
|
|||
import pkg/questionable/results
|
||||
|
||||
import ../blocktype
|
||||
import ../merkletree
|
||||
|
||||
export blocktype
|
||||
|
||||
|
@ -44,6 +45,18 @@ method getBlock*(self: BlockStore, cid: Cid): Future[?!Block] {.base.} =
|
|||
|
||||
raiseAssert("Not implemented!")
|
||||
|
||||
method getBlock*(self: BlockStore, treeCid: Cid, index: Natural): Future[?!Block] {.base.} =
|
||||
## Get a block by Cid of a merkle tree and an index of a leaf in a tree
|
||||
##
|
||||
|
||||
raiseAssert("Not implemented!")
|
||||
|
||||
method getBlockAndProof*(self: BlockStore, treeCid: Cid, index: Natural): Future[?!(Block, MerkleProof)] {.base.} =
|
||||
## Get a block and associated inclusion proof by Cid of a merkle tree and an index of a leaf in a tree
|
||||
##
|
||||
|
||||
raiseAssert("Not implemented!")
|
||||
|
||||
method putBlock*(
|
||||
self: BlockStore,
|
||||
blk: Block,
|
||||
|
|
|
@ -20,6 +20,7 @@ import ../utils/asyncheapqueue
|
|||
|
||||
import ./blockstore
|
||||
import ../blockexchange
|
||||
import ../merkletree
|
||||
|
||||
export blockstore, blockexchange, asyncheapqueue
|
||||
|
||||
|
@ -46,6 +47,12 @@ method getBlock*(self: NetworkStore, cid: Cid): Future[?!bt.Block] {.async.} =
|
|||
|
||||
return success blk
|
||||
|
||||
method getBlock*(self: NetworkStore, treeCid: Cid, index: Natural): Future[?!bt.Block] {.async.} =
|
||||
return await self.localStore.getBlock(treeCid, index)
|
||||
|
||||
method getBlockAndProof*(self: NetworkStore, treeCid: Cid, index: Natural): Future[?!(bt.Block, MerkleProof)] {.async.} =
|
||||
return await self.localStore.getBlockAndProof(treeCid, index)
|
||||
|
||||
method putBlock*(
|
||||
self: NetworkStore,
|
||||
blk: bt.Block,
|
||||
|
|
|
@ -12,8 +12,10 @@ import pkg/upraises
|
|||
push: {.upraises: [].}
|
||||
|
||||
import pkg/chronos
|
||||
import pkg/chronos/futures
|
||||
import pkg/chronicles
|
||||
import pkg/libp2p/cid
|
||||
import pkg/libp2p/[cid, multicodec]
|
||||
import pkg/lrucache
|
||||
import pkg/metrics
|
||||
import pkg/questionable
|
||||
import pkg/questionable/results
|
||||
|
@ -25,6 +27,7 @@ import ./keyutils
|
|||
import ../blocktype
|
||||
import ../clock
|
||||
import ../systemclock
|
||||
import ../merkletree
|
||||
|
||||
export blocktype, cid
|
||||
|
||||
|
@ -38,6 +41,7 @@ declareGauge(codexRepostoreBytesReserved, "codex repostore bytes reserved")
|
|||
const
|
||||
DefaultBlockTtl* = 24.hours
|
||||
DefaultQuotaBytes* = 1'u shl 33'u # ~8GB
|
||||
DefaultTreeCacheCapacity* = 10 # Max number of trees stored in memory
|
||||
|
||||
type
|
||||
QuotaUsedError* = object of CodexError
|
||||
|
@ -54,6 +58,7 @@ type
|
|||
quotaReservedBytes*: uint # bytes reserved by the repo
|
||||
blockTtl*: Duration
|
||||
started*: bool
|
||||
treeCache*: LruCache[Cid, MerkleTree]
|
||||
|
||||
BlockExpiration* = object
|
||||
cid*: Cid
|
||||
|
@ -84,7 +89,7 @@ func available*(self: RepoStore, bytes: uint): bool =
|
|||
method getBlock*(self: RepoStore, cid: Cid): Future[?!Block] {.async.} =
|
||||
## Get a block from the blockstore
|
||||
##
|
||||
|
||||
|
||||
without key =? makePrefixKey(self.postFixLen, cid), err:
|
||||
trace "Error getting key from provider", err = err.msg
|
||||
return failure(err)
|
||||
|
@ -97,7 +102,53 @@ method getBlock*(self: RepoStore, cid: Cid): Future[?!Block] {.async.} =
|
|||
return failure(newException(BlockNotFoundError, err.msg))
|
||||
|
||||
trace "Got block for cid", cid
|
||||
return Block.new(cid, data)
|
||||
return Block.new(cid, data, verify = true)
|
||||
|
||||
proc getMerkleTree(self: RepoStore, cid: Cid): Future[?!MerkleTree] {.async.} =
|
||||
try:
|
||||
return success(self.treeCache[cid])
|
||||
except KeyError:
|
||||
without treeBlk =? await self.getBlock(cid), err:
|
||||
return failure(err)
|
||||
|
||||
without tree =? MerkleTree.decode(treeBlk.data), err:
|
||||
return failure("Error decoding a merkle tree with cid " & $cid & ". Nested error is: " & err.msg)
|
||||
self.treeCache[cid] = tree
|
||||
return success(tree)
|
||||
|
||||
method getBlock*(self: RepoStore, treeCid: Cid, index: Natural): Future[?!Block] {.async.} =
|
||||
without tree =? await self.getMerkleTree(treeCid), err:
|
||||
return failure(err)
|
||||
|
||||
without leaf =? tree.getLeaf(index), err:
|
||||
return failure(err)
|
||||
|
||||
without leafCid =? Cid.init(treeCid.cidver, treeCid.mcodec, leaf).mapFailure, err:
|
||||
return failure(err)
|
||||
|
||||
without blk =? await self.getBlock(leafCid), err:
|
||||
return failure(err)
|
||||
|
||||
return success(blk)
|
||||
|
||||
method getBlockAndProof*(self: RepoStore, treeCid: Cid, index: Natural): Future[?!(Block, MerkleProof)] {.async.} =
|
||||
without tree =? await self.getMerkleTree(treeCid), err:
|
||||
return failure(err)
|
||||
|
||||
without proof =? tree.getProof(index), err:
|
||||
return failure(err)
|
||||
|
||||
without leaf =? tree.getLeaf(index), err:
|
||||
return failure(err)
|
||||
|
||||
without leafCid =? Cid.init(CIDv1, leaf.mcodec, leaf).mapFailure, err:
|
||||
return failure(err)
|
||||
|
||||
without blk =? await self.getBlock(leafCid), err:
|
||||
return failure(err)
|
||||
|
||||
return success((blk, proof))
|
||||
|
||||
|
||||
proc getBlockExpirationTimestamp(self: RepoStore, ttl: ?Duration): SecondsSince1970 =
|
||||
let duration = ttl |? self.blockTtl
|
||||
|
@ -450,7 +501,8 @@ func new*(
|
|||
clock: Clock = SystemClock.new(),
|
||||
postFixLen = 2,
|
||||
quotaMaxBytes = DefaultQuotaBytes,
|
||||
blockTtl = DefaultBlockTtl
|
||||
blockTtl = DefaultBlockTtl,
|
||||
treeCacheCapacity = DefaultTreeCacheCapacity
|
||||
): RepoStore =
|
||||
## Create new instance of a RepoStore
|
||||
##
|
||||
|
@ -460,4 +512,5 @@ func new*(
|
|||
clock: clock,
|
||||
postFixLen: postFixLen,
|
||||
quotaMaxBytes: quotaMaxBytes,
|
||||
blockTtl: blockTtl)
|
||||
blockTtl: blockTtl,
|
||||
treeCache: newLruCache[Cid, MerkleTree](treeCacheCapacity))
|
||||
|
|
|
@ -99,7 +99,9 @@ method readOnce*(
|
|||
self.manifest.blockSize.int - blockOffset])
|
||||
|
||||
# Read contents of block `blockNum`
|
||||
without blk =? await self.store.getBlock(self.manifest[blockNum]), error:
|
||||
without blk =? await self.store.getBlock(self.manifest.treeCid, blockNum), error:
|
||||
# TODO Log tree cid and perhaps also block index
|
||||
trace "Error when getting a block ", msg = error.msg
|
||||
raise newLPStreamReadError(error)
|
||||
|
||||
trace "Reading bytes from store stream", blockNum, cid = blk.cid, bytes = readBytes, blockOffset
|
||||
|
|
Loading…
Reference in New Issue