Storing proofs instead of trees

This commit is contained in:
Tomasz Bekas 2023-11-03 21:17:20 +01:00 committed by Dmitriy Ryajov
parent 9c8d08681f
commit 2ea84f9e6c
No known key found for this signature in database
GPG Key ID: DA8C680CE7C657A4
23 changed files with 368 additions and 858 deletions

View File

@ -174,78 +174,29 @@ proc monitorBlockHandle(b: BlockExcEngine, handle: Future[Block], address: Block
proc requestBlock*(
b: BlockExcEngine,
cid: Cid,
timeout = DefaultBlockTimeout): Future[Block] {.async.} =
trace "Begin block request", cid, peers = b.peers.len
if b.pendingBlocks.isInFlight(cid):
trace "Request handle already pending", cid
return await b.pendingBlocks.getWantHandle(cid, timeout)
let
blk = b.pendingBlocks.getWantHandle(cid, timeout)
address = BlockAddress(leaf: false, cid: cid)
trace "Selecting peers who have", address
var
peers = b.peers.selectCheapest(address)
without blockPeer =? b.findCheapestPeerForBlock(peers):
trace "No peers to request blocks from. Queue discovery...", cid
b.discovery.queueFindBlocksReq(@[cid])
return await blk
asyncSpawn b.monitorBlockHandle(blk, address, blockPeer.id)
b.pendingBlocks.setInFlight(cid, true)
await b.sendWantBlock(address, blockPeer)
codex_block_exchange_want_block_lists_sent.inc()
if (peers.len - 1) == 0:
trace "No peers to send want list to", cid
b.discovery.queueFindBlocksReq(@[cid])
return await blk
await b.sendWantHave(address, blockPeer, toSeq(b.peers))
codex_block_exchange_want_have_lists_sent.inc()
return await blk
proc requestBlock(
b: BlockExcEngine,
treeReq: TreeReq,
index: Natural,
address: BlockAddress,
timeout = DefaultBlockTimeout
): Future[Block] {.async.} =
let address = BlockAddress(leaf: true, treeCid: treeReq.treeCid, index: index)
let blockFuture = b.pendingBlocks.getWantHandle(address, timeout)
let handleOrCid = treeReq.getWantHandleOrCid(index, timeout)
if handleOrCid.resolved:
without blk =? await b.localStore.getBlock(handleOrCid.cid), err:
return await b.requestBlock(handleOrCid.cid, timeout)
return blk
let blockFuture = handleOrCid.handle
if treeReq.isInFlight(index):
if b.pendingBlocks.isInFlight(address):
return await blockFuture
let peers = b.peers.selectCheapest(address)
if peers.len == 0:
b.discovery.queueFindBlocksReq(@[treeReq.treeCid])
b.discovery.queueFindBlocksReq(@[address.cidOrTreeCid])
let maybePeer =
if peers.len > 0:
peers[index mod peers.len].some
peers[hash(address) mod peers.len].some
elif b.peers.len > 0:
toSeq(b.peers)[index mod b.peers.len].some
toSeq(b.peers)[hash(address) mod b.peers.len].some
else:
BlockExcPeerCtx.none
if peer =? maybePeer:
asyncSpawn b.monitorBlockHandle(blockFuture, address, peer.id)
treeReq.trySetInFlight(index)
b.pendingBlocks.setInFlight(address)
await b.sendWantBlock(address, peer)
codexBlockExchangeWantBlockListsSent.inc()
await b.sendWantHave(address, peer, toSeq(b.peers))
@ -255,29 +206,10 @@ proc requestBlock(
proc requestBlock*(
b: BlockExcEngine,
treeCid: Cid,
index: Natural,
merkleRoot: MultiHash,
cid: Cid,
timeout = DefaultBlockTimeout
): Future[Block] =
without treeReq =? b.pendingBlocks.getOrPutTreeReq(treeCid, Natural.none, merkleRoot), err:
raise err
return b.requestBlock(treeReq, index, timeout)
proc requestBlocks*(
b: BlockExcEngine,
treeCid: Cid,
leavesCount: Natural,
merkleRoot: MultiHash,
timeout = DefaultBlockTimeout
): ?!AsyncIter[Block] =
without treeReq =? b.pendingBlocks.getOrPutTreeReq(treeCid, leavesCount.some, merkleRoot), err:
return failure(err)
return Iter.fromSlice(0..<leavesCount).map(
(index: int) => b.requestBlock(treeReq, index, timeout)
).success
b.requestBlock(BlockAddress.init(cid))
proc blockPresenceHandler*(
b: BlockExcEngine,
@ -352,7 +284,12 @@ proc resolveBlocks*(b: BlockExcEngine, blocksDelivery: seq[BlockDelivery]) {.asy
b.pendingBlocks.resolve(blocksDelivery)
await b.scheduleTasks(blocksDelivery)
b.discovery.queueProvideBlocksReq(blocksDelivery.mapIt( it.blk.cid ))
var cids = initHashSet[Cid]()
for bd in blocksDelivery:
cids.incl(bd.blk.cid)
if bd.address.leaf:
cids.incl(bd.address.treeCid)
b.discovery.queueProvideBlocksReq(cids.toSeq)
proc resolveBlocks*(b: BlockExcEngine, blocks: seq[Block]) {.async.} =
await b.resolveBlocks(blocks.mapIt(BlockDelivery(blk: it, address: BlockAddress(leaf: false, cid: it.cid))))
@ -370,18 +307,66 @@ proc payForBlocks(engine: BlockExcEngine,
trace "Sending payment for blocks", price
await sendPayment(peer.id, payment)
proc validateBlockDelivery(
b: BlockExcEngine,
bd: BlockDelivery
): ?!void =
if bd.address notin b.pendingBlocks:
return failure("Received block is not currently a pending block")
if bd.address.leaf:
without proof =? bd.proof:
return failure("Missing proof")
if proof.index != bd.address.index:
return failure("Proof index " & $proof.index & " doesn't match leaf index " & $bd.address.index)
without leaf =? bd.blk.cid.mhash.mapFailure, err:
return failure("Unable to get mhash from cid for block, nested err: " & err.msg)
without treeRoot =? bd.address.treeCid.mhash.mapFailure, err:
return failure("Unable to get mhash from treeCid for block, nested err: " & err.msg)
without verifyOutcome =? proof.verifyLeaf(leaf, treeRoot), err:
return failure("Unable to verify proof for block, nested err: " & err.msg)
if not verifyOutcome:
return failure("Provided inclusion proof is invalid")
else: # not leaf
if bd.address.cid != bd.blk.cid:
return failure("Delivery cid " & $bd.address.cid & " doesn't match block cid " & $bd.blk.cid)
return success()
proc blocksDeliveryHandler*(
b: BlockExcEngine,
peer: PeerId,
blocksDelivery: seq[BlockDelivery]) {.async.} =
trace "Got blocks from peer", peer, len = blocksDelivery.len
var validatedBlocksDelivery: seq[BlockDelivery]
for bd in blocksDelivery:
if isErr (await b.localStore.putBlock(bd.blk)):
trace "Unable to store block", cid = bd.blk.cid
await b.resolveBlocks(blocksDelivery)
codexBlockExchangeBlocksReceived.inc(blocksDelivery.len.int64)
if err =? b.validateBlockDelivery(bd).errorOption:
warn "Block validation failed", address = bd.address, msg = err.msg
continue
if err =? (await b.localStore.putBlock(bd.blk)).errorOption:
error "Unable to store block", address = bd.address, err = err.msg
continue
if bd.address.leaf:
without proof =? bd.proof:
error "Proof expected for a leaf block delivery", address = bd.address
continue
if err =? (await b.localStore.putBlockCidAndProof(bd.address.treeCid, bd.address.index, bd.blk.cid, proof)).errorOption:
error "Unable to store proof and cid for a block", address = bd.address
continue
validatedBlocksDelivery.add(bd)
await b.resolveBlocks(validatedBlocksDelivery)
codexBlockExchangeBlocksReceived.inc(validatedBlocksDelivery.len.int64)
let
peerCtx = b.peers.get(peer)
@ -491,18 +476,6 @@ proc paymentHandler*(
else:
context.paymentChannel = engine.wallet.acceptChannel(payment).option
proc onTreeHandler(b: BlockExcEngine, tree: MerkleTree): Future[?!void] {.async.} =
trace "Handling tree"
without treeBlk =? Block.new(tree.encode()), err:
return failure(err)
if err =? (await b.localStore.putBlock(treeBlk)).errorOption:
return failure("Unable to store merkle tree block " & $treeBlk.cid & ", nested err: " & err.msg)
return success()
proc setupPeer*(b: BlockExcEngine, peer: PeerId) {.async.} =
## Perform initial setup, such as want
## list exchange
@ -564,7 +537,7 @@ proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} =
BlockDelivery(address: e.address, blk: blkAndProof[0], proof: blkAndProof[1].some)
)
else:
(await b.localStore.getBlock(e.address.cid)).map(
(await b.localStore.getBlock(e.address)).map(
(blk: Block) => BlockDelivery(address: e.address, blk: blk, proof: MerkleProof.none)
)
@ -665,11 +638,6 @@ proc new*(
proc paymentHandler(peer: PeerId, payment: SignedState): Future[void] {.gcsafe.} =
engine.paymentHandler(peer, payment)
proc onTree(tree: MerkleTree): Future[void] {.gcsafe, async.} =
if err =? (await engine.onTreeHandler(tree)).errorOption:
echo "Error handling a tree" & err.msg # TODO
# error "Error handling a tree", msg = err.msg
network.handlers = BlockExcHandlers(
onWantList: blockWantListHandler,
onBlocksDelivery: blocksDeliveryHandler,
@ -677,6 +645,4 @@ proc new*(
onAccount: accountHandler,
onPayment: paymentHandler)
pendingBlocks.onTree = onTree
return engine

View File

@ -14,20 +14,15 @@ import pkg/upraises
push: {.upraises: [].}
import ../../blocktype
import pkg/chronicles
import pkg/questionable
import pkg/questionable/options
import pkg/questionable/results
import pkg/chronos
import pkg/libp2p
import pkg/metrics
import pkg/questionable/results
import ../protobuf/blockexc
import ../../blocktype
import ../../merkletree
import ../../utils
logScope:
topics = "codex pendingblocks"
@ -44,121 +39,12 @@ type
inFlight*: bool
startTime*: int64
LeafReq* = object
case delivered*: bool
of false:
handle*: Future[Block]
inFlight*: bool
of true:
leaf: MultiHash
blkCid*: Cid
TreeReq* = ref object
leaves*: Table[Natural, LeafReq]
deliveredCount*: Natural
leavesCount*: ?Natural
treeRoot*: MultiHash
treeCid*: Cid
TreeHandler* = proc(tree: MerkleTree): Future[void] {.gcsafe.}
PendingBlocksManager* = ref object of RootObj
blocks*: Table[Cid, BlockReq] # pending Block requests
trees*: Table[Cid, TreeReq]
onTree*: TreeHandler
blocks*: Table[BlockAddress, BlockReq] # pending Block requests
proc updatePendingBlockGauge(p: PendingBlocksManager) =
codex_block_exchange_pending_block_requests.set(p.blocks.len.int64)
type
BlockHandleOrCid = object
case resolved*: bool
of true:
cid*: Cid
else:
handle*: Future[Block]
proc buildTree(treeReq: TreeReq): ?!MerkleTree =
trace "Building a merkle tree from leaves", treeCid = treeReq.treeCid, leavesCount = treeReq.leavesCount
without leavesCount =? treeReq.leavesCount:
return failure("Leaves count is none, cannot build a tree")
var builder = ? MerkleTreeBuilder.init(treeReq.treeRoot.mcodec)
for i in 0..<leavesCount:
treeReq.leaves.withValue(i, leafReq):
if leafReq.delivered:
? builder.addLeaf(leafReq.leaf)
else:
return failure("Expected all leaves to be delivered but leaf with index " & $i & " was not")
do:
return failure("Missing a leaf with index " & $i)
let tree = ? builder.build()
if tree.root != treeReq.treeRoot:
return failure("Reconstructed tree root doesn't match the original tree root, tree cid is " & $treeReq.treeCid)
return success(tree)
proc checkIfAllDelivered(p: PendingBlocksManager, treeReq: TreeReq): void =
if treeReq.deliveredCount.some == treeReq.leavesCount:
without tree =? buildTree(treeReq), err:
error "Error building a tree", msg = err.msg
p.trees.del(treeReq.treeCid)
return
p.trees.del(treeReq.treeCid)
try:
asyncSpawn p.onTree(tree)
except Exception as err:
error "Exception when handling tree", msg = err.msg
proc getWantHandleOrCid*(
treeReq: TreeReq,
index: Natural,
timeout = DefaultBlockTimeout
): BlockHandleOrCid =
treeReq.leaves.withValue(index, leafReq):
if not leafReq.delivered:
return BlockHandleOrCid(resolved: false, handle: leafReq.handle)
else:
return BlockHandleOrCid(resolved: true, cid: leafReq.blkCid)
do:
let leafReq = LeafReq(
delivered: false,
handle: newFuture[Block]("pendingBlocks.getWantHandleOrCid"),
inFlight: false
)
treeReq.leaves[index] = leafReq
return BlockHandleOrCid(resolved: false, handle: leafReq.handle)
proc getOrPutTreeReq*(
p: PendingBlocksManager,
treeCid: Cid,
leavesCount = Natural.none, # has value when all leaves are expected to be delivered
treeRoot: MultiHash
): ?!TreeReq =
p.trees.withValue(treeCid, treeReq):
if treeReq.treeRoot != treeRoot:
return failure("Unexpected root for tree with cid " & $treeCid)
if leavesCount == treeReq.leavesCount:
return success(treeReq[])
else:
treeReq.leavesCount = treeReq.leavesCount.orElse(leavesCount)
let res = success(treeReq[])
p.checkIfAllDelivered(treeReq[])
return res
do:
let treeReq = TreeReq(
deliveredCount: 0,
leavesCount: leavesCount,
treeRoot: treeRoot,
treeCid: treeCid
)
p.trees[treeCid] = treeReq
return success(treeReq)
proc getWantHandle*(
p: PendingBlocksManager,
address: BlockAddress,
@ -190,14 +76,13 @@ proc getWantHandle*(
p.blocks.del(address)
p.updatePendingBlockGauge()
proc getOrComputeLeaf(mcodec: MultiCodec, blk: Block): ?!MultiHash =
without mhash =? blk.cid.mhash.mapFailure, err:
return MultiHash.digest($mcodec, blk.data).mapFailure
if mhash.mcodec == mcodec:
return success(mhash)
else:
return MultiHash.digest($mcodec, blk.data).mapFailure
proc getWantHandle*(
p: PendingBlocksManager,
cid: Cid,
timeout = DefaultBlockTimeout,
inFlight = false
): Future[Block] =
p.getWantHandle(BlockAddress.init(cid), timeout, inFlight)
proc resolve*(
p: PendingBlocksManager,
@ -207,87 +92,37 @@ proc resolve*(
##
for bd in blocksDelivery:
p.blocks.withValue(bd.address, blockReq):
trace "Resolving block", address = bd.address
if not bd.address.leaf:
if bd.address.cid == bd.blk.cid:
p.blocks.withValue(bd.blk.cid, pending):
if not pending.handle.completed:
trace "Resolving block", cid = bd.blk.cid
pending.handle.complete(bd.blk)
let
startTime = pending[].startTime
stopTime = getMonoTime().ticks
retrievalDurationUs = (stopTime - startTime) div 1000
codexBlockExchangeRetrievalTimeUs.set(retrievalDurationUs)
trace "Block retrieval time", retrievalDurationUs
if not blockReq.handle.finished:
let
startTime = blockReq.startTime
stopTime = getMonoTime().ticks
retrievalDurationUs = (stopTime - startTime) div 1000
blockReq.handle.complete(bd.blk)
codexBlockExchangeRetrievalTimeUs.set(retrievalDurationUs)
trace "Block retrieval time", retrievalDurationUs, address = bd.address
else:
warn "Delivery cid doesn't match block cid", deliveryCid = bd.address.cid, blockCid = bd.blk.cid
# resolve any pending blocks
if bd.address.leaf:
p.trees.withValue(bd.address.treeCid, treeReq):
treeReq.leaves.withValue(bd.address.index, leafReq):
if not leafReq.delivered:
if proof =? bd.proof:
if not proof.index == bd.address.index:
warn "Proof index doesn't match leaf index", address = bd.address, proofIndex = proof.index
continue
without mhash =? bd.blk.cid.mhash.mapFailure, err:
error "Unable to get mhash from cid for block", address = bd.address, msg = err.msg
continue
without verifySuccess =? proof.verifyLeaf(mhash, treeReq.treeRoot), err:
error "Unable to verify proof for block", address = bd.address, msg = err.msg
continue
if verifySuccess:
without leaf =? getOrComputeLeaf(treeReq.treeRoot.mcodec, bd.blk), err:
error "Unable to get or calculate hash for block", address = bd.address
continue
leafReq.handle.complete(bd.blk)
leafReq[] = LeafReq(delivered: true, blkCid: bd.blk.cid, leaf: leaf)
inc treeReq.deliveredCount
p.checkIfAllDelivered(treeReq[])
else:
warn "Invalid proof provided for a block", address = bd.address
else:
warn "Missing proof for a block", address = bd.address
else:
trace "Ignore veryfing proof for already delivered block", address = bd.address
trace "Block handle already finished", address = bd.address
do:
warn "Attempting to resolve block that's not currently a pending block", address = bd.address
proc setInFlight*(p: PendingBlocksManager,
address: BlockAddress,
inFlight = true) =
p.blocks.withValue(cid, pending):
pending.inFlight = inFlight
trace "Setting inflight", cid, inFlight = pending.inFlight
proc trySetInFlight*(treeReq: TreeReq,
index: Natural,
inFlight = true) =
treeReq.leaves.withValue(index, leafReq):
if not leafReq.delivered:
leafReq.inFlight = inFlight
trace "Setting inflight", treeCid = treeReq.treeCid, index, inFlight = inFlight
proc isInFlight*(treeReq: TreeReq,
index: Natural
): bool =
treeReq.leaves.withValue(index, leafReq):
return (not leafReq.delivered) and leafReq.inFlight
do:
return false
p.blocks.withValue(address, pending):
pending[].inFlight = inFlight
trace "Setting inflight", address, inFlight = pending[].inFlight
proc isInFlight*(p: PendingBlocksManager,
address: BlockAddress,
): bool =
p.blocks.withValue(cid, pending):
result = pending.inFlight
trace "Getting inflight", cid, inFlight = result
proc pending*(p: PendingBlocksManager, cid: Cid): bool =
cid in p.blocks
p.blocks.withValue(address, pending):
result = pending[].inFlight
trace "Getting inflight", address, inFlight = result
proc contains*(p: PendingBlocksManager, cid: Cid): bool =
BlockAddress.init(cid) in p.blocks
@ -312,33 +147,13 @@ iterator wantListCids*(p: PendingBlocksManager): Cid =
yieldedCids.incl(cid)
yield cid
iterator wantList*(p: PendingBlocksManager): BlockAddress =
for k in p.blocks.keys:
yield BlockAddress(leaf: false, cid: k)
for treeCid, treeReq in p.trees.pairs:
for index, leafReq in treeReq.leaves.pairs:
if not leafReq.delivered:
yield BlockAddress(leaf: true, treeCid: treeCid, index: index)
iterator wantListBlockCids*(p: PendingBlocksManager): Cid =
for k in p.blocks.keys:
yield k
iterator wantListCids*(p: PendingBlocksManager): Cid =
for k in p.blocks.keys:
yield k
for k in p.trees.keys:
yield k
iterator wantHandles*(p: PendingBlocksManager): Future[Block] =
for v in p.blocks.values:
yield v.handle
proc wantListLen*(p: PendingBlocksManager): int =
p.blocks.len + p.trees.len
p.blocks.len
func len*(p: PendingBlocksManager): int =
p.blocks.len

View File

@ -21,12 +21,15 @@ export Wantlist, WantType, WantListEntry
export BlockDelivery, BlockPresenceType, BlockPresence
export AccountMessage, StateChannelUpdate
proc hash*(e: WantListEntry): Hash =
if e.address.leaf:
let data = e.address.treeCid.data.buffer & @(e.address.index.uint64.toBytesBE)
proc hash*(a: BlockAddress): Hash =
if a.leaf:
let data = a.treeCid.data.buffer & @(a.index.uint64.toBytesBE)
hash(data)
else:
hash(e.address.cid.data.buffer)
hash(a.cid.data.buffer)
proc hash*(e: WantListEntry): Hash =
hash(e.address)
proc contains*(a: openArray[WantListEntry], b: BlockAddress): bool =
## Convenience method to check for peer precense

View File

@ -78,6 +78,12 @@ proc cidOrTreeCid*(a: BlockAddress): Cid =
proc address*(b: Block): BlockAddress =
BlockAddress(leaf: false, cid: b.cid)
proc init*(_: type BlockAddress, cid: Cid): BlockAddress =
BlockAddress(leaf: false, cid: cid)
proc init*(_: type BlockAddress, treeCid: Cid, index: Natural): BlockAddress =
BlockAddress(leaf: true, treeCid: treeCid, index: index)
proc `$`*(b: Block): string =
result &= "cid: " & $b.cid
result &= "\ndata: " & string.fromBytes(b.data)

View File

@ -97,7 +97,7 @@ proc getPendingBlocks(
var
# request blocks from the store
pendingBlocks = indicies.map( (i: int) =>
self.store.getBlock(manifest.treeCid, i, manifest.treeRoot).map((r: ?!bt.Block) => (r, i)) # Get the data blocks (first K)
self.store.getBlock(BlockAddress.init(manifest.treeCid, i)).map((r: ?!bt.Block) => (r, i)) # Get the data blocks (first K)
)
proc isFinished(): bool = pendingBlocks.len == 0
@ -290,16 +290,15 @@ proc encodeData(
without tree =? MerkleTree.init(cids[]), err:
return failure(err)
without treeBlk =? bt.Block.new(tree.encode()), err:
without treeCid =? tree.rootCid, err:
return failure(err)
if err =? (await self.store.putBlock(treeBlk)).errorOption:
return failure("Unable to store merkle tree block " & $treeBlk.cid & ", nested err: " & err.msg)
if err =? (await self.store.putAllProofs(tree)).errorOption:
return failure(err)
let encodedManifest = Manifest.new(
manifest = manifest,
treeCid = treeBlk.cid,
treeRoot = tree.root,
treeCid = treeCid,
datasetSize = (manifest.blockSize.int * params.blocksCount).NBytes,
ecK = params.ecK,
ecM = params.ecM
@ -353,9 +352,9 @@ proc decode*(
var
cids = seq[Cid].new()
recoveredIndices = newSeq[int]()
decoder = self.decoderProvider(encoded.blockSize.int, encoded.ecK, encoded.ecM)
emptyBlock = newSeq[byte](encoded.blockSize.int)
hasParity = false
cids[].setLen(encoded.blocksCount)
try:
@ -403,6 +402,7 @@ proc decode*(
return failure("Unable to store block!")
cids[idx] = blk.cid
recoveredIndices.add(idx)
except CancelledError as exc:
trace "Erasure coding decoding cancelled"
raise exc # cancellation needs to be propagated
@ -415,14 +415,18 @@ proc decode*(
without tree =? MerkleTree.init(cids[0..<encoded.originalBlocksCount]), err:
return failure(err)
if tree.root != encoded.originalTreeRoot:
return failure("Original tree root differs from tree root computed out of recovered data")
without treeBlk =? bt.Block.new(tree.encode()), err:
without treeCid =? tree.rootCid, err:
return failure(err)
if err =? (await self.store.putBlock(treeBlk)).errorOption:
return failure("Unable to store merkle tree block " & $treeBlk.cid & ", nested err: " & err.msg)
if treeCid != encoded.originalTreeCid:
return failure("Original tree root differs from the tree root computed out of recovered data")
let idxIter = Iter
.fromItems(recoveredIndices)
.filter((i: int) => i < tree.leavesCount)
if err =? (await self.store.putSomeProofs(tree, idxIter)).errorOption:
return failure(err)
let decoded = Manifest.new(encoded)

View File

@ -45,31 +45,27 @@ proc encode*(_: DagPBCoder, manifest: Manifest): ?!seq[byte] =
# optional uint32 originalDatasetSize = 4; # size of the original dataset
# }
# Message Header {
# optional bytes treeCid = 1; # the cid of the tree
# optional bytes treeRoot = 2; # the root hash of the tree
# optional uint32 blockSize = 3; # size of a single block
# optional uint64 originalBytes = 4;# exact file size
# optional ErasureInfo erasure = 5; # erasure coding info
# optional bytes treeCid = 1; # cid (root) of the tree
# optional uint32 blockSize = 2; # size of a single block
# optional uint64 datasetSize = 3; # size of the dataset
# optional ErasureInfo erasure = 4; # erasure coding info
# }
# ```
#
# var treeRootVBuf = initVBuffer()
var header = initProtoBuffer()
header.write(1, manifest.treeCid.data.buffer)
# treeRootVBuf.write(manifest.treeRoot)
header.write(2, manifest.treeRoot.data.buffer)
header.write(3, manifest.blockSize.uint32)
header.write(4, manifest.datasetSize.uint32)
header.write(2, manifest.blockSize.uint32)
header.write(3, manifest.datasetSize.uint32)
if manifest.protected:
var erasureInfo = initProtoBuffer()
erasureInfo.write(1, manifest.ecK.uint32)
erasureInfo.write(2, manifest.ecM.uint32)
erasureInfo.write(3, manifest.originalCid.data.buffer)
erasureInfo.write(4, manifest.originalTreeRoot.data.buffer)
erasureInfo.write(5, manifest.originalDatasetSize.uint32)
erasureInfo.write(3, manifest.originalTreeCid.data.buffer)
erasureInfo.write(4, manifest.originalDatasetSize.uint32)
erasureInfo.finish()
header.write(5, erasureInfo)
header.write(4, erasureInfo)
pbNode.write(1, header) # set the treeCid as the data field
pbNode.finish()
@ -85,9 +81,7 @@ proc decode*(_: DagPBCoder, data: openArray[byte]): ?!Manifest =
pbHeader: ProtoBuffer
pbErasureInfo: ProtoBuffer
treeCidBuf: seq[byte]
treeRootBuf: seq[byte]
originalTreeCid: seq[byte]
originalTreeRootBuf: seq[byte]
datasetSize: uint32
blockSize: uint32
originalDatasetSize: uint32
@ -101,16 +95,13 @@ proc decode*(_: DagPBCoder, data: openArray[byte]): ?!Manifest =
if pbHeader.getField(1, treeCidBuf).isErr:
return failure("Unable to decode `treeCid` from manifest!")
if pbHeader.getField(2, treeRootBuf).isErr:
return failure("Unable to decode `treeRoot` from manifest!")
if pbHeader.getField(3, blockSize).isErr:
if pbHeader.getField(2, blockSize).isErr:
return failure("Unable to decode `blockSize` from manifest!")
if pbHeader.getField(4, datasetSize).isErr:
if pbHeader.getField(3, datasetSize).isErr:
return failure("Unable to decode `datasetSize` from manifest!")
if pbHeader.getField(5, pbErasureInfo).isErr:
if pbHeader.getField(4, pbErasureInfo).isErr:
return failure("Unable to decode `erasureInfo` from manifest!")
let protected = pbErasureInfo.buffer.len > 0
@ -123,34 +114,18 @@ proc decode*(_: DagPBCoder, data: openArray[byte]): ?!Manifest =
if pbErasureInfo.getField(3, originalTreeCid).isErr:
return failure("Unable to decode `originalTreeCid` from manifest!")
if pbErasureInfo.getField(4, originalTreeRootBuf).isErr:
return failure("Unable to decode `originalTreeRoot` from manifest!")
if pbErasureInfo.getField(5, originalDatasetSize).isErr:
if pbErasureInfo.getField(4, originalDatasetSize).isErr:
return failure("Unable to decode `originalDatasetSize` from manifest!")
var
treeRoot: MultiHash
originalTreeRoot: MultiHash
let
treeCid = ? Cid.init(treeCidBuf).mapFailure
treeRootRes = ? MultiHash.decode(treeRootBuf, treeRoot).mapFailure
if treeRootRes != treeRootBuf.len:
return failure("Error decoding `treeRoot` as MultiHash")
if protected:
let originalTreeRootRes = ? MultiHash.decode(originalTreeRootBuf, originalTreeRoot).mapFailure
if originalTreeRootRes != originalTreeRootBuf.len:
return failure("Error decoding `originalTreeRoot` as MultiHash")
let
self = if protected:
Manifest.new(
treeCid = treeCid,
treeRoot = treeRoot,
datasetSize = datasetSize.NBytes,
blockSize = blockSize.NBytes,
version = treeCid.cidver,
@ -159,13 +134,11 @@ proc decode*(_: DagPBCoder, data: openArray[byte]): ?!Manifest =
ecK = ecK.int,
ecM = ecM.int,
originalTreeCid = ? Cid.init(originalTreeCid).mapFailure,
originalTreeRoot = originalTreeRoot,
originalDatasetSize = originalDatasetSize.NBytes
)
else:
Manifest.new(
treeCid = treeCid,
treeRoot = treeRoot,
datasetSize = datasetSize.NBytes,
blockSize = blockSize.NBytes,
version = treeCid.cidver,

View File

@ -30,8 +30,7 @@ export types
type
Manifest* = ref object of RootObj
treeCid: Cid # Cid of the merkle tree
treeRoot: MultiHash # Root hash of the merkle tree
treeCid: Cid # Root of the merkle tree
datasetSize: NBytes # Total size of all blocks
blockSize: NBytes # Size of each contained block (might not be needed if blocks are len-prefixed)
version: CidVersion # Cid version
@ -41,8 +40,7 @@ type
of true:
ecK: int # Number of blocks to encode
ecM: int # Number of resulting parity blocks
originalTreeCid: Cid # The original Cid of the dataset being erasure coded
originalTreeRoot: MultiHash
originalTreeCid: Cid # The original root of the dataset being erasure coded
originalDatasetSize: NBytes
else:
discard
@ -75,24 +73,18 @@ proc ecK*(self: Manifest): int =
proc ecM*(self: Manifest): int =
self.ecM
proc originalCid*(self: Manifest): Cid =
proc originalTreeCid*(self: Manifest): Cid =
self.originalTreeCid
proc originalBlocksCount*(self: Manifest): int =
divUp(self.originalDatasetSize.int, self.blockSize.int)
proc originalTreeRoot*(self: Manifest): MultiHash =
self.originalTreeRoot
proc originalDatasetSize*(self: Manifest): NBytes =
self.originalDatasetSize
proc treeCid*(self: Manifest): Cid =
self.treeCid
proc treeRoot*(self: Manifest): MultiHash =
self.treeRoot
proc blocksCount*(self: Manifest): int =
divUp(self.datasetSize.int, self.blockSize.int)
@ -140,7 +132,6 @@ proc cid*(self: Manifest): ?!Cid {.deprecated: "use treeCid instead".} =
proc `==`*(a, b: Manifest): bool =
(a.treeCid == b.treeCid) and
(a.treeRoot == b.treeRoot) and
(a.datasetSize == b.datasetSize) and
(a.blockSize == b.blockSize) and
(a.version == b.version) and
@ -151,14 +142,12 @@ proc `==`*(a, b: Manifest): bool =
(a.ecK == b.ecK) and
(a.ecM == b.ecM) and
(a.originalTreeCid == b.originalTreeCid) and
(a.originalTreeRoot == b.originalTreeRoot) and
(a.originalDatasetSize == b.originalDatasetSize)
else:
true)
proc `$`*(self: Manifest): string =
"treeCid: " & $self.treeCid &
", treeRoot: " & $self.treeRoot &
", datasetSize: " & $self.datasetSize &
", blockSize: " & $self.blockSize &
", version: " & $self.version &
@ -169,7 +158,6 @@ proc `$`*(self: Manifest): string =
", ecK: " & $self.ecK &
", ecM: " & $self.ecM &
", originalTreeCid: " & $self.originalTreeCid &
", originalTreeRoot: " & $self.originalTreeRoot &
", originalDatasetSize: " & $self.originalDatasetSize
else:
"")
@ -181,7 +169,6 @@ proc `$`*(self: Manifest): string =
proc new*(
T: type Manifest,
treeCid: Cid,
treeRoot: MultiHash,
blockSize: NBytes,
datasetSize: NBytes,
version: CidVersion = CIDv1,
@ -192,7 +179,6 @@ proc new*(
T(
treeCid: treeCid,
treeRoot: treeRoot,
blockSize: blockSize,
datasetSize: datasetSize,
version: version,
@ -204,7 +190,6 @@ proc new*(
T: type Manifest,
manifest: Manifest,
treeCid: Cid,
treeRoot: MultiHash,
datasetSize: NBytes,
ecK, ecM: int
): Manifest =
@ -213,7 +198,6 @@ proc new*(
##
Manifest(
treeCid: treeCid,
treeRoot: treeRoot,
datasetSize: datasetSize,
version: manifest.version,
codec: manifest.codec,
@ -222,7 +206,6 @@ proc new*(
protected: true,
ecK: ecK, ecM: ecM,
originalTreeCid: manifest.treeCid,
originalTreeRoot: manifest.treeRoot,
originalDatasetSize: manifest.datasetSize)
proc new*(
@ -233,8 +216,7 @@ proc new*(
## erasure protected one
##
Manifest(
treeCid: manifest.originalCid,
treeRoot: manifest.originalTreeRoot,
treeCid: manifest.originalTreeCid,
datasetSize: manifest.originalDatasetSize,
version: manifest.version,
codec: manifest.codec,
@ -254,7 +236,6 @@ proc new*(
proc new*(
T: type Manifest,
treeCid: Cid,
treeRoot: MultiHash,
datasetSize: NBytes,
blockSize: NBytes,
version: CidVersion,
@ -263,12 +244,10 @@ proc new*(
ecK: int,
ecM: int,
originalTreeCid: Cid,
originalTreeRoot: MultiHash,
originalDatasetSize: NBytes
): Manifest =
Manifest(
treeCid: treeCid,
treeRoot: treeRoot,
datasetSize: datasetSize,
blockSize: blockSize,
version: version,
@ -278,6 +257,5 @@ proc new*(
ecK: ecK,
ecM: ecM,
originalTreeCid: originalTreeCid,
originalTreeRoot: originalTreeRoot,
originalDatasetSize: originalDatasetSize
)

View File

@ -14,6 +14,7 @@ import std/sugar
import std/algorithm
import pkg/chronicles
import pkg/questionable
import pkg/questionable/results
import pkg/nimcrypto/sha2
import pkg/libp2p/[cid, multicodec, multihash, vbuffer]
@ -186,8 +187,16 @@ proc root*(self: MerkleTree): MultiHash =
let rootIndex = self.len - 1
self.nodeBufferToMultiHash(rootIndex)
proc leaves*(self: MerkleTree): seq[MultiHash] =
toSeq(0..<self.leavesCount).map(i => self.nodeBufferToMultiHash(i))
proc rootCid*(self: MerkleTree, version = CIDv1, dataCodec = multiCodec("raw")): ?!Cid =
Cid.init(version, dataCodec, self.root).mapFailure
iterator leaves*(self: MerkleTree): MultiHash =
for i in 0..<self.leavesCount:
yield self.nodeBufferToMultiHash(i)
iterator leavesCids*(self: MerkleTree, version = CIDv1, dataCodec = multiCodec("raw")): ?!Cid =
for leaf in self.leaves:
yield Cid.init(version, dataCodec, leaf).mapFailure
proc leavesCount*(self: MerkleTree): Natural =
self.leavesCount
@ -198,6 +207,10 @@ proc getLeaf*(self: MerkleTree, index: Natural): ?!MultiHash =
success(self.nodeBufferToMultiHash(index))
proc getLeafCid*(self: MerkleTree, index: Natural, version = CIDv1, dataCodec = multiCodec("raw")): ?!Cid =
let leaf = ? self.getLeaf(index)
Cid.init(version, dataCodec, leaf).mapFailure
proc height*(self: MerkleTree): Natural =
computeTreeHeight(self.leavesCount)
@ -277,18 +290,6 @@ proc init*(
else:
failure("Expected nodesBuffer len to be " & $(totalNodes * digestSize) & " but was " & $nodesBuffer.len)
proc init*(
T: type MerkleTree,
cids: openArray[Cid]
): ?!MerkleTree =
let leaves = collect:
for cid in cids:
without mhash =? cid.mhash.mapFailure, errx:
return failure(errx)
mhash
MerkleTree.init(leaves)
proc init*(
T: type MerkleTree,
leaves: openArray[MultiHash]
@ -304,6 +305,18 @@ proc init*(
builder.build()
proc init*(
T: type MerkleTree,
cids: openArray[Cid]
): ?!MerkleTree =
let leaves = collect:
for idx, cid in cids:
without mhash =? cid.mhash.mapFailure, errx:
return failure(errx)
mhash
MerkleTree.init(leaves)
###########################################################
# MerkleProof
###########################################################

View File

@ -116,8 +116,8 @@ proc fetchBatched*(
trace "Fetching blocks in batches of", size = batchSize
without iter =? await node.blockStore.getBlocks(manifest.treeCid, manifest.blocksCount, manifest.treeRoot), err:
return failure(err)
let iter = Iter.fromSlice(0..<manifest.blocksCount)
.map((i: int) => node.blockStore.getBlock(BlockAddress.init(manifest.treeCid, i)))
for batchNum in 0..<batchCount:
@ -197,8 +197,7 @@ proc store*(
dataCodec = multiCodec("raw")
chunker = LPStreamChunker.new(stream, chunkSize = blockSize)
without var treeBuilder =? MerkleTreeBuilder.init(hcodec), err:
return failure(err)
var cids: seq[Cid]
try:
while (
@ -216,8 +215,7 @@ proc store*(
without blk =? bt.Block.new(cid, chunk, verify = false):
return failure("Unable to init block from chunk!")
if err =? treeBuilder.addLeaf(mhash).errorOption:
return failure(err)
cids.add(cid)
if err =? (await self.blockStore.putBlock(blk)).errorOption:
trace "Unable to store block", cid = blk.cid, err = err.msg
@ -230,18 +228,21 @@ proc store*(
finally:
await stream.close()
without tree =? treeBuilder.build(), err:
without tree =? MerkleTree.init(cids), err:
return failure(err)
without treeCid =? tree.rootCid(CIDv1, dataCodec), err:
return failure(err)
without treeBlk =? bt.Block.new(tree.encode()), err:
return failure(err)
if err =? (await self.blockStore.putBlock(treeBlk)).errorOption:
return failure("Unable to store merkle tree block " & $treeBlk.cid & ", nested err: " & err.msg)
for index, cid in cids:
without proof =? tree.getProof(index), err:
return failure(err)
if err =? (await self.blockStore.putBlockCidAndProof(treeCid, index, cid, proof)).errorOption:
# TODO add log here
return failure(err)
let manifest = Manifest.new(
treeCid = treeBlk.cid,
treeRoot = tree.root,
treeCid = treeCid,
blockSize = blockSize,
datasetSize = NBytes(chunker.offset),
version = CIDv1,
@ -263,12 +264,13 @@ proc store*(
return failure("Unable to store manifest " & $manifestBlk.cid)
info "Stored data", manifestCid = manifestBlk.cid,
treeCid = treeBlk.cid,
treeCid = treeCid,
blocks = manifest.blocksCount,
datasetSize = manifest.datasetSize
# Announce manifest
await self.discovery.provide(manifestBlk.cid)
await self.discovery.provide(treeCid)
return manifestBlk.cid.success

View File

@ -4,6 +4,6 @@ import ./stores/networkstore
import ./stores/repostore
import ./stores/maintenance
import ./stores/keyutils
import ./stores/treereader
import ./stores/treehelper
export cachestore, blockstore, networkstore, repostore, maintenance, keyutils, treereader
export cachestore, blockstore, networkstore, repostore, maintenance, keyutils, treehelper

View File

@ -35,43 +35,25 @@ method getBlock*(self: BlockStore, cid: Cid): Future[?!Block] {.base.} =
## Get a block from the blockstore
##
raiseAssert("getBlock by cid not implemented!")
raiseAssert("getBlock by cid Not implemented!")
method getBlock*(self: BlockStore, treeCid: Cid, index: Natural): Future[?!Block] {.base.} =
## Get a block from the blockstore
##
raiseAssert("getBlock by treecid not implemented!")
raiseAssert("getBlock by treecid Not implemented!")
method getBlock*(self: BlockStore, address: BlockAddress): Future[?!Block] {.base.} =
## Get a block from the blockstore
##
raiseAssert("getBlock by addr not implemented!")
raiseAssert("getBlock by addr Not implemented!")
method getBlockAndProof*(self: BlockStore, treeCid: Cid, index: Natural): Future[?!(Block, MerkleProof)] {.base.} =
## Get a block and associated inclusion proof by Cid of a merkle tree and an index of a leaf in a tree
##
raiseAssert("getBlockAndProof not implemented!")
method getBlock*(self: BlockStore, treeCid: Cid, index: Natural, merkleRoot: MultiHash): Future[?!Block] {.base.} =
## Get a block by Cid of a merkle tree and an index of a leaf in a tree, validate inclusion using merkle root
##
raiseAssert("Not implemented!")
method getBlockAndProof*(self: BlockStore, treeCid: Cid, index: Natural): Future[?!(Block, MerkleProof)] {.base.} =
## Get a block and associated inclusion proof by Cid of a merkle tree and an index of a leaf in a tree
##
raiseAssert("Not implemented!")
method getBlocks*(self: BlockStore, treeCid: Cid, leavesCount: Natural, merkleRoot: MultiHash): Future[?!AsyncIter[?!Block]] {.base.} =
## Get all blocks in range [0..<leavesCount] by Cid of a merkle tree, validate inclusion using merkle root
##
raiseAssert("Not implemented!")
raiseAssert("getBlockAndProof Not implemented!")
method putBlock*(
self: BlockStore,
@ -81,8 +63,9 @@ method putBlock*(
## Put a block to the blockstore
##
raiseAssert("putBlock not implemented!")
raiseAssert("putBlock Not implemented!")
# I cant use `BlockAddress` to wrap (treeCid, index) here. because as far as I know there's no way in Nim to force `assert(leaf == true)` in a compile time
method putBlockCidAndProof*(
self: BlockStore,
treeCid: Cid,
@ -93,7 +76,7 @@ method putBlockCidAndProof*(
## Put a block to the blockstore
##
raiseAssert("putBlockCidAndProof not implemented!")
raiseAssert("putBlockCidAndProof Not implemented!")
method ensureExpiry*(
self: BlockStore,

View File

@ -21,7 +21,6 @@ import pkg/questionable
import pkg/questionable/results
import ./blockstore
import ./treereader
import ../units
import ../chunker
import ../errors
@ -36,7 +35,6 @@ logScope:
type
CacheStore* = ref object of BlockStore
treeReader: TreeReader
currentSize*: NBytes
size*: NBytes
cache: LruCache[Cid, Block]
@ -66,14 +64,34 @@ method getBlock*(self: CacheStore, cid: Cid): Future[?!Block] {.async.} =
trace "Error requesting block from cache", cid, error = exc.msg
return failure exc
method getBlock*(self: CacheStore, treeCid: Cid, index: Natural, merkleRoot: MultiHash): Future[?!Block] =
self.treeReader.getBlock(treeCid, index)
proc getCidAndProof(self: CacheStore, treeCid: Cid, index: Natural): ?!(Cid, MerkleProof) =
if cidAndProof =? self.cidAndProofCache.getOption((treeCid, index)):
success(cidAndProof)
else:
failure(newException(BlockNotFoundError, "Block not in cache: " & $BlockAddress.init(treeCid, index)))
method getBlocks*(self: CacheStore, treeCid: Cid, leavesCount: Natural, merkleRoot: MultiHash): Future[?!AsyncIter[?!Block]] =
self.treeReader.getBlocks(treeCid, leavesCount)
method getBlock*(self: CacheStore, treeCid: Cid, index: Natural): Future[?!Block] {.async.} =
without cidAndProof =? self.getCidAndProof(treeCid, index), err:
return failure(err)
method getBlockAndProof*(self: CacheStore, treeCid: Cid, index: Natural): Future[?!(Block, MerkleProof)] =
self.treeReader.getBlockAndProof(treeCid, index)
await self.getBlock(cidAndProof[0])
method getBlockAndProof*(self: CacheStore, treeCid: Cid, index: Natural): Future[?!(Block, MerkleProof)] {.async.} =
without cidAndProof =? self.getCidAndProof(treeCid, index), err:
return failure(err)
let (cid, proof) = cidAndProof
without blk =? await self.getBlock(cid), err:
return failure(err)
success((blk, proof))
method getBlock*(self: CacheStore, address: BlockAddress): Future[?!Block] =
if address.leaf:
self.getBlock(address.treeCid, address.index)
else:
self.getBlock(address.cid)
method hasBlock*(self: CacheStore, cid: Cid): Future[?!bool] {.async.} =
## Check if the block exists in the blockstore
@ -87,13 +105,14 @@ method hasBlock*(self: CacheStore, cid: Cid): Future[?!bool] {.async.} =
return (cid in self.cache).success
method hasBlock*(self: CacheStore, treeCid: Cid, index: Natural): Future[?!bool] {.async.} =
## Check if the block exists in the blockstore
##
without cidAndProof =? self.getCidAndProof(treeCid, index), err:
if err of BlockNotFoundError:
return success(false)
else:
return failure(err)
await self.hasBlock(cidAndProof[0])
without cid =? await self.treeReader.getBlockCid(treeCid, index), err:
return failure(err)
await self.hasBlock(cid)
func cids(self: CacheStore): (iterator: Cid {.gcsafe.}) =
return iterator(): Cid =
@ -200,16 +219,6 @@ method putBlockCidAndProof*(
self.cidAndProofCache[(treeCid, index)] = (blockCid, proof)
success()
method ensureExpiry*(
self: CacheStore,
cid: Cid,
expiry: SecondsSince1970
): Future[?!void] {.async.} =
## Updates block's assosicated TTL in store - not applicable for CacheStore
##
discard # CacheStore does not have notion of TTL
method delBlock*(self: CacheStore, cid: Cid): Future[?!void] {.async.} =
## Delete a block from the blockstore
##
@ -226,10 +235,12 @@ method delBlock*(self: CacheStore, cid: Cid): Future[?!void] {.async.} =
return success()
method delBlock*(self: CacheStore, treeCid: Cid, index: Natural): Future[?!void] {.async.} =
without cid =? await self.treeReader.getBlockCid(treeCid, index), err:
return failure(err)
let maybeRemoved = self.cidAndProofCache.del((treeCid, index))
return await self.delBlock(cid)
if removed =? maybeRemoved:
return await self.delBlock(removed[0])
return success()
method close*(self: CacheStore): Future[void] {.async.} =
## Close the blockstore, a no-op for this implementation
@ -265,10 +276,6 @@ proc new*(
currentSize: currentSize,
size: cacheSize)
proc getBlockFromStore(cid: Cid): Future[?!Block] = store.getBlock(cid)
treeReader.getBlockFromStore = getBlockFromStore
for blk in blocks:
discard store.putBlockSync(blk)

View File

@ -24,6 +24,7 @@ import ../utils/asynciter
import ./blockstore
import ../blockexchange
import ../merkletree
import ../blocktype
export blockstore, blockexchange, asyncheapqueue
@ -52,34 +53,17 @@ method getBlock*(self: NetworkStore, address: BlockAddress): Future[?!Block] {.a
return success blk
method getBlock*(self: NetworkStore, treeCid: Cid, index: Natural, merkleRoot: MultiHash): Future[?!Block] {.async.} =
without localBlock =? await self.localStore.getBlock(treeCid, index, merkleRoot), err:
if err of BlockNotFoundError:
trace "Requesting block from the network engine", treeCid, index
try:
let networkBlock = await self.engine.requestBlock(treeCid, index, merkleRoot)
return success(networkBlock)
except CatchableError as err:
return failure(err)
else:
failure(err)
return success(localBlock)
method getBlock*(self: NetworkStore, cid: Cid): Future[?!Block] =
## Get a block from the blockstore
##
method getBlocks*(self: NetworkStore, treeCid: Cid, leavesCount: Natural, merkleRoot: MultiHash): Future[?!AsyncIter[?!Block]] {.async.} =
without localIter =? await self.localStore.getBlocks(treeCid, leavesCount, merkleRoot), err:
if err of BlockNotFoundError:
trace "Requesting blocks from the network engine", treeCid, leavesCount
without var networkIter =? self.engine.requestBlocks(treeCid, leavesCount, merkleRoot), err:
failure(err)
self.getBlock(BlockAddress.init(cid))
let iter = networkIter
.prefetch(BlockPrefetchAmount)
.map(proc (fut: Future[Block]): Future[?!Block] {.async.} = catch: (await fut))
method getBlock*(self: NetworkStore, treeCid: Cid, index: Natural): Future[?!Block] =
## Get a block from the blockstore
##
return success(iter)
else:
return failure(err)
return success(localIter)
self.getBlock(BlockAddress.init(treeCid, index))
method putBlock*(
self: NetworkStore,
@ -107,27 +91,6 @@ method putBlockCidAndProof*(
): Future[?!void] =
self.localStore.putBlockCidAndProof(treeCid, index, blockCid, proof)
method ensureExpiry*(
self: NetworkStore,
cid: Cid,
expiry: SecondsSince1970
): Future[?!void] {.async.} =
## Ensure that block's assosicated expiry is at least given timestamp
## If the current expiry is lower then it is updated to the given one, otherwise it is left intact
##
if (await self.localStore.hasBlock(cid)).tryGet:
return await self.localStore.ensureExpiry(cid, expiry)
else:
trace "Updating expiry - block not in local store", cid
return success()
method listBlocks*(
self: NetworkStore,
blockType = BlockType.Manifest): Future[?!AsyncIter[?Cid]] =
self.localStore.listBlocks(blockType)
method delBlock*(self: NetworkStore, cid: Cid): Future[?!void] =
## Delete a block from the blockstore
##

View File

@ -24,7 +24,6 @@ import pkg/stew/endians2
import ./blockstore
import ./keyutils
import ./treereader
import ../blocktype
import ../clock
import ../systemclock
@ -59,12 +58,11 @@ type
quotaReservedBytes*: uint # bytes reserved by the repo
blockTtl*: Duration
started*: bool
treeReader*: TreeReader
BlockExpiration* = object
cid*: Cid
expiration*: SecondsSince1970
proc updateMetrics(self: RepoStore) =
codex_repostore_blocks.set(self.totalBlocks.int64)
codex_repostore_bytes_used.set(self.quotaUsedBytes.int64)
@ -80,27 +78,26 @@ func available*(self: RepoStore, bytes: uint): bool =
return bytes < self.available()
proc encode(cidAndProof: (Cid, MerkleProof)): seq[byte] =
## Encodes a tuple of cid and merkle proof in a following format:
## | 8-bytes | n-bytes | remaining bytes |
## | n | cid | proof |
##
## where n is a size of cid
##
let
(cid, proof) = cidAndProof
cidBytes = cid.data.buffer
proofBytes = proof.encode
n = cidBytes.len
nBytes = n.uint64.toBytesBE
@nBytes & cidBytes & proofBytes
var buf = newSeq[byte](1 + cidBytes.len + proofBytes.len)
buf[0] = cid.data.buffer.len.byte # cid shouldnt be more than 255 bytes?
buf[1..cidBytes.len] = cidBytes
buf[cidBytes.len + 1..^1] = proofBytes
buf
proc decode(_: type (Cid, MerkleProof), data: seq[byte]): ?!(Cid, MerkleProof) =
let
n = uint64.fromBytesBE(data[0..<sizeof(uint64)]).int
let cidLen = data[0].int
let
cid = ? Cid.init(data[sizeof(uint64)..<sizeof(uint64) + n]).mapFailure
proof = ? MerkleProof.decode(data[sizeof(uint64) + n..^1])
cid = ? Cid.init(data[1..cidLen]).mapFailure
proof = ? MerkleProof.decode(data[cidLen + 1..^1])
success((cid, proof))
method putBlockCidAndProof*(
@ -161,14 +158,32 @@ method getBlock*(self: RepoStore, cid: Cid): Future[?!Block] {.async.} =
trace "Got block for cid", cid
return Block.new(cid, data, verify = true)
method getBlock*(self: RepoStore, treeCid: Cid, index: Natural, merkleRoot: MultiHash): Future[?!Block] =
self.treeReader.getBlock(treeCid, index)
method getBlocks*(self: RepoStore, treeCid: Cid, leavesCount: Natural, merkleRoot: MultiHash): Future[?!AsyncIter[?!Block]] =
self.treeReader.getBlocks(treeCid, leavesCount)
method getBlockAndProof*(self: RepoStore, treeCid: Cid, index: Natural): Future[?!(Block, MerkleProof)] {.async.} =
without cidAndProof =? await self.getCidAndProof(treeCid, index), err:
return failure(err)
method getBlockAndProof*(self: RepoStore, treeCid: Cid, index: Natural): Future[?!(Block, MerkleProof)] =
self.treeReader.getBlockAndProof(treeCid, index)
let (cid, proof) = cidAndProof
without blk =? await self.getBlock(cid), err:
return failure(err)
success((blk, proof))
method getBlock*(self: RepoStore, treeCid: Cid, index: Natural): Future[?!Block] {.async.} =
without cidAndProof =? await self.getCidAndProof(treeCid, index), err:
return failure(err)
await self.getBlock(cidAndProof[0])
method getBlock*(self: RepoStore, address: BlockAddress): Future[?!Block] =
## Get a block from the blockstore
##
if address.leaf:
self.getBlock(address.treeCid, address.index)
else:
self.getBlock(address.cid)
proc getBlockExpirationEntry(
self: RepoStore,
@ -355,10 +370,21 @@ method delBlock*(self: RepoStore, cid: Cid): Future[?!void] {.async.} =
return success()
method delBlock*(self: RepoStore, treeCid: Cid, index: Natural): Future[?!void] {.async.} =
without cid =? await self.treeReader.getBlockCid(treeCid, index), err:
without key =? createBlockCidAndProofMetadataKey(treeCid, index), err:
return failure(err)
await self.delBlock(cid)
without value =? await self.metaDs.get(key), err:
if err of DatastoreKeyNotFound:
return success()
else:
return failure(err)
without cidAndProof =? (Cid, MerkleProof).decode(value), err:
return failure(err)
self.delBlock(cidAndProof[0])
await self.metaDs.delete(key)
method hasBlock*(self: RepoStore, cid: Cid): Future[?!bool] {.async.} =
## Check if the block exists in the blockstore
@ -378,10 +404,13 @@ method hasBlock*(self: RepoStore, cid: Cid): Future[?!bool] {.async.} =
return await self.repoDs.has(key)
method hasBlock*(self: RepoStore, treeCid: Cid, index: Natural): Future[?!bool] {.async.} =
without cid =? await self.treeReader.getBlockCid(treeCid, index), err:
return failure(err)
await self.hasBlock(cid)
without cidAndProof =? await self.getCidAndProof(treeCid, index), err:
if err of BlockNotFoundError:
return success(false)
else:
return failure(err)
await self.hasBlock(cidAndProof[0])
method listBlocks*(
self: RepoStore,
@ -587,21 +616,16 @@ func new*(
clock: Clock = SystemClock.new(),
postFixLen = 2,
quotaMaxBytes = DefaultQuotaBytes,
blockTtl = DefaultBlockTtl,
treeCacheCapacity = DefaultTreeCacheCapacity
blockTtl = DefaultBlockTtl
): RepoStore =
## Create new instance of a RepoStore
##
let store = RepoStore(
RepoStore(
repoDs: repoDs,
metaDs: metaDs,
treeReader: treeReader,
clock: clock,
postFixLen: postFixLen,
quotaMaxBytes: quotaMaxBytes,
blockTtl: blockTtl)
proc getBlockFromStore(cid: Cid): Future[?!Block] = store.getBlock(cid)
treeReader.getBlockFromStore = getBlockFromStore
store
blockTtl: blockTtl
)

View File

@ -1,101 +0,0 @@
import pkg/upraises
import pkg/chronos
import pkg/chronos/futures
import pkg/chronicles
import pkg/libp2p/[cid, multicodec, multihash]
import pkg/lrucache
import pkg/questionable
import pkg/questionable/results
import ../blocktype
import ../merkletree
import ../utils
const DefaultTreeCacheCapacity* = 10 # Max number of trees stored in memory
type
GetBlock = proc (cid: Cid): Future[?!Block] {.upraises: [], gcsafe, closure.}
TreeReader* = ref object of RootObj
getBlockFromStore*: GetBlock
treeCache*: LruCache[Cid, MerkleTree]
proc getTree*(self: TreeReader, cid: Cid): Future[?!MerkleTree] {.async.} =
if tree =? self.treeCache.getOption(cid):
return success(tree)
else:
without treeBlk =? await self.getBlockFromStore(cid), err:
return failure(err)
without tree =? MerkleTree.decode(treeBlk.data), err:
return failure("Error decoding a merkle tree with cid " & $cid & ". Nested error is: " & err.msg)
self.treeCache[cid] = tree
trace "Got merkle tree for cid", cid
return success(tree)
proc getBlockCidAndProof*(self: TreeReader, treeCid: Cid, index: Natural): Future[?!(Cid, MerkleProof)] {.async.} =
without tree =? await self.getTree(treeCid), err:
return failure(err)
without proof =? tree.getProof(index), err:
return failure(err)
without leaf =? tree.getLeaf(index), err:
return failure(err)
without leafCid =? Cid.init(treeCid.cidver, treeCid.mcodec, leaf).mapFailure, err:
return failure(err)
return success((leafCid, proof))
proc getBlockCid*(self: TreeReader, treeCid: Cid, index: Natural): Future[?!Cid] {.async.} =
without tree =? await self.getTree(treeCid), err:
return failure(err)
without leaf =? tree.getLeaf(index), err:
return failure(err)
without leafCid =? Cid.init(treeCid.cidver, treeCid.mcodec, leaf).mapFailure, err:
return failure(err)
return success(leafCid)
proc getBlock*(self: TreeReader, treeCid: Cid, index: Natural): Future[?!Block] {.async.} =
without leafCid =? await self.getBlockCid(treeCid, index), err:
return failure(err)
without blk =? await self.getBlockFromStore(leafCid), err:
return failure(err)
return success(blk)
proc getBlockAndProof*(self: TreeReader, treeCid: Cid, index: Natural): Future[?!(Block, MerkleProof)] {.async.} =
without (leafCid, proof) =? await self.getBlockCidAndProof(treeCid, index), err:
return failure(err)
without blk =? await self.getBlockFromStore(leafCid), err:
return failure(err)
return success((blk, proof))
proc getBlocks*(self: TreeReader, treeCid: Cid, leavesCount: Natural): Future[?!AsyncIter[?!Block]] {.async.} =
without tree =? await self.getTree(treeCid), err:
return failure(err)
let iter = Iter.fromSlice(0..<leavesCount)
.map(proc (index: int): Future[?!Block] {.async.} =
without leaf =? tree.getLeaf(index), err:
return failure(err)
without leafCid =? Cid.init(treeCid.cidver, treeCid.mcodec, leaf).mapFailure, err:
return failure(err)
without blk =? await self.getBlockFromStore(leafCid), err:
return failure(err)
return success(blk)
)
return success(iter)
proc new*(T: type TreeReader, treeCacheCap = DefaultTreeCacheCapacity): TreeReader =
TreeReader(treeCache: newLruCache[Cid, MerkleTree](treeCacheCap))

View File

@ -1,6 +1,5 @@
import ./streams/seekablestream
import ./streams/storestream
import ./streams/seekablestorestream
import ./streams/asyncstreamwrapper
export seekablestream, storestream, seekablestorestream, asyncstreamwrapper
export seekablestream, storestream, asyncstreamwrapper

View File

@ -1,123 +0,0 @@
## Nim-Dagger
## Copyright (c) 2022 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import std/options
import pkg/upraises
push: {.upraises: [].}
import pkg/chronos
import pkg/chronicles
import pkg/stew/ptrops
import ../stores
import ../manifest
import ../blocktype
import ../utils
import ./seekablestream
export stores, blocktype, manifest, chronos
logScope:
topics = "codex storestream"
const
SeekableStoreStreamTrackerName* = "SeekableStoreStream"
type
# Make SeekableStream from a sequence of blocks stored in Manifest
# (only original file data - see StoreStream.size)
SeekableStoreStream* = ref object of SeekableStream
store*: BlockStore # Store where to lookup block contents
manifest*: Manifest # List of block CIDs
pad*: bool # Pad last block to manifest.blockSize?
method initStream*(s: SeekableStoreStream) =
if s.objName.len == 0:
s.objName = SeekableStoreStreamTrackerName
procCall SeekableStream(s).initStream()
proc new*(
T: type SeekableStoreStream,
store: BlockStore,
manifest: Manifest,
pad = true
): SeekableStoreStream =
## Create a new SeekableStoreStream instance for a given store and manifest
##
result = SeekableStoreStream(
store: store,
manifest: manifest,
pad: pad,
offset: 0)
result.initStream()
method `size`*(self: SeekableStoreStream): int =
bytes(self.manifest, self.pad).int
proc `size=`*(self: SeekableStoreStream, size: int)
{.error: "Setting the size is forbidden".} =
discard
method atEof*(self: SeekableStoreStream): bool =
self.offset >= self.size
method readOnce*(
self: SeekableStoreStream,
pbytes: pointer,
nbytes: int
): Future[int] {.async.} =
## Read `nbytes` from current position in the SeekableStoreStream into output buffer pointed by `pbytes`.
## Return how many bytes were actually read before EOF was encountered.
## Raise exception if we are already at EOF.
##
trace "Reading from manifest", cid = self.manifest.cid.get(), blocks = self.manifest.blocksCount
if self.atEof:
raise newLPStreamEOFError()
# The loop iterates over blocks in the SeekableStoreStream,
# reading them and copying their data into outbuf
var read = 0 # Bytes read so far, and thus write offset in the outbuf
while read < nbytes and not self.atEof:
# Compute from the current stream position `self.offset` the block num/offset to read
# Compute how many bytes to read from this block
let
blockNum = self.offset div self.manifest.blockSize.int
blockOffset = self.offset mod self.manifest.blockSize.int
readBytes = min([self.size - self.offset,
nbytes - read,
self.manifest.blockSize.int - blockOffset])
# Read contents of block `blockNum`
without blk =? await self.store.getBlock(self.manifest.treeCid, blockNum, self.manifest.treeRoot), error:
raise newLPStreamReadError(error)
trace "Reading bytes from store stream", blockNum, cid = blk.cid, bytes = readBytes, blockOffset
# Copy `readBytes` bytes starting at `blockOffset` from the block into the outbuf
if blk.isEmpty:
zeroMem(pbytes.offset(read), readBytes)
else:
copyMem(pbytes.offset(read), blk.data[blockOffset].addr, readBytes)
# Update current positions in the stream and outbuf
self.offset += readBytes
read += readBytes
return read
method closeImpl*(self: SeekableStoreStream) {.async.} =
trace "Closing SeekableStoreStream"
self.offset = self.size # set Eof
await procCall LPStream(self).closeImpl()

View File

@ -33,20 +33,18 @@ const
StoreStreamTrackerName* = "StoreStream"
type
StoreStream* = ref object of LPStream
# Make SeekableStream from a sequence of blocks stored in Manifest
# (only original file data - see StoreStream.size)
StoreStream* = ref object of SeekableStream
store*: BlockStore # Store where to lookup block contents
manifest*: Manifest # List of block CIDs
pad*: bool # Pad last block to manifest.blockSize?
iter*: AsyncIter[?!Block]
lastBlock: Block
lastIndex: int
offset: int
method initStream*(s: StoreStream) =
if s.objName.len == 0:
s.objName = StoreStreamTrackerName
procCall LPStream(s).initStream()
procCall SeekableStream(s).initStream()
proc new*(
T: type StoreStream,
@ -60,7 +58,6 @@ proc new*(
store: store,
manifest: manifest,
pad: pad,
lastIndex: -1,
offset: 0)
result.initStream()
@ -88,34 +85,32 @@ method readOnce*(
trace "Reading from manifest", cid = self.manifest.cid.get(), blocks = self.manifest.blocksCount
if self.atEof:
raise newLPStreamEOFError()
# Initialize a block iterator
if self.lastIndex < 0:
without iter =? await self.store.getBlocks(self.manifest.treeCid, self.manifest.blocksCount, self.manifest.treeRoot), err:
raise newLPStreamReadError(err)
self.iter = iter
# The loop iterates over blocks in the StoreStream,
# reading them and copying their data into outbuf
var read = 0 # Bytes read so far, and thus write offset in the outbuf
while read < nbytes and not self.atEof:
if self.offset >= (self.lastIndex + 1) * self.manifest.blockSize.int:
if not self.iter.finished:
without lastBlock =? await self.iter.next(), err:
raise newLPStreamReadError(err)
self.lastBlock = lastBlock
inc self.lastIndex
else:
raise newLPStreamReadError(newException(CodexError, "Block iterator finished prematurely"))
# Compute from the current stream position `self.offset` the block num/offset to read
# Compute how many bytes to read from this block
let
blockNum = self.offset div self.manifest.blockSize.int
blockOffset = self.offset mod self.manifest.blockSize.int
readBytes = min([self.size - self.offset,
nbytes - read,
self.manifest.blockSize.int - blockOffset])
address = BlockAddress(leaf: true, treeCid: self.manifest.treeCid, index: blockNum)
# Read contents of block `blockNum`
without blk =? await self.store.getBlock(address), error:
raise newLPStreamReadError(error)
trace "Reading bytes from store stream", blockNum, cid = blk.cid, bytes = readBytes, blockOffset
# Copy `readBytes` bytes starting at `blockOffset` from the block into the outbuf
if self.lastBlock.isEmpty:
if blk.isEmpty:
zeroMem(pbytes.offset(read), readBytes)
else:
copyMem(pbytes.offset(read), self.lastBlock.data[blockOffset].addr, readBytes)
copyMem(pbytes.offset(read), blk.data[blockOffset].addr, readBytes)
# Update current positions in the stream and outbuf
self.offset += readBytes

View File

@ -52,12 +52,12 @@ proc new*[T](_: type Iter, genNext: GenNext[T], isFinished: IsFinished, finishOn
iter.next = next
return iter
proc fromItems*[T](_: type Iter, items: openArray[T]): Iter[T] =
proc fromItems*[T](_: type Iter, items: seq[T]): Iter[T] =
## Create new iterator from items
##
Iter.fromSlice(0..<items.len)
.map((i) => items[i])
.map((i: int) => items[i])
proc fromSlice*[U, V: Ordinal](_: type Iter, slice: HSlice[U, V]): Iter[U] =
## Creates new iterator from slice
@ -89,22 +89,23 @@ proc map*[T, U](iter: Iter[T], fn: Function[T, U]): Iter[U] =
)
proc filter*[T](iter: Iter[T], predicate: Function[T, bool]): Iter[T] =
var nextItem: T
var nextT: Option[T]
proc tryFetch(): void =
nextT = T.none
while not iter.finished:
let item = iter.next()
if predicate(item):
nextItem = some(item)
let t = iter.next()
if predicate(t):
nextT = some(t)
break
proc genNext(): T =
let t = nextItem
let t = nextT.unsafeGet
tryFetch()
return t
proc isFinished(): bool =
iter.finished
nextT.isNone
tryFetch()
Iter.new(genNext, isFinished)

View File

@ -47,10 +47,9 @@ proc makeManifestAndTree*(blocks: seq[Block]): ?!(Manifest, MerkleTree) =
datasetSize = blocks.mapIt(it.data.len).foldl(a + b)
blockSize = blocks.mapIt(it.data.len).foldl(max(a, b))
tree = ? MerkleTree.init(blocks.mapIt(it.cid))
treeBlk = ? Block.new(tree.encode())
treeCid = ? tree.rootCid
manifest = Manifest.new(
treeCid = treeBlk.cid,
treeRoot = tree.root,
treeCid = treeCid,
blockSize = NBytes(blockSize),
datasetSize = NBytes(datasetSize),
version = CIDv1,
@ -78,30 +77,28 @@ proc makeWantList*(
full: full)
proc storeDataGetManifest*(store: BlockStore, chunker: Chunker): Future[Manifest] {.async.} =
var builder = MerkleTreeBuilder.init().tryGet()
var cids = newSeq[Cid]()
while (
let chunk = await chunker.getBytes();
chunk.len > 0):
let blk = Block.new(chunk).tryGet()
# builder.addDataBlock(blk.data).tryGet()
let mhash = blk.cid.mhash.mapFailure.tryGet()
builder.addLeaf(mhash).tryGet()
cids.add(blk.cid)
(await store.putBlock(blk)).tryGet()
let
tree = builder.build().tryGet()
treeBlk = Block.new(tree.encode()).tryGet()
let
tree = MerkleTree.init(cids).tryGet()
treeCid = tree.rootCid.tryGet()
manifest = Manifest.new(
treeCid = treeCid,
blockSize = NBytes(chunker.chunkSize),
datasetSize = NBytes(chunker.offset),
)
let manifest = Manifest.new(
treeCid = treeBlk.cid,
treeRoot = tree.root,
blockSize = NBytes(chunker.chunkSize),
datasetSize = NBytes(chunker.offset),
)
(await store.putBlock(treeBlk)).tryGet()
for i in 0..<tree.leavesCount:
let proof = tree.getProof(i).tryGet()
(await store.putBlockCidAndProof(treeCid, i, cids[i], proof)).tryGet()
return manifest
@ -119,7 +116,7 @@ proc corruptBlocks*(
pos.add(i)
var
blk = (await store.getBlock(manifest.treeCid, i, manifest.treeRoot)).tryGet()
blk = (await store.getBlock(manifest.treeCid, i)).tryGet()
bytePos: seq[int]
doAssert bytes < blk.data.len

View File

@ -56,7 +56,7 @@ checksuite "merkletree":
let tree = builder.build().tryGet()
check:
tree.leaves == expectedLeaves[0..0]
tree.leaves.toSeq == expectedLeaves[0..0]
tree.root == expectedLeaves[0]
tree.len == 1
@ -69,7 +69,7 @@ checksuite "merkletree":
let expectedRoot = combine(expectedLeaves[0], expectedLeaves[1])
check:
tree.leaves == expectedLeaves[0..1]
tree.leaves.toSeq == expectedLeaves[0..1]
tree.len == 3
tree.root == expectedRoot
@ -87,7 +87,7 @@ checksuite "merkletree":
)
check:
tree.leaves == expectedLeaves[0..2]
tree.leaves.toSeq == expectedLeaves[0..2]
tree.len == 6
tree.root == expectedRoot
@ -126,7 +126,7 @@ checksuite "merkletree":
)
check:
tree.leaves == expectedLeaves[0..8]
tree.leaves.toSeq == expectedLeaves[0..8]
tree.len == 20
tree.root == expectedRoot

View File

@ -29,7 +29,7 @@ asyncchecksuite "Erasure encode/decode":
metaDs = SQLiteDatastore.new(Memory).tryGet()
rng = Rng.instance()
chunker = RandomChunker.new(rng, size = dataSetSize, chunkSize = BlockSize)
store = CacheStore.new(cacheSize = (dataSetSize * 4), chunkSize = BlockSize)
store = CacheStore.new(cacheSize = (dataSetSize * 8), chunkSize = BlockSize)
erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider)
manifest = await storeDataGetManifest(store, chunker)
@ -61,18 +61,19 @@ asyncchecksuite "Erasure encode/decode":
for _ in 0..<encoded.ecM:
dropped.add(column)
(await store.delBlock(encoded.treeCid, column)).tryGet()
(await store.delBlock(manifest.treeCid, column)).tryGet()
column.inc(encoded.steps - 1)
var
decoded = (await erasure.decode(encoded)).tryGet()
check:
decoded.cid.tryGet() == manifest.cid.tryGet()
decoded.cid.tryGet() == encoded.originalCid
decoded.treeCid == manifest.treeCid
decoded.treeCid == encoded.originalTreeCid
decoded.blocksCount == encoded.originalBlocksCount
for d in dropped:
let present = await store.hasBlock(encoded.treeCid, d)
let present = await store.hasBlock(manifest.treeCid, d)
check present.tryGet()
test "Should not tolerate losing more than M data blocks in a single random column":
@ -89,6 +90,7 @@ asyncchecksuite "Erasure encode/decode":
for _ in 0..<encoded.ecM + 1:
dropped.add(column)
(await store.delBlock(encoded.treeCid, column)).tryGet()
(await store.delBlock(manifest.treeCid, column)).tryGet()
column.inc(encoded.steps)
var
@ -98,7 +100,7 @@ asyncchecksuite "Erasure encode/decode":
decoded = (await erasure.decode(encoded)).tryGet()
for d in dropped:
let present = await store.hasBlock(encoded.treeCid, d)
let present = await store.hasBlock(manifest.treeCid, d)
check not present.tryGet()
test "Should tolerate losing M data blocks in M random columns":
@ -122,6 +124,7 @@ asyncchecksuite "Erasure encode/decode":
for idx in blocks:
(await store.delBlock(encoded.treeCid, idx)).tryGet()
(await store.delBlock(manifest.treeCid, idx)).tryGet()
discard
discard (await erasure.decode(encoded)).tryGet()
@ -149,7 +152,7 @@ asyncchecksuite "Erasure encode/decode":
var idx: int
while true:
idx = rng.sample(blockIdx, blocks)
let blk = (await store.getBlock(encoded.treeCid, idx, encoded.treeRoot)).tryGet()
let blk = (await store.getBlock(encoded.treeCid, idx)).tryGet()
if not blk.isEmpty:
break
@ -158,6 +161,7 @@ asyncchecksuite "Erasure encode/decode":
for idx in blocks:
(await store.delBlock(encoded.treeCid, idx)).tryGet()
(await store.delBlock(manifest.treeCid, idx)).tryGet()
discard
var
@ -175,6 +179,7 @@ asyncchecksuite "Erasure encode/decode":
for b in 0..<encoded.steps * encoded.ecM:
(await store.delBlock(encoded.treeCid, b)).tryGet()
(await store.delBlock(manifest.treeCid, b)).tryGet()
discard (await erasure.decode(encoded)).tryGet()
@ -191,6 +196,7 @@ asyncchecksuite "Erasure encode/decode":
for b in (encoded.blocksCount - encoded.steps * encoded.ecM)..<encoded.blocksCount:
(await store.delBlock(encoded.treeCid, b)).tryGet()
(await store.delBlock(manifest.treeCid, b)).tryGet()
discard (await erasure.decode(encoded)).tryGet()

View File

@ -17,7 +17,6 @@ checksuite "Manifest":
var
manifest = Manifest.new(
treeCid = Cid.example,
treeRoot = MultiHash.example,
blockSize = 1.MiBs,
datasetSize = 100.MiBs)