Tmp, rework of merkleization

This commit is contained in:
Tomasz Bekas 2023-10-26 13:23:23 +02:00
parent 252b4451b7
commit 919d15ff1f
No known key found for this signature in database
GPG Key ID: 4854E04C98824959
10 changed files with 213 additions and 522 deletions

View File

@ -175,60 +175,11 @@ proc monitorBlockHandle(b: BlockExcEngine, handle: Future[Block], address: Block
b.discovery.queueFindBlocksReq(@[address.cidOrTreeCid])
await b.network.switch.disconnect(peerId)
proc requestBlock*(
b: BlockExcEngine,
cid: Cid,
timeout = DefaultBlockTimeout): Future[Block] {.async.} =
trace "Begin block request", cid, peers = b.peers.len
if b.pendingBlocks.isInFlight(cid):
trace "Request handle already pending", cid
return await b.pendingBlocks.getWantHandle(cid, timeout)
let
blk = b.pendingBlocks.getWantHandle(cid, timeout)
address = BlockAddress(leaf: false, cid: cid)
trace "Selecting peers who have", address
var
peers = b.peers.selectCheapest(address)
without blockPeer =? b.findCheapestPeerForBlock(peers):
trace "No peers to request blocks from. Queue discovery...", cid
b.discovery.queueFindBlocksReq(@[cid])
return await blk
asyncSpawn b.monitorBlockHandle(blk, address, blockPeer.id)
b.pendingBlocks.setInFlight(cid, true)
await b.sendWantBlock(address, blockPeer)
codexBlockExchangeWantBlockListsSent.inc()
if (peers.len - 1) == 0:
trace "No peers to send want list to", cid
b.discovery.queueFindBlocksReq(@[cid])
return await blk
await b.sendWantHave(address, blockPeer, toSeq(b.peers))
codexBlockExchangeWantHaveListsSent.inc()
return await blk
proc requestBlock(
b: BlockExcEngine,
treeReq: TreeReq,
index: Natural,
address: BlockAddress
timeout = DefaultBlockTimeout
): Future[Block] {.async.} =
let address = BlockAddress(leaf: true, treeCid: treeReq.treeCid, index: index)
let handleOrCid = treeReq.getWantHandleOrCid(index, timeout)
if handleOrCid.resolved:
without blk =? await b.localStore.getBlock(handleOrCid.cid), err:
return await b.requestBlock(handleOrCid.cid, timeout)
return blk
let blockFuture = handleOrCid.handle
if treeReq.isInFlight(index):
@ -236,7 +187,7 @@ proc requestBlock(
let peers = b.peers.selectCheapest(address)
if peers.len == 0:
b.discovery.queueFindBlocksReq(@[treeReq.treeCid])
b.discovery.queueFindBlocksReq(@[address.cidOrTreeCid])
let maybePeer =
if peers.len > 0:
@ -248,7 +199,7 @@ proc requestBlock(
if peer =? maybePeer:
asyncSpawn b.monitorBlockHandle(blockFuture, address, peer.id)
treeReq.trySetInFlight(index)
treeReq.setInFlight(address)
await b.sendWantBlock(address, peer)
codexBlockExchangeWantBlockListsSent.inc()
await b.sendWantHave(address, peer, toSeq(b.peers))
@ -256,32 +207,6 @@ proc requestBlock(
return await blockFuture
proc requestBlock*(
b: BlockExcEngine,
treeCid: Cid,
index: Natural,
merkleRoot: MultiHash,
timeout = DefaultBlockTimeout
): Future[Block] =
without treeReq =? b.pendingBlocks.getOrPutTreeReq(treeCid, Natural.none, merkleRoot), err:
raise err
return b.requestBlock(treeReq, index, timeout)
proc requestBlocks*(
b: BlockExcEngine,
treeCid: Cid,
leavesCount: Natural,
merkleRoot: MultiHash,
timeout = DefaultBlockTimeout
): ?!AsyncIter[Block] =
without treeReq =? b.pendingBlocks.getOrPutTreeReq(treeCid, leavesCount.some, merkleRoot), err:
return failure(err)
return Iter.fromSlice(0..<leavesCount).map(
(index: int) => b.requestBlock(treeReq, index, timeout)
).success
proc blockPresenceHandler*(
b: BlockExcEngine,
peer: PeerId,
@ -383,6 +308,7 @@ proc blocksDeliveryHandler*(
for bd in blocksDelivery:
if err =? (await b.localStore.putBlock(bd.blk)).errorOption:
trace "Unable to store block", cid = bd.blk.cid, err = err.msg
# TODO store proof if bd.leaf
else:
storedBlocks.add(bd)

View File

@ -14,19 +14,12 @@ import pkg/upraises
push: {.upraises: [].}
import ../../blocktype
import pkg/chronicles
import pkg/questionable
import pkg/questionable/options
import pkg/questionable/results
import pkg/chronos
import pkg/libp2p
import pkg/metrics
import ../protobuf/blockexc
import ../../merkletree
import ../../utils
import ../../blocktype
logScope:
topics = "codex pendingblocks"
@ -43,121 +36,15 @@ type
inFlight*: bool
startTime*: int64
LeafReq* = object
case delivered*: bool
of false:
handle*: Future[Block]
inFlight*: bool
of true:
leaf: MultiHash
blkCid*: Cid
TreeReq* = ref object
leaves*: Table[Natural, LeafReq]
deliveredCount*: Natural
leavesCount*: ?Natural
treeRoot*: MultiHash
treeCid*: Cid
TreeHandler* = proc(tree: MerkleTree): Future[void] {.raises:[], gcsafe.}
PendingBlocksManager* = ref object of RootObj
blocks*: Table[Cid, BlockReq] # pending Block requests
trees*: Table[Cid, TreeReq]
onTree*: TreeHandler
blocks*: Table[BlockAddress, BlockReq] # pending Block requests
proc updatePendingBlockGauge(p: PendingBlocksManager) =
codexBlockExchangePendingBlockRequests.set(p.blocks.len.int64)
type
BlockHandleOrCid = object
case resolved*: bool
of true:
cid*: Cid
else:
handle*: Future[Block]
proc buildTree(treeReq: TreeReq): ?!MerkleTree =
trace "Building a merkle tree from leaves", treeCid = treeReq.treeCid, leavesCount = treeReq.leavesCount
without leavesCount =? treeReq.leavesCount:
return failure("Leaves count is none, cannot build a tree")
var builder = ? MerkleTreeBuilder.init(treeReq.treeRoot.mcodec)
for i in 0..<leavesCount:
treeReq.leaves.withValue(i, leafReq):
if leafReq.delivered:
? builder.addLeaf(leafReq.leaf)
else:
return failure("Expected all leaves to be delivered but leaf with index " & $i & " was not")
do:
return failure("Missing a leaf with index " & $i)
let tree = ? builder.build()
if tree.root != treeReq.treeRoot:
return failure("Reconstructed tree root doesn't match the original tree root, tree cid is " & $treeReq.treeCid)
return success(tree)
proc checkIfAllDelivered(p: PendingBlocksManager, treeReq: TreeReq): void =
if treeReq.deliveredCount.some == treeReq.leavesCount:
without tree =? buildTree(treeReq), err:
error "Error building a tree", msg = err.msg
p.trees.del(treeReq.treeCid)
return
p.trees.del(treeReq.treeCid)
asyncSpawn p.onTree(tree)
proc getWantHandleOrCid*(
treeReq: TreeReq,
index: Natural,
timeout = DefaultBlockTimeout
): BlockHandleOrCid =
treeReq.leaves.withValue(index, leafReq):
if not leafReq.delivered:
return BlockHandleOrCid(resolved: false, handle: leafReq.handle)
else:
return BlockHandleOrCid(resolved: true, cid: leafReq.blkCid)
do:
let leafReq = LeafReq(
delivered: false,
handle: newFuture[Block]("pendingBlocks.getWantHandleOrCid"),
inFlight: false
)
treeReq.leaves[index] = leafReq
return BlockHandleOrCid(resolved: false, handle: leafReq.handle)
proc getOrPutTreeReq*(
p: PendingBlocksManager,
treeCid: Cid,
leavesCount = Natural.none, # has value when all leaves are expected to be delivered
treeRoot: MultiHash
): ?!TreeReq =
p.trees.withValue(treeCid, treeReq):
if treeReq.treeRoot != treeRoot:
return failure("Unexpected root for tree with cid " & $treeCid)
if leavesCount == treeReq.leavesCount:
return success(treeReq[])
else:
treeReq.leavesCount = treeReq.leavesCount.orElse(leavesCount)
let res = success(treeReq[])
p.checkIfAllDelivered(treeReq[])
return res
do:
let treeReq = TreeReq(
deliveredCount: 0,
leavesCount: leavesCount,
treeRoot: treeRoot,
treeCid: treeCid
)
p.trees[treeCid] = treeReq
return success(treeReq)
proc getWantHandle*(
p: PendingBlocksManager,
cid: Cid,
address: BlockAddress,
timeout = DefaultBlockTimeout,
inFlight = false
): Future[Block] {.async.} =
@ -165,16 +52,16 @@ proc getWantHandle*(
##
try:
if cid notin p.blocks:
p.blocks[cid] = BlockReq(
if address notin p.blocks:
p.blocks[address] = BlockReq(
handle: newFuture[Block]("pendingBlocks.getWantHandle"),
inFlight: inFlight,
startTime: getMonoTime().ticks)
trace "Adding pending future for block", cid, inFlight = p.blocks[cid].inFlight
trace "Adding pending future for block", address, inFlight = p.blocks[address].inFlight
p.updatePendingBlockGauge()
return await p.blocks[cid].handle.wait(timeout)
return await p.blocks[address].handle.wait(timeout)
except CancelledError as exc:
trace "Blocks cancelled", exc = exc.msg, cid
raise exc
@ -183,17 +70,9 @@ proc getWantHandle*(
# no need to cancel, it is already cancelled by wait()
raise exc
finally:
p.blocks.del(cid)
p.blocks.del(address)
p.updatePendingBlockGauge()
proc getOrComputeLeaf(mcodec: MultiCodec, blk: Block): ?!MultiHash =
without mhash =? blk.cid.mhash.mapFailure, err:
return MultiHash.digest($mcodec, blk.data).mapFailure
if mhash.mcodec == mcodec:
return success(mhash)
else:
return MultiHash.digest($mcodec, blk.data).mapFailure
proc resolve*(
p: PendingBlocksManager,
@ -202,117 +81,94 @@ proc resolve*(
## Resolve pending blocks
##
for bd in blocksDelivery:
for bd in blocksDelivery:
p.blocks.withValue(bd.address, blockReq):
trace "Resolving block", bd.address
if not bd.address.leaf:
if bd.address.cid == bd.blk.cid:
p.blocks.withValue(bd.blk.cid, pending):
if not pending.handle.completed:
trace "Resolving block", cid = bd.blk.cid
pending.handle.complete(bd.blk)
let
startTime = pending[].startTime
stopTime = getMonoTime().ticks
retrievalDurationUs = (stopTime - startTime) div 1000
codexBlockExchangeRetrievalTimeUs.set(retrievalDurationUs)
trace "Block retrieval time", retrievalDurationUs
else:
warn "Delivery cid doesn't match block cid", deliveryCid = bd.address.cid, blockCid = bd.blk.cid
else: # when block.address.leaf == true
p.trees.withValue(bd.address.treeCid, treeReq):
treeReq.leaves.withValue(bd.address.index, leafReq):
if not leafReq.delivered:
if proof =? bd.proof:
if not proof.index == bd.address.index:
warn "Proof index doesn't match leaf index", address = bd.address, proofIndex = proof.index
continue
without mhash =? bd.blk.cid.mhash.mapFailure, err:
error "Unable to get mhash from cid for block", address = bd.address, msg = err.msg
continue
without verifySuccess =? proof.verifyLeaf(mhash, treeReq.treeRoot), err:
error "Unable to verify proof for block", address = bd.address, msg = err.msg
continue
if verifySuccess:
without leaf =? getOrComputeLeaf(treeReq.treeRoot.mcodec, bd.blk), err:
error "Unable to get or calculate hash for block", address = bd.address
continue
if bd.address.leaf:
without proof =? bd.proof:
warn "Missing proof for a block", address = bd.address
continue
if proof.index != bd.address.index:
warn "Proof index doesn't match leaf index", address = bd.address, proofIndex = proof.index
continue
leafReq.handle.complete(bd.blk)
leafReq[] = LeafReq(delivered: true, blkCid: bd.blk.cid, leaf: leaf)
without leaf =? bd.blk.cid.mhash.mapFailure, err:
error "Unable to get mhash from cid for block", address = bd.address, msg = err.msg
continue
inc treeReq.deliveredCount
without treeRoot =? bd.address.treeCid.mhash.mapFailure, err:
error "Unable to get mhash from treeCid for block", address = bd.address, msg = err.msg
continue
p.checkIfAllDelivered(treeReq[])
else:
warn "Invalid proof provided for a block", address = bd.address
else:
warn "Missing proof for a block", address = bd.address
else:
trace "Ignore verifying proof for already delivered block", address = bd.address
without verifyOutcome =? proof.verifyLeaf(leaf, treeRoot), err:
error "Unable to verify proof for block", address = bd.address, msg = err.msg
continue
if not verifyOutcome:
warn "Invalid proof provided for a block", address = bd.address
else:
# bd.address.leaf == false
if bd.address.cid != bd.blk.cid:
warn "Delivery cid doesn't match block cid", deliveryCid = bd.address.cid, blockCid = bd.blk.cid
continue
let
startTime = blockReq.startTime
stopTime = getMonoTime().ticks
retrievalDurationUs = (stopTime - startTime) div 1000
blockReq.handle.complete(blk)
codexBlockExchangeRetrievalTimeUs.set(retrievalDurationUs)
trace "Block retrieval time", retrievalDurationUs
do:
warn "Attempting to resolve block delivery for not pending block", address = bd.address
proc setInFlight*(p: PendingBlocksManager,
cid: Cid,
address: BlockAddress,
inFlight = true) =
p.blocks.withValue(cid, pending):
pending.inFlight = inFlight
trace "Setting inflight", cid, inFlight = pending.inFlight
proc trySetInFlight*(treeReq: TreeReq,
index: Natural,
inFlight = true) =
treeReq.leaves.withValue(index, leafReq):
if not leafReq.delivered:
leafReq.inFlight = inFlight
trace "Setting inflight", treeCid = treeReq.treeCid, index, inFlight = inFlight
proc isInFlight*(treeReq: TreeReq,
index: Natural
): bool =
treeReq.leaves.withValue(index, leafReq):
return (not leafReq.delivered) and leafReq.inFlight
do:
return false
p.blocks.withValue(address, pending):
pending[].inFlight = inFlight
trace "Setting inflight", address, inFlight = pending[].inFlight
proc isInFlight*(p: PendingBlocksManager,
cid: Cid
address: BlockAddress,
): bool =
p.blocks.withValue(cid, pending):
result = pending.inFlight
trace "Getting inflight", cid, inFlight = result
p.blocks.withValue(address, pending):
result = pending[].inFlight
trace "Getting inflight", address, inFlight = result
proc pending*(p: PendingBlocksManager, cid: Cid): bool =
cid in p.blocks
proc pending*(p: PendingBlocksManager, address: Cid): bool =
address in p.blocks
proc contains*(p: PendingBlocksManager, cid: Cid): bool =
p.pending(cid)
proc contains*(p: PendingBlocksManager, address: Cid): bool =
p.pending(address)
iterator wantList*(p: PendingBlocksManager): BlockAddress =
for k in p.blocks.keys:
yield BlockAddress(leaf: false, cid: k)
for treeCid, treeReq in p.trees.pairs:
for index, leafReq in treeReq.leaves.pairs:
if not leafReq.delivered:
yield BlockAddress(leaf: true, treeCid: treeCid, index: index)
p.blocks.keys
iterator wantListBlockCids*(p: PendingBlocksManager): Cid =
for k in p.blocks.keys:
yield k
for a in p.blocks.keys:
if not a.leaf:
yield a.cid
iterator wantListCids*(p: PendingBlocksManager): Cid =
for k in p.blocks.keys:
yield k
yield k.cidOrTreeCid # TODO don't yield duplicates
for k in p.trees.keys:
yield k
iterator wantHandles*(p: PendingBlocksManager): Future[Block] =
for v in p.blocks.values:
yield v.handle
proc wantListLen*(p: PendingBlocksManager): int =
p.blocks.len + p.trees.len
p.blocks.len
iterator wantHandles*(p: PendingBlocksManager): Future[Block] =
for v in p.blocks.values:
yield v.handle
func len*(p: PendingBlocksManager): int =
p.blocks.len

View File

@ -16,6 +16,8 @@ const
CodexManifestNamespace* = CodexRepoNamespace & "/manifests" # manifest namespace
CodexBlocksTtlNamespace* = # Cid TTL
CodexMetaNamespace & "/ttl"
CodexBlockProofNamespace* = # Cid and Proof
CodexMetaNamespace & "/proof"
CodexDhtNamespace* = "dht" # Dht namespace
CodexDhtProvidersNamespace* = # Dht providers namespace
CodexDhtNamespace & "/providers"

View File

@ -190,8 +190,7 @@ proc store*(
dataCodec = multiCodec("raw")
chunker = LPStreamChunker.new(stream, chunkSize = blockSize)
without var treeBuilder =? MerkleTreeBuilder.init(hcodec), err:
return failure(err)
var cids: seq[Cid]
try:
while (
@ -209,8 +208,7 @@ proc store*(
without blk =? bt.Block.new(cid, chunk, verify = false):
return failure("Unable to init block from chunk!")
if err =? treeBuilder.addLeaf(mhash).errorOption:
return failure(err)
cids.add(cid)
if err =? (await self.blockStore.putBlock(blk)).errorOption:
trace "Unable to store block", cid = blk.cid, err = err.msg
@ -223,18 +221,23 @@ proc store*(
finally:
await stream.close()
without tree =? treeBuilder.build(), err:
return failure(err)
without treeBlk =? bt.Block.new(tree.encode()), err:
without tree =? MerkleTree.init(cids), err:
return failure(err)
if err =? (await self.blockStore.putBlock(treeBlk)).errorOption:
return failure("Unable to store merkle tree block " & $treeBlk.cid & ", nested err: " & err.msg)
without treeCid =? Cid.init(CIDv1, dataCodec, tree.root).mapFailure, err:
return failure(err)
for cid, index in cids:
without proof =? tree.getProof(index), err:
return failure(err)
if err =? (await self.blockStore.putBlockCidAndProof(treeCid, index, cid, proof)).errorOption:
# TODO add log here
return failure(err)
let manifest = Manifest.new(
treeCid = treeBlk.cid,
treeRoot = tree.root,
treeRoot = tree.root, # TODO remove it
blockSize = blockSize,
datasetSize = NBytes(chunker.offset),
version = CIDv1,

View File

@ -30,30 +30,18 @@ type
BlockStore* = ref object of RootObj
method getBlock*(self: BlockStore, cid: Cid): Future[?!Block] {.base.} =
method getBlock*(self: BlockStore, address: BlockAddress): Future[?!Block] {.base.} =
## Get a block from the blockstore
##
raiseAssert("Not implemented!")
method getBlock*(self: BlockStore, treeCid: Cid, index: Natural, merkleRoot: MultiHash): Future[?!Block] {.base.} =
## Get a block by Cid of a merkle tree and an index of a leaf in a tree, validate inclusion using merkle root
##
raiseAssert("Not implemented!")
method getBlockAndProof*(self: BlockStore, treeCid: Cid, index: Natural): Future[?!(Block, MerkleProof)] {.base.} =
## Get a block and associated inclusion proof by Cid of a merkle tree and an index of a leaf in a tree
##
raiseAssert("Not implemented!")
method getBlocks*(self: BlockStore, treeCid: Cid, leavesCount: Natural, merkleRoot: MultiHash): Future[?!AsyncIter[?!Block]] {.base.} =
## Get all blocks in range [0..<leavesCount] by Cid of a merkle tree, validate inclusion using merkle root
##
raiseAssert("Not implemented!")
method putBlock*(
self: BlockStore,
blk: Block,
@ -64,6 +52,19 @@ method putBlock*(
raiseAssert("Not implemented!")
# I cant use `BlockAddress` to wrap (treeCid, index) here. because as far as I know there's no way in Nim to force `assert(leaf == true)` in a compile time
method putBlockCidAndProof*(
self: BlockStore,
treeCid: Cid,
index: Natural
blockCid: Cid,
proof: MerkleProof
): Future[?!void] {.base.} =
## Put a block to the blockstore
##
raiseAssert("Not implemented!")
method delBlock*(self: BlockStore, cid: Cid): Future[?!void] {.base.} =
## Delete a block from the blockstore
##

View File

@ -15,6 +15,7 @@ import pkg/datastore
import pkg/libp2p
import ../namespaces
import ../manifest
import ../merkletree
const
CodexMetaKey* = Key.init(CodexMetaNamespace).tryGet
@ -23,6 +24,7 @@ const
CodexTotalBlocksKey* = Key.init(CodexBlockTotalNamespace).tryGet
CodexManifestKey* = Key.init(CodexManifestNamespace).tryGet
BlocksTtlKey* = Key.init(CodexBlocksTtlNamespace).tryGet
BlockProofKey* = Key.init(CodexBlockProofNamespace).tryGet
QuotaKey* = Key.init(CodexQuotaNamespace).tryGet
QuotaUsedKey* = (QuotaKey / "used").tryGet
QuotaReservedKey* = (QuotaKey / "reserved").tryGet
@ -42,3 +44,7 @@ proc createBlockExpirationMetadataKey*(cid: Cid): ?!Key =
proc createBlockExpirationMetadataQueryKey*(): ?!Key =
let queryString = ? (BlocksTtlKey / "*")
Key.init(queryString)
proc createBlockCidAndProofMetadataKey*(treeCid: Cid, index: Natural): ?!Key =
(BlockProofKey / $treeCid) / $index

View File

@ -17,7 +17,7 @@ import pkg/chronicles
import pkg/chronos
import pkg/libp2p
import ../blocktype as bt
import ../blocktype
import ../utils/asyncheapqueue
import ../utils/asynciter
@ -37,53 +37,24 @@ type
engine*: BlockExcEngine # blockexc decision engine
localStore*: BlockStore # local block store
method getBlock*(self: NetworkStore, cid: Cid): Future[?!bt.Block] {.async.} =
method getBlock*(self: BlockStore, address: BlockAddress): Future[?!Block] {.async.} =
trace "Getting block from local store or network", cid
without blk =? await self.localStore.getBlock(cid), error:
without blk =? await self.localStore.getBlock(address), error:
if not (error of BlockNotFoundError): return failure error
trace "Block not in local store", cid
trace "Block not in local store", address
without newBlock =? (await self.engine.requestBlock(cid)).catch, error:
trace "Unable to get block from exchange engine", cid
without newBlock =? (await self.engine.requestBlock(address)).catch, error:
trace "Unable to get block from exchange engine", address
return failure error
return success newBlock
return success blk
method getBlock*(self: NetworkStore, treeCid: Cid, index: Natural, merkleRoot: MultiHash): Future[?!Block] {.async.} =
without localBlock =? await self.localStore.getBlock(treeCid, index, merkleRoot), err:
if err of BlockNotFoundError:
trace "Requesting block from the network engine", treeCid, index
try:
let networkBlock = await self.engine.requestBlock(treeCid, index, merkleRoot)
return success(networkBlock)
except CatchableError as err:
return failure(err)
else:
failure(err)
return success(localBlock)
method getBlocks*(self: NetworkStore, treeCid: Cid, leavesCount: Natural, merkleRoot: MultiHash): Future[?!AsyncIter[?!Block]] {.async.} =
without localIter =? await self.localStore.getBlocks(treeCid, leavesCount, merkleRoot), err:
if err of BlockNotFoundError:
trace "Requesting blocks from the network engine", treeCid, leavesCount
without var networkIter =? self.engine.requestBlocks(treeCid, leavesCount, merkleRoot), err:
failure(err)
let iter = networkIter
.prefetch(BlockPrefetchAmount)
.map(proc (fut: Future[Block]): Future[?!Block] {.async.} = catch: (await fut))
return success(iter)
else:
return failure(err)
return success(localIter)
method putBlock*(
self: NetworkStore,
blk: bt.Block,
blk: Block,
ttl = Duration.none
): Future[?!void] {.async.} =
## Store block locally and notify the network

View File

@ -64,7 +64,7 @@ type
BlockExpiration* = object
cid*: Cid
expiration*: SecondsSince1970
proc updateMetrics(self: RepoStore) =
codexRepostoreBlocks.set(self.totalBlocks.int64)
codexRepostoreBytesUsed.set(self.quotaUsedBytes.int64)
@ -79,6 +79,46 @@ func available*(self: RepoStore): uint =
func available*(self: RepoStore, bytes: uint): bool =
return bytes < self.available()
proc encode(cidAndProof: (Cid, MerkleProof)): seq[byte] =
let (cid, proof) = cidAndProof
var pb = initProtoBuffer()
pb.write(1, cid.data.buffer)
pb.write(2, proof.encode)
pb.finish
pb.buffer
proc decode(_: type (Cid, MerkleProof), data: seq[byte]): ?!(Cid, MerkleProof) =
var
pbNode = initProtoBuffer(data)
cidBuf: seq[byte]
proofBuf: seq[byte]
discard pbNode.getField(1, cidBuf).mapFailure
discard pbNode.getField(2, proofBuf).mapFailure
let
cid = ? Cid.init(cidBuf).mapFailure
proof = ? MerkleProof.decode(proofBuf)
(cid, proof)
method putBlockCidAndProof*(
self: BlockStore,
treeCid: Cid,
index: Natural,
blockCid: Cid,
proof: MerkleProof
): Future[?!void] {.async.} =
## Put a block to the blockstore
##
without key =? createBlockCidAndProofMetadataKey(address), err:
return failure(err)
let value = (blockCid, proof).encode()
await self.metaDs.put(key, value)
method getBlock*(self: RepoStore, cid: Cid): Future[?!Block] {.async.} =
## Get a block from the blockstore
##
@ -104,14 +144,28 @@ method getBlock*(self: RepoStore, cid: Cid): Future[?!Block] {.async.} =
trace "Got block for cid", cid
return Block.new(cid, data, verify = true)
method getBlock*(self: RepoStore, treeCid: Cid, index: Natural, merkleRoot: MultiHash): Future[?!Block] =
self.treeReader.getBlock(treeCid, index)
method getBlock*(self: RepoStore, treeCid: Cid, index: Natural): Future[?!Block] =
without (blk, _) =? await self.getBlockAndProof(treeCid, index), err:
return failure(err)
success(blk)
method getBlocks*(self: RepoStore, treeCid: Cid, leavesCount: Natural, merkleRoot: MultiHash): Future[?!AsyncIter[?!Block]] =
self.treeReader.getBlocks(treeCid, leavesCount)
method getBlockAndProof*(self: RepoStore, treeCid: Cid, index: Natural): Future[?!(Block, MerkleProof)] =
self.treeReader.getBlockAndProof(treeCid, index)
without key =? createBlockCidAndProofMetadataKey(address), err:
return failure(err)
without value =? await self.metaDs.get(key), err:
return failure(err)
without (cid, proof) =? (Cid, MerkleProof).decode(value), err:
return failure(err)
without blk =? await self.getBlock(cid), err:
return failure(err)
succes(blk, proof)
proc getBlockExpirationTimestamp(self: RepoStore, ttl: ?Duration): SecondsSince1970 =
let duration = ttl |? self.blockTtl

View File

@ -1,123 +0,0 @@
## Nim-Codex
## Copyright (c) 2023 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import std/options
import pkg/upraises
push: {.upraises: [].}
import pkg/chronos
import pkg/chronicles
import pkg/stew/ptrops
import ../stores
import ../manifest
import ../blocktype
import ../utils
import ./seekablestream
export stores, blocktype, manifest, chronos
logScope:
topics = "codex storestream"
const
SeekableStoreStreamTrackerName* = "SeekableStoreStream"
type
# Make SeekableStream from a sequence of blocks stored in Manifest
# (only original file data - see StoreStream.size)
SeekableStoreStream* = ref object of SeekableStream
store*: BlockStore # Store where to lookup block contents
manifest*: Manifest # List of block CIDs
pad*: bool # Pad last block to manifest.blockSize?
method initStream*(s: SeekableStoreStream) =
if s.objName.len == 0:
s.objName = SeekableStoreStreamTrackerName
procCall SeekableStream(s).initStream()
proc new*(
T: type SeekableStoreStream,
store: BlockStore,
manifest: Manifest,
pad = true
): SeekableStoreStream =
## Create a new SeekableStoreStream instance for a given store and manifest
##
result = SeekableStoreStream(
store: store,
manifest: manifest,
pad: pad,
offset: 0)
result.initStream()
method `size`*(self: SeekableStoreStream): int =
bytes(self.manifest, self.pad).int
proc `size=`*(self: SeekableStoreStream, size: int)
{.error: "Setting the size is forbidden".} =
discard
method atEof*(self: SeekableStoreStream): bool =
self.offset >= self.size
method readOnce*(
self: SeekableStoreStream,
pbytes: pointer,
nbytes: int
): Future[int] {.async.} =
## Read `nbytes` from current position in the SeekableStoreStream into output buffer pointed by `pbytes`.
## Return how many bytes were actually read before EOF was encountered.
## Raise exception if we are already at EOF.
##
trace "Reading from manifest", cid = self.manifest.cid.get(), blocks = self.manifest.blocksCount
if self.atEof:
raise newLPStreamEOFError()
# The loop iterates over blocks in the SeekableStoreStream,
# reading them and copying their data into outbuf
var read = 0 # Bytes read so far, and thus write offset in the outbuf
while read < nbytes and not self.atEof:
# Compute from the current stream position `self.offset` the block num/offset to read
# Compute how many bytes to read from this block
let
blockNum = self.offset div self.manifest.blockSize.int
blockOffset = self.offset mod self.manifest.blockSize.int
readBytes = min([self.size - self.offset,
nbytes - read,
self.manifest.blockSize.int - blockOffset])
# Read contents of block `blockNum`
without blk =? await self.store.getBlock(self.manifest.treeCid, blockNum, self.manifest.treeRoot), error:
raise newLPStreamReadError(error)
trace "Reading bytes from store stream", blockNum, cid = blk.cid, bytes = readBytes, blockOffset
# Copy `readBytes` bytes starting at `blockOffset` from the block into the outbuf
if blk.isEmpty:
zeroMem(pbytes.offset(read), readBytes)
else:
copyMem(pbytes.offset(read), blk.data[blockOffset].addr, readBytes)
# Update current positions in the stream and outbuf
self.offset += readBytes
read += readBytes
return read
method closeImpl*(self: SeekableStoreStream) {.async.} =
trace "Closing SeekableStoreStream"
self.offset = self.size # set Eof
await procCall LPStream(self).closeImpl()

View File

@ -1,5 +1,5 @@
## Nim-Dagger
## Copyright (c) 2022 Status Research & Development GmbH
## Nim-Codex
## Copyright (c) 2023 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
@ -30,92 +30,87 @@ logScope:
topics = "codex storestream"
const
StoreStreamTrackerName* = "StoreStream"
SeekableStoreStreamTrackerName* = "SeekableStoreStream"
type
StoreStream* = ref object of LPStream
# Make SeekableStream from a sequence of blocks stored in Manifest
# (only original file data - see StoreStream.size)
SeekableStoreStream* = ref object of SeekableStream
store*: BlockStore # Store where to lookup block contents
manifest*: Manifest # List of block CIDs
pad*: bool # Pad last block to manifest.blockSize?
iter: AsyncIter[?!Block]
lastBlock: Block
lastIndex: int
offset: int
method initStream*(s: StoreStream) =
method initStream*(s: SeekableStoreStream) =
if s.objName.len == 0:
s.objName = StoreStreamTrackerName
s.objName = SeekableStoreStreamTrackerName
procCall LPStream(s).initStream()
procCall SeekableStream(s).initStream()
proc new*(
T: type StoreStream,
T: type SeekableStoreStream,
store: BlockStore,
manifest: Manifest,
pad = true
): StoreStream =
## Create a new StoreStream instance for a given store and manifest
##
result = StoreStream(
): SeekableStoreStream =
## Create a new SeekableStoreStream instance for a given store and manifest
##
result = SeekableStoreStream(
store: store,
manifest: manifest,
pad: pad,
lastIndex: -1,
offset: 0)
result.initStream()
method `size`*(self: StoreStream): int =
method `size`*(self: SeekableStoreStream): int =
bytes(self.manifest, self.pad).int
proc `size=`*(self: StoreStream, size: int)
proc `size=`*(self: SeekableStoreStream, size: int)
{.error: "Setting the size is forbidden".} =
discard
method atEof*(self: StoreStream): bool =
method atEof*(self: SeekableStoreStream): bool =
self.offset >= self.size
method readOnce*(
self: StoreStream,
self: SeekableStoreStream,
pbytes: pointer,
nbytes: int
): Future[int] {.async.} =
## Read `nbytes` from current position in the StoreStream into output buffer pointed by `pbytes`.
## Read `nbytes` from current position in the SeekableStoreStream into output buffer pointed by `pbytes`.
## Return how many bytes were actually read before EOF was encountered.
## Raise exception if we are already at EOF.
##
##
trace "Reading from manifest", cid = self.manifest.cid.get(), blocks = self.manifest.blocksCount
if self.atEof:
raise newLPStreamEOFError()
# Initialize a block iterator
if self.lastIndex < 0:
without iter =? await self.store.getBlocks(self.manifest.treeCid, self.manifest.blocksCount, self.manifest.treeRoot), err:
raise newLPStreamReadError(err)
self.iter = iter
# The loop iterates over blocks in the SeekableStoreStream,
# reading them and copying their data into outbuf
var read = 0 # Bytes read so far, and thus write offset in the outbuf
while read < nbytes and not self.atEof:
if self.offset >= (self.lastIndex + 1) * self.manifest.blockSize.int:
if not self.iter.finished:
without lastBlock =? await self.iter.next(), err:
raise newLPStreamReadError(err)
self.lastBlock = lastBlock
inc self.lastIndex
else:
raise newLPStreamReadError(newException(CodexError, "Block iterator finished prematurely"))
# Compute from the current stream position `self.offset` the block num/offset to read
# Compute how many bytes to read from this block
let
blockNum = self.offset div self.manifest.blockSize.int
blockOffset = self.offset mod self.manifest.blockSize.int
readBytes = min([self.size - self.offset,
nbytes - read,
self.manifest.blockSize.int - blockOffset])
address = BlockAddress(leaf: true, treeCid: self.manifest.treeCid, index: blockNum)
# Read contents of block `blockNum`
without blk =? await self.store.getBlock(address), error:
raise newLPStreamReadError(error)
trace "Reading bytes from store stream", blockNum, cid = blk.cid, bytes = readBytes, blockOffset
# Copy `readBytes` bytes starting at `blockOffset` from the block into the outbuf
if self.lastBlock.isEmpty:
if blk.isEmpty:
zeroMem(pbytes.offset(read), readBytes)
else:
copyMem(pbytes.offset(read), self.lastBlock.data[blockOffset].addr, readBytes)
copyMem(pbytes.offset(read), blk.data[blockOffset].addr, readBytes)
# Update current positions in the stream and outbuf
self.offset += readBytes
@ -123,7 +118,7 @@ method readOnce*(
return read
method closeImpl*(self: StoreStream) {.async.} =
trace "Closing StoreStream"
method closeImpl*(self: SeekableStoreStream) {.async.} =
trace "Closing SeekableStoreStream"
self.offset = self.size # set Eof
await procCall LPStream(self).closeImpl()