mirror of
https://github.com/codex-storage/nim-codex.git
synced 2025-02-20 00:28:07 +00:00
Blockexchange uses merkle root and index to fetch blocks
This commit is contained in:
parent
0497d13f6f
commit
bbe59238a0
@ -122,9 +122,9 @@ proc stop*(b: BlockExcEngine) {.async.} =
|
||||
|
||||
|
||||
proc sendWantHave(
|
||||
b: BlockExcEngine,
|
||||
address: BlockAddress,
|
||||
selectedPeer: BlockExcPeerCtx,
|
||||
b: BlockExcEngine,
|
||||
address: BlockAddress,
|
||||
selectedPeer: BlockExcPeerCtx,
|
||||
peers: seq[BlockExcPeerCtx]): Future[void] {.async.} =
|
||||
trace "Sending wantHave request to peers", address
|
||||
for p in peers:
|
||||
@ -137,8 +137,8 @@ proc sendWantHave(
|
||||
wantType = WantType.WantHave) # we only want to know if the peer has the block
|
||||
|
||||
proc sendWantBlock(
|
||||
b: BlockExcEngine,
|
||||
address: BlockAddress,
|
||||
b: BlockExcEngine,
|
||||
address: BlockAddress,
|
||||
blockPeer: BlockExcPeerCtx): Future[void] {.async.} =
|
||||
trace "Sending wantBlock request to", peer = blockPeer.id, address
|
||||
await b.network.request.sendWantList(
|
||||
@ -189,14 +189,14 @@ proc requestBlock(
|
||||
if peers.len == 0:
|
||||
b.discovery.queueFindBlocksReq(@[address.cidOrTreeCid])
|
||||
|
||||
let maybePeer =
|
||||
let maybePeer =
|
||||
if peers.len > 0:
|
||||
peers[index mod peers.len].some
|
||||
elif b.peers.len > 0:
|
||||
toSeq(b.peers)[index mod b.peers.len].some
|
||||
else:
|
||||
BlockExcPeerCtx.none
|
||||
|
||||
|
||||
if peer =? maybePeer:
|
||||
asyncSpawn b.monitorBlockHandle(blockFuture, address, peer.id)
|
||||
treeReq.setInFlight(address)
|
||||
@ -204,9 +204,91 @@ proc requestBlock(
|
||||
codexBlockExchangeWantBlockListsSent.inc()
|
||||
await b.sendWantHave(address, peer, toSeq(b.peers))
|
||||
codexBlockExchangeWantHaveListsSent.inc()
|
||||
|
||||
|
||||
return await blockFuture
|
||||
|
||||
proc requestBlock(
|
||||
b: BlockExcEngine,
|
||||
treeReq: TreeReq,
|
||||
index: Natural,
|
||||
timeout = DefaultBlockTimeout
|
||||
): Future[Block] {.async.} =
|
||||
let address = BlockAddress(leaf: true, treeCid: treeReq.treeCid, index: index)
|
||||
|
||||
let handleOrCid = treeReq.getWantHandleOrCid(index, timeout)
|
||||
if handleOrCid.resolved:
|
||||
without blk =? await b.localStore.getBlock(handleOrCid.cid), err:
|
||||
return await b.requestBlock(handleOrCid.cid, timeout)
|
||||
return blk
|
||||
|
||||
let blockFuture = handleOrCid.handle
|
||||
|
||||
if treeReq.isInFlight(index):
|
||||
return await blockFuture
|
||||
|
||||
let peers = b.peers.selectCheapest(address)
|
||||
if peers.len == 0:
|
||||
b.discovery.queueFindBlocksReq(@[treeReq.treeCid])
|
||||
|
||||
let maybePeer =
|
||||
if peers.len > 0:
|
||||
peers[index mod peers.len].some
|
||||
elif b.peers.len > 0:
|
||||
toSeq(b.peers)[index mod b.peers.len].some
|
||||
else:
|
||||
BlockExcPeerCtx.none
|
||||
|
||||
if peer =? maybePeer:
|
||||
asyncSpawn b.monitorBlockHandle(blockFuture, address, peer.id)
|
||||
treeReq.trySetInFlight(index)
|
||||
await b.sendWantBlock(address, peer)
|
||||
codexBlockExchangeWantBlockListsSent.inc()
|
||||
await b.sendWantHave(address, peer, toSeq(b.peers))
|
||||
codexBlockExchangeWantHaveListsSent.inc()
|
||||
|
||||
return await blockFuture
|
||||
|
||||
proc requestBlock*(
|
||||
b: BlockExcEngine,
|
||||
treeCid: Cid,
|
||||
index: Natural,
|
||||
merkleRoot: MultiHash,
|
||||
timeout = DefaultBlockTimeout
|
||||
): Future[Block] =
|
||||
without treeReq =? b.pendingBlocks.getOrPutTreeReq(treeCid, Natural.none, merkleRoot), err:
|
||||
raise err
|
||||
|
||||
return b.requestBlock(treeReq, index, timeout)
|
||||
|
||||
proc requestBlocks*(
|
||||
b: BlockExcEngine,
|
||||
treeCid: Cid,
|
||||
leavesCount: Natural,
|
||||
merkleRoot: MultiHash,
|
||||
timeout = DefaultBlockTimeout
|
||||
): ?!AsyncIter[Block] =
|
||||
without treeReq =? b.pendingBlocks.getOrPutTreeReq(treeCid, leavesCount.some, merkleRoot), err:
|
||||
return failure(err)
|
||||
|
||||
var
|
||||
iter = AsyncIter[Block]()
|
||||
index = 0
|
||||
|
||||
proc next(): Future[Block] =
|
||||
if index < leavesCount:
|
||||
let fut = b.requestBlock(treeReq, index, timeout)
|
||||
inc index
|
||||
if index >= leavesCount:
|
||||
iter.finished = true
|
||||
return fut
|
||||
else:
|
||||
let fut = newFuture[Block]("engine.requestBlocks")
|
||||
fut.fail(newException(CodexError, "No more elements for tree with cid " & $treeCid))
|
||||
return fut
|
||||
|
||||
iter.next = next
|
||||
return success(iter)
|
||||
|
||||
proc blockPresenceHandler*(
|
||||
b: BlockExcEngine,
|
||||
peer: PeerId,
|
||||
@ -492,7 +574,7 @@ proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} =
|
||||
trace "Handling lookup for entry", address = e.address
|
||||
if e.address.leaf:
|
||||
(await b.localStore.getBlockAndProof(e.address.treeCid, e.address.index)).map(
|
||||
(blkAndProof: (Block, MerkleProof)) =>
|
||||
(blkAndProof: (Block, MerkleProof)) =>
|
||||
BlockDelivery(address: e.address, blk: blkAndProof[0], proof: blkAndProof[1].some)
|
||||
)
|
||||
else:
|
||||
|
@ -14,12 +14,19 @@ import pkg/upraises
|
||||
|
||||
push: {.upraises: [].}
|
||||
|
||||
import ../../blocktype
|
||||
import pkg/chronicles
|
||||
import pkg/questionable
|
||||
import pkg/questionable/options
|
||||
import pkg/questionable/results
|
||||
import pkg/chronos
|
||||
import pkg/libp2p
|
||||
import pkg/metrics
|
||||
|
||||
import ../../blocktype
|
||||
import ../protobuf/blockexc
|
||||
|
||||
import ../../merkletree
|
||||
import ../../utils
|
||||
|
||||
logScope:
|
||||
topics = "codex pendingblocks"
|
||||
@ -36,12 +43,119 @@ type
|
||||
inFlight*: bool
|
||||
startTime*: int64
|
||||
|
||||
LeafReq* = object
|
||||
case delivered*: bool
|
||||
of false:
|
||||
handle*: Future[Block]
|
||||
inFlight*: bool
|
||||
of true:
|
||||
leaf: MultiHash
|
||||
blkCid*: Cid
|
||||
|
||||
TreeReq* = ref object
|
||||
leaves*: Table[Natural, LeafReq]
|
||||
deliveredCount*: Natural
|
||||
leavesCount*: ?Natural
|
||||
treeRoot*: MultiHash
|
||||
treeCid*: Cid
|
||||
|
||||
TreeHandler* = proc(tree: MerkleTree): Future[void] {.gcsafe.}
|
||||
|
||||
PendingBlocksManager* = ref object of RootObj
|
||||
blocks*: Table[BlockAddress, BlockReq] # pending Block requests
|
||||
|
||||
proc updatePendingBlockGauge(p: PendingBlocksManager) =
|
||||
codexBlockExchangePendingBlockRequests.set(p.blocks.len.int64)
|
||||
|
||||
type
|
||||
BlockHandleOrCid = object
|
||||
case resolved*: bool
|
||||
of true:
|
||||
cid*: Cid
|
||||
else:
|
||||
handle*: Future[Block]
|
||||
|
||||
proc buildTree(treeReq: TreeReq): ?!MerkleTree =
|
||||
trace "Building a merkle tree from leaves", treeCid = treeReq.treeCid, leavesCount = treeReq.leavesCount
|
||||
|
||||
without leavesCount =? treeReq.leavesCount:
|
||||
return failure("Leaves count is none, cannot build a tree")
|
||||
|
||||
var builder = ? MerkleTreeBuilder.init(treeReq.treeRoot.mcodec)
|
||||
for i in 0..<leavesCount:
|
||||
treeReq.leaves.withValue(i, leafReq):
|
||||
if leafReq.delivered:
|
||||
? builder.addLeaf(leafReq.leaf)
|
||||
else:
|
||||
return failure("Expected all leaves to be delivered but leaf with index " & $i & " was not")
|
||||
do:
|
||||
return failure("Missing a leaf with index " & $i)
|
||||
|
||||
let tree = ? builder.build()
|
||||
|
||||
if tree.root != treeReq.treeRoot:
|
||||
return failure("Reconstructed tree root doesn't match the original tree root, tree cid is " & $treeReq.treeCid)
|
||||
|
||||
return success(tree)
|
||||
|
||||
proc checkIfAllDelivered(p: PendingBlocksManager, treeReq: TreeReq): void =
|
||||
if treeReq.deliveredCount.some == treeReq.leavesCount:
|
||||
without tree =? buildTree(treeReq), err:
|
||||
error "Error building a tree", msg = err.msg
|
||||
p.trees.del(treeReq.treeCid)
|
||||
return
|
||||
p.trees.del(treeReq.treeCid)
|
||||
try:
|
||||
asyncSpawn p.onTree(tree)
|
||||
except Exception as err:
|
||||
error "Exception when handling tree", msg = err.msg
|
||||
|
||||
proc getWantHandleOrCid*(
|
||||
treeReq: TreeReq,
|
||||
index: Natural,
|
||||
timeout = DefaultBlockTimeout
|
||||
): BlockHandleOrCid =
|
||||
treeReq.leaves.withValue(index, leafReq):
|
||||
if not leafReq.delivered:
|
||||
return BlockHandleOrCid(resolved: false, handle: leafReq.handle)
|
||||
else:
|
||||
return BlockHandleOrCid(resolved: true, cid: leafReq.blkCid)
|
||||
do:
|
||||
let leafReq = LeafReq(
|
||||
delivered: false,
|
||||
handle: newFuture[Block]("pendingBlocks.getWantHandleOrCid"),
|
||||
inFlight: false
|
||||
)
|
||||
treeReq.leaves[index] = leafReq
|
||||
return BlockHandleOrCid(resolved: false, handle: leafReq.handle)
|
||||
|
||||
proc getOrPutTreeReq*(
|
||||
p: PendingBlocksManager,
|
||||
treeCid: Cid,
|
||||
leavesCount = Natural.none, # has value when all leaves are expected to be delivered
|
||||
treeRoot: MultiHash
|
||||
): ?!TreeReq =
|
||||
p.trees.withValue(treeCid, treeReq):
|
||||
if treeReq.treeRoot != treeRoot:
|
||||
return failure("Unexpected root for tree with cid " & $treeCid)
|
||||
|
||||
if leavesCount == treeReq.leavesCount:
|
||||
return success(treeReq[])
|
||||
else:
|
||||
treeReq.leavesCount = treeReq.leavesCount.orElse(leavesCount)
|
||||
let res = success(treeReq[])
|
||||
p.checkIfAllDelivered(treeReq[])
|
||||
return res
|
||||
do:
|
||||
let treeReq = TreeReq(
|
||||
deliveredCount: 0,
|
||||
leavesCount: leavesCount,
|
||||
treeRoot: treeRoot,
|
||||
treeCid: treeCid
|
||||
)
|
||||
p.trees[treeCid] = treeReq
|
||||
return success(treeReq)
|
||||
|
||||
proc getWantHandle*(
|
||||
p: PendingBlocksManager,
|
||||
address: BlockAddress,
|
||||
@ -89,7 +203,7 @@ proc resolve*(
|
||||
without proof =? bd.proof:
|
||||
warn "Missing proof for a block", address = bd.address
|
||||
continue
|
||||
|
||||
|
||||
if proof.index != bd.address.index:
|
||||
warn "Proof index doesn't match leaf index", address = bd.address, proofIndex = proof.index
|
||||
continue
|
||||
@ -120,7 +234,7 @@ proc resolve*(
|
||||
retrievalDurationUs = (stopTime - startTime) div 1000
|
||||
|
||||
blockReq.handle.complete(blk)
|
||||
|
||||
|
||||
codexBlockExchangeRetrievalTimeUs.set(retrievalDurationUs)
|
||||
trace "Block retrieval time", retrievalDurationUs
|
||||
do:
|
||||
@ -170,6 +284,10 @@ iterator wantHandles*(p: PendingBlocksManager): Future[Block] =
|
||||
for v in p.blocks.values:
|
||||
yield v.handle
|
||||
|
||||
|
||||
proc wantListLen*(p: PendingBlocksManager): int =
|
||||
p.blocks.len + p.trees.len
|
||||
|
||||
func len*(p: PendingBlocksManager): int =
|
||||
p.blocks.len
|
||||
|
||||
|
@ -90,7 +90,7 @@ func new*(
|
||||
codec = multiCodec("raw")
|
||||
): ?!Block =
|
||||
## creates a new block for both storage and network IO
|
||||
##
|
||||
##
|
||||
|
||||
let
|
||||
hash = ? MultiHash.digest($mcodec, data).mapFailure
|
||||
@ -126,7 +126,7 @@ func new*(
|
||||
|
||||
proc emptyCid*(version: CidVersion, hcodec: MultiCodec, dcodec: MultiCodec): ?!Cid =
|
||||
## Returns cid representing empty content, given cid version, hash codec and data codec
|
||||
##
|
||||
##
|
||||
|
||||
const
|
||||
Sha256 = multiCodec("sha2-256")
|
||||
@ -155,11 +155,11 @@ proc emptyBlock*(version: CidVersion, hcodec: MultiCodec): ?!Block =
|
||||
.flatMap((cid: Cid) => Block.new(cid = cid, data = @[]))
|
||||
|
||||
proc emptyBlock*(cid: Cid): ?!Block =
|
||||
cid.mhash.mapFailure.flatMap((mhash: MultiHash) =>
|
||||
cid.mhash.mapFailure.flatMap((mhash: MultiHash) =>
|
||||
emptyBlock(cid.cidver, mhash.mcodec))
|
||||
|
||||
proc isEmpty*(cid: Cid): bool =
|
||||
success(cid) == cid.mhash.mapFailure.flatMap((mhash: MultiHash) =>
|
||||
success(cid) == cid.mhash.mapFailure.flatMap((mhash: MultiHash) =>
|
||||
emptyCid(cid.cidver, mhash.mcodec, cid.mcodec))
|
||||
|
||||
proc isEmpty*(blk: Block): bool =
|
||||
|
@ -231,6 +231,8 @@ proc new*(
|
||||
wallet = WalletRef.new(EthPrivateKey.random())
|
||||
network = BlockExcNetwork.new(switch)
|
||||
|
||||
treeReader = TreeReader.new()
|
||||
|
||||
repoData = case config.repoKind
|
||||
of repoFS: Datastore(FSDatastore.new($config.dataDir, depth = 5)
|
||||
.expect("Should create repo file data store!"))
|
||||
|
@ -102,13 +102,13 @@ proc getPendingBlocks(
|
||||
|
||||
proc isFinished(): bool = pendingBlocks.len == 0
|
||||
|
||||
proc genNext(): Future[(?!bt.Block, int)] {.async.} =
|
||||
proc genNext(): Future[(?!bt.Block, int)] {.async.} =
|
||||
let completedFut = await one(pendingBlocks)
|
||||
pendingBlocks.del(pendingBlocks.find(completedFut))
|
||||
return await completedFut
|
||||
|
||||
Iter.new(genNext, isFinished)
|
||||
|
||||
|
||||
proc prepareEncodingData(
|
||||
self: Erasure,
|
||||
manifest: Manifest,
|
||||
@ -130,7 +130,7 @@ proc prepareEncodingData(
|
||||
without blk =? blkOrErr, err:
|
||||
warn "Failed retreiving a block", idx, treeCid = manifest.treeCid
|
||||
continue
|
||||
|
||||
|
||||
let pos = indexToPos(params.steps, idx, step)
|
||||
shallowCopy(data[pos], if blk.isEmpty: emptyBlock else: blk.data)
|
||||
cids[idx] = blk.cid
|
||||
@ -164,7 +164,7 @@ proc prepareDecodingData(
|
||||
## `emptyBlock` - the empty block to be used for padding
|
||||
##
|
||||
|
||||
let
|
||||
let
|
||||
indicies = toSeq(countup(step, encoded.blocksCount - 1, encoded.steps))
|
||||
pendingBlocksIter = self.getPendingBlocks(encoded, indicies)
|
||||
|
||||
@ -266,11 +266,16 @@ proc encodeData(
|
||||
trace "Unable to prepare data", error = err.msg
|
||||
return failure(err)
|
||||
|
||||
trace "Erasure coding data", data = data[].len, parity = parityData.len
|
||||
trace "Encoding block", cid = blk.cid, pos = idx
|
||||
shallowCopy(data[j], blk.data)
|
||||
else:
|
||||
trace "Padding with empty block", pos = idx
|
||||
data[j] = newSeq[byte](manifest.blockSize.int)
|
||||
|
||||
if (
|
||||
let res = encoder.encode(data[], parityData);
|
||||
res.isErr):
|
||||
trace "Erasure coding data", data = data.len, parity = parityData.len
|
||||
|
||||
let res = encoder.encode(data, parityData);
|
||||
if res.isErr:
|
||||
trace "Unable to encode manifest!", error = $res.error
|
||||
return failure($res.error)
|
||||
|
||||
@ -354,12 +359,19 @@ proc decode*(
|
||||
var
|
||||
cids = seq[Cid].new()
|
||||
decoder = self.decoderProvider(encoded.blockSize.int, encoded.ecK, encoded.ecM)
|
||||
emptyBlock = newSeq[byte](encoded.blockSize.int)
|
||||
hasParity = false
|
||||
|
||||
cids[].setLen(encoded.blocksCount)
|
||||
try:
|
||||
for step in 0..<encoded.steps:
|
||||
for i in 0..<encoded.steps:
|
||||
# TODO: Don't allocate a new seq every time, allocate once and zero out
|
||||
let
|
||||
# calculate block indexes to retrieve
|
||||
blockIdx = toSeq(countup(i, encoded.blocksCount - 1, encoded.steps))
|
||||
# request all blocks from the store
|
||||
pendingBlocks = blockIdx.mapIt(
|
||||
self.store.getBlock(encoded.treeCid, it, encoded.treeRoot) # Get the data blocks (first K)
|
||||
)
|
||||
|
||||
# TODO: this is a tight blocking loop so we sleep here to allow
|
||||
# other events to be processed, this should be addressed
|
||||
# by threading
|
||||
@ -369,10 +381,15 @@ proc decode*(
|
||||
data = seq[seq[byte]].new()
|
||||
parityData = seq[seq[byte]].new()
|
||||
recovered = newSeqWith[seq[byte]](encoded.ecK, newSeq[byte](encoded.blockSize.int))
|
||||
idxPendingBlocks = pendingBlocks # copy futures to make using with `one` easier
|
||||
emptyBlock = newSeq[byte](encoded.blockSize.int)
|
||||
resolved = 0
|
||||
|
||||
data[].setLen(encoded.ecK) # set len to K
|
||||
parityData[].setLen(encoded.ecM) # set len to M
|
||||
while true:
|
||||
# Continue to receive blocks until we have just enough for decoding
|
||||
# or no more blocks can arrive
|
||||
if (resolved >= encoded.ecK) or (idxPendingBlocks.len == 0):
|
||||
break
|
||||
|
||||
without (dataPieces, parityPieces) =?
|
||||
(await self.prepareDecodingData(encoded, step, data, parityData, cids, emptyBlock)), err:
|
||||
@ -380,14 +397,14 @@ proc decode*(
|
||||
return failure(err)
|
||||
|
||||
if dataPieces >= encoded.ecK:
|
||||
trace "Retrieved all the required data blocks"
|
||||
trace "Retrieved all the required data blocks", data = dataPieces, parity = parityPieces
|
||||
continue
|
||||
|
||||
trace "Erasure decoding data"
|
||||
trace "Erasure decoding data", data = dataPieces, parity = parityPieces
|
||||
if (
|
||||
let err = decoder.decode(data[], parityData[], recovered);
|
||||
let err = decoder.decode(data, parityData, recovered);
|
||||
err.isErr):
|
||||
trace "Unable to decode data!", err = $err.error
|
||||
trace "Unable to decode manifest!", err = $err.error
|
||||
return failure($err.error)
|
||||
|
||||
for i in 0..<encoded.ecK:
|
||||
@ -420,7 +437,7 @@ proc decode*(
|
||||
|
||||
without treeBlk =? bt.Block.new(tree.encode()), err:
|
||||
return failure(err)
|
||||
|
||||
|
||||
if err =? (await self.store.putBlock(treeBlk)).errorOption:
|
||||
return failure("Unable to store merkle tree block " & $treeBlk.cid & ", nested err: " & err.msg)
|
||||
|
||||
|
@ -45,7 +45,7 @@ type
|
||||
###########################################################
|
||||
|
||||
func computeTreeHeight(leavesCount: int): int =
|
||||
if isPowerOfTwo(leavesCount):
|
||||
if isPowerOfTwo(leavesCount):
|
||||
fastLog2(leavesCount) + 1
|
||||
else:
|
||||
fastLog2(leavesCount) + 2
|
||||
@ -83,16 +83,16 @@ proc init*(
|
||||
|
||||
proc addDataBlock*(self: var MerkleTreeBuilder, dataBlock: openArray[byte]): ?!void =
|
||||
## Hashes the data block and adds the result of hashing to a buffer
|
||||
##
|
||||
##
|
||||
let oldLen = self.buffer.len
|
||||
self.buffer.setLen(oldLen + self.digestSize)
|
||||
digestFn(self.mcodec, self.buffer, oldLen, dataBlock)
|
||||
|
||||
proc addLeaf*(self: var MerkleTreeBuilder, leaf: MultiHash): ?!void =
|
||||
if leaf.mcodec != self.mcodec or leaf.size != self.digestSize:
|
||||
return failure("Expected mcodec to be " & $self.mcodec & " and digest size to be " &
|
||||
return failure("Expected mcodec to be " & $self.mcodec & " and digest size to be " &
|
||||
$self.digestSize & " but was " & $leaf.mcodec & " and " & $leaf.size)
|
||||
|
||||
|
||||
let oldLen = self.buffer.len
|
||||
self.buffer.setLen(oldLen + self.digestSize)
|
||||
self.buffer[oldLen..<oldLen + self.digestSize] = leaf.data.buffer[leaf.dpos..<leaf.dpos + self.digestSize]
|
||||
@ -100,7 +100,7 @@ proc addLeaf*(self: var MerkleTreeBuilder, leaf: MultiHash): ?!void =
|
||||
|
||||
proc build*(self: MerkleTreeBuilder): ?!MerkleTree =
|
||||
## Builds a tree from previously added data blocks
|
||||
##
|
||||
##
|
||||
## Tree built from data blocks A, B and C is
|
||||
## H5=H(H3 & H4)
|
||||
## / \
|
||||
@ -113,7 +113,7 @@ proc build*(self: MerkleTreeBuilder): ?!MerkleTree =
|
||||
## Memory layout is [H0, H1, H2, H3, H4, H5]
|
||||
##
|
||||
let
|
||||
mcodec = self.mcodec
|
||||
mcodec = self.mcodec
|
||||
digestSize = self.digestSize
|
||||
leavesCount = self.buffer.len div self.digestSize
|
||||
|
||||
@ -122,7 +122,7 @@ proc build*(self: MerkleTreeBuilder): ?!MerkleTree =
|
||||
|
||||
let levels = computeLevels(leavesCount)
|
||||
let totalNodes = levels[^1].offset + 1
|
||||
|
||||
|
||||
var tree = MerkleTree(mcodec: mcodec, digestSize: digestSize, leavesCount: leavesCount, nodesBuffer: newSeq[byte](totalNodes * digestSize))
|
||||
|
||||
# copy leaves
|
||||
@ -133,7 +133,7 @@ proc build*(self: MerkleTreeBuilder): ?!MerkleTree =
|
||||
var one = newSeq[byte](digestSize)
|
||||
one[^1] = 0x01
|
||||
|
||||
var
|
||||
var
|
||||
concatBuf = newSeq[byte](2 * digestSize)
|
||||
prevLevel = levels[0]
|
||||
for level in levels[1..^1]:
|
||||
@ -179,7 +179,7 @@ proc nodes*(self: (MerkleTree | MerkleProof)): seq[MultiHash] {.noSideEffect.} =
|
||||
proc mcodec*(self: (MerkleTree | MerkleProof)): MultiCodec =
|
||||
self.mcodec
|
||||
|
||||
proc digestSize*(self: (MerkleTree | MerkleProof)): Natural =
|
||||
proc digestSize*(self: (MerkleTree | MerkleProof)): Natural =
|
||||
self.digestSize
|
||||
|
||||
proc root*(self: MerkleTree): MultiHash =
|
||||
@ -195,7 +195,7 @@ proc leavesCount*(self: MerkleTree): Natural =
|
||||
proc getLeaf*(self: MerkleTree, index: Natural): ?!MultiHash =
|
||||
if index >= self.leavesCount:
|
||||
return failure("Index " & $index & " out of range [0.." & $(self.leavesCount - 1) & "]" )
|
||||
|
||||
|
||||
success(self.nodeBufferToMultiHash(index))
|
||||
|
||||
proc height*(self: MerkleTree): Natural =
|
||||
@ -203,7 +203,7 @@ proc height*(self: MerkleTree): Natural =
|
||||
|
||||
proc getProof*(self: MerkleTree, index: Natural): ?!MerkleProof =
|
||||
## Extracts proof from a tree for a given index
|
||||
##
|
||||
##
|
||||
## Given a tree built from data blocks A, B and C
|
||||
## H5
|
||||
## / \
|
||||
@ -217,7 +217,7 @@ proc getProof*(self: MerkleTree, index: Natural): ?!MerkleProof =
|
||||
## - 0,[H1, H4] for data block A
|
||||
## - 1,[H0, H4] for data block B
|
||||
## - 2,[0x00, H3] for data block C
|
||||
##
|
||||
##
|
||||
if index >= self.leavesCount:
|
||||
return failure("Index " & $index & " out of range [0.." & $(self.leavesCount - 1) & "]" )
|
||||
|
||||
@ -237,7 +237,7 @@ proc getProof*(self: MerkleTree, index: Natural): ?!MerkleProof =
|
||||
var dummyValue = if level.index == 0: zero else: one
|
||||
|
||||
if siblingIndex < level.offset + level.width:
|
||||
proofNodesBuffer[level.index * self.digestSize..<(level.index + 1) * self.digestSize] =
|
||||
proofNodesBuffer[level.index * self.digestSize..<(level.index + 1) * self.digestSize] =
|
||||
self.nodesBuffer[siblingIndex * self.digestSize..<(siblingIndex + 1) * self.digestSize]
|
||||
else:
|
||||
proofNodesBuffer[level.index * self.digestSize..<(level.index + 1) * self.digestSize] = dummyValue
|
||||
@ -268,9 +268,9 @@ proc init*(
|
||||
if totalNodes * digestSize == nodesBuffer.len:
|
||||
success(
|
||||
MerkleTree(
|
||||
mcodec: mcodec,
|
||||
digestSize: digestSize,
|
||||
leavesCount: leavesCount,
|
||||
mcodec: mcodec,
|
||||
digestSize: digestSize,
|
||||
leavesCount: leavesCount,
|
||||
nodesBuffer: nodesBuffer
|
||||
)
|
||||
)
|
||||
@ -295,13 +295,13 @@ proc init*(
|
||||
): ?!MerkleTree =
|
||||
without leaf =? leaves.?[0]:
|
||||
return failure("At least one leaf is required")
|
||||
|
||||
|
||||
var builder = ? MerkleTreeBuilder.init(mcodec = leaf.mcodec)
|
||||
|
||||
for l in leaves:
|
||||
if err =? builder.addLeaf(l).errorOption:
|
||||
return failure(err)
|
||||
|
||||
|
||||
builder.build()
|
||||
|
||||
###########################################################
|
||||
@ -328,7 +328,7 @@ proc verifyLeaf*(self: MerkleProof, leaf: MultiHash, treeRoot: MultiHash): ?!boo
|
||||
else:
|
||||
concatBuf[0..^1] = self.nodesBuffer[offset..<(offset + self.digestSize)] & digestBuf
|
||||
? digestFn(self.mcodec, digestBuf, 0, concatBuf)
|
||||
|
||||
|
||||
let computedRoot = ? MultiHash.init(self.mcodec, digestBuf).mapFailure
|
||||
|
||||
success(computedRoot == treeRoot)
|
||||
@ -352,8 +352,8 @@ proc `$`*(self: MerkleProof): string =
|
||||
", nodes: " & $self.nodes
|
||||
|
||||
func `==`*(a, b: MerkleProof): bool =
|
||||
(a.index == b.index) and
|
||||
(a.mcodec == b.mcodec) and
|
||||
(a.index == b.index) and
|
||||
(a.mcodec == b.mcodec) and
|
||||
(a.digestSize == b.digestSize) and
|
||||
(a.nodesBuffer == b.nodesBuffer)
|
||||
|
||||
@ -368,11 +368,11 @@ proc init*(
|
||||
let
|
||||
mcodec = nodes[0].mcodec
|
||||
digestSize = nodes[0].size
|
||||
|
||||
|
||||
var nodesBuffer = newSeq[byte](nodes.len * digestSize)
|
||||
for nodeIndex, node in nodes:
|
||||
nodesBuffer[nodeIndex * digestSize..<(nodeIndex + 1) * digestSize] = node.data.buffer[node.dpos..<node.dpos + digestSize]
|
||||
|
||||
|
||||
success(MerkleProof(mcodec: mcodec, digestSize: digestSize, index: index, nodesBuffer: nodesBuffer))
|
||||
|
||||
func init*(
|
||||
|
@ -109,7 +109,7 @@ proc fetchBatched*(
|
||||
onBatch: BatchProc = nil): Future[?!void] {.async, gcsafe.} =
|
||||
## Fetch manifest in batches of `batchSize`
|
||||
##
|
||||
|
||||
|
||||
let batchCount = divUp(manifest.blocksCount, batchSize)
|
||||
|
||||
trace "Fetching blocks in batches of", size = batchSize
|
||||
@ -207,7 +207,7 @@ proc store*(
|
||||
|
||||
without blk =? bt.Block.new(cid, chunk, verify = false):
|
||||
return failure("Unable to init block from chunk!")
|
||||
|
||||
|
||||
cids.add(cid)
|
||||
|
||||
if err =? (await self.blockStore.putBlock(blk)).errorOption:
|
||||
@ -227,7 +227,7 @@ proc store*(
|
||||
|
||||
without treeCid =? Cid.init(CIDv1, dataCodec, tree.root).mapFailure, err:
|
||||
return failure(err)
|
||||
|
||||
|
||||
for cid, index in cids:
|
||||
without proof =? tree.getProof(index), err:
|
||||
return failure(err)
|
||||
|
@ -38,8 +38,8 @@ method getBlock*(self: BlockStore, address: BlockAddress): Future[?!Block] {.bas
|
||||
|
||||
method getBlockAndProof*(self: BlockStore, treeCid: Cid, index: Natural): Future[?!(Block, MerkleProof)] {.base.} =
|
||||
## Get a block and associated inclusion proof by Cid of a merkle tree and an index of a leaf in a tree
|
||||
##
|
||||
|
||||
##
|
||||
|
||||
raiseAssert("Not implemented!")
|
||||
|
||||
method putBlock*(
|
||||
|
@ -91,7 +91,7 @@ method hasBlock*(self: CacheStore, treeCid: Cid, index: Natural): Future[?!bool]
|
||||
|
||||
without cid =? await self.treeReader.getBlockCid(treeCid, index), err:
|
||||
return failure(err)
|
||||
|
||||
|
||||
await self.hasBlock(cid)
|
||||
|
||||
func cids(self: CacheStore): (iterator: Cid {.gcsafe.}) =
|
||||
@ -230,11 +230,14 @@ proc new*(
|
||||
if cacheSize < chunkSize:
|
||||
raise newException(ValueError, "cacheSize cannot be less than chunkSize")
|
||||
|
||||
var treeReader = TreeReader.new()
|
||||
|
||||
let
|
||||
currentSize = 0'nb
|
||||
size = int(cacheSize div chunkSize)
|
||||
cache = newLruCache[Cid, Block](size)
|
||||
store = CacheStore(
|
||||
treeReader: treeReader,
|
||||
cache: cache,
|
||||
currentSize: currentSize,
|
||||
size: cacheSize)
|
||||
|
@ -52,6 +52,35 @@ method getBlock*(self: BlockStore, address: BlockAddress): Future[?!Block] {.asy
|
||||
|
||||
return success blk
|
||||
|
||||
method getBlock*(self: NetworkStore, treeCid: Cid, index: Natural, merkleRoot: MultiHash): Future[?!Block] {.async.} =
|
||||
without localBlock =? await self.localStore.getBlock(treeCid, index, merkleRoot), err:
|
||||
if err of BlockNotFoundError:
|
||||
trace "Requesting block from the network engine", treeCid, index
|
||||
try:
|
||||
let networkBlock = await self.engine.requestBlock(treeCid, index, merkleRoot)
|
||||
return success(networkBlock)
|
||||
except CatchableError as err:
|
||||
return failure(err)
|
||||
else:
|
||||
failure(err)
|
||||
return success(localBlock)
|
||||
|
||||
method getBlocks*(self: NetworkStore, treeCid: Cid, leavesCount: Natural, merkleRoot: MultiHash): Future[?!AsyncIter[?!Block]] {.async.} =
|
||||
without localIter =? await self.localStore.getBlocks(treeCid, leavesCount, merkleRoot), err:
|
||||
if err of BlockNotFoundError:
|
||||
trace "Requesting blocks from the network engine", treeCid, leavesCount
|
||||
without var networkIter =? self.engine.requestBlocks(treeCid, leavesCount, merkleRoot), err:
|
||||
failure(err)
|
||||
|
||||
let iter = networkIter
|
||||
.prefetch(BlockPrefetchAmount)
|
||||
.map(proc (fut: Future[Block]): Future[?!Block] {.async.} = catch: (await fut))
|
||||
|
||||
return success(iter)
|
||||
else:
|
||||
return failure(err)
|
||||
return success(localIter)
|
||||
|
||||
method putBlock*(
|
||||
self: NetworkStore,
|
||||
blk: Block,
|
||||
|
@ -64,7 +64,7 @@ type
|
||||
BlockExpiration* = object
|
||||
cid*: Cid
|
||||
expiration*: SecondsSince1970
|
||||
|
||||
|
||||
proc updateMetrics(self: RepoStore) =
|
||||
codexRepostoreBlocks.set(self.totalBlocks.int64)
|
||||
codexRepostoreBytesUsed.set(self.quotaUsedBytes.int64)
|
||||
@ -88,18 +88,18 @@ proc encode(cidAndProof: (Cid, MerkleProof)): seq[byte] =
|
||||
pb.buffer
|
||||
|
||||
proc decode(_: type (Cid, MerkleProof), data: seq[byte]): ?!(Cid, MerkleProof) =
|
||||
var
|
||||
var
|
||||
pbNode = initProtoBuffer(data)
|
||||
cidBuf: seq[byte]
|
||||
proofBuf: seq[byte]
|
||||
|
||||
discard pbNode.getField(1, cidBuf).mapFailure
|
||||
discard pbNode.getField(2, proofBuf).mapFailure
|
||||
|
||||
let
|
||||
|
||||
let
|
||||
cid = ? Cid.init(cidBuf).mapFailure
|
||||
proof = ? MerkleProof.decode(proofBuf)
|
||||
|
||||
|
||||
(cid, proof)
|
||||
|
||||
method putBlockCidAndProof*(
|
||||
@ -147,7 +147,7 @@ method getBlock*(self: RepoStore, cid: Cid): Future[?!Block] {.async.} =
|
||||
method getBlock*(self: RepoStore, treeCid: Cid, index: Natural): Future[?!Block] =
|
||||
without (blk, _) =? await self.getBlockAndProof(treeCid, index), err:
|
||||
return failure(err)
|
||||
|
||||
|
||||
success(blk)
|
||||
|
||||
|
||||
@ -338,7 +338,7 @@ method hasBlock*(self: RepoStore, cid: Cid): Future[?!bool] {.async.} =
|
||||
method hasBlock*(self: RepoStore, treeCid: Cid, index: Natural): Future[?!bool] {.async.} =
|
||||
without cid =? await self.treeReader.getBlockCid(treeCid, index), err:
|
||||
return failure(err)
|
||||
|
||||
|
||||
await self.hasBlock(cid)
|
||||
|
||||
method listBlocks*(
|
||||
@ -541,6 +541,7 @@ proc new*(
|
||||
T: type RepoStore,
|
||||
repoDs: Datastore,
|
||||
metaDs: Datastore,
|
||||
treeReader: TreeReader = TreeReader.new(),
|
||||
clock: Clock = SystemClock.new(),
|
||||
postFixLen = 2,
|
||||
quotaMaxBytes = DefaultQuotaBytes,
|
||||
@ -552,6 +553,7 @@ proc new*(
|
||||
let store = RepoStore(
|
||||
repoDs: repoDs,
|
||||
metaDs: metaDs,
|
||||
treeReader: treeReader,
|
||||
clock: clock,
|
||||
postFixLen: postFixLen,
|
||||
quotaMaxBytes: quotaMaxBytes,
|
||||
|
@ -99,7 +99,7 @@ proc getBlocks*(self: TreeReader, treeCid: Cid, leavesCount: Natural): Future[?!
|
||||
|
||||
func new*(
|
||||
T: type TreeReader,
|
||||
getBlockFromStore: GetBlock,
|
||||
getBlockFromStore: GetBlock,
|
||||
treeCacheCap = DefaultTreeCacheCapacity
|
||||
): TreeReader {.noSideEffect.} =
|
||||
TreeReader(
|
||||
|
123
codex/streams/seekablestorestream.nim
Normal file
123
codex/streams/seekablestorestream.nim
Normal file
@ -0,0 +1,123 @@
|
||||
## Nim-Dagger
|
||||
## Copyright (c) 2022 Status Research & Development GmbH
|
||||
## Licensed under either of
|
||||
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
||||
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
||||
## at your option.
|
||||
## This file may not be copied, modified, or distributed except according to
|
||||
## those terms.
|
||||
|
||||
import std/options
|
||||
|
||||
import pkg/upraises
|
||||
|
||||
push: {.upraises: [].}
|
||||
|
||||
import pkg/chronos
|
||||
import pkg/chronicles
|
||||
import pkg/stew/ptrops
|
||||
|
||||
import ../stores
|
||||
import ../manifest
|
||||
import ../blocktype
|
||||
import ../utils
|
||||
|
||||
import ./seekablestream
|
||||
|
||||
export stores, blocktype, manifest, chronos
|
||||
|
||||
logScope:
|
||||
topics = "codex storestream"
|
||||
|
||||
const
|
||||
SeekableStoreStreamTrackerName* = "SeekableStoreStream"
|
||||
|
||||
type
|
||||
# Make SeekableStream from a sequence of blocks stored in Manifest
|
||||
# (only original file data - see StoreStream.size)
|
||||
SeekableStoreStream* = ref object of SeekableStream
|
||||
store*: BlockStore # Store where to lookup block contents
|
||||
manifest*: Manifest # List of block CIDs
|
||||
pad*: bool # Pad last block to manifest.blockSize?
|
||||
|
||||
method initStream*(s: SeekableStoreStream) =
|
||||
if s.objName.len == 0:
|
||||
s.objName = SeekableStoreStreamTrackerName
|
||||
|
||||
procCall SeekableStream(s).initStream()
|
||||
|
||||
proc new*(
|
||||
T: type SeekableStoreStream,
|
||||
store: BlockStore,
|
||||
manifest: Manifest,
|
||||
pad = true
|
||||
): SeekableStoreStream =
|
||||
## Create a new SeekableStoreStream instance for a given store and manifest
|
||||
##
|
||||
result = SeekableStoreStream(
|
||||
store: store,
|
||||
manifest: manifest,
|
||||
pad: pad,
|
||||
offset: 0)
|
||||
|
||||
result.initStream()
|
||||
|
||||
method `size`*(self: SeekableStoreStream): int =
|
||||
bytes(self.manifest, self.pad).int
|
||||
|
||||
proc `size=`*(self: SeekableStoreStream, size: int)
|
||||
{.error: "Setting the size is forbidden".} =
|
||||
discard
|
||||
|
||||
method atEof*(self: SeekableStoreStream): bool =
|
||||
self.offset >= self.size
|
||||
|
||||
method readOnce*(
|
||||
self: SeekableStoreStream,
|
||||
pbytes: pointer,
|
||||
nbytes: int
|
||||
): Future[int] {.async.} =
|
||||
## Read `nbytes` from current position in the SeekableStoreStream into output buffer pointed by `pbytes`.
|
||||
## Return how many bytes were actually read before EOF was encountered.
|
||||
## Raise exception if we are already at EOF.
|
||||
##
|
||||
|
||||
trace "Reading from manifest", cid = self.manifest.cid.get(), blocks = self.manifest.blocksCount
|
||||
if self.atEof:
|
||||
raise newLPStreamEOFError()
|
||||
|
||||
# The loop iterates over blocks in the SeekableStoreStream,
|
||||
# reading them and copying their data into outbuf
|
||||
var read = 0 # Bytes read so far, and thus write offset in the outbuf
|
||||
while read < nbytes and not self.atEof:
|
||||
# Compute from the current stream position `self.offset` the block num/offset to read
|
||||
# Compute how many bytes to read from this block
|
||||
let
|
||||
blockNum = self.offset div self.manifest.blockSize.int
|
||||
blockOffset = self.offset mod self.manifest.blockSize.int
|
||||
readBytes = min([self.size - self.offset,
|
||||
nbytes - read,
|
||||
self.manifest.blockSize.int - blockOffset])
|
||||
|
||||
# Read contents of block `blockNum`
|
||||
without blk =? await self.store.getBlock(self.manifest.treeCid, blockNum, self.manifest.treeRoot), error:
|
||||
raise newLPStreamReadError(error)
|
||||
|
||||
trace "Reading bytes from store stream", blockNum, cid = blk.cid, bytes = readBytes, blockOffset
|
||||
|
||||
# Copy `readBytes` bytes starting at `blockOffset` from the block into the outbuf
|
||||
if blk.isEmpty:
|
||||
zeroMem(pbytes.offset(read), readBytes)
|
||||
else:
|
||||
copyMem(pbytes.offset(read), blk.data[blockOffset].addr, readBytes)
|
||||
|
||||
# Update current positions in the stream and outbuf
|
||||
self.offset += readBytes
|
||||
read += readBytes
|
||||
|
||||
return read
|
||||
|
||||
method closeImpl*(self: SeekableStoreStream) {.async.} =
|
||||
trace "Closing SeekableStoreStream"
|
||||
self.offset = self.size # set Eof
|
||||
await procCall LPStream(self).closeImpl()
|
@ -39,12 +39,16 @@ type
|
||||
store*: BlockStore # Store where to lookup block contents
|
||||
manifest*: Manifest # List of block CIDs
|
||||
pad*: bool # Pad last block to manifest.blockSize?
|
||||
iter*: AsyncIter[?!Block]
|
||||
lastBlock: Block
|
||||
lastIndex: int
|
||||
offset: int
|
||||
|
||||
method initStream*(s: SeekableStoreStream) =
|
||||
if s.objName.len == 0:
|
||||
s.objName = SeekableStoreStreamTrackerName
|
||||
|
||||
procCall SeekableStream(s).initStream()
|
||||
procCall LPStream(s).initStream()
|
||||
|
||||
proc new*(
|
||||
T: type SeekableStoreStream,
|
||||
@ -53,11 +57,12 @@ proc new*(
|
||||
pad = true
|
||||
): SeekableStoreStream =
|
||||
## Create a new SeekableStoreStream instance for a given store and manifest
|
||||
##
|
||||
##
|
||||
result = SeekableStoreStream(
|
||||
store: store,
|
||||
manifest: manifest,
|
||||
pad: pad,
|
||||
lastIndex: -1,
|
||||
offset: 0)
|
||||
|
||||
result.initStream()
|
||||
@ -80,20 +85,32 @@ method readOnce*(
|
||||
## Read `nbytes` from current position in the SeekableStoreStream into output buffer pointed by `pbytes`.
|
||||
## Return how many bytes were actually read before EOF was encountered.
|
||||
## Raise exception if we are already at EOF.
|
||||
##
|
||||
##
|
||||
|
||||
trace "Reading from manifest", cid = self.manifest.cid.get(), blocks = self.manifest.blocksCount
|
||||
if self.atEof:
|
||||
raise newLPStreamEOFError()
|
||||
|
||||
# Initialize a block iterator
|
||||
if self.lastIndex < 0:
|
||||
without iter =? await self.store.getBlocks(self.manifest.treeCid, self.manifest.blocksCount, self.manifest.treeRoot), err:
|
||||
raise newLPStreamReadError(err)
|
||||
self.iter = iter
|
||||
|
||||
# The loop iterates over blocks in the SeekableStoreStream,
|
||||
# reading them and copying their data into outbuf
|
||||
var read = 0 # Bytes read so far, and thus write offset in the outbuf
|
||||
while read < nbytes and not self.atEof:
|
||||
# Compute from the current stream position `self.offset` the block num/offset to read
|
||||
if self.offset >= (self.lastIndex + 1) * self.manifest.blockSize.int:
|
||||
if not self.iter.finished:
|
||||
without lastBlock =? await self.iter.next(), err:
|
||||
raise newLPStreamReadError(err)
|
||||
self.lastBlock = lastBlock
|
||||
inc self.lastIndex
|
||||
else:
|
||||
raise newLPStreamReadError(newException(CodexError, "Block iterator finished prematurely"))
|
||||
# Compute how many bytes to read from this block
|
||||
let
|
||||
blockNum = self.offset div self.manifest.blockSize.int
|
||||
blockOffset = self.offset mod self.manifest.blockSize.int
|
||||
readBytes = min([self.size - self.offset,
|
||||
nbytes - read,
|
||||
@ -107,10 +124,10 @@ method readOnce*(
|
||||
trace "Reading bytes from store stream", blockNum, cid = blk.cid, bytes = readBytes, blockOffset
|
||||
|
||||
# Copy `readBytes` bytes starting at `blockOffset` from the block into the outbuf
|
||||
if blk.isEmpty:
|
||||
if self.lastBlock.isEmpty:
|
||||
zeroMem(pbytes.offset(read), readBytes)
|
||||
else:
|
||||
copyMem(pbytes.offset(read), blk.data[blockOffset].addr, readBytes)
|
||||
copyMem(pbytes.offset(read), self.lastBlock.data[blockOffset].addr, readBytes)
|
||||
|
||||
# Update current positions in the stream and outbuf
|
||||
self.offset += readBytes
|
||||
|
@ -6,7 +6,7 @@
|
||||
## at your option.
|
||||
## This file may not be copied, modified, or distributed except according to
|
||||
## those terms.
|
||||
##
|
||||
##
|
||||
|
||||
import std/parseutils
|
||||
import std/options
|
||||
@ -30,12 +30,11 @@ func roundUp*[T](a, b : T): T =
|
||||
divUp(a,b) * b
|
||||
|
||||
proc orElse*[A](a, b: Option[A]): Option[A] =
|
||||
if (a.isSome()):
|
||||
a
|
||||
else:
|
||||
if (a.isSome()):
|
||||
a
|
||||
else:
|
||||
b
|
||||
|
||||
|
||||
when not declared(parseDuration): # Odd code formatting to minimize diff v. mainLine
|
||||
const Whitespace = {' ', '\t', '\v', '\r', '\l', '\f'}
|
||||
|
||||
|
@ -1,5 +1,4 @@
|
||||
import std/sugar
|
||||
|
||||
import pkg/questionable
|
||||
import pkg/chronos
|
||||
import pkg/upraises
|
||||
@ -54,14 +53,14 @@ proc new*[T](_: type Iter, genNext: GenNext[T], isFinished: IsFinished, finishOn
|
||||
|
||||
if isFinished():
|
||||
iter.finish
|
||||
|
||||
|
||||
iter.next = next
|
||||
return iter
|
||||
|
||||
proc fromItems*[T](_: type Iter, items: openArray[T]): Iter[T] =
|
||||
## Create new iterator from items
|
||||
##
|
||||
|
||||
|
||||
Iter.fromSlice(0..<items.len)
|
||||
.map((i) => items[i])
|
||||
|
||||
@ -81,7 +80,7 @@ proc fromRange*[U, V, S: Ordinal](_: type Iter, a: U, b: V, step: S = 1): Iter[U
|
||||
let u = i
|
||||
inc(i, step)
|
||||
u
|
||||
|
||||
|
||||
proc isFinished(): bool =
|
||||
(step > 0 and i > b) or
|
||||
(step < 0 and i < b)
|
||||
@ -119,7 +118,6 @@ proc prefetch*[T](iter: Iter[T], n: Positive): Iter[T] =
|
||||
var ringBuf = newSeq[T](n)
|
||||
var iterLen = int.high
|
||||
var i = 0
|
||||
|
||||
proc tryFetch(j: int): void =
|
||||
if not iter.finished:
|
||||
let item = iter.next()
|
||||
|
@ -43,7 +43,7 @@ proc makeManifestAndTree*(blocks: seq[Block]): ?!(Manifest, MerkleTree) =
|
||||
if blocks.len == 0:
|
||||
return failure("Blocks list was empty")
|
||||
|
||||
let
|
||||
let
|
||||
datasetSize = blocks.mapIt(it.data.len).foldl(a + b)
|
||||
blockSize = blocks.mapIt(it.data.len).foldl(max(a, b))
|
||||
tree = ? MerkleTree.init(blocks.mapIt(it.cid))
|
||||
|
2
vendor/codex-contracts-eth
vendored
2
vendor/codex-contracts-eth
vendored
@ -1 +1 @@
|
||||
Subproject commit 14e453ac3150e6c9ca277e605d5df9389ac7eea7
|
||||
Subproject commit 1854dfba9991a25532de5f6a53cf50e66afb3c8b
|
Loading…
x
Reference in New Issue
Block a user