mirror of
https://github.com/logos-storage/logos-storage-nim.git
synced 2026-01-14 11:23:10 +00:00
Blockexchange uses merkle root and index to fetch blocks
This commit is contained in:
parent
e904ff39b0
commit
7db3f1ac0b
@ -121,11 +121,7 @@ proc stop*(b: BlockExcEngine) {.async.} =
|
||||
trace "NetworkStore stopped"
|
||||
|
||||
|
||||
proc sendWantHave(
|
||||
b: BlockExcEngine,
|
||||
address: BlockAddress,
|
||||
selectedPeer: BlockExcPeerCtx,
|
||||
peers: seq[BlockExcPeerCtx]): Future[void] {.async.} =
|
||||
proc sendWantHave(b: BlockExcEngine, address: BlockAddress, selectedPeer: BlockExcPeerCtx, peers: seq[BlockExcPeerCtx]): Future[void] {.async.} =
|
||||
trace "Sending wantHave request to peers", address
|
||||
for p in peers:
|
||||
if p != selectedPeer:
|
||||
@ -136,10 +132,11 @@ proc sendWantHave(
|
||||
@[address],
|
||||
wantType = WantType.WantHave) # we only want to know if the peer has the block
|
||||
|
||||
proc sendWantBlock(
|
||||
b: BlockExcEngine,
|
||||
address: BlockAddress,
|
||||
blockPeer: BlockExcPeerCtx): Future[void] {.async.} =
|
||||
proc sendWantBlock(b: BlockExcEngine, address: BlockAddress, blockPeer: BlockExcPeerCtx): Future[void] {.async.} =
|
||||
let cid = if address.leaf:
|
||||
address.treeCid
|
||||
else:
|
||||
address.cid
|
||||
trace "Sending wantBlock request to", peer = blockPeer.id, address
|
||||
await b.network.request.sendWantList(
|
||||
blockPeer.id,
|
||||
@ -177,42 +174,125 @@ proc monitorBlockHandle(b: BlockExcEngine, handle: Future[Block], address: Block
|
||||
|
||||
proc requestBlock*(
|
||||
b: BlockExcEngine,
|
||||
address: BlockAddress,
|
||||
cid: Cid,
|
||||
timeout = DefaultBlockTimeout): Future[Block] {.async.} =
|
||||
trace "Begin block request", cid, peers = b.peers.len
|
||||
|
||||
if b.pendingBlocks.isInFlight(cid):
|
||||
trace "Request handle already pending", cid
|
||||
return await b.pendingBlocks.getWantHandle(cid, timeout)
|
||||
|
||||
let
|
||||
blk = b.pendingBlocks.getWantHandle(cid, timeout)
|
||||
address = BlockAddress(leaf: false, cid: cid)
|
||||
|
||||
trace "Selecting peers who have", address
|
||||
var
|
||||
peers = b.peers.selectCheapest(address)
|
||||
|
||||
without blockPeer =? b.findCheapestPeerForBlock(peers):
|
||||
trace "No peers to request blocks from. Queue discovery...", cid
|
||||
b.discovery.queueFindBlocksReq(@[cid])
|
||||
return await blk
|
||||
|
||||
asyncSpawn b.monitorBlockHandle(blk, address, blockPeer.id)
|
||||
b.pendingBlocks.setInFlight(cid, true)
|
||||
await b.sendWantBlock(address, blockPeer)
|
||||
|
||||
codex_block_exchange_want_block_lists_sent.inc()
|
||||
|
||||
if (peers.len - 1) == 0:
|
||||
trace "No peers to send want list to", cid
|
||||
b.discovery.queueFindBlocksReq(@[cid])
|
||||
return await blk
|
||||
|
||||
await b.sendWantHave(address, blockPeer, toSeq(b.peers))
|
||||
|
||||
codex_block_exchange_want_have_lists_sent.inc()
|
||||
|
||||
return await blk
|
||||
|
||||
proc requestBlock(
|
||||
b: BlockExcEngine,
|
||||
treeReq: TreeReq,
|
||||
index: Natural,
|
||||
timeout = DefaultBlockTimeout
|
||||
): Future[Block] {.async.} =
|
||||
let blockFuture = b.pendingBlocks.getWantHandle(address, timeout)
|
||||
let address = BlockAddress(leaf: true, treeCid: treeReq.treeCid, index: index)
|
||||
|
||||
if b.pendingBlocks.isInFlight(address):
|
||||
let handleOrCid = treeReq.getWantHandleOrCid(index, timeout)
|
||||
if handleOrCid.resolved:
|
||||
without blk =? await b.localStore.getBlock(handleOrCid.cid), err:
|
||||
return await b.requestBlock(handleOrCid.cid, timeout)
|
||||
return blk
|
||||
|
||||
let blockFuture = handleOrCid.handle
|
||||
|
||||
if treeReq.isInFlight(index):
|
||||
return await blockFuture
|
||||
|
||||
let peers = b.peers.selectCheapest(address)
|
||||
if peers.len == 0:
|
||||
b.discovery.queueFindBlocksReq(@[address.cidOrTreeCid])
|
||||
b.discovery.queueFindBlocksReq(@[treeReq.treeCid])
|
||||
|
||||
let maybePeer =
|
||||
if peers.len > 0:
|
||||
peers[hash(address) mod peers.len].some
|
||||
peers[index mod peers.len].some
|
||||
elif b.peers.len > 0:
|
||||
toSeq(b.peers)[hash(address) mod b.peers.len].some
|
||||
toSeq(b.peers)[index mod b.peers.len].some
|
||||
else:
|
||||
BlockExcPeerCtx.none
|
||||
|
||||
if peer =? maybePeer:
|
||||
asyncSpawn b.monitorBlockHandle(blockFuture, address, peer.id)
|
||||
b.pendingBlocks.setInFlight(address)
|
||||
treeReq.trySetInFlight(index)
|
||||
await b.sendWantBlock(address, peer)
|
||||
codex_block_exchange_want_block_lists_sent.inc()
|
||||
codexBlockExchangeWantBlockListsSent.inc()
|
||||
await b.sendWantHave(address, peer, toSeq(b.peers))
|
||||
codex_block_exchange_want_have_lists_sent.inc()
|
||||
codexBlockExchangeWantHaveListsSent.inc()
|
||||
|
||||
return await blockFuture
|
||||
|
||||
proc requestBlock*(
|
||||
b: BlockExcEngine,
|
||||
cid: Cid,
|
||||
treeCid: Cid,
|
||||
index: Natural,
|
||||
merkleRoot: MultiHash,
|
||||
timeout = DefaultBlockTimeout
|
||||
): Future[Block] =
|
||||
b.requestBlock(BlockAddress.init(cid))
|
||||
without treeReq =? b.pendingBlocks.getOrPutTreeReq(treeCid, Natural.none, merkleRoot), err:
|
||||
raise err
|
||||
|
||||
return b.requestBlock(treeReq, index, timeout)
|
||||
|
||||
proc requestBlocks*(
|
||||
b: BlockExcEngine,
|
||||
treeCid: Cid,
|
||||
leavesCount: Natural,
|
||||
merkleRoot: MultiHash,
|
||||
timeout = DefaultBlockTimeout
|
||||
): ?!AsyncIter[Block] =
|
||||
without treeReq =? b.pendingBlocks.getOrPutTreeReq(treeCid, leavesCount.some, merkleRoot), err:
|
||||
return failure(err)
|
||||
|
||||
var
|
||||
iter = AsyncIter[Block]()
|
||||
index = 0
|
||||
|
||||
proc next(): Future[Block] =
|
||||
if index < leavesCount:
|
||||
let fut = b.requestBlock(treeReq, index, timeout)
|
||||
inc index
|
||||
if index >= leavesCount:
|
||||
iter.finished = true
|
||||
return fut
|
||||
else:
|
||||
let fut = newFuture[Block]("engine.requestBlocks")
|
||||
fut.fail(newException(CodexError, "No more elements for tree with cid " & $treeCid))
|
||||
return fut
|
||||
|
||||
iter.next = next
|
||||
return success(iter)
|
||||
|
||||
proc blockPresenceHandler*(
|
||||
b: BlockExcEngine,
|
||||
@ -287,12 +367,7 @@ proc resolveBlocks*(b: BlockExcEngine, blocksDelivery: seq[BlockDelivery]) {.asy
|
||||
|
||||
b.pendingBlocks.resolve(blocksDelivery)
|
||||
await b.scheduleTasks(blocksDelivery)
|
||||
var cids = initHashSet[Cid]()
|
||||
for bd in blocksDelivery:
|
||||
cids.incl(bd.blk.cid)
|
||||
if bd.address.leaf:
|
||||
cids.incl(bd.address.treeCid)
|
||||
b.discovery.queueProvideBlocksReq(cids.toSeq)
|
||||
b.discovery.queueProvideBlocksReq(blocksDelivery.mapIt( it.blk.cid ))
|
||||
|
||||
proc resolveBlocks*(b: BlockExcEngine, blocks: seq[Block]) {.async.} =
|
||||
await b.resolveBlocks(blocks.mapIt(BlockDelivery(blk: it, address: BlockAddress(leaf: false, cid: it.cid))))
|
||||
@ -310,69 +385,18 @@ proc payForBlocks(engine: BlockExcEngine,
|
||||
trace "Sending payment for blocks", price
|
||||
await sendPayment(peer.id, payment)
|
||||
|
||||
proc validateBlockDelivery(
|
||||
b: BlockExcEngine,
|
||||
bd: BlockDelivery
|
||||
): ?!void =
|
||||
if bd.address notin b.pendingBlocks:
|
||||
return failure("Received block is not currently a pending block")
|
||||
|
||||
if bd.address.leaf:
|
||||
without proof =? bd.proof:
|
||||
return failure("Missing proof")
|
||||
|
||||
if proof.index != bd.address.index:
|
||||
return failure("Proof index " & $proof.index & " doesn't match leaf index " & $bd.address.index)
|
||||
|
||||
without leaf =? bd.blk.cid.mhash.mapFailure, err:
|
||||
return failure("Unable to get mhash from cid for block, nested err: " & err.msg)
|
||||
|
||||
without treeRoot =? bd.address.treeCid.mhash.mapFailure, err:
|
||||
return failure("Unable to get mhash from treeCid for block, nested err: " & err.msg)
|
||||
|
||||
without verifyOutcome =? proof.verifyLeaf(leaf, treeRoot), err:
|
||||
return failure("Unable to verify proof for block, nested err: " & err.msg)
|
||||
|
||||
if not verifyOutcome:
|
||||
return failure("Provided inclusion proof is invalid")
|
||||
else: # not leaf
|
||||
if bd.address.cid != bd.blk.cid:
|
||||
return failure("Delivery cid " & $bd.address.cid & " doesn't match block cid " & $bd.blk.cid)
|
||||
|
||||
return success()
|
||||
|
||||
proc blocksDeliveryHandler*(
|
||||
b: BlockExcEngine,
|
||||
peer: PeerId,
|
||||
blocksDelivery: seq[BlockDelivery]) {.async.} =
|
||||
trace "Got blocks from peer", peer, len = blocksDelivery.len
|
||||
|
||||
var validatedBlocksDelivery: seq[BlockDelivery]
|
||||
for bd in blocksDelivery:
|
||||
logScope:
|
||||
peer = peer
|
||||
address = bd.address
|
||||
if isErr (await b.localStore.putBlock(bd.blk)):
|
||||
trace "Unable to store block", cid = bd.blk.cid
|
||||
|
||||
if err =? b.validateBlockDelivery(bd).errorOption:
|
||||
warn "Block validation failed", msg = err.msg
|
||||
continue
|
||||
|
||||
if err =? (await b.localStore.putBlock(bd.blk)).errorOption:
|
||||
error "Unable to store block", err = err.msg
|
||||
continue
|
||||
|
||||
if bd.address.leaf:
|
||||
without proof =? bd.proof:
|
||||
error "Proof expected for a leaf block delivery"
|
||||
continue
|
||||
if err =? (await b.localStore.putBlockCidAndProof(bd.address.treeCid, bd.address.index, bd.blk.cid, proof)).errorOption:
|
||||
error "Unable to store proof and cid for a block"
|
||||
continue
|
||||
|
||||
validatedBlocksDelivery.add(bd)
|
||||
|
||||
await b.resolveBlocks(validatedBlocksDelivery)
|
||||
codex_block_exchange_blocks_received.inc(validatedBlocksDelivery.len.int64)
|
||||
await b.resolveBlocks(blocksDelivery)
|
||||
codexBlockExchangeBlocksReceived.inc(blocksDelivery.len.int64)
|
||||
|
||||
let
|
||||
peerCtx = b.peers.get(peer)
|
||||
@ -401,11 +425,11 @@ proc wantListHandler*(
|
||||
|
||||
logScope:
|
||||
peer = peerCtx.id
|
||||
address = e.address
|
||||
# cid = e.cid
|
||||
wantType = $e.wantType
|
||||
|
||||
if idx < 0: # updating entry
|
||||
trace "Processing new want list entry"
|
||||
trace "Processing new want list entry", address = e.address
|
||||
|
||||
let
|
||||
have = await e.address in b.localStore
|
||||
@ -417,21 +441,21 @@ proc wantListHandler*(
|
||||
codex_block_exchange_want_have_lists_received.inc()
|
||||
|
||||
if not have and e.sendDontHave:
|
||||
trace "Adding dont have entry to presence response"
|
||||
trace "Adding dont have entry to presence response", address = e.address
|
||||
presence.add(
|
||||
BlockPresence(
|
||||
address: e.address,
|
||||
`type`: BlockPresenceType.DontHave,
|
||||
price: price))
|
||||
elif have and e.wantType == WantType.WantHave:
|
||||
trace "Adding have entry to presence response"
|
||||
trace "Adding have entry to presence response", address = e.address
|
||||
presence.add(
|
||||
BlockPresence(
|
||||
address: e.address,
|
||||
`type`: BlockPresenceType.Have,
|
||||
price: price))
|
||||
elif e.wantType == WantType.WantBlock:
|
||||
trace "Added entry to peer's want blocks list"
|
||||
trace "Added entry to peer's want blocks list", address = e.address
|
||||
peerCtx.peerWants.add(e)
|
||||
codex_block_exchange_want_block_lists_received.inc()
|
||||
else:
|
||||
@ -482,6 +506,18 @@ proc paymentHandler*(
|
||||
else:
|
||||
context.paymentChannel = engine.wallet.acceptChannel(payment).option
|
||||
|
||||
proc onTreeHandler(b: BlockExcEngine, tree: MerkleTree): Future[?!void] {.async.} =
|
||||
trace "Handling tree"
|
||||
|
||||
without treeBlk =? Block.new(tree.encode()), err:
|
||||
return failure(err)
|
||||
|
||||
if err =? (await b.localStore.putBlock(treeBlk)).errorOption:
|
||||
return failure("Unable to store merkle tree block " & $treeBlk.cid & ", nested err: " & err.msg)
|
||||
|
||||
return success()
|
||||
|
||||
|
||||
proc setupPeer*(b: BlockExcEngine, peer: PeerId) {.async.} =
|
||||
## Perform initial setup, such as want
|
||||
## list exchange
|
||||
@ -543,7 +579,7 @@ proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} =
|
||||
BlockDelivery(address: e.address, blk: blkAndProof[0], proof: blkAndProof[1].some)
|
||||
)
|
||||
else:
|
||||
(await b.localStore.getBlock(e.address)).map(
|
||||
(await b.localStore.getBlock(e.address.cid)).map(
|
||||
(blk: Block) => BlockDelivery(address: e.address, blk: blk, proof: MerkleProof.none)
|
||||
)
|
||||
|
||||
@ -563,7 +599,7 @@ proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} =
|
||||
blocksDelivery
|
||||
)
|
||||
|
||||
codex_block_exchange_blocks_sent.inc(blocksDelivery.len.int64)
|
||||
codexBlockExchangeBlocksSent.inc(blocksDelivery.len.int64)
|
||||
|
||||
trace "About to remove entries from peerWants", blocks = blocksDelivery.len, items = task.peerWants.len
|
||||
# Remove successfully sent blocks
|
||||
@ -644,6 +680,11 @@ proc new*(
|
||||
proc paymentHandler(peer: PeerId, payment: SignedState): Future[void] {.gcsafe.} =
|
||||
engine.paymentHandler(peer, payment)
|
||||
|
||||
proc onTree(tree: MerkleTree): Future[void] {.gcsafe, async.} =
|
||||
if err =? (await engine.onTreeHandler(tree)).errorOption:
|
||||
echo "Error handling a tree" & err.msg # TODO
|
||||
# error "Error handling a tree", msg = err.msg
|
||||
|
||||
network.handlers = BlockExcHandlers(
|
||||
onWantList: blockWantListHandler,
|
||||
onBlocksDelivery: blocksDeliveryHandler,
|
||||
@ -651,4 +692,6 @@ proc new*(
|
||||
onAccount: accountHandler,
|
||||
onPayment: paymentHandler)
|
||||
|
||||
pendingBlocks.onTree = onTree
|
||||
|
||||
return engine
|
||||
|
||||
@ -14,15 +14,20 @@ import pkg/upraises
|
||||
|
||||
push: {.upraises: [].}
|
||||
|
||||
import ../../blocktype
|
||||
import pkg/chronicles
|
||||
import pkg/questionable
|
||||
import pkg/questionable/options
|
||||
import pkg/questionable/results
|
||||
import pkg/chronos
|
||||
import pkg/libp2p
|
||||
import pkg/metrics
|
||||
import pkg/questionable/results
|
||||
|
||||
import ../protobuf/blockexc
|
||||
import ../../blocktype
|
||||
|
||||
import ../../merkletree
|
||||
import ../../utils
|
||||
|
||||
logScope:
|
||||
topics = "codex pendingblocks"
|
||||
@ -39,12 +44,121 @@ type
|
||||
inFlight*: bool
|
||||
startTime*: int64
|
||||
|
||||
LeafReq* = object
|
||||
case delivered*: bool
|
||||
of false:
|
||||
handle*: Future[Block]
|
||||
inFlight*: bool
|
||||
of true:
|
||||
leaf: MultiHash
|
||||
blkCid*: Cid
|
||||
|
||||
TreeReq* = ref object
|
||||
leaves*: Table[Natural, LeafReq]
|
||||
deliveredCount*: Natural
|
||||
leavesCount*: ?Natural
|
||||
treeRoot*: MultiHash
|
||||
treeCid*: Cid
|
||||
|
||||
TreeHandler* = proc(tree: MerkleTree): Future[void] {.gcsafe.}
|
||||
|
||||
PendingBlocksManager* = ref object of RootObj
|
||||
blocks*: Table[BlockAddress, BlockReq] # pending Block requests
|
||||
blocks*: Table[Cid, BlockReq] # pending Block requests
|
||||
trees*: Table[Cid, TreeReq]
|
||||
onTree*: TreeHandler
|
||||
|
||||
proc updatePendingBlockGauge(p: PendingBlocksManager) =
|
||||
codex_block_exchange_pending_block_requests.set(p.blocks.len.int64)
|
||||
|
||||
type
|
||||
BlockHandleOrCid = object
|
||||
case resolved*: bool
|
||||
of true:
|
||||
cid*: Cid
|
||||
else:
|
||||
handle*: Future[Block]
|
||||
|
||||
proc buildTree(treeReq: TreeReq): ?!MerkleTree =
|
||||
trace "Building a merkle tree from leaves", treeCid = treeReq.treeCid, leavesCount = treeReq.leavesCount
|
||||
|
||||
without leavesCount =? treeReq.leavesCount:
|
||||
return failure("Leaves count is none, cannot build a tree")
|
||||
|
||||
var builder = ? MerkleTreeBuilder.init(treeReq.treeRoot.mcodec)
|
||||
for i in 0..<leavesCount:
|
||||
treeReq.leaves.withValue(i, leafReq):
|
||||
if leafReq.delivered:
|
||||
? builder.addLeaf(leafReq.leaf)
|
||||
else:
|
||||
return failure("Expected all leaves to be delivered but leaf with index " & $i & " was not")
|
||||
do:
|
||||
return failure("Missing a leaf with index " & $i)
|
||||
|
||||
let tree = ? builder.build()
|
||||
|
||||
if tree.root != treeReq.treeRoot:
|
||||
return failure("Reconstructed tree root doesn't match the original tree root, tree cid is " & $treeReq.treeCid)
|
||||
|
||||
return success(tree)
|
||||
|
||||
proc checkIfAllDelivered(p: PendingBlocksManager, treeReq: TreeReq): void =
|
||||
if treeReq.deliveredCount.some == treeReq.leavesCount:
|
||||
without tree =? buildTree(treeReq), err:
|
||||
error "Error building a tree", msg = err.msg
|
||||
p.trees.del(treeReq.treeCid)
|
||||
return
|
||||
p.trees.del(treeReq.treeCid)
|
||||
try:
|
||||
asyncSpawn p.onTree(tree)
|
||||
except Exception as err:
|
||||
error "Exception when handling tree", msg = err.msg
|
||||
|
||||
proc getWantHandleOrCid*(
|
||||
treeReq: TreeReq,
|
||||
index: Natural,
|
||||
timeout = DefaultBlockTimeout
|
||||
): BlockHandleOrCid =
|
||||
treeReq.leaves.withValue(index, leafReq):
|
||||
if not leafReq.delivered:
|
||||
return BlockHandleOrCid(resolved: false, handle: leafReq.handle)
|
||||
else:
|
||||
return BlockHandleOrCid(resolved: true, cid: leafReq.blkCid)
|
||||
do:
|
||||
let leafReq = LeafReq(
|
||||
delivered: false,
|
||||
handle: newFuture[Block]("pendingBlocks.getWantHandleOrCid"),
|
||||
inFlight: false
|
||||
)
|
||||
treeReq.leaves[index] = leafReq
|
||||
return BlockHandleOrCid(resolved: false, handle: leafReq.handle)
|
||||
|
||||
proc getOrPutTreeReq*(
|
||||
p: PendingBlocksManager,
|
||||
treeCid: Cid,
|
||||
leavesCount = Natural.none, # has value when all leaves are expected to be delivered
|
||||
treeRoot: MultiHash
|
||||
): ?!TreeReq =
|
||||
p.trees.withValue(treeCid, treeReq):
|
||||
if treeReq.treeRoot != treeRoot:
|
||||
return failure("Unexpected root for tree with cid " & $treeCid)
|
||||
|
||||
if leavesCount == treeReq.leavesCount:
|
||||
return success(treeReq[])
|
||||
else:
|
||||
treeReq.leavesCount = treeReq.leavesCount.orElse(leavesCount)
|
||||
let res = success(treeReq[])
|
||||
p.checkIfAllDelivered(treeReq[])
|
||||
return res
|
||||
do:
|
||||
let treeReq = TreeReq(
|
||||
deliveredCount: 0,
|
||||
leavesCount: leavesCount,
|
||||
treeRoot: treeRoot,
|
||||
treeCid: treeCid
|
||||
)
|
||||
p.trees[treeCid] = treeReq
|
||||
return success(treeReq)
|
||||
|
||||
proc getWantHandle*(
|
||||
p: PendingBlocksManager,
|
||||
address: BlockAddress,
|
||||
@ -76,13 +190,14 @@ proc getWantHandle*(
|
||||
p.blocks.del(address)
|
||||
p.updatePendingBlockGauge()
|
||||
|
||||
proc getWantHandle*(
|
||||
p: PendingBlocksManager,
|
||||
cid: Cid,
|
||||
timeout = DefaultBlockTimeout,
|
||||
inFlight = false
|
||||
): Future[Block] =
|
||||
p.getWantHandle(BlockAddress.init(cid), timeout, inFlight)
|
||||
proc getOrComputeLeaf(mcodec: MultiCodec, blk: Block): ?!MultiHash =
|
||||
without mhash =? blk.cid.mhash.mapFailure, err:
|
||||
return MultiHash.digest($mcodec, blk.data).mapFailure
|
||||
|
||||
if mhash.mcodec == mcodec:
|
||||
return success(mhash)
|
||||
else:
|
||||
return MultiHash.digest($mcodec, blk.data).mapFailure
|
||||
|
||||
proc resolve*(
|
||||
p: PendingBlocksManager,
|
||||
@ -92,37 +207,87 @@ proc resolve*(
|
||||
##
|
||||
|
||||
for bd in blocksDelivery:
|
||||
p.blocks.withValue(bd.address, blockReq):
|
||||
trace "Resolving block", address = bd.address
|
||||
|
||||
if not blockReq.handle.finished:
|
||||
let
|
||||
startTime = blockReq.startTime
|
||||
stopTime = getMonoTime().ticks
|
||||
retrievalDurationUs = (stopTime - startTime) div 1000
|
||||
|
||||
blockReq.handle.complete(bd.blk)
|
||||
|
||||
codex_block_exchange_retrieval_time_us.set(retrievalDurationUs)
|
||||
trace "Block retrieval time", retrievalDurationUs, address = bd.address
|
||||
if not bd.address.leaf:
|
||||
if bd.address.cid == bd.blk.cid:
|
||||
p.blocks.withValue(bd.blk.cid, pending):
|
||||
if not pending.handle.completed:
|
||||
trace "Resolving block", cid = bd.blk.cid
|
||||
pending.handle.complete(bd.blk)
|
||||
let
|
||||
startTime = pending[].startTime
|
||||
stopTime = getMonoTime().ticks
|
||||
retrievalDurationUs = (stopTime - startTime) div 1000
|
||||
codexBlockExchangeRetrievalTimeUs.set(retrievalDurationUs)
|
||||
trace "Block retrieval time", retrievalDurationUs
|
||||
else:
|
||||
trace "Block handle already finished", address = bd.address
|
||||
do:
|
||||
warn "Attempting to resolve block that's not currently a pending block", address = bd.address
|
||||
warn "Delivery cid doesn't match block cid", deliveryCid = bd.address.cid, blockCid = bd.blk.cid
|
||||
|
||||
# resolve any pending blocks
|
||||
if bd.address.leaf:
|
||||
p.trees.withValue(bd.address.treeCid, treeReq):
|
||||
treeReq.leaves.withValue(bd.address.index, leafReq):
|
||||
if not leafReq.delivered:
|
||||
if proof =? bd.proof:
|
||||
if not proof.index == bd.address.index:
|
||||
warn "Proof index doesn't match leaf index", address = bd.address, proofIndex = proof.index
|
||||
continue
|
||||
without mhash =? bd.blk.cid.mhash.mapFailure, err:
|
||||
error "Unable to get mhash from cid for block", address = bd.address, msg = err.msg
|
||||
continue
|
||||
without verifySuccess =? proof.verifyLeaf(mhash, treeReq.treeRoot), err:
|
||||
error "Unable to verify proof for block", address = bd.address, msg = err.msg
|
||||
continue
|
||||
if verifySuccess:
|
||||
without leaf =? getOrComputeLeaf(treeReq.treeRoot.mcodec, bd.blk), err:
|
||||
error "Unable to get or calculate hash for block", address = bd.address
|
||||
continue
|
||||
|
||||
leafReq.handle.complete(bd.blk)
|
||||
leafReq[] = LeafReq(delivered: true, blkCid: bd.blk.cid, leaf: leaf)
|
||||
|
||||
inc treeReq.deliveredCount
|
||||
|
||||
p.checkIfAllDelivered(treeReq[])
|
||||
else:
|
||||
warn "Invalid proof provided for a block", address = bd.address
|
||||
else:
|
||||
warn "Missing proof for a block", address = bd.address
|
||||
else:
|
||||
trace "Ignore veryfing proof for already delivered block", address = bd.address
|
||||
|
||||
proc setInFlight*(p: PendingBlocksManager,
|
||||
address: BlockAddress,
|
||||
inFlight = true) =
|
||||
p.blocks.withValue(address, pending):
|
||||
pending[].inFlight = inFlight
|
||||
trace "Setting inflight", address, inFlight = pending[].inFlight
|
||||
p.blocks.withValue(cid, pending):
|
||||
pending.inFlight = inFlight
|
||||
trace "Setting inflight", cid, inFlight = pending.inFlight
|
||||
|
||||
proc trySetInFlight*(treeReq: TreeReq,
|
||||
index: Natural,
|
||||
inFlight = true) =
|
||||
treeReq.leaves.withValue(index, leafReq):
|
||||
if not leafReq.delivered:
|
||||
leafReq.inFlight = inFlight
|
||||
trace "Setting inflight", treeCid = treeReq.treeCid, index, inFlight = inFlight
|
||||
|
||||
proc isInFlight*(treeReq: TreeReq,
|
||||
index: Natural
|
||||
): bool =
|
||||
treeReq.leaves.withValue(index, leafReq):
|
||||
return (not leafReq.delivered) and leafReq.inFlight
|
||||
do:
|
||||
return false
|
||||
|
||||
proc isInFlight*(p: PendingBlocksManager,
|
||||
address: BlockAddress,
|
||||
): bool =
|
||||
p.blocks.withValue(address, pending):
|
||||
result = pending[].inFlight
|
||||
trace "Getting inflight", address, inFlight = result
|
||||
p.blocks.withValue(cid, pending):
|
||||
result = pending.inFlight
|
||||
trace "Getting inflight", cid, inFlight = result
|
||||
|
||||
proc pending*(p: PendingBlocksManager, cid: Cid): bool =
|
||||
cid in p.blocks
|
||||
|
||||
proc contains*(p: PendingBlocksManager, cid: Cid): bool =
|
||||
BlockAddress.init(cid) in p.blocks
|
||||
@ -147,13 +312,33 @@ iterator wantListCids*(p: PendingBlocksManager): Cid =
|
||||
yieldedCids.incl(cid)
|
||||
yield cid
|
||||
|
||||
iterator wantList*(p: PendingBlocksManager): BlockAddress =
|
||||
for k in p.blocks.keys:
|
||||
yield BlockAddress(leaf: false, cid: k)
|
||||
|
||||
for treeCid, treeReq in p.trees.pairs:
|
||||
for index, leafReq in treeReq.leaves.pairs:
|
||||
if not leafReq.delivered:
|
||||
yield BlockAddress(leaf: true, treeCid: treeCid, index: index)
|
||||
|
||||
iterator wantListBlockCids*(p: PendingBlocksManager): Cid =
|
||||
for k in p.blocks.keys:
|
||||
yield k
|
||||
|
||||
iterator wantListCids*(p: PendingBlocksManager): Cid =
|
||||
for k in p.blocks.keys:
|
||||
yield k
|
||||
|
||||
for k in p.trees.keys:
|
||||
yield k
|
||||
|
||||
iterator wantHandles*(p: PendingBlocksManager): Future[Block] =
|
||||
for v in p.blocks.values:
|
||||
yield v.handle
|
||||
|
||||
|
||||
proc wantListLen*(p: PendingBlocksManager): int =
|
||||
p.blocks.len
|
||||
p.blocks.len + p.trees.len
|
||||
|
||||
func len*(p: PendingBlocksManager): int =
|
||||
p.blocks.len
|
||||
|
||||
@ -21,15 +21,12 @@ export Wantlist, WantType, WantListEntry
|
||||
export BlockDelivery, BlockPresenceType, BlockPresence
|
||||
export AccountMessage, StateChannelUpdate
|
||||
|
||||
proc hash*(a: BlockAddress): Hash =
|
||||
if a.leaf:
|
||||
let data = a.treeCid.data.buffer & @(a.index.uint64.toBytesBE)
|
||||
proc hash*(e: WantListEntry): Hash =
|
||||
if e.address.leaf:
|
||||
let data = e.address.treeCid.data.buffer & @(e.address.index.uint64.toBytesBE)
|
||||
hash(data)
|
||||
else:
|
||||
hash(a.cid.data.buffer)
|
||||
|
||||
proc hash*(e: WantListEntry): Hash =
|
||||
hash(e.address)
|
||||
hash(e.address.cid.data.buffer)
|
||||
|
||||
proc contains*(a: openArray[WantListEntry], b: BlockAddress): bool =
|
||||
## Convenience method to check for peer precense
|
||||
|
||||
@ -78,12 +78,6 @@ proc cidOrTreeCid*(a: BlockAddress): Cid =
|
||||
proc address*(b: Block): BlockAddress =
|
||||
BlockAddress(leaf: false, cid: b.cid)
|
||||
|
||||
proc init*(_: type BlockAddress, cid: Cid): BlockAddress =
|
||||
BlockAddress(leaf: false, cid: cid)
|
||||
|
||||
proc init*(_: type BlockAddress, treeCid: Cid, index: Natural): BlockAddress =
|
||||
BlockAddress(leaf: true, treeCid: treeCid, index: index)
|
||||
|
||||
proc `$`*(b: Block): string =
|
||||
result &= "cid: " & $b.cid
|
||||
result &= "\ndata: " & string.fromBytes(b.data)
|
||||
@ -140,17 +134,17 @@ proc emptyCid*(version: CidVersion, hcodec: MultiCodec, dcodec: MultiCodec): ?!C
|
||||
DagPB = multiCodec("dag-pb")
|
||||
DagJson = multiCodec("dag-json")
|
||||
|
||||
var index {.global, threadvar.}: Table[(CidVersion, MultiCodec, MultiCodec), Cid]
|
||||
var index {.global, threadvar.}: Table[(CIDv0, Sha256, DagPB), Result[Cid, CidError]]
|
||||
once:
|
||||
index = {
|
||||
# source https://ipld.io/specs/codecs/dag-pb/fixtures/cross-codec/#dagpb_empty
|
||||
(CIDv0, Sha256, DagPB): ? Cid.init("QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n").mapFailure,
|
||||
(CIDv1, Sha256, DagPB): ? Cid.init("zdj7Wkkhxcu2rsiN6GUyHCLsSLL47kdUNfjbFqBUUhMFTZKBi").mapFailure, # base36: bafybeihdwdcefgh4dqkjv67uzcmw7ojee6xedzdetojuzjevtenxquvyku
|
||||
(CIDv1, Sha256, DagJson): ? Cid.init("z4EBG9jGUWMVxX9deANWX7iPyExLswe2akyF7xkNAaYgugvnhmP").mapFailure, # base36: baguqeera6mfu3g6n722vx7dbitpnbiyqnwah4ddy4b5c3rwzxc5pntqcupta
|
||||
(CIDv1, Sha256, Raw): ? Cid.init("zb2rhmy65F3REf8SZp7De11gxtECBGgUKaLdiDj7MCGCHxbDW").mapFailure,
|
||||
(CIDv0, Sha256, DagPB): Cid.init("QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n"),
|
||||
(CIDv1, Sha256, DagPB): Cid.init("zdj7Wkkhxcu2rsiN6GUyHCLsSLL47kdUNfjbFqBUUhMFTZKBi"), # base36: bafybeihdwdcefgh4dqkjv67uzcmw7ojee6xedzdetojuzjevtenxquvyku
|
||||
(CIDv1, Sha256, DagJson): Cid.init("z4EBG9jGUWMVxX9deANWX7iPyExLswe2akyF7xkNAaYgugvnhmP"), # base36: baguqeera6mfu3g6n722vx7dbitpnbiyqnwah4ddy4b5c3rwzxc5pntqcupta
|
||||
(CIDv1, Sha256, Raw): Cid.init("zb2rhmy65F3REf8SZp7De11gxtECBGgUKaLdiDj7MCGCHxbDW"),
|
||||
}.toTable
|
||||
|
||||
index[(version, hcodec, dcodec)].catch
|
||||
index[(version, hcodec, dcodec)].catch.flatMap((a: Result[Cid, CidError]) => a.mapFailure)
|
||||
|
||||
proc emptyDigest*(version: CidVersion, hcodec: MultiCodec, dcodec: MultiCodec): ?!MultiHash =
|
||||
emptyCid(version, hcodec, dcodec)
|
||||
|
||||
@ -231,6 +231,8 @@ proc new*(
|
||||
wallet = WalletRef.new(EthPrivateKey.random())
|
||||
network = BlockExcNetwork.new(switch)
|
||||
|
||||
treeReader = TreeReader.new()
|
||||
|
||||
repoData = case config.repoKind
|
||||
of repoFS: Datastore(FSDatastore.new($config.dataDir, depth = 5)
|
||||
.expect("Should create repo file data store!"))
|
||||
|
||||
@ -12,7 +12,6 @@ import pkg/upraises
|
||||
push: {.upraises: [].}
|
||||
|
||||
import std/sequtils
|
||||
import std/sugar
|
||||
|
||||
import pkg/chronos
|
||||
import pkg/chronicles
|
||||
@ -24,7 +23,6 @@ import ../merkletree
|
||||
import ../stores
|
||||
import ../blocktype as bt
|
||||
import ../utils
|
||||
import ../utils/asynciter
|
||||
|
||||
import pkg/stew/byteutils
|
||||
|
||||
@ -70,245 +68,142 @@ type
|
||||
decoderProvider*: DecoderProvider
|
||||
store*: BlockStore
|
||||
|
||||
EncodingParams = object
|
||||
ecK: int
|
||||
ecM: int
|
||||
rounded: int
|
||||
steps: int
|
||||
blocksCount: int
|
||||
|
||||
func indexToPos(steps, idx, step: int): int {.inline.} =
|
||||
## Convert an index to a position in the encoded
|
||||
## dataset
|
||||
## `idx` - the index to convert
|
||||
## `step` - the current step
|
||||
## `pos` - the position in the encoded dataset
|
||||
proc encode*(
|
||||
self: Erasure,
|
||||
manifest: Manifest,
|
||||
blocks: int,
|
||||
parity: int
|
||||
): Future[?!Manifest] {.async.} =
|
||||
## Encode a manifest into one that is erasure protected.
|
||||
##
|
||||
|
||||
(idx - step) div steps
|
||||
|
||||
proc getPendingBlocks(
|
||||
self: Erasure,
|
||||
manifest: Manifest,
|
||||
indicies: seq[int]): AsyncIter[(?!bt.Block, int)] =
|
||||
## Get pending blocks iterator
|
||||
##
|
||||
|
||||
var
|
||||
# request blocks from the store
|
||||
pendingBlocks = indicies.map( (i: int) =>
|
||||
self.store.getBlock(BlockAddress.init(manifest.treeCid, i)).map((r: ?!bt.Block) => (r, i)) # Get the data blocks (first K)
|
||||
)
|
||||
|
||||
proc isFinished(): bool = pendingBlocks.len == 0
|
||||
|
||||
proc genNext(): Future[(?!bt.Block, int)] {.async.} =
|
||||
let completedFut = await one(pendingBlocks)
|
||||
if (let i = pendingBlocks.find(completedFut); i >= 0):
|
||||
pendingBlocks.del(i)
|
||||
return await completedFut
|
||||
else:
|
||||
let (_, index) = await completedFut
|
||||
raise newException(CatchableError, "Future for block id not found, tree cid: " & $manifest.treeCid & ", index: " & $index)
|
||||
|
||||
Iter.new(genNext, isFinished)
|
||||
|
||||
proc prepareEncodingData(
|
||||
self: Erasure,
|
||||
manifest: Manifest,
|
||||
params: EncodingParams,
|
||||
step: int,
|
||||
data: ref seq[seq[byte]],
|
||||
cids: ref seq[Cid],
|
||||
emptyBlock: seq[byte]): Future[?!int] {.async.} =
|
||||
## Prepare data for encoding
|
||||
##
|
||||
|
||||
let
|
||||
indicies = toSeq(countup(step, params.rounded - 1, params.steps))
|
||||
pendingBlocksIter = self.getPendingBlocks(manifest, indicies.filterIt(it < manifest.blocksCount))
|
||||
|
||||
var resolved = 0
|
||||
for fut in pendingBlocksIter:
|
||||
let (blkOrErr, idx) = await fut
|
||||
without blk =? blkOrErr, err:
|
||||
warn "Failed retreiving a block", treeCid = manifest.treeCid, idx, msg = err.msg
|
||||
continue
|
||||
|
||||
let pos = indexToPos(params.steps, idx, step)
|
||||
shallowCopy(data[pos], if blk.isEmpty: emptyBlock else: blk.data)
|
||||
cids[idx] = blk.cid
|
||||
|
||||
resolved.inc()
|
||||
|
||||
for idx in indicies.filterIt(it >= manifest.blocksCount):
|
||||
let pos = indexToPos(params.steps, idx, step)
|
||||
trace "Padding with empty block", idx
|
||||
shallowCopy(data[pos], emptyBlock)
|
||||
without emptyBlockCid =? emptyCid(manifest.version, manifest.hcodec, manifest.codec), err:
|
||||
return failure(err)
|
||||
cids[idx] = emptyBlockCid
|
||||
|
||||
success(resolved)
|
||||
|
||||
proc prepareDecodingData(
|
||||
self: Erasure,
|
||||
encoded: Manifest,
|
||||
step: int,
|
||||
data: ref seq[seq[byte]],
|
||||
parityData: ref seq[seq[byte]],
|
||||
cids: ref seq[Cid],
|
||||
emptyBlock: seq[byte]): Future[?!(int, int)] {.async.} =
|
||||
## Prepare data for decoding
|
||||
## `encoded` - the encoded manifest
|
||||
## `step` - the current step
|
||||
## `data` - the data to be prepared
|
||||
## `parityData` - the parityData to be prepared
|
||||
## `cids` - cids of prepared data
|
||||
## `emptyBlock` - the empty block to be used for padding
|
||||
##
|
||||
|
||||
let
|
||||
indicies = toSeq(countup(step, encoded.blocksCount - 1, encoded.steps))
|
||||
pendingBlocksIter = self.getPendingBlocks(encoded, indicies)
|
||||
|
||||
var
|
||||
dataPieces = 0
|
||||
parityPieces = 0
|
||||
resolved = 0
|
||||
for fut in pendingBlocksIter:
|
||||
# Continue to receive blocks until we have just enough for decoding
|
||||
# or no more blocks can arrive
|
||||
if resolved >= encoded.ecK:
|
||||
break
|
||||
|
||||
let (blkOrErr, idx) = await fut
|
||||
without blk =? blkOrErr, err:
|
||||
trace "Failed retreiving a block", idx, treeCid = encoded.treeCid, msg = err.msg
|
||||
continue
|
||||
|
||||
let
|
||||
pos = indexToPos(encoded.steps, idx, step)
|
||||
|
||||
logScope:
|
||||
cid = blk.cid
|
||||
idx = idx
|
||||
pos = pos
|
||||
step = step
|
||||
empty = blk.isEmpty
|
||||
|
||||
cids[idx] = blk.cid
|
||||
if idx >= encoded.rounded:
|
||||
trace "Retrieved parity block"
|
||||
shallowCopy(parityData[pos - encoded.ecK], if blk.isEmpty: emptyBlock else: blk.data)
|
||||
parityPieces.inc
|
||||
else:
|
||||
trace "Retrieved data block"
|
||||
shallowCopy(data[pos], if blk.isEmpty: emptyBlock else: blk.data)
|
||||
dataPieces.inc
|
||||
|
||||
resolved.inc
|
||||
|
||||
return success (dataPieces, parityPieces)
|
||||
|
||||
proc init(_: type EncodingParams, manifest: Manifest, ecK: int, ecM: int): ?!EncodingParams =
|
||||
if ecK > manifest.blocksCount:
|
||||
return failure("Unable to encode manifest, not enough blocks, ecK = " & $ecK & ", blocksCount = " & $manifest.blocksCount)
|
||||
|
||||
let
|
||||
rounded = roundUp(manifest.blocksCount, ecK)
|
||||
steps = divUp(manifest.blocksCount, ecK)
|
||||
blocksCount = rounded + (steps * ecM)
|
||||
|
||||
EncodingParams(
|
||||
ecK: ecK,
|
||||
ecM: ecM,
|
||||
rounded: rounded,
|
||||
steps: steps,
|
||||
blocksCount: blocksCount
|
||||
).success
|
||||
|
||||
proc encodeData(
|
||||
self: Erasure,
|
||||
manifest: Manifest,
|
||||
params: EncodingParams
|
||||
): Future[?!Manifest] {.async.} =
|
||||
## Encode blocks pointed to by the protected manifest
|
||||
##
|
||||
## `manifest` - the manifest to encode
|
||||
## `manifest` - the original manifest to be encoded
|
||||
## `blocks` - the number of blocks to be encoded - K
|
||||
## `parity` - the number of parity blocks to generate - M
|
||||
##
|
||||
|
||||
logScope:
|
||||
steps = params.steps
|
||||
rounded_blocks = params.rounded
|
||||
blocks_count = params.blocksCount
|
||||
ecK = params.ecK
|
||||
ecM = params.ecM
|
||||
original_cid = manifest.cid.get()
|
||||
original_len = manifest.blocksCount
|
||||
blocks = blocks
|
||||
parity = parity
|
||||
|
||||
trace "Erasure coding manifest", blocks, parity
|
||||
|
||||
without tree =? await self.store.getTree(manifest.treeCid), err:
|
||||
return err.failure
|
||||
|
||||
let leaves = tree.leaves
|
||||
|
||||
let
|
||||
rounded = roundUp(manifest.blocksCount, blocks)
|
||||
steps = divUp(manifest.blocksCount, blocks)
|
||||
blocksCount = rounded + (steps * parity)
|
||||
|
||||
var cids = newSeq[Cid](blocksCount)
|
||||
|
||||
|
||||
# copy original manifest blocks
|
||||
for i in 0..<rounded:
|
||||
if i < manifest.blocksCount:
|
||||
without cid =? Cid.init(manifest.version, manifest.codec, leaves[i]).mapFailure, err:
|
||||
return err.failure
|
||||
cids[i] = cid
|
||||
else:
|
||||
without cid =? emptyCid(manifest.version, manifest.hcodec, manifest.codec), err:
|
||||
return err.failure
|
||||
cids[i] = cid
|
||||
|
||||
logScope:
|
||||
steps = steps
|
||||
rounded_blocks = rounded
|
||||
new_manifest = blocksCount
|
||||
|
||||
var
|
||||
cids = seq[Cid].new()
|
||||
encoder = self.encoderProvider(manifest.blockSize.int, params.ecK, params.ecM)
|
||||
emptyBlock = newSeq[byte](manifest.blockSize.int)
|
||||
|
||||
cids[].setLen(params.blocksCount)
|
||||
|
||||
encoder = self.encoderProvider(manifest.blockSize.int, blocks, parity)
|
||||
var toadd = 0
|
||||
var tocount = 0
|
||||
var maxidx = 0
|
||||
try:
|
||||
for step in 0..<params.steps:
|
||||
for i in 0..<steps:
|
||||
# TODO: Don't allocate a new seq every time, allocate once and zero out
|
||||
var
|
||||
data = seq[seq[byte]].new() # number of blocks to encode
|
||||
parityData = newSeqWith[seq[byte]](params.ecM, newSeq[byte](manifest.blockSize.int))
|
||||
data = newSeq[seq[byte]](blocks) # number of blocks to encode
|
||||
parityData = newSeqWith[seq[byte]](parity, newSeq[byte](manifest.blockSize.int))
|
||||
# calculate block indexes to retrieve
|
||||
blockIdx = toSeq(countup(i, rounded - 1, steps))
|
||||
# request all blocks from the store
|
||||
dataBlocks = await allFinished(
|
||||
blockIdx.mapIt( self.store.getBlock(cids[it]) ))
|
||||
|
||||
data[].setLen(params.ecK)
|
||||
# TODO: this is a tight blocking loop so we sleep here to allow
|
||||
# other events to be processed, this should be addressed
|
||||
# by threading
|
||||
await sleepAsync(10.millis)
|
||||
|
||||
without resolved =?
|
||||
(await self.prepareEncodingData(manifest, params, step, data, cids, emptyBlock)), err:
|
||||
trace "Unable to prepare data", error = err.msg
|
||||
return failure(err)
|
||||
for j in 0..<blocks:
|
||||
let idx = blockIdx[j]
|
||||
if idx < manifest.blocksCount:
|
||||
without blk =? (await dataBlocks[j]), error:
|
||||
trace "Unable to retrieve block", error = error.msg
|
||||
return failure error
|
||||
|
||||
trace "Erasure coding data", data = data[].len, parity = parityData.len
|
||||
trace "Encoding block", cid = blk.cid, pos = idx
|
||||
shallowCopy(data[j], blk.data)
|
||||
else:
|
||||
trace "Padding with empty block", pos = idx
|
||||
data[j] = newSeq[byte](manifest.blockSize.int)
|
||||
|
||||
if (
|
||||
let res = encoder.encode(data[], parityData);
|
||||
res.isErr):
|
||||
trace "Erasure coding data", data = data.len, parity = parityData.len
|
||||
|
||||
let res = encoder.encode(data, parityData);
|
||||
if res.isErr:
|
||||
trace "Unable to encode manifest!", error = $res.error
|
||||
return failure($res.error)
|
||||
|
||||
var idx = params.rounded + step
|
||||
for j in 0..<params.ecM:
|
||||
for j in 0..<parity:
|
||||
let idx = rounded + blockIdx[j]
|
||||
without blk =? bt.Block.new(parityData[j]), error:
|
||||
trace "Unable to create parity block", err = error.msg
|
||||
return failure(error)
|
||||
|
||||
trace "Adding parity block", cid = blk.cid, idx
|
||||
trace "Adding parity block", cid = blk.cid, pos = idx
|
||||
cids[idx] = blk.cid
|
||||
maxidx = max(maxidx, idx)
|
||||
toadd = toadd + blk.data.len
|
||||
tocount.inc
|
||||
if isErr (await self.store.putBlock(blk)):
|
||||
trace "Unable to store block!", cid = blk.cid
|
||||
return failure("Unable to store block!")
|
||||
idx.inc(params.steps)
|
||||
|
||||
without tree =? MerkleTree.init(cids[]), err:
|
||||
without var builder =? MerkleTreeBuilder.init(manifest.hcodec), err:
|
||||
return failure(err)
|
||||
|
||||
without treeCid =? tree.rootCid, err:
|
||||
for cid in cids:
|
||||
without mhash =? cid.mhash.mapFailure, err:
|
||||
return err.failure
|
||||
if err =? builder.addLeaf(mhash).errorOption:
|
||||
return failure(err)
|
||||
|
||||
without tree =? builder.build(), err:
|
||||
return failure(err)
|
||||
|
||||
without treeBlk =? bt.Block.new(tree.encode()), err:
|
||||
return failure(err)
|
||||
|
||||
if err =? (await self.store.putAllProofs(tree)).errorOption:
|
||||
return failure(err)
|
||||
if err =? (await self.store.putBlock(treeBlk)).errorOption:
|
||||
return failure("Unable to store merkle tree block " & $treeBlk.cid & ", nested err: " & err.msg)
|
||||
|
||||
let encodedManifest = Manifest.new(
|
||||
let encoded = Manifest.new(
|
||||
manifest = manifest,
|
||||
treeCid = treeCid,
|
||||
datasetSize = (manifest.blockSize.int * params.blocksCount).NBytes,
|
||||
ecK = params.ecK,
|
||||
ecM = params.ecM
|
||||
treeCid = treeBlk.cid,
|
||||
treeRoot = tree.root,
|
||||
datasetSize = (manifest.blockSize.int * blocksCount).NBytes,
|
||||
ecK = blocks,
|
||||
ecM = parity
|
||||
)
|
||||
|
||||
return encodedManifest.success
|
||||
return encoded.success
|
||||
|
||||
except CancelledError as exc:
|
||||
trace "Erasure coding encoding cancelled"
|
||||
raise exc # cancellation needs to be propagated
|
||||
@ -318,26 +213,6 @@ proc encodeData(
|
||||
finally:
|
||||
encoder.release()
|
||||
|
||||
proc encode*(
|
||||
self: Erasure,
|
||||
manifest: Manifest,
|
||||
blocks: int,
|
||||
parity: int): Future[?!Manifest] {.async.} =
|
||||
## Encode a manifest into one that is erasure protected.
|
||||
##
|
||||
## `manifest` - the original manifest to be encoded
|
||||
## `blocks` - the number of blocks to be encoded - K
|
||||
## `parity` - the number of parity blocks to generate - M
|
||||
##
|
||||
|
||||
without params =? EncodingParams.init(manifest, blocks, parity), err:
|
||||
return failure(err)
|
||||
|
||||
without encodedManifest =? await self.encodeData(manifest, params), err:
|
||||
return failure(err)
|
||||
|
||||
return success encodedManifest
|
||||
|
||||
proc decode*(
|
||||
self: Erasure,
|
||||
encoded: Manifest
|
||||
@ -358,48 +233,78 @@ proc decode*(
|
||||
cids = seq[Cid].new()
|
||||
recoveredIndices = newSeq[int]()
|
||||
decoder = self.decoderProvider(encoded.blockSize.int, encoded.ecK, encoded.ecM)
|
||||
emptyBlock = newSeq[byte](encoded.blockSize.int)
|
||||
|
||||
cids[].setLen(encoded.blocksCount)
|
||||
try:
|
||||
for step in 0..<encoded.steps:
|
||||
for i in 0..<encoded.steps:
|
||||
# TODO: Don't allocate a new seq every time, allocate once and zero out
|
||||
let
|
||||
# calculate block indexes to retrieve
|
||||
blockIdx = toSeq(countup(i, encoded.blocksCount - 1, encoded.steps))
|
||||
# request all blocks from the store
|
||||
pendingBlocks = blockIdx.mapIt(
|
||||
self.store.getBlock(encoded.treeCid, it, encoded.treeRoot) # Get the data blocks (first K)
|
||||
)
|
||||
|
||||
# TODO: this is a tight blocking loop so we sleep here to allow
|
||||
# other events to be processed, this should be addressed
|
||||
# by threading
|
||||
await sleepAsync(10.millis)
|
||||
|
||||
var
|
||||
data = seq[seq[byte]].new()
|
||||
parityData = seq[seq[byte]].new()
|
||||
data = newSeq[seq[byte]](encoded.ecK) # number of blocks to encode
|
||||
parityData = newSeq[seq[byte]](encoded.ecM)
|
||||
recovered = newSeqWith[seq[byte]](encoded.ecK, newSeq[byte](encoded.blockSize.int))
|
||||
idxPendingBlocks = pendingBlocks # copy futures to make using with `one` easier
|
||||
emptyBlock = newSeq[byte](encoded.blockSize.int)
|
||||
resolved = 0
|
||||
|
||||
data[].setLen(encoded.ecK) # set len to K
|
||||
parityData[].setLen(encoded.ecM) # set len to M
|
||||
while true:
|
||||
# Continue to receive blocks until we have just enough for decoding
|
||||
# or no more blocks can arrive
|
||||
if (resolved >= encoded.ecK) or (idxPendingBlocks.len == 0):
|
||||
break
|
||||
|
||||
without (dataPieces, parityPieces) =?
|
||||
(await self.prepareDecodingData(encoded, step, data, parityData, cids, emptyBlock)), err:
|
||||
trace "Unable to prepare data", error = err.msg
|
||||
return failure(err)
|
||||
let
|
||||
done = await one(idxPendingBlocks)
|
||||
idx = pendingBlocks.find(done)
|
||||
|
||||
idxPendingBlocks.del(idxPendingBlocks.find(done))
|
||||
|
||||
without blk =? (await done), error:
|
||||
trace "Failed retrieving block", error = error.msg
|
||||
continue
|
||||
|
||||
if idx >= encoded.ecK:
|
||||
trace "Retrieved parity block", cid = blk.cid, idx
|
||||
shallowCopy(parityData[idx - encoded.ecK], if blk.isEmpty: emptyBlock else: blk.data)
|
||||
else:
|
||||
trace "Retrieved data block", cid = blk.cid, idx
|
||||
shallowCopy(data[idx], if blk.isEmpty: emptyBlock else: blk.data)
|
||||
|
||||
resolved.inc
|
||||
|
||||
let
|
||||
dataPieces = data.filterIt( it.len > 0 ).len
|
||||
parityPieces = parityData.filterIt( it.len > 0 ).len
|
||||
|
||||
if dataPieces >= encoded.ecK:
|
||||
trace "Retrieved all the required data blocks"
|
||||
trace "Retrieved all the required data blocks", data = dataPieces, parity = parityPieces
|
||||
continue
|
||||
|
||||
trace "Erasure decoding data"
|
||||
trace "Erasure decoding data", data = dataPieces, parity = parityPieces
|
||||
if (
|
||||
let err = decoder.decode(data[], parityData[], recovered);
|
||||
let err = decoder.decode(data, parityData, recovered);
|
||||
err.isErr):
|
||||
trace "Unable to decode data!", err = $err.error
|
||||
trace "Unable to decode manifest!", err = $err.error
|
||||
return failure($err.error)
|
||||
|
||||
for i in 0..<encoded.ecK:
|
||||
let idx = i * encoded.steps + step
|
||||
if data[i].len <= 0 and not cids[idx].isEmpty:
|
||||
if data[i].len <= 0:
|
||||
without blk =? bt.Block.new(recovered[i]), error:
|
||||
trace "Unable to create block!", exc = error.msg
|
||||
return failure(error)
|
||||
|
||||
trace "Recovered block", cid = blk.cid, index = i
|
||||
trace "Recovered block", cid = blk.cid
|
||||
if isErr (await self.store.putBlock(blk)):
|
||||
trace "Unable to store block!", cid = blk.cid
|
||||
return failure("Unable to store block!")
|
||||
@ -415,22 +320,6 @@ proc decode*(
|
||||
finally:
|
||||
decoder.release()
|
||||
|
||||
without tree =? MerkleTree.init(cids[0..<encoded.originalBlocksCount]), err:
|
||||
return failure(err)
|
||||
|
||||
without treeCid =? tree.rootCid, err:
|
||||
return failure(err)
|
||||
|
||||
if treeCid != encoded.originalTreeCid:
|
||||
return failure("Original tree root differs from the tree root computed out of recovered data")
|
||||
|
||||
let idxIter = Iter
|
||||
.fromItems(recoveredIndices)
|
||||
.filter((i: int) => i < tree.leavesCount)
|
||||
|
||||
if err =? (await self.store.putSomeProofs(tree, idxIter)).errorOption:
|
||||
return failure(err)
|
||||
|
||||
let decoded = Manifest.new(encoded)
|
||||
|
||||
return decoded.success
|
||||
|
||||
@ -45,27 +45,31 @@ proc encode*(_: DagPBCoder, manifest: Manifest): ?!seq[byte] =
|
||||
# optional uint32 originalDatasetSize = 4; # size of the original dataset
|
||||
# }
|
||||
# Message Header {
|
||||
# optional bytes treeCid = 1; # cid (root) of the tree
|
||||
# optional uint32 blockSize = 2; # size of a single block
|
||||
# optional uint64 datasetSize = 3; # size of the dataset
|
||||
# optional ErasureInfo erasure = 4; # erasure coding info
|
||||
# optional bytes treeCid = 1; # the cid of the tree
|
||||
# optional bytes treeRoot = 2; # the root hash of the tree
|
||||
# optional uint32 blockSize = 3; # size of a single block
|
||||
# optional uint64 originalBytes = 4;# exact file size
|
||||
# optional ErasureInfo erasure = 5; # erasure coding info
|
||||
# }
|
||||
# ```
|
||||
#
|
||||
# var treeRootVBuf = initVBuffer()
|
||||
var header = initProtoBuffer()
|
||||
header.write(1, manifest.treeCid.data.buffer)
|
||||
header.write(2, manifest.blockSize.uint32)
|
||||
header.write(3, manifest.datasetSize.uint32)
|
||||
# treeRootVBuf.write(manifest.treeRoot)
|
||||
header.write(2, manifest.treeRoot.data.buffer)
|
||||
header.write(3, manifest.blockSize.uint32)
|
||||
header.write(4, manifest.datasetSize.uint32)
|
||||
if manifest.protected:
|
||||
var erasureInfo = initProtoBuffer()
|
||||
erasureInfo.write(1, manifest.ecK.uint32)
|
||||
erasureInfo.write(2, manifest.ecM.uint32)
|
||||
erasureInfo.write(3, manifest.originalTreeCid.data.buffer)
|
||||
erasureInfo.write(4, manifest.originalDatasetSize.uint32)
|
||||
erasureInfo.write(3, manifest.originalCid.data.buffer)
|
||||
erasureInfo.write(4, manifest.originalTreeRoot.data.buffer)
|
||||
erasureInfo.write(5, manifest.originalDatasetSize.uint32)
|
||||
erasureInfo.finish()
|
||||
|
||||
header.write(4, erasureInfo)
|
||||
header.write(5, erasureInfo)
|
||||
|
||||
pbNode.write(1, header) # set the treeCid as the data field
|
||||
pbNode.finish()
|
||||
@ -81,7 +85,9 @@ proc decode*(_: DagPBCoder, data: openArray[byte]): ?!Manifest =
|
||||
pbHeader: ProtoBuffer
|
||||
pbErasureInfo: ProtoBuffer
|
||||
treeCidBuf: seq[byte]
|
||||
treeRootBuf: seq[byte]
|
||||
originalTreeCid: seq[byte]
|
||||
originalTreeRootBuf: seq[byte]
|
||||
datasetSize: uint32
|
||||
blockSize: uint32
|
||||
originalDatasetSize: uint32
|
||||
@ -95,13 +101,16 @@ proc decode*(_: DagPBCoder, data: openArray[byte]): ?!Manifest =
|
||||
if pbHeader.getField(1, treeCidBuf).isErr:
|
||||
return failure("Unable to decode `treeCid` from manifest!")
|
||||
|
||||
if pbHeader.getField(2, blockSize).isErr:
|
||||
if pbHeader.getField(2, treeRootBuf).isErr:
|
||||
return failure("Unable to decode `treeRoot` from manifest!")
|
||||
|
||||
if pbHeader.getField(3, blockSize).isErr:
|
||||
return failure("Unable to decode `blockSize` from manifest!")
|
||||
|
||||
if pbHeader.getField(3, datasetSize).isErr:
|
||||
if pbHeader.getField(4, datasetSize).isErr:
|
||||
return failure("Unable to decode `datasetSize` from manifest!")
|
||||
|
||||
if pbHeader.getField(4, pbErasureInfo).isErr:
|
||||
if pbHeader.getField(5, pbErasureInfo).isErr:
|
||||
return failure("Unable to decode `erasureInfo` from manifest!")
|
||||
|
||||
let protected = pbErasureInfo.buffer.len > 0
|
||||
@ -114,18 +123,34 @@ proc decode*(_: DagPBCoder, data: openArray[byte]): ?!Manifest =
|
||||
|
||||
if pbErasureInfo.getField(3, originalTreeCid).isErr:
|
||||
return failure("Unable to decode `originalTreeCid` from manifest!")
|
||||
|
||||
if pbErasureInfo.getField(4, originalTreeRootBuf).isErr:
|
||||
return failure("Unable to decode `originalTreeRoot` from manifest!")
|
||||
|
||||
if pbErasureInfo.getField(4, originalDatasetSize).isErr:
|
||||
if pbErasureInfo.getField(5, originalDatasetSize).isErr:
|
||||
return failure("Unable to decode `originalDatasetSize` from manifest!")
|
||||
|
||||
var
|
||||
treeRoot: MultiHash
|
||||
originalTreeRoot: MultiHash
|
||||
|
||||
let
|
||||
treeCid = ? Cid.init(treeCidBuf).mapFailure
|
||||
treeRootRes = ? MultiHash.decode(treeRootBuf, treeRoot).mapFailure
|
||||
|
||||
if treeRootRes != treeRootBuf.len:
|
||||
return failure("Error decoding `treeRoot` as MultiHash")
|
||||
|
||||
if protected:
|
||||
let originalTreeRootRes = ? MultiHash.decode(originalTreeRootBuf, originalTreeRoot).mapFailure
|
||||
if originalTreeRootRes != originalTreeRootBuf.len:
|
||||
return failure("Error decoding `originalTreeRoot` as MultiHash")
|
||||
|
||||
let
|
||||
self = if protected:
|
||||
Manifest.new(
|
||||
treeCid = treeCid,
|
||||
treeRoot = treeRoot,
|
||||
datasetSize = datasetSize.NBytes,
|
||||
blockSize = blockSize.NBytes,
|
||||
version = treeCid.cidver,
|
||||
@ -134,11 +159,13 @@ proc decode*(_: DagPBCoder, data: openArray[byte]): ?!Manifest =
|
||||
ecK = ecK.int,
|
||||
ecM = ecM.int,
|
||||
originalTreeCid = ? Cid.init(originalTreeCid).mapFailure,
|
||||
originalTreeRoot = originalTreeRoot,
|
||||
originalDatasetSize = originalDatasetSize.NBytes
|
||||
)
|
||||
else:
|
||||
Manifest.new(
|
||||
treeCid = treeCid,
|
||||
treeRoot = treeRoot,
|
||||
datasetSize = datasetSize.NBytes,
|
||||
blockSize = blockSize.NBytes,
|
||||
version = treeCid.cidver,
|
||||
|
||||
@ -30,17 +30,19 @@ export types
|
||||
|
||||
type
|
||||
Manifest* = ref object of RootObj
|
||||
treeCid {.serialize.}: Cid # Root of the merkle tree
|
||||
datasetSize {.serialize.}: NBytes # Total size of all blocks
|
||||
blockSize {.serialize.}: NBytes # Size of each contained block (might not be needed if blocks are len-prefixed)
|
||||
version: CidVersion # Cid version
|
||||
hcodec: MultiCodec # Multihash codec
|
||||
codec: MultiCodec # Data set codec
|
||||
case protected {.serialize.}: bool # Protected datasets have erasure coded info
|
||||
treeCid: Cid # Cid of the merkle tree
|
||||
treeRoot: MultiHash # Root hash of the merkle tree
|
||||
datasetSize: NBytes # Total size of all blocks
|
||||
blockSize: NBytes # Size of each contained block (might not be needed if blocks are len-prefixed)
|
||||
version: CidVersion # Cid version
|
||||
hcodec: MultiCodec # Multihash codec
|
||||
codec: MultiCodec # Data set codec
|
||||
case protected: bool # Protected datasets have erasure coded info
|
||||
of true:
|
||||
ecK: int # Number of blocks to encode
|
||||
ecM: int # Number of resulting parity blocks
|
||||
originalTreeCid: Cid # The original root of the dataset being erasure coded
|
||||
ecK: int # Number of blocks to encode
|
||||
ecM: int # Number of resulting parity blocks
|
||||
originalTreeCid: Cid # The original Cid of the dataset being erasure coded
|
||||
originalTreeRoot: MultiHash
|
||||
originalDatasetSize: NBytes
|
||||
else:
|
||||
discard
|
||||
@ -73,18 +75,24 @@ proc ecK*(self: Manifest): int =
|
||||
proc ecM*(self: Manifest): int =
|
||||
self.ecM
|
||||
|
||||
proc originalTreeCid*(self: Manifest): Cid =
|
||||
proc originalCid*(self: Manifest): Cid =
|
||||
self.originalTreeCid
|
||||
|
||||
proc originalBlocksCount*(self: Manifest): int =
|
||||
divUp(self.originalDatasetSize.int, self.blockSize.int)
|
||||
|
||||
proc originalTreeRoot*(self: Manifest): MultiHash =
|
||||
self.originalTreeRoot
|
||||
|
||||
proc originalDatasetSize*(self: Manifest): NBytes =
|
||||
self.originalDatasetSize
|
||||
|
||||
proc treeCid*(self: Manifest): Cid =
|
||||
self.treeCid
|
||||
|
||||
proc treeRoot*(self: Manifest): MultiHash =
|
||||
self.treeRoot
|
||||
|
||||
proc blocksCount*(self: Manifest): int =
|
||||
divUp(self.datasetSize.int, self.blockSize.int)
|
||||
|
||||
@ -132,6 +140,7 @@ proc cid*(self: Manifest): ?!Cid {.deprecated: "use treeCid instead".} =
|
||||
|
||||
proc `==`*(a, b: Manifest): bool =
|
||||
(a.treeCid == b.treeCid) and
|
||||
(a.treeRoot == b.treeRoot) and
|
||||
(a.datasetSize == b.datasetSize) and
|
||||
(a.blockSize == b.blockSize) and
|
||||
(a.version == b.version) and
|
||||
@ -142,12 +151,14 @@ proc `==`*(a, b: Manifest): bool =
|
||||
(a.ecK == b.ecK) and
|
||||
(a.ecM == b.ecM) and
|
||||
(a.originalTreeCid == b.originalTreeCid) and
|
||||
(a.originalTreeRoot == b.originalTreeRoot) and
|
||||
(a.originalDatasetSize == b.originalDatasetSize)
|
||||
else:
|
||||
true)
|
||||
|
||||
proc `$`*(self: Manifest): string =
|
||||
"treeCid: " & $self.treeCid &
|
||||
", treeRoot: " & $self.treeRoot &
|
||||
", datasetSize: " & $self.datasetSize &
|
||||
", blockSize: " & $self.blockSize &
|
||||
", version: " & $self.version &
|
||||
@ -158,6 +169,7 @@ proc `$`*(self: Manifest): string =
|
||||
", ecK: " & $self.ecK &
|
||||
", ecM: " & $self.ecM &
|
||||
", originalTreeCid: " & $self.originalTreeCid &
|
||||
", originalTreeRoot: " & $self.originalTreeRoot &
|
||||
", originalDatasetSize: " & $self.originalDatasetSize
|
||||
else:
|
||||
"")
|
||||
@ -169,6 +181,7 @@ proc `$`*(self: Manifest): string =
|
||||
proc new*(
|
||||
T: type Manifest,
|
||||
treeCid: Cid,
|
||||
treeRoot: MultiHash,
|
||||
blockSize: NBytes,
|
||||
datasetSize: NBytes,
|
||||
version: CidVersion = CIDv1,
|
||||
@ -179,6 +192,7 @@ proc new*(
|
||||
|
||||
T(
|
||||
treeCid: treeCid,
|
||||
treeRoot: treeRoot,
|
||||
blockSize: blockSize,
|
||||
datasetSize: datasetSize,
|
||||
version: version,
|
||||
@ -190,6 +204,7 @@ proc new*(
|
||||
T: type Manifest,
|
||||
manifest: Manifest,
|
||||
treeCid: Cid,
|
||||
treeRoot: MultiHash,
|
||||
datasetSize: NBytes,
|
||||
ecK, ecM: int
|
||||
): Manifest =
|
||||
@ -198,6 +213,7 @@ proc new*(
|
||||
##
|
||||
Manifest(
|
||||
treeCid: treeCid,
|
||||
treeRoot: treeRoot,
|
||||
datasetSize: datasetSize,
|
||||
version: manifest.version,
|
||||
codec: manifest.codec,
|
||||
@ -206,6 +222,7 @@ proc new*(
|
||||
protected: true,
|
||||
ecK: ecK, ecM: ecM,
|
||||
originalTreeCid: manifest.treeCid,
|
||||
originalTreeRoot: manifest.treeRoot,
|
||||
originalDatasetSize: manifest.datasetSize)
|
||||
|
||||
proc new*(
|
||||
@ -216,7 +233,8 @@ proc new*(
|
||||
## erasure protected one
|
||||
##
|
||||
Manifest(
|
||||
treeCid: manifest.originalTreeCid,
|
||||
treeCid: manifest.originalCid,
|
||||
treeRoot: manifest.originalTreeRoot,
|
||||
datasetSize: manifest.originalDatasetSize,
|
||||
version: manifest.version,
|
||||
codec: manifest.codec,
|
||||
@ -236,6 +254,7 @@ proc new*(
|
||||
proc new*(
|
||||
T: type Manifest,
|
||||
treeCid: Cid,
|
||||
treeRoot: MultiHash,
|
||||
datasetSize: NBytes,
|
||||
blockSize: NBytes,
|
||||
version: CidVersion,
|
||||
@ -244,10 +263,12 @@ proc new*(
|
||||
ecK: int,
|
||||
ecM: int,
|
||||
originalTreeCid: Cid,
|
||||
originalTreeRoot: MultiHash,
|
||||
originalDatasetSize: NBytes
|
||||
): Manifest =
|
||||
Manifest(
|
||||
treeCid: treeCid,
|
||||
treeRoot: treeRoot,
|
||||
datasetSize: datasetSize,
|
||||
blockSize: blockSize,
|
||||
version: version,
|
||||
@ -257,5 +278,6 @@ proc new*(
|
||||
ecK: ecK,
|
||||
ecM: ecM,
|
||||
originalTreeCid: originalTreeCid,
|
||||
originalTreeRoot: originalTreeRoot,
|
||||
originalDatasetSize: originalDatasetSize
|
||||
)
|
||||
|
||||
@ -14,10 +14,9 @@ import std/sugar
|
||||
import std/algorithm
|
||||
|
||||
import pkg/chronicles
|
||||
import pkg/questionable
|
||||
import pkg/questionable/results
|
||||
import pkg/nimcrypto/sha2
|
||||
import pkg/libp2p/[cid, multicodec, multihash, vbuffer]
|
||||
import pkg/libp2p/[multicodec, multihash, vbuffer]
|
||||
import pkg/stew/byteutils
|
||||
|
||||
import ../errors
|
||||
@ -187,16 +186,8 @@ proc root*(self: MerkleTree): MultiHash =
|
||||
let rootIndex = self.len - 1
|
||||
self.nodeBufferToMultiHash(rootIndex)
|
||||
|
||||
proc rootCid*(self: MerkleTree, version = CIDv1, dataCodec = multiCodec("raw")): ?!Cid =
|
||||
Cid.init(version, dataCodec, self.root).mapFailure
|
||||
|
||||
iterator leaves*(self: MerkleTree): MultiHash =
|
||||
for i in 0..<self.leavesCount:
|
||||
yield self.nodeBufferToMultiHash(i)
|
||||
|
||||
iterator leavesCids*(self: MerkleTree, version = CIDv1, dataCodec = multiCodec("raw")): ?!Cid =
|
||||
for leaf in self.leaves:
|
||||
yield Cid.init(version, dataCodec, leaf).mapFailure
|
||||
proc leaves*(self: MerkleTree): seq[MultiHash] =
|
||||
toSeq(0..<self.leavesCount).map(i => self.nodeBufferToMultiHash(i))
|
||||
|
||||
proc leavesCount*(self: MerkleTree): Natural =
|
||||
self.leavesCount
|
||||
@ -207,10 +198,6 @@ proc getLeaf*(self: MerkleTree, index: Natural): ?!MultiHash =
|
||||
|
||||
success(self.nodeBufferToMultiHash(index))
|
||||
|
||||
proc getLeafCid*(self: MerkleTree, index: Natural, version = CIDv1, dataCodec = multiCodec("raw")): ?!Cid =
|
||||
let leaf = ? self.getLeaf(index)
|
||||
Cid.init(version, dataCodec, leaf).mapFailure
|
||||
|
||||
proc height*(self: MerkleTree): Natural =
|
||||
computeTreeHeight(self.leavesCount)
|
||||
|
||||
@ -269,7 +256,7 @@ proc `==`*(a, b: MerkleTree): bool =
|
||||
(a.leavesCount == b.leavesCount) and
|
||||
(a.nodesBuffer == b.nodesBuffer)
|
||||
|
||||
proc init*(
|
||||
func init*(
|
||||
T: type MerkleTree,
|
||||
mcodec: MultiCodec,
|
||||
digestSize: Natural,
|
||||
@ -290,37 +277,6 @@ proc init*(
|
||||
else:
|
||||
failure("Expected nodesBuffer len to be " & $(totalNodes * digestSize) & " but was " & $nodesBuffer.len)
|
||||
|
||||
proc init*(
|
||||
T: type MerkleTree,
|
||||
leaves: openArray[MultiHash]
|
||||
): ?!MerkleTree =
|
||||
without leaf =? leaves.?[0]:
|
||||
return failure("At least one leaf is required")
|
||||
|
||||
var builder = ? MerkleTreeBuilder.init(mcodec = leaf.mcodec)
|
||||
|
||||
for l in leaves:
|
||||
let res = builder.addLeaf(l)
|
||||
if res.isErr:
|
||||
return failure(res.error)
|
||||
|
||||
builder.build()
|
||||
|
||||
proc init*(
|
||||
T: type MerkleTree,
|
||||
cids: openArray[Cid]
|
||||
): ?!MerkleTree =
|
||||
var leaves = newSeq[MultiHash]()
|
||||
|
||||
for cid in cids:
|
||||
let res = cid.mhash.mapFailure
|
||||
if res.isErr:
|
||||
return failure(res.error)
|
||||
else:
|
||||
leaves.add(res.value)
|
||||
|
||||
MerkleTree.init(leaves)
|
||||
|
||||
###########################################################
|
||||
# MerkleProof
|
||||
###########################################################
|
||||
|
||||
@ -116,15 +116,20 @@ proc fetchBatched*(
|
||||
|
||||
trace "Fetching blocks in batches of", size = batchSize
|
||||
|
||||
let iter = Iter.fromSlice(0..<manifest.blocksCount)
|
||||
.map((i: int) => node.blockStore.getBlock(BlockAddress.init(manifest.treeCid, i)))
|
||||
without iter =? await node.blockStore.getBlocks(manifest.treeCid, manifest.blocksCount, manifest.treeRoot), err:
|
||||
return failure(err)
|
||||
|
||||
for batchNum in 0..<batchCount:
|
||||
|
||||
let blocks = collect:
|
||||
for i in 0..<batchSize:
|
||||
if not iter.finished:
|
||||
iter.next()
|
||||
|
||||
|
||||
# let
|
||||
# indexRange = (batchNum * batchSize)..<(min((batchNum + 1) * batchSize, manifest.blocksCount))
|
||||
# blocks = indexRange.mapIt(node.blockStore.getBlock(manifest.treeCid, it))
|
||||
try:
|
||||
await allFuturesThrowing(allFinished(blocks))
|
||||
if not onBatch.isNil:
|
||||
@ -192,7 +197,8 @@ proc store*(
|
||||
dataCodec = multiCodec("raw")
|
||||
chunker = LPStreamChunker.new(stream, chunkSize = blockSize)
|
||||
|
||||
var cids: seq[Cid]
|
||||
without var treeBuilder =? MerkleTreeBuilder.init(hcodec), err:
|
||||
return failure(err)
|
||||
|
||||
try:
|
||||
while (
|
||||
@ -210,7 +216,8 @@ proc store*(
|
||||
without blk =? bt.Block.new(cid, chunk, verify = false):
|
||||
return failure("Unable to init block from chunk!")
|
||||
|
||||
cids.add(cid)
|
||||
if err =? treeBuilder.addLeaf(mhash).errorOption:
|
||||
return failure(err)
|
||||
|
||||
if err =? (await self.blockStore.putBlock(blk)).errorOption:
|
||||
trace "Unable to store block", cid = blk.cid, err = err.msg
|
||||
@ -223,21 +230,18 @@ proc store*(
|
||||
finally:
|
||||
await stream.close()
|
||||
|
||||
without tree =? MerkleTree.init(cids), err:
|
||||
return failure(err)
|
||||
|
||||
without treeCid =? tree.rootCid(CIDv1, dataCodec), err:
|
||||
without tree =? treeBuilder.build(), err:
|
||||
return failure(err)
|
||||
|
||||
for index, cid in cids:
|
||||
without proof =? tree.getProof(index), err:
|
||||
return failure(err)
|
||||
if err =? (await self.blockStore.putBlockCidAndProof(treeCid, index, cid, proof)).errorOption:
|
||||
# TODO add log here
|
||||
return failure(err)
|
||||
without treeBlk =? bt.Block.new(tree.encode()), err:
|
||||
return failure(err)
|
||||
|
||||
if err =? (await self.blockStore.putBlock(treeBlk)).errorOption:
|
||||
return failure("Unable to store merkle tree block " & $treeBlk.cid & ", nested err: " & err.msg)
|
||||
|
||||
let manifest = Manifest.new(
|
||||
treeCid = treeCid,
|
||||
treeCid = treeBlk.cid,
|
||||
treeRoot = tree.root,
|
||||
blockSize = blockSize,
|
||||
datasetSize = NBytes(chunker.offset),
|
||||
version = CIDv1,
|
||||
@ -259,13 +263,12 @@ proc store*(
|
||||
return failure("Unable to store manifest " & $manifestBlk.cid)
|
||||
|
||||
info "Stored data", manifestCid = manifestBlk.cid,
|
||||
treeCid = treeCid,
|
||||
treeCid = treeBlk.cid,
|
||||
blocks = manifest.blocksCount,
|
||||
datasetSize = manifest.datasetSize
|
||||
|
||||
# Announce manifest
|
||||
await self.discovery.provide(manifestBlk.cid)
|
||||
await self.discovery.provide(treeCid)
|
||||
|
||||
return manifestBlk.cid.success
|
||||
|
||||
|
||||
@ -4,6 +4,6 @@ import ./stores/networkstore
|
||||
import ./stores/repostore
|
||||
import ./stores/maintenance
|
||||
import ./stores/keyutils
|
||||
import ./stores/treehelper
|
||||
import ./stores/treereader
|
||||
|
||||
export cachestore, blockstore, networkstore, repostore, maintenance, keyutils, treehelper
|
||||
export cachestore, blockstore, networkstore, repostore, maintenance, keyutils, treereader
|
||||
|
||||
@ -55,6 +55,30 @@ method getBlockAndProof*(self: BlockStore, treeCid: Cid, index: Natural): Future
|
||||
|
||||
raiseAssert("getBlockAndProof not implemented!")
|
||||
|
||||
method getTree*(self: BlockStore, treeCid: Cid): Future[?!MerkleTree] {.base.} =
|
||||
## Get a merkle tree by Cid
|
||||
##
|
||||
|
||||
raiseAssert("Not implemented!")
|
||||
|
||||
method getBlock*(self: BlockStore, treeCid: Cid, index: Natural, merkleRoot: MultiHash): Future[?!Block] {.base.} =
|
||||
## Get a block by Cid of a merkle tree and an index of a leaf in a tree, validate inclusion using merkle root
|
||||
##
|
||||
|
||||
raiseAssert("Not implemented!")
|
||||
|
||||
method getBlockAndProof*(self: BlockStore, treeCid: Cid, index: Natural): Future[?!(Block, MerkleProof)] {.base.} =
|
||||
## Get a block and associated inclusion proof by Cid of a merkle tree and an index of a leaf in a tree
|
||||
##
|
||||
|
||||
raiseAssert("Not implemented!")
|
||||
|
||||
method getBlocks*(self: BlockStore, treeCid: Cid, leavesCount: Natural, merkleRoot: MultiHash): Future[?!AsyncIter[?!Block]] {.base.} =
|
||||
## Get all blocks in range [0..<leavesCount] by Cid of a merkle tree, validate inclusion using merkle root
|
||||
##
|
||||
|
||||
raiseAssert("Not implemented!")
|
||||
|
||||
method putBlock*(
|
||||
self: BlockStore,
|
||||
blk: Block,
|
||||
@ -100,6 +124,12 @@ method delBlock*(self: BlockStore, treeCid: Cid, index: Natural): Future[?!void]
|
||||
|
||||
raiseAssert("delBlock not implemented!")
|
||||
|
||||
method delBlock*(self: BlockStore, treeCid: Cid, index: Natural): Future[?!void] {.base.} =
|
||||
## Delete a block from the blockstore
|
||||
##
|
||||
|
||||
raiseAssert("Not implemented!")
|
||||
|
||||
method hasBlock*(self: BlockStore, cid: Cid): Future[?!bool] {.base.} =
|
||||
## Check if the block exists in the blockstore
|
||||
##
|
||||
@ -112,6 +142,12 @@ method hasBlock*(self: BlockStore, tree: Cid, index: Natural): Future[?!bool] {.
|
||||
|
||||
raiseAssert("hasBlock not implemented!")
|
||||
|
||||
method hasBlock*(self: BlockStore, tree: Cid, index: Natural): Future[?!bool] {.base.} =
|
||||
## Check if the block exists in the blockstore
|
||||
##
|
||||
|
||||
raiseAssert("Not implemented!")
|
||||
|
||||
method listBlocks*(
|
||||
self: BlockStore,
|
||||
blockType = BlockType.Manifest): Future[?!AsyncIter[?Cid]] {.base.} =
|
||||
|
||||
@ -21,13 +21,13 @@ import pkg/questionable
|
||||
import pkg/questionable/results
|
||||
|
||||
import ./blockstore
|
||||
import ./treereader
|
||||
import ../units
|
||||
import ../chunker
|
||||
import ../errors
|
||||
import ../manifest
|
||||
import ../merkletree
|
||||
import ../utils
|
||||
import ../clock
|
||||
|
||||
export blockstore
|
||||
|
||||
@ -36,6 +36,7 @@ logScope:
|
||||
|
||||
type
|
||||
CacheStore* = ref object of BlockStore
|
||||
treeReader: TreeReader
|
||||
currentSize*: NBytes
|
||||
size*: NBytes
|
||||
cache: LruCache[Cid, Block]
|
||||
@ -65,34 +66,17 @@ method getBlock*(self: CacheStore, cid: Cid): Future[?!Block] {.async.} =
|
||||
trace "Error requesting block from cache", cid, error = exc.msg
|
||||
return failure exc
|
||||
|
||||
proc getCidAndProof(self: CacheStore, treeCid: Cid, index: Natural): ?!(Cid, MerkleProof) =
|
||||
if cidAndProof =? self.cidAndProofCache.getOption((treeCid, index)):
|
||||
success(cidAndProof)
|
||||
else:
|
||||
failure(newException(BlockNotFoundError, "Block not in cache: " & $BlockAddress.init(treeCid, index)))
|
||||
method getTree*(self: CacheStore, treeCid: Cid): Future[?!MerkleTree] =
|
||||
self.treeReader.getTree(treeCid)
|
||||
|
||||
method getBlock*(self: CacheStore, treeCid: Cid, index: Natural): Future[?!Block] {.async.} =
|
||||
without cidAndProof =? self.getCidAndProof(treeCid, index), err:
|
||||
return failure(err)
|
||||
method getBlock*(self: CacheStore, treeCid: Cid, index: Natural, merkleRoot: MultiHash): Future[?!Block] =
|
||||
self.treeReader.getBlock(treeCid, index)
|
||||
|
||||
await self.getBlock(cidAndProof[0])
|
||||
method getBlocks*(self: CacheStore, treeCid: Cid, leavesCount: Natural, merkleRoot: MultiHash): Future[?!AsyncIter[?!Block]] =
|
||||
self.treeReader.getBlocks(treeCid, leavesCount)
|
||||
|
||||
method getBlockAndProof*(self: CacheStore, treeCid: Cid, index: Natural): Future[?!(Block, MerkleProof)] {.async.} =
|
||||
without cidAndProof =? self.getCidAndProof(treeCid, index), err:
|
||||
return failure(err)
|
||||
|
||||
let (cid, proof) = cidAndProof
|
||||
|
||||
without blk =? await self.getBlock(cid), err:
|
||||
return failure(err)
|
||||
|
||||
success((blk, proof))
|
||||
|
||||
method getBlock*(self: CacheStore, address: BlockAddress): Future[?!Block] =
|
||||
if address.leaf:
|
||||
self.getBlock(address.treeCid, address.index)
|
||||
else:
|
||||
self.getBlock(address.cid)
|
||||
method getBlockAndProof*(self: CacheStore, treeCid: Cid, index: Natural): Future[?!(Block, MerkleProof)] =
|
||||
self.treeReader.getBlockAndProof(treeCid, index)
|
||||
|
||||
method hasBlock*(self: CacheStore, cid: Cid): Future[?!bool] {.async.} =
|
||||
## Check if the block exists in the blockstore
|
||||
@ -106,14 +90,13 @@ method hasBlock*(self: CacheStore, cid: Cid): Future[?!bool] {.async.} =
|
||||
return (cid in self.cache).success
|
||||
|
||||
method hasBlock*(self: CacheStore, treeCid: Cid, index: Natural): Future[?!bool] {.async.} =
|
||||
without cidAndProof =? self.getCidAndProof(treeCid, index), err:
|
||||
if err of BlockNotFoundError:
|
||||
return success(false)
|
||||
else:
|
||||
return failure(err)
|
||||
|
||||
await self.hasBlock(cidAndProof[0])
|
||||
## Check if the block exists in the blockstore
|
||||
##
|
||||
|
||||
without cid =? await self.treeReader.getBlockCid(treeCid, index), err:
|
||||
return failure(err)
|
||||
|
||||
await self.hasBlock(cid)
|
||||
|
||||
func cids(self: CacheStore): (iterator: Cid {.gcsafe.}) =
|
||||
return iterator(): Cid =
|
||||
@ -246,12 +229,10 @@ method delBlock*(self: CacheStore, cid: Cid): Future[?!void] {.async.} =
|
||||
return success()
|
||||
|
||||
method delBlock*(self: CacheStore, treeCid: Cid, index: Natural): Future[?!void] {.async.} =
|
||||
let maybeRemoved = self.cidAndProofCache.del((treeCid, index))
|
||||
without cid =? await self.treeReader.getBlockCid(treeCid, index), err:
|
||||
return failure(err)
|
||||
|
||||
if removed =? maybeRemoved:
|
||||
return await self.delBlock(removed[0])
|
||||
|
||||
return success()
|
||||
return await self.delBlock(cid)
|
||||
|
||||
method close*(self: CacheStore): Future[void] {.async.} =
|
||||
## Close the blockstore, a no-op for this implementation
|
||||
@ -273,17 +254,24 @@ proc new*(
|
||||
if cacheSize < chunkSize:
|
||||
raise newException(ValueError, "cacheSize cannot be less than chunkSize")
|
||||
|
||||
var treeReader = TreeReader.new()
|
||||
|
||||
let
|
||||
currentSize = 0'nb
|
||||
size = int(cacheSize div chunkSize)
|
||||
cache = newLruCache[Cid, Block](size)
|
||||
cidAndProofCache = newLruCache[(Cid, Natural), (Cid, MerkleProof)](size)
|
||||
store = CacheStore(
|
||||
treeReader: treeReader,
|
||||
cache: cache,
|
||||
cidAndProofCache: cidAndProofCache,
|
||||
currentSize: currentSize,
|
||||
size: cacheSize)
|
||||
|
||||
proc getBlockFromStore(cid: Cid): Future[?!Block] = store.getBlock(cid)
|
||||
|
||||
treeReader.getBlockFromStore = getBlockFromStore
|
||||
|
||||
for blk in blocks:
|
||||
discard store.putBlockSync(blk)
|
||||
|
||||
|
||||
@ -20,12 +20,10 @@ import pkg/libp2p
|
||||
import ../blocktype
|
||||
import ../utils/asyncheapqueue
|
||||
import ../utils/asynciter
|
||||
import ../clock
|
||||
|
||||
import ./blockstore
|
||||
import ../blockexchange
|
||||
import ../merkletree
|
||||
import ../blocktype
|
||||
|
||||
export blockstore, blockexchange, asyncheapqueue
|
||||
|
||||
@ -54,17 +52,34 @@ method getBlock*(self: NetworkStore, address: BlockAddress): Future[?!Block] {.a
|
||||
|
||||
return success blk
|
||||
|
||||
method getBlock*(self: NetworkStore, cid: Cid): Future[?!Block] =
|
||||
## Get a block from the blockstore
|
||||
##
|
||||
method getBlock*(self: NetworkStore, treeCid: Cid, index: Natural, merkleRoot: MultiHash): Future[?!Block] {.async.} =
|
||||
without localBlock =? await self.localStore.getBlock(treeCid, index, merkleRoot), err:
|
||||
if err of BlockNotFoundError:
|
||||
trace "Requesting block from the network engine", treeCid, index
|
||||
try:
|
||||
let networkBlock = await self.engine.requestBlock(treeCid, index, merkleRoot)
|
||||
return success(networkBlock)
|
||||
except CatchableError as err:
|
||||
return failure(err)
|
||||
else:
|
||||
failure(err)
|
||||
return success(localBlock)
|
||||
|
||||
self.getBlock(BlockAddress.init(cid))
|
||||
method getBlocks*(self: NetworkStore, treeCid: Cid, leavesCount: Natural, merkleRoot: MultiHash): Future[?!AsyncIter[?!Block]] {.async.} =
|
||||
without localIter =? await self.localStore.getBlocks(treeCid, leavesCount, merkleRoot), err:
|
||||
if err of BlockNotFoundError:
|
||||
trace "Requesting blocks from the network engine", treeCid, leavesCount
|
||||
without var networkIter =? self.engine.requestBlocks(treeCid, leavesCount, merkleRoot), err:
|
||||
failure(err)
|
||||
|
||||
method getBlock*(self: NetworkStore, treeCid: Cid, index: Natural): Future[?!Block] =
|
||||
## Get a block from the blockstore
|
||||
##
|
||||
let iter = networkIter
|
||||
.prefetch(BlockPrefetchAmount)
|
||||
.map(proc (fut: Future[Block]): Future[?!Block] {.async.} = catch: (await fut))
|
||||
|
||||
self.getBlock(BlockAddress.init(treeCid, index))
|
||||
return success(iter)
|
||||
else:
|
||||
return failure(err)
|
||||
return success(localIter)
|
||||
|
||||
method putBlock*(
|
||||
self: NetworkStore,
|
||||
|
||||
@ -24,6 +24,7 @@ import pkg/stew/endians2
|
||||
|
||||
import ./blockstore
|
||||
import ./keyutils
|
||||
import ./treereader
|
||||
import ../blocktype
|
||||
import ../clock
|
||||
import ../systemclock
|
||||
@ -58,11 +59,12 @@ type
|
||||
quotaReservedBytes*: uint # bytes reserved by the repo
|
||||
blockTtl*: Duration
|
||||
started*: bool
|
||||
treeReader*: TreeReader
|
||||
|
||||
BlockExpiration* = object
|
||||
cid*: Cid
|
||||
expiration*: SecondsSince1970
|
||||
|
||||
|
||||
proc updateMetrics(self: RepoStore) =
|
||||
codex_repostore_blocks.set(self.totalBlocks.int64)
|
||||
codex_repostore_bytes_used.set(self.quotaUsedBytes.int64)
|
||||
@ -159,32 +161,17 @@ method getBlock*(self: RepoStore, cid: Cid): Future[?!Block] {.async.} =
|
||||
trace "Got block for cid", cid
|
||||
return Block.new(cid, data, verify = true)
|
||||
|
||||
method getTree*(self: RepoStore, treeCid: Cid): Future[?!MerkleTree] =
|
||||
self.treeReader.getTree(treeCid)
|
||||
|
||||
method getBlockAndProof*(self: RepoStore, treeCid: Cid, index: Natural): Future[?!(Block, MerkleProof)] {.async.} =
|
||||
without cidAndProof =? await self.getCidAndProof(treeCid, index), err:
|
||||
return failure(err)
|
||||
method getBlock*(self: RepoStore, treeCid: Cid, index: Natural, merkleRoot: MultiHash): Future[?!Block] =
|
||||
self.treeReader.getBlock(treeCid, index)
|
||||
|
||||
let (cid, proof) = cidAndProof
|
||||
method getBlocks*(self: RepoStore, treeCid: Cid, leavesCount: Natural, merkleRoot: MultiHash): Future[?!AsyncIter[?!Block]] =
|
||||
self.treeReader.getBlocks(treeCid, leavesCount)
|
||||
|
||||
without blk =? await self.getBlock(cid), err:
|
||||
return failure(err)
|
||||
|
||||
success((blk, proof))
|
||||
|
||||
method getBlock*(self: RepoStore, treeCid: Cid, index: Natural): Future[?!Block] {.async.} =
|
||||
without cidAndProof =? await self.getCidAndProof(treeCid, index), err:
|
||||
return failure(err)
|
||||
|
||||
await self.getBlock(cidAndProof[0])
|
||||
|
||||
method getBlock*(self: RepoStore, address: BlockAddress): Future[?!Block] =
|
||||
## Get a block from the blockstore
|
||||
##
|
||||
|
||||
if address.leaf:
|
||||
self.getBlock(address.treeCid, address.index)
|
||||
else:
|
||||
self.getBlock(address.cid)
|
||||
method getBlockAndProof*(self: RepoStore, treeCid: Cid, index: Natural): Future[?!(Block, MerkleProof)] =
|
||||
self.treeReader.getBlockAndProof(treeCid, index)
|
||||
|
||||
proc getBlockExpirationEntry(
|
||||
self: RepoStore,
|
||||
@ -371,21 +358,10 @@ method delBlock*(self: RepoStore, cid: Cid): Future[?!void] {.async.} =
|
||||
return success()
|
||||
|
||||
method delBlock*(self: RepoStore, treeCid: Cid, index: Natural): Future[?!void] {.async.} =
|
||||
without key =? createBlockCidAndProofMetadataKey(treeCid, index), err:
|
||||
without cid =? await self.treeReader.getBlockCid(treeCid, index), err:
|
||||
return failure(err)
|
||||
|
||||
without value =? await self.metaDs.get(key), err:
|
||||
if err of DatastoreKeyNotFound:
|
||||
return success()
|
||||
else:
|
||||
return failure(err)
|
||||
|
||||
without cidAndProof =? (Cid, MerkleProof).decode(value), err:
|
||||
return failure(err)
|
||||
|
||||
self.delBlock(cidAndProof[0])
|
||||
|
||||
await self.metaDs.delete(key)
|
||||
await self.delBlock(cid)
|
||||
|
||||
method hasBlock*(self: RepoStore, cid: Cid): Future[?!bool] {.async.} =
|
||||
## Check if the block exists in the blockstore
|
||||
@ -405,13 +381,10 @@ method hasBlock*(self: RepoStore, cid: Cid): Future[?!bool] {.async.} =
|
||||
return await self.repoDs.has(key)
|
||||
|
||||
method hasBlock*(self: RepoStore, treeCid: Cid, index: Natural): Future[?!bool] {.async.} =
|
||||
without cidAndProof =? await self.getCidAndProof(treeCid, index), err:
|
||||
if err of BlockNotFoundError:
|
||||
return success(false)
|
||||
else:
|
||||
return failure(err)
|
||||
|
||||
await self.hasBlock(cidAndProof[0])
|
||||
without cid =? await self.treeReader.getBlockCid(treeCid, index), err:
|
||||
return failure(err)
|
||||
|
||||
await self.hasBlock(cid)
|
||||
|
||||
method listBlocks*(
|
||||
self: RepoStore,
|
||||
@ -458,7 +431,7 @@ method getBlockExpirations*(
|
||||
self: RepoStore,
|
||||
maxNumber: int,
|
||||
offset: int): Future[?!AsyncIter[?BlockExpiration]] {.async, base.} =
|
||||
## Get block expirations from the given RepoStore
|
||||
## Get block experiartions from the given RepoStore
|
||||
##
|
||||
|
||||
without query =? createBlockExpirationQuery(maxNumber, offset), err:
|
||||
@ -613,18 +586,25 @@ func new*(
|
||||
T: type RepoStore,
|
||||
repoDs: Datastore,
|
||||
metaDs: Datastore,
|
||||
treeReader: TreeReader = TreeReader.new(),
|
||||
clock: Clock = SystemClock.new(),
|
||||
postFixLen = 2,
|
||||
quotaMaxBytes = DefaultQuotaBytes,
|
||||
blockTtl = DefaultBlockTtl
|
||||
blockTtl = DefaultBlockTtl,
|
||||
treeCacheCapacity = DefaultTreeCacheCapacity
|
||||
): RepoStore =
|
||||
## Create new instance of a RepoStore
|
||||
##
|
||||
RepoStore(
|
||||
let store = RepoStore(
|
||||
repoDs: repoDs,
|
||||
metaDs: metaDs,
|
||||
treeReader: treeReader,
|
||||
clock: clock,
|
||||
postFixLen: postFixLen,
|
||||
quotaMaxBytes: quotaMaxBytes,
|
||||
blockTtl: blockTtl
|
||||
)
|
||||
blockTtl: blockTtl)
|
||||
|
||||
proc getBlockFromStore(cid: Cid): Future[?!Block] = store.getBlock(cid)
|
||||
|
||||
treeReader.getBlockFromStore = getBlockFromStore
|
||||
store
|
||||
|
||||
120
codex/stores/treereader.nim
Normal file
120
codex/stores/treereader.nim
Normal file
@ -0,0 +1,120 @@
|
||||
import pkg/upraises
|
||||
|
||||
import pkg/chronos
|
||||
import pkg/chronos/futures
|
||||
import pkg/chronicles
|
||||
import pkg/libp2p/[cid, multicodec, multihash]
|
||||
import pkg/lrucache
|
||||
import pkg/questionable
|
||||
import pkg/questionable/results
|
||||
|
||||
import ../blocktype
|
||||
import ../merkletree
|
||||
import ../utils
|
||||
|
||||
const DefaultTreeCacheCapacity* = 10 # Max number of trees stored in memory
|
||||
|
||||
type
|
||||
GetBlock = proc (cid: Cid): Future[?!Block] {.upraises: [], gcsafe, closure.}
|
||||
DelBlock = proc (cid: Cid): Future[?!void] {.upraises: [], gcsafe, closure.}
|
||||
TreeReader* = ref object of RootObj
|
||||
getBlockFromStore*: GetBlock
|
||||
treeCache*: LruCache[Cid, MerkleTree]
|
||||
|
||||
method getTree*(self: TreeReader, cid: Cid): Future[?!MerkleTree] {.async.} =
|
||||
if tree =? self.treeCache.getOption(cid):
|
||||
return success(tree)
|
||||
else:
|
||||
without treeBlk =? await self.getBlockFromStore(cid), err:
|
||||
return failure(err)
|
||||
|
||||
without tree =? MerkleTree.decode(treeBlk.data), err:
|
||||
return failure("Error decoding a merkle tree with cid " & $cid & ". Nested error is: " & err.msg)
|
||||
self.treeCache[cid] = tree
|
||||
|
||||
trace "Got merkle tree for cid", cid
|
||||
return success(tree)
|
||||
|
||||
method getBlockCidAndProof*(self: TreeReader, treeCid: Cid, index: Natural): Future[?!(Cid, MerkleProof)] {.async.} =
|
||||
without tree =? await self.getTree(treeCid), err:
|
||||
return failure(err)
|
||||
|
||||
without proof =? tree.getProof(index), err:
|
||||
return failure(err)
|
||||
|
||||
without leaf =? tree.getLeaf(index), err:
|
||||
return failure(err)
|
||||
|
||||
without leafCid =? Cid.init(treeCid.cidver, treeCid.mcodec, leaf).mapFailure, err:
|
||||
return failure(err)
|
||||
|
||||
return success((leafCid, proof))
|
||||
|
||||
method getBlockCid*(self: TreeReader, treeCid: Cid, index: Natural): Future[?!Cid] {.async.} =
|
||||
without tree =? await self.getTree(treeCid), err:
|
||||
return failure(err)
|
||||
|
||||
without leaf =? tree.getLeaf(index), err:
|
||||
return failure(err)
|
||||
|
||||
without leafCid =? Cid.init(treeCid.cidver, treeCid.mcodec, leaf).mapFailure, err:
|
||||
return failure(err)
|
||||
|
||||
return success(leafCid)
|
||||
|
||||
method getBlock*(self: TreeReader, treeCid: Cid, index: Natural): Future[?!Block] {.async.} =
|
||||
without leafCid =? await self.getBlockCid(treeCid, index), err:
|
||||
return failure(err)
|
||||
|
||||
without blk =? await self.getBlockFromStore(leafCid), err:
|
||||
return failure(err)
|
||||
|
||||
return success(blk)
|
||||
|
||||
method getBlockAndProof*(self: TreeReader, treeCid: Cid, index: Natural): Future[?!(Block, MerkleProof)] {.async.} =
|
||||
without (leafCid, proof) =? await self.getBlockCidAndProof(treeCid, index), err:
|
||||
return failure(err)
|
||||
|
||||
without blk =? await self.getBlockFromStore(leafCid), err:
|
||||
return failure(err)
|
||||
|
||||
return success((blk, proof))
|
||||
|
||||
method getBlocks*(self: TreeReader, treeCid: Cid, leavesCount: Natural): Future[?!AsyncIter[?!Block]] {.async.} =
|
||||
without tree =? await self.getTree(treeCid), err:
|
||||
return failure(err)
|
||||
|
||||
var iter = AsyncIter[?!Block]()
|
||||
|
||||
proc checkLen(index: Natural): void =
|
||||
if index >= leavesCount:
|
||||
iter.finish
|
||||
|
||||
checkLen(0)
|
||||
|
||||
var index = 0
|
||||
proc next(): Future[?!Block] {.async.} =
|
||||
if not iter.finished:
|
||||
without leaf =? tree.getLeaf(index), err:
|
||||
inc index
|
||||
checkLen(index)
|
||||
return failure(err)
|
||||
|
||||
inc index
|
||||
checkLen(index)
|
||||
|
||||
without leafCid =? Cid.init(treeCid.cidver, treeCid.mcodec, leaf).mapFailure, err:
|
||||
return failure(err)
|
||||
|
||||
without blk =? await self.getBlockFromStore(leafCid), err:
|
||||
return failure(err)
|
||||
|
||||
return success(blk)
|
||||
else:
|
||||
return failure("No more elements for tree with cid " & $treeCid)
|
||||
|
||||
iter.next = next
|
||||
return success(iter)
|
||||
|
||||
proc new*(T: type TreeReader, treeCacheCap = DefaultTreeCacheCapacity): TreeReader =
|
||||
TreeReader(treeCache: newLruCache[Cid, MerkleTree](treeCacheCap))
|
||||
@ -1,5 +1,6 @@
|
||||
import ./streams/seekablestream
|
||||
import ./streams/storestream
|
||||
import ./streams/seekablestorestream
|
||||
import ./streams/asyncstreamwrapper
|
||||
|
||||
export seekablestream, storestream, asyncstreamwrapper
|
||||
export seekablestream, storestream, seekablestorestream, asyncstreamwrapper
|
||||
|
||||
123
codex/streams/seekablestorestream.nim
Normal file
123
codex/streams/seekablestorestream.nim
Normal file
@ -0,0 +1,123 @@
|
||||
## Nim-Dagger
|
||||
## Copyright (c) 2022 Status Research & Development GmbH
|
||||
## Licensed under either of
|
||||
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
||||
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
||||
## at your option.
|
||||
## This file may not be copied, modified, or distributed except according to
|
||||
## those terms.
|
||||
|
||||
import std/options
|
||||
|
||||
import pkg/upraises
|
||||
|
||||
push: {.upraises: [].}
|
||||
|
||||
import pkg/chronos
|
||||
import pkg/chronicles
|
||||
import pkg/stew/ptrops
|
||||
|
||||
import ../stores
|
||||
import ../manifest
|
||||
import ../blocktype
|
||||
import ../utils
|
||||
|
||||
import ./seekablestream
|
||||
|
||||
export stores, blocktype, manifest, chronos
|
||||
|
||||
logScope:
|
||||
topics = "codex storestream"
|
||||
|
||||
const
|
||||
SeekableStoreStreamTrackerName* = "SeekableStoreStream"
|
||||
|
||||
type
|
||||
# Make SeekableStream from a sequence of blocks stored in Manifest
|
||||
# (only original file data - see StoreStream.size)
|
||||
SeekableStoreStream* = ref object of SeekableStream
|
||||
store*: BlockStore # Store where to lookup block contents
|
||||
manifest*: Manifest # List of block CIDs
|
||||
pad*: bool # Pad last block to manifest.blockSize?
|
||||
|
||||
method initStream*(s: SeekableStoreStream) =
|
||||
if s.objName.len == 0:
|
||||
s.objName = SeekableStoreStreamTrackerName
|
||||
|
||||
procCall SeekableStream(s).initStream()
|
||||
|
||||
proc new*(
|
||||
T: type SeekableStoreStream,
|
||||
store: BlockStore,
|
||||
manifest: Manifest,
|
||||
pad = true
|
||||
): SeekableStoreStream =
|
||||
## Create a new SeekableStoreStream instance for a given store and manifest
|
||||
##
|
||||
result = SeekableStoreStream(
|
||||
store: store,
|
||||
manifest: manifest,
|
||||
pad: pad,
|
||||
offset: 0)
|
||||
|
||||
result.initStream()
|
||||
|
||||
method `size`*(self: SeekableStoreStream): int =
|
||||
bytes(self.manifest, self.pad).int
|
||||
|
||||
proc `size=`*(self: SeekableStoreStream, size: int)
|
||||
{.error: "Setting the size is forbidden".} =
|
||||
discard
|
||||
|
||||
method atEof*(self: SeekableStoreStream): bool =
|
||||
self.offset >= self.size
|
||||
|
||||
method readOnce*(
|
||||
self: SeekableStoreStream,
|
||||
pbytes: pointer,
|
||||
nbytes: int
|
||||
): Future[int] {.async.} =
|
||||
## Read `nbytes` from current position in the SeekableStoreStream into output buffer pointed by `pbytes`.
|
||||
## Return how many bytes were actually read before EOF was encountered.
|
||||
## Raise exception if we are already at EOF.
|
||||
##
|
||||
|
||||
trace "Reading from manifest", cid = self.manifest.cid.get(), blocks = self.manifest.blocksCount
|
||||
if self.atEof:
|
||||
raise newLPStreamEOFError()
|
||||
|
||||
# The loop iterates over blocks in the SeekableStoreStream,
|
||||
# reading them and copying their data into outbuf
|
||||
var read = 0 # Bytes read so far, and thus write offset in the outbuf
|
||||
while read < nbytes and not self.atEof:
|
||||
# Compute from the current stream position `self.offset` the block num/offset to read
|
||||
# Compute how many bytes to read from this block
|
||||
let
|
||||
blockNum = self.offset div self.manifest.blockSize.int
|
||||
blockOffset = self.offset mod self.manifest.blockSize.int
|
||||
readBytes = min([self.size - self.offset,
|
||||
nbytes - read,
|
||||
self.manifest.blockSize.int - blockOffset])
|
||||
|
||||
# Read contents of block `blockNum`
|
||||
without blk =? await self.store.getBlock(self.manifest.treeCid, blockNum, self.manifest.treeRoot), error:
|
||||
raise newLPStreamReadError(error)
|
||||
|
||||
trace "Reading bytes from store stream", blockNum, cid = blk.cid, bytes = readBytes, blockOffset
|
||||
|
||||
# Copy `readBytes` bytes starting at `blockOffset` from the block into the outbuf
|
||||
if blk.isEmpty:
|
||||
zeroMem(pbytes.offset(read), readBytes)
|
||||
else:
|
||||
copyMem(pbytes.offset(read), blk.data[blockOffset].addr, readBytes)
|
||||
|
||||
# Update current positions in the stream and outbuf
|
||||
self.offset += readBytes
|
||||
read += readBytes
|
||||
|
||||
return read
|
||||
|
||||
method closeImpl*(self: SeekableStoreStream) {.async.} =
|
||||
trace "Closing SeekableStoreStream"
|
||||
self.offset = self.size # set Eof
|
||||
await procCall LPStream(self).closeImpl()
|
||||
@ -33,18 +33,20 @@ const
|
||||
StoreStreamTrackerName* = "StoreStream"
|
||||
|
||||
type
|
||||
# Make SeekableStream from a sequence of blocks stored in Manifest
|
||||
# (only original file data - see StoreStream.size)
|
||||
StoreStream* = ref object of SeekableStream
|
||||
StoreStream* = ref object of LPStream
|
||||
store*: BlockStore # Store where to lookup block contents
|
||||
manifest*: Manifest # List of block CIDs
|
||||
pad*: bool # Pad last block to manifest.blockSize?
|
||||
iter*: AsyncIter[?!Block]
|
||||
lastBlock: Block
|
||||
lastIndex: int
|
||||
offset: int
|
||||
|
||||
method initStream*(s: StoreStream) =
|
||||
if s.objName.len == 0:
|
||||
s.objName = StoreStreamTrackerName
|
||||
|
||||
procCall SeekableStream(s).initStream()
|
||||
procCall LPStream(s).initStream()
|
||||
|
||||
proc new*(
|
||||
T: type StoreStream,
|
||||
@ -58,6 +60,7 @@ proc new*(
|
||||
store: store,
|
||||
manifest: manifest,
|
||||
pad: pad,
|
||||
lastIndex: -1,
|
||||
offset: 0)
|
||||
|
||||
result.initStream()
|
||||
@ -85,32 +88,34 @@ method readOnce*(
|
||||
trace "Reading from manifest", cid = self.manifest.cid.get(), blocks = self.manifest.blocksCount
|
||||
if self.atEof:
|
||||
raise newLPStreamEOFError()
|
||||
|
||||
# Initialize a block iterator
|
||||
if self.lastIndex < 0:
|
||||
without iter =? await self.store.getBlocks(self.manifest.treeCid, self.manifest.blocksCount, self.manifest.treeRoot), err:
|
||||
raise newLPStreamReadError(err)
|
||||
self.iter = iter
|
||||
|
||||
# The loop iterates over blocks in the StoreStream,
|
||||
# reading them and copying their data into outbuf
|
||||
var read = 0 # Bytes read so far, and thus write offset in the outbuf
|
||||
while read < nbytes and not self.atEof:
|
||||
# Compute from the current stream position `self.offset` the block num/offset to read
|
||||
if self.offset >= (self.lastIndex + 1) * self.manifest.blockSize.int:
|
||||
if not self.iter.finished:
|
||||
without lastBlock =? await self.iter.next(), err:
|
||||
raise newLPStreamReadError(err)
|
||||
self.lastBlock = lastBlock
|
||||
inc self.lastIndex
|
||||
else:
|
||||
raise newLPStreamReadError(newException(CodexError, "Block iterator finished prematurely"))
|
||||
# Compute how many bytes to read from this block
|
||||
let
|
||||
blockNum = self.offset div self.manifest.blockSize.int
|
||||
blockOffset = self.offset mod self.manifest.blockSize.int
|
||||
readBytes = min([self.size - self.offset,
|
||||
nbytes - read,
|
||||
self.manifest.blockSize.int - blockOffset])
|
||||
address = BlockAddress(leaf: true, treeCid: self.manifest.treeCid, index: blockNum)
|
||||
|
||||
# Read contents of block `blockNum`
|
||||
without blk =? await self.store.getBlock(address), error:
|
||||
raise newLPStreamReadError(error)
|
||||
|
||||
trace "Reading bytes from store stream", blockNum, cid = blk.cid, bytes = readBytes, blockOffset
|
||||
|
||||
# Copy `readBytes` bytes starting at `blockOffset` from the block into the outbuf
|
||||
if blk.isEmpty:
|
||||
if self.lastBlock.isEmpty:
|
||||
zeroMem(pbytes.offset(read), readBytes)
|
||||
else:
|
||||
copyMem(pbytes.offset(read), blk.data[blockOffset].addr, readBytes)
|
||||
copyMem(pbytes.offset(read), self.lastBlock.data[blockOffset].addr, readBytes)
|
||||
|
||||
# Update current positions in the stream and outbuf
|
||||
self.offset += readBytes
|
||||
|
||||
@ -35,6 +35,13 @@ proc orElse*[A](a, b: Option[A]): Option[A] =
|
||||
else:
|
||||
b
|
||||
|
||||
template `..<`*(a, b: untyped): untyped =
|
||||
## A shortcut for `a .. pred(b)`.
|
||||
## ```
|
||||
## for i in 5 ..< 9:
|
||||
## echo i # => 5; 6; 7; 8
|
||||
## ```
|
||||
a .. (when b is BackwardsIndex: succ(b) else: pred(b))
|
||||
|
||||
when not declared(parseDuration): # Odd code formatting to minimize diff v. mainLine
|
||||
const Whitespace = {' ', '\t', '\v', '\r', '\l', '\f'}
|
||||
|
||||
@ -1,16 +1,13 @@
|
||||
import std/sugar
|
||||
|
||||
import pkg/questionable
|
||||
import pkg/chronos
|
||||
import pkg/upraises
|
||||
|
||||
type
|
||||
Function*[T, U] = proc(fut: T): U {.upraises: [CatchableError], gcsafe, closure.}
|
||||
IsFinished* = proc(): bool {.upraises: [], gcsafe, closure.}
|
||||
GenNext*[T] = proc(): T {.upraises: [CatchableError], gcsafe, closure.}
|
||||
MapItem*[T, U] = proc(fut: T): U {.upraises: [CatchableError], gcsafe, closure.}
|
||||
NextItem*[T] = proc(): T {.upraises: [CatchableError], gcsafe, closure.}
|
||||
Iter*[T] = ref object
|
||||
finished: bool
|
||||
next*: GenNext[T]
|
||||
finished*: bool
|
||||
next*: NextItem[T]
|
||||
AsyncIter*[T] = Iter[Future[T]]
|
||||
|
||||
proc finish*[T](self: Iter[T]): void =
|
||||
@ -23,126 +20,66 @@ iterator items*[T](self: Iter[T]): T =
|
||||
while not self.finished:
|
||||
yield self.next()
|
||||
|
||||
iterator pairs*[T](self: Iter[T]): tuple[key: int, val: T] {.inline.} =
|
||||
var i = 0
|
||||
while not self.finished:
|
||||
yield (i, self.next())
|
||||
inc(i)
|
||||
proc map*[T, U](wrappedIter: Iter[T], mapItem: MapItem[T, U]): Iter[U] =
|
||||
var iter = Iter[U]()
|
||||
|
||||
proc map*[T, U](fut: Future[T], fn: Function[T, U]): Future[U] {.async.} =
|
||||
let t = await fut
|
||||
fn(t)
|
||||
proc checkFinish(): void =
|
||||
if wrappedIter.finished:
|
||||
iter.finish
|
||||
|
||||
checkFinish()
|
||||
|
||||
proc next(): U {.upraises: [CatchableError].} =
|
||||
if not iter.finished:
|
||||
let fut = wrappedIter.next()
|
||||
checkFinish()
|
||||
return mapItem(fut)
|
||||
else:
|
||||
raise newException(CatchableError, "Iterator finished, but next element was requested")
|
||||
|
||||
iter.next = next
|
||||
return iter
|
||||
|
||||
proc prefetch*[T](wrappedIter: Iter[T], n: Positive): Iter[T] =
|
||||
|
||||
var ringBuf = newSeq[T](n)
|
||||
var wrappedLen = int.high
|
||||
|
||||
proc new*[T](_: type Iter, genNext: GenNext[T], isFinished: IsFinished, finishOnErr: bool = true): Iter[T] =
|
||||
var iter = Iter[T]()
|
||||
|
||||
proc tryFetch(i: int): void =
|
||||
if not wrappedIter.finished:
|
||||
let res = wrappedIter.next()
|
||||
ringBuf[i mod n] = res
|
||||
if wrappedIter.finished:
|
||||
wrappedLen = min(i + 1, wrappedLen)
|
||||
else:
|
||||
if i == 0:
|
||||
wrappedLen = 0
|
||||
|
||||
proc checkLen(i: int): void =
|
||||
if i >= wrappedLen:
|
||||
iter.finish
|
||||
|
||||
# initialize buf with n prefetched values
|
||||
for i in 0..<n:
|
||||
tryFetch(i)
|
||||
|
||||
checkLen(0)
|
||||
|
||||
var i = 0
|
||||
proc next(): T {.upraises: [CatchableError].} =
|
||||
if not iter.finished:
|
||||
var item: T
|
||||
try:
|
||||
item = genNext()
|
||||
except CatchableError as err:
|
||||
if finishOnErr or isFinished():
|
||||
iter.finish
|
||||
raise err
|
||||
|
||||
if isFinished():
|
||||
iter.finish
|
||||
return item
|
||||
let fut = ringBuf[i mod n]
|
||||
# prefetch a value
|
||||
tryFetch(i + n)
|
||||
inc i
|
||||
checkLen(i)
|
||||
|
||||
return fut
|
||||
else:
|
||||
raise newException(CatchableError, "Iterator is finished but next item was requested")
|
||||
|
||||
if isFinished():
|
||||
iter.finish
|
||||
raise newException(CatchableError, "Iterator finished, but next element was requested")
|
||||
|
||||
iter.next = next
|
||||
return iter
|
||||
|
||||
proc fromItems*[T](_: type Iter, items: seq[T]): Iter[T] =
|
||||
## Create new iterator from items
|
||||
##
|
||||
|
||||
Iter.fromSlice(0..<items.len)
|
||||
.map((i: int) => items[i])
|
||||
|
||||
proc fromSlice*[U, V: Ordinal](_: type Iter, slice: HSlice[U, V]): Iter[U] =
|
||||
## Creates new iterator from slice
|
||||
##
|
||||
|
||||
Iter.fromRange(slice.a.int, slice.b.int, 1)
|
||||
|
||||
proc fromRange*[U, V, S: Ordinal](_: type Iter, a: U, b: V, step: S = 1): Iter[U] =
|
||||
## Creates new iterator in range a..b with specified step (default 1)
|
||||
##
|
||||
|
||||
var i = a
|
||||
|
||||
proc genNext(): U =
|
||||
let u = i
|
||||
inc(i, step)
|
||||
u
|
||||
|
||||
proc isFinished(): bool =
|
||||
(step > 0 and i > b) or
|
||||
(step < 0 and i < b)
|
||||
|
||||
Iter.new(genNext, isFinished)
|
||||
|
||||
proc map*[T, U](iter: Iter[T], fn: Function[T, U]): Iter[U] =
|
||||
Iter.new(
|
||||
genNext = () => fn(iter.next()),
|
||||
isFinished = () => iter.finished
|
||||
)
|
||||
|
||||
proc filter*[T](iter: Iter[T], predicate: Function[T, bool]): Iter[T] =
|
||||
var nextT: Option[T]
|
||||
|
||||
proc tryFetch(): void =
|
||||
nextT = T.none
|
||||
while not iter.finished:
|
||||
let t = iter.next()
|
||||
if predicate(t):
|
||||
nextT = some(t)
|
||||
break
|
||||
|
||||
proc genNext(): T =
|
||||
let t = nextT.unsafeGet
|
||||
tryFetch()
|
||||
return t
|
||||
|
||||
proc isFinished(): bool =
|
||||
nextT.isNone
|
||||
|
||||
tryFetch()
|
||||
Iter.new(genNext, isFinished)
|
||||
|
||||
proc prefetch*[T](iter: Iter[T], n: Positive): Iter[T] =
|
||||
var ringBuf = newSeq[T](n)
|
||||
var iterLen = int.high
|
||||
var i = 0
|
||||
|
||||
proc tryFetch(j: int): void =
|
||||
if not iter.finished:
|
||||
let item = iter.next()
|
||||
ringBuf[j mod n] = item
|
||||
if iter.finished:
|
||||
iterLen = min(j + 1, iterLen)
|
||||
else:
|
||||
if j == 0:
|
||||
iterLen = 0
|
||||
|
||||
proc genNext(): T =
|
||||
let item = ringBuf[i mod n]
|
||||
tryFetch(i + n)
|
||||
inc i
|
||||
return item
|
||||
|
||||
proc isFinished(): bool =
|
||||
i >= iterLen
|
||||
|
||||
# initialize ringBuf with n prefetched values
|
||||
for j in 0..<n:
|
||||
tryFetch(j)
|
||||
|
||||
Iter.new(genNext, isFinished)
|
||||
|
||||
|
||||
@ -242,13 +242,8 @@ asyncchecksuite "E2E - Multiple Nodes Discovery":
|
||||
.publishBlockProvideHandler = proc(d: MockDiscovery, cid: Cid): Future[void] {.async.} =
|
||||
advertised[cid] = switch[3].peerInfo.signedPeerRecord
|
||||
|
||||
discard blocks[0..5].mapIt(blockexc[1].engine.pendingBlocks.getWantHandle(it.address))
|
||||
await blockexc[1].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, blocks[0..5].mapIt(BlockDelivery(blk: it, address: it.address)))
|
||||
|
||||
discard blocks[4..10].mapIt(blockexc[2].engine.pendingBlocks.getWantHandle(it.address))
|
||||
await blockexc[2].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, blocks[4..10].mapIt(BlockDelivery(blk: it, address: it.address)))
|
||||
|
||||
discard blocks[10..15].mapIt(blockexc[3].engine.pendingBlocks.getWantHandle(it.address))
|
||||
await blockexc[3].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, blocks[10..15].mapIt(BlockDelivery(blk: it, address: it.address)))
|
||||
|
||||
MockDiscovery(blockexc[0].engine.discovery.discovery)
|
||||
@ -289,13 +284,8 @@ asyncchecksuite "E2E - Multiple Nodes Discovery":
|
||||
.publishBlockProvideHandler = proc(d: MockDiscovery, cid: Cid): Future[void] {.async.} =
|
||||
advertised[cid] = switch[3].peerInfo.signedPeerRecord
|
||||
|
||||
discard blocks[0..5].mapIt(blockexc[1].engine.pendingBlocks.getWantHandle(it.address))
|
||||
await blockexc[1].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, blocks[0..5].mapIt(BlockDelivery(blk: it, address: it.address)))
|
||||
|
||||
discard blocks[4..10].mapIt(blockexc[2].engine.pendingBlocks.getWantHandle(it.address))
|
||||
await blockexc[2].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, blocks[4..10].mapIt(BlockDelivery(blk: it, address: it.address)))
|
||||
|
||||
discard blocks[10..15].mapIt(blockexc[3].engine.pendingBlocks.getWantHandle(it.address))
|
||||
await blockexc[3].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, blocks[10..15].mapIt(BlockDelivery(blk: it, address: it.address)))
|
||||
|
||||
MockDiscovery(blockexc[0].engine.discovery.discovery)
|
||||
|
||||
@ -44,16 +44,28 @@ proc makeManifestAndTree*(blocks: seq[Block]): ?!(Manifest, MerkleTree) =
|
||||
return failure("Blocks list was empty")
|
||||
|
||||
let
|
||||
mcodec = (? blocks[0].cid.mhash.mapFailure).mcodec
|
||||
datasetSize = blocks.mapIt(it.data.len).foldl(a + b)
|
||||
blockSize = blocks.mapIt(it.data.len).foldl(max(a, b))
|
||||
tree = ? MerkleTree.init(blocks.mapIt(it.cid))
|
||||
treeCid = ? tree.rootCid
|
||||
|
||||
var builder = ? MerkleTreeBuilder.init(mcodec)
|
||||
|
||||
for b in blocks:
|
||||
let mhash = ? b.cid.mhash.mapFailure
|
||||
if mhash.mcodec != mcodec:
|
||||
return failure("Blocks are not using the same codec")
|
||||
? builder.addLeaf(mhash)
|
||||
|
||||
let
|
||||
tree = ? builder.build()
|
||||
treeBlk = ? Block.new(tree.encode())
|
||||
manifest = Manifest.new(
|
||||
treeCid = treeCid,
|
||||
treeCid = treeBlk.cid,
|
||||
treeRoot = tree.root,
|
||||
blockSize = NBytes(blockSize),
|
||||
datasetSize = NBytes(datasetSize),
|
||||
version = CIDv1,
|
||||
hcodec = tree.mcodec
|
||||
hcodec = mcodec
|
||||
)
|
||||
|
||||
return success((manifest, tree))
|
||||
@ -77,28 +89,30 @@ proc makeWantList*(
|
||||
full: full)
|
||||
|
||||
proc storeDataGetManifest*(store: BlockStore, chunker: Chunker): Future[Manifest] {.async.} =
|
||||
var cids = newSeq[Cid]()
|
||||
var builder = MerkleTreeBuilder.init().tryGet()
|
||||
|
||||
while (
|
||||
let chunk = await chunker.getBytes();
|
||||
chunk.len > 0):
|
||||
|
||||
let blk = Block.new(chunk).tryGet()
|
||||
cids.add(blk.cid)
|
||||
# builder.addDataBlock(blk.data).tryGet()
|
||||
let mhash = blk.cid.mhash.mapFailure.tryGet()
|
||||
builder.addLeaf(mhash).tryGet()
|
||||
(await store.putBlock(blk)).tryGet()
|
||||
|
||||
let
|
||||
tree = MerkleTree.init(cids).tryGet()
|
||||
treeCid = tree.rootCid.tryGet()
|
||||
manifest = Manifest.new(
|
||||
treeCid = treeCid,
|
||||
blockSize = NBytes(chunker.chunkSize),
|
||||
datasetSize = NBytes(chunker.offset),
|
||||
)
|
||||
let
|
||||
tree = builder.build().tryGet()
|
||||
treeBlk = Block.new(tree.encode()).tryGet()
|
||||
|
||||
for i in 0..<tree.leavesCount:
|
||||
let proof = tree.getProof(i).tryGet()
|
||||
(await store.putBlockCidAndProof(treeCid, i, cids[i], proof)).tryGet()
|
||||
let manifest = Manifest.new(
|
||||
treeCid = treeBlk.cid,
|
||||
treeRoot = tree.root,
|
||||
blockSize = NBytes(chunker.chunkSize),
|
||||
datasetSize = NBytes(chunker.offset),
|
||||
)
|
||||
|
||||
(await store.putBlock(treeBlk)).tryGet()
|
||||
|
||||
return manifest
|
||||
|
||||
@ -116,7 +130,7 @@ proc corruptBlocks*(
|
||||
|
||||
pos.add(i)
|
||||
var
|
||||
blk = (await store.getBlock(manifest.treeCid, i)).tryGet()
|
||||
blk = (await store.getBlock(manifest.treeCid, i, manifest.treeRoot)).tryGet()
|
||||
bytePos: seq[int]
|
||||
|
||||
doAssert bytes < blk.data.len
|
||||
|
||||
@ -40,6 +40,7 @@ method getBlockExpirations*(self: MockRepoStore, maxNumber: int, offset: int): F
|
||||
self.getBeOffset = offset
|
||||
|
||||
var iter = AsyncIter[?BlockExpiration]()
|
||||
iter.finished = false
|
||||
|
||||
self.iteratorIndex = offset
|
||||
var numberLeft = maxNumber
|
||||
|
||||
@ -56,7 +56,7 @@ checksuite "merkletree":
|
||||
let tree = builder.build().tryGet()
|
||||
|
||||
check:
|
||||
tree.leaves.toSeq == expectedLeaves[0..0]
|
||||
tree.leaves == expectedLeaves[0..0]
|
||||
tree.root == expectedLeaves[0]
|
||||
tree.len == 1
|
||||
|
||||
@ -69,7 +69,7 @@ checksuite "merkletree":
|
||||
let expectedRoot = combine(expectedLeaves[0], expectedLeaves[1])
|
||||
|
||||
check:
|
||||
tree.leaves.toSeq == expectedLeaves[0..1]
|
||||
tree.leaves == expectedLeaves[0..1]
|
||||
tree.len == 3
|
||||
tree.root == expectedRoot
|
||||
|
||||
@ -87,7 +87,7 @@ checksuite "merkletree":
|
||||
)
|
||||
|
||||
check:
|
||||
tree.leaves.toSeq == expectedLeaves[0..2]
|
||||
tree.leaves == expectedLeaves[0..2]
|
||||
tree.len == 6
|
||||
tree.root == expectedRoot
|
||||
|
||||
@ -126,7 +126,7 @@ checksuite "merkletree":
|
||||
)
|
||||
|
||||
check:
|
||||
tree.leaves.toSeq == expectedLeaves[0..8]
|
||||
tree.leaves == expectedLeaves[0..8]
|
||||
tree.len == 20
|
||||
tree.root == expectedRoot
|
||||
|
||||
|
||||
@ -29,7 +29,7 @@ asyncchecksuite "Erasure encode/decode":
|
||||
metaDs = SQLiteDatastore.new(Memory).tryGet()
|
||||
rng = Rng.instance()
|
||||
chunker = RandomChunker.new(rng, size = dataSetSize, chunkSize = BlockSize)
|
||||
store = RepoStore.new(repoDs, metaDs)
|
||||
store = CacheStore.new(cacheSize = (dataSetSize * 4), chunkSize = BlockSize)
|
||||
erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider)
|
||||
manifest = await storeDataGetManifest(store, chunker)
|
||||
|
||||
@ -61,19 +61,18 @@ asyncchecksuite "Erasure encode/decode":
|
||||
for _ in 0..<encoded.ecM:
|
||||
dropped.add(column)
|
||||
(await store.delBlock(encoded.treeCid, column)).tryGet()
|
||||
(await store.delBlock(manifest.treeCid, column)).tryGet()
|
||||
column.inc(encoded.steps - 1)
|
||||
|
||||
var
|
||||
decoded = (await erasure.decode(encoded)).tryGet()
|
||||
|
||||
check:
|
||||
decoded.treeCid == manifest.treeCid
|
||||
decoded.treeCid == encoded.originalTreeCid
|
||||
decoded.cid.tryGet() == manifest.cid.tryGet()
|
||||
decoded.cid.tryGet() == encoded.originalCid
|
||||
decoded.blocksCount == encoded.originalBlocksCount
|
||||
|
||||
for d in dropped:
|
||||
let present = await store.hasBlock(manifest.treeCid, d)
|
||||
let present = await store.hasBlock(encoded.treeCid, d)
|
||||
check present.tryGet()
|
||||
|
||||
test "Should not tolerate losing more than M data blocks in a single random column":
|
||||
@ -90,7 +89,6 @@ asyncchecksuite "Erasure encode/decode":
|
||||
for _ in 0..<encoded.ecM + 1:
|
||||
dropped.add(column)
|
||||
(await store.delBlock(encoded.treeCid, column)).tryGet()
|
||||
(await store.delBlock(manifest.treeCid, column)).tryGet()
|
||||
column.inc(encoded.steps)
|
||||
|
||||
var
|
||||
@ -100,7 +98,7 @@ asyncchecksuite "Erasure encode/decode":
|
||||
decoded = (await erasure.decode(encoded)).tryGet()
|
||||
|
||||
for d in dropped:
|
||||
let present = await store.hasBlock(manifest.treeCid, d)
|
||||
let present = await store.hasBlock(encoded.treeCid, d)
|
||||
check not present.tryGet()
|
||||
|
||||
test "Should tolerate losing M data blocks in M random columns":
|
||||
@ -124,7 +122,6 @@ asyncchecksuite "Erasure encode/decode":
|
||||
|
||||
for idx in blocks:
|
||||
(await store.delBlock(encoded.treeCid, idx)).tryGet()
|
||||
(await store.delBlock(manifest.treeCid, idx)).tryGet()
|
||||
discard
|
||||
|
||||
discard (await erasure.decode(encoded)).tryGet()
|
||||
@ -152,7 +149,7 @@ asyncchecksuite "Erasure encode/decode":
|
||||
var idx: int
|
||||
while true:
|
||||
idx = rng.sample(blockIdx, blocks)
|
||||
let blk = (await store.getBlock(encoded.treeCid, idx)).tryGet()
|
||||
let blk = (await store.getBlock(encoded.treeCid, idx, encoded.treeRoot)).tryGet()
|
||||
if not blk.isEmpty:
|
||||
break
|
||||
|
||||
@ -161,7 +158,6 @@ asyncchecksuite "Erasure encode/decode":
|
||||
|
||||
for idx in blocks:
|
||||
(await store.delBlock(encoded.treeCid, idx)).tryGet()
|
||||
(await store.delBlock(manifest.treeCid, idx)).tryGet()
|
||||
discard
|
||||
|
||||
var
|
||||
@ -179,7 +175,6 @@ asyncchecksuite "Erasure encode/decode":
|
||||
|
||||
for b in 0..<encoded.steps * encoded.ecM:
|
||||
(await store.delBlock(encoded.treeCid, b)).tryGet()
|
||||
(await store.delBlock(manifest.treeCid, b)).tryGet()
|
||||
|
||||
discard (await erasure.decode(encoded)).tryGet()
|
||||
|
||||
@ -196,7 +191,6 @@ asyncchecksuite "Erasure encode/decode":
|
||||
|
||||
for b in (encoded.blocksCount - encoded.steps * encoded.ecM)..<encoded.blocksCount:
|
||||
(await store.delBlock(encoded.treeCid, b)).tryGet()
|
||||
(await store.delBlock(manifest.treeCid, b)).tryGet()
|
||||
|
||||
discard (await erasure.decode(encoded)).tryGet()
|
||||
|
||||
|
||||
@ -17,6 +17,7 @@ checksuite "Manifest":
|
||||
var
|
||||
manifest = Manifest.new(
|
||||
treeCid = Cid.example,
|
||||
treeRoot = MultiHash.example,
|
||||
blockSize = 1.MiBs,
|
||||
datasetSize = 100.MiBs)
|
||||
|
||||
|
||||
@ -102,7 +102,8 @@ asyncchecksuite "Test Node":
|
||||
fetched = (await node.fetchManifest(manifestBlock.cid)).tryGet()
|
||||
|
||||
check:
|
||||
fetched == manifest
|
||||
fetched.cid == manifest.cid
|
||||
# fetched.blocks == manifest.blocks
|
||||
|
||||
test "Block Batching":
|
||||
let
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user