diff --git a/codex/blockexchange/engine/engine.nim b/codex/blockexchange/engine/engine.nim index a74bca2c..ebc67f14 100644 --- a/codex/blockexchange/engine/engine.nim +++ b/codex/blockexchange/engine/engine.nim @@ -122,9 +122,9 @@ proc stop*(b: BlockExcEngine) {.async.} = proc sendWantHave( - b: BlockExcEngine, - address: BlockAddress, - selectedPeer: BlockExcPeerCtx, + b: BlockExcEngine, + address: BlockAddress, + selectedPeer: BlockExcPeerCtx, peers: seq[BlockExcPeerCtx]): Future[void] {.async.} = trace "Sending wantHave request to peers", address for p in peers: @@ -137,8 +137,8 @@ proc sendWantHave( wantType = WantType.WantHave) # we only want to know if the peer has the block proc sendWantBlock( - b: BlockExcEngine, - address: BlockAddress, + b: BlockExcEngine, + address: BlockAddress, blockPeer: BlockExcPeerCtx): Future[void] {.async.} = trace "Sending wantBlock request to", peer = blockPeer.id, address await b.network.request.sendWantList( @@ -189,14 +189,14 @@ proc requestBlock( if peers.len == 0: b.discovery.queueFindBlocksReq(@[address.cidOrTreeCid]) - let maybePeer = + let maybePeer = if peers.len > 0: peers[index mod peers.len].some elif b.peers.len > 0: toSeq(b.peers)[index mod b.peers.len].some else: BlockExcPeerCtx.none - + if peer =? maybePeer: asyncSpawn b.monitorBlockHandle(blockFuture, address, peer.id) treeReq.setInFlight(address) @@ -204,9 +204,91 @@ proc requestBlock( codexBlockExchangeWantBlockListsSent.inc() await b.sendWantHave(address, peer, toSeq(b.peers)) codexBlockExchangeWantHaveListsSent.inc() - + return await blockFuture +proc requestBlock( + b: BlockExcEngine, + treeReq: TreeReq, + index: Natural, + timeout = DefaultBlockTimeout +): Future[Block] {.async.} = + let address = BlockAddress(leaf: true, treeCid: treeReq.treeCid, index: index) + + let handleOrCid = treeReq.getWantHandleOrCid(index, timeout) + if handleOrCid.resolved: + without blk =? await b.localStore.getBlock(handleOrCid.cid), err: + return await b.requestBlock(handleOrCid.cid, timeout) + return blk + + let blockFuture = handleOrCid.handle + + if treeReq.isInFlight(index): + return await blockFuture + + let peers = b.peers.selectCheapest(address) + if peers.len == 0: + b.discovery.queueFindBlocksReq(@[treeReq.treeCid]) + + let maybePeer = + if peers.len > 0: + peers[index mod peers.len].some + elif b.peers.len > 0: + toSeq(b.peers)[index mod b.peers.len].some + else: + BlockExcPeerCtx.none + + if peer =? maybePeer: + asyncSpawn b.monitorBlockHandle(blockFuture, address, peer.id) + treeReq.trySetInFlight(index) + await b.sendWantBlock(address, peer) + codexBlockExchangeWantBlockListsSent.inc() + await b.sendWantHave(address, peer, toSeq(b.peers)) + codexBlockExchangeWantHaveListsSent.inc() + + return await blockFuture + +proc requestBlock*( + b: BlockExcEngine, + treeCid: Cid, + index: Natural, + merkleRoot: MultiHash, + timeout = DefaultBlockTimeout +): Future[Block] = + without treeReq =? b.pendingBlocks.getOrPutTreeReq(treeCid, Natural.none, merkleRoot), err: + raise err + + return b.requestBlock(treeReq, index, timeout) + +proc requestBlocks*( + b: BlockExcEngine, + treeCid: Cid, + leavesCount: Natural, + merkleRoot: MultiHash, + timeout = DefaultBlockTimeout +): ?!AsyncIter[Block] = + without treeReq =? b.pendingBlocks.getOrPutTreeReq(treeCid, leavesCount.some, merkleRoot), err: + return failure(err) + + var + iter = AsyncIter[Block]() + index = 0 + + proc next(): Future[Block] = + if index < leavesCount: + let fut = b.requestBlock(treeReq, index, timeout) + inc index + if index >= leavesCount: + iter.finished = true + return fut + else: + let fut = newFuture[Block]("engine.requestBlocks") + fut.fail(newException(CodexError, "No more elements for tree with cid " & $treeCid)) + return fut + + iter.next = next + return success(iter) + proc blockPresenceHandler*( b: BlockExcEngine, peer: PeerId, @@ -492,7 +574,7 @@ proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} = trace "Handling lookup for entry", address = e.address if e.address.leaf: (await b.localStore.getBlockAndProof(e.address.treeCid, e.address.index)).map( - (blkAndProof: (Block, MerkleProof)) => + (blkAndProof: (Block, MerkleProof)) => BlockDelivery(address: e.address, blk: blkAndProof[0], proof: blkAndProof[1].some) ) else: diff --git a/codex/blockexchange/engine/pendingblocks.nim b/codex/blockexchange/engine/pendingblocks.nim index 8e07d4bb..832634be 100644 --- a/codex/blockexchange/engine/pendingblocks.nim +++ b/codex/blockexchange/engine/pendingblocks.nim @@ -14,12 +14,19 @@ import pkg/upraises push: {.upraises: [].} +import ../../blocktype import pkg/chronicles +import pkg/questionable +import pkg/questionable/options +import pkg/questionable/results import pkg/chronos import pkg/libp2p import pkg/metrics -import ../../blocktype +import ../protobuf/blockexc + +import ../../merkletree +import ../../utils logScope: topics = "codex pendingblocks" @@ -36,12 +43,119 @@ type inFlight*: bool startTime*: int64 + LeafReq* = object + case delivered*: bool + of false: + handle*: Future[Block] + inFlight*: bool + of true: + leaf: MultiHash + blkCid*: Cid + + TreeReq* = ref object + leaves*: Table[Natural, LeafReq] + deliveredCount*: Natural + leavesCount*: ?Natural + treeRoot*: MultiHash + treeCid*: Cid + + TreeHandler* = proc(tree: MerkleTree): Future[void] {.gcsafe.} + PendingBlocksManager* = ref object of RootObj blocks*: Table[BlockAddress, BlockReq] # pending Block requests proc updatePendingBlockGauge(p: PendingBlocksManager) = codexBlockExchangePendingBlockRequests.set(p.blocks.len.int64) +type + BlockHandleOrCid = object + case resolved*: bool + of true: + cid*: Cid + else: + handle*: Future[Block] + +proc buildTree(treeReq: TreeReq): ?!MerkleTree = + trace "Building a merkle tree from leaves", treeCid = treeReq.treeCid, leavesCount = treeReq.leavesCount + + without leavesCount =? treeReq.leavesCount: + return failure("Leaves count is none, cannot build a tree") + + var builder = ? MerkleTreeBuilder.init(treeReq.treeRoot.mcodec) + for i in 0.. Block.new(cid = cid, data = @[])) proc emptyBlock*(cid: Cid): ?!Block = - cid.mhash.mapFailure.flatMap((mhash: MultiHash) => + cid.mhash.mapFailure.flatMap((mhash: MultiHash) => emptyBlock(cid.cidver, mhash.mcodec)) proc isEmpty*(cid: Cid): bool = - success(cid) == cid.mhash.mapFailure.flatMap((mhash: MultiHash) => + success(cid) == cid.mhash.mapFailure.flatMap((mhash: MultiHash) => emptyCid(cid.cidver, mhash.mcodec, cid.mcodec)) proc isEmpty*(blk: Block): bool = diff --git a/codex/codex.nim b/codex/codex.nim index 0b725bb7..62aba38b 100644 --- a/codex/codex.nim +++ b/codex/codex.nim @@ -231,6 +231,8 @@ proc new*( wallet = WalletRef.new(EthPrivateKey.random()) network = BlockExcNetwork.new(switch) + treeReader = TreeReader.new() + repoData = case config.repoKind of repoFS: Datastore(FSDatastore.new($config.dataDir, depth = 5) .expect("Should create repo file data store!")) diff --git a/codex/erasure/erasure.nim b/codex/erasure/erasure.nim index 10159bdc..07ece7aa 100644 --- a/codex/erasure/erasure.nim +++ b/codex/erasure/erasure.nim @@ -102,13 +102,13 @@ proc getPendingBlocks( proc isFinished(): bool = pendingBlocks.len == 0 - proc genNext(): Future[(?!bt.Block, int)] {.async.} = + proc genNext(): Future[(?!bt.Block, int)] {.async.} = let completedFut = await one(pendingBlocks) pendingBlocks.del(pendingBlocks.find(completedFut)) return await completedFut Iter.new(genNext, isFinished) - + proc prepareEncodingData( self: Erasure, manifest: Manifest, @@ -130,7 +130,7 @@ proc prepareEncodingData( without blk =? blkOrErr, err: warn "Failed retreiving a block", idx, treeCid = manifest.treeCid continue - + let pos = indexToPos(params.steps, idx, step) shallowCopy(data[pos], if blk.isEmpty: emptyBlock else: blk.data) cids[idx] = blk.cid @@ -164,7 +164,7 @@ proc prepareDecodingData( ## `emptyBlock` - the empty block to be used for padding ## - let + let indicies = toSeq(countup(step, encoded.blocksCount - 1, encoded.steps)) pendingBlocksIter = self.getPendingBlocks(encoded, indicies) @@ -266,11 +266,16 @@ proc encodeData( trace "Unable to prepare data", error = err.msg return failure(err) - trace "Erasure coding data", data = data[].len, parity = parityData.len + trace "Encoding block", cid = blk.cid, pos = idx + shallowCopy(data[j], blk.data) + else: + trace "Padding with empty block", pos = idx + data[j] = newSeq[byte](manifest.blockSize.int) - if ( - let res = encoder.encode(data[], parityData); - res.isErr): + trace "Erasure coding data", data = data.len, parity = parityData.len + + let res = encoder.encode(data, parityData); + if res.isErr: trace "Unable to encode manifest!", error = $res.error return failure($res.error) @@ -354,12 +359,19 @@ proc decode*( var cids = seq[Cid].new() decoder = self.decoderProvider(encoded.blockSize.int, encoded.ecK, encoded.ecM) - emptyBlock = newSeq[byte](encoded.blockSize.int) - hasParity = false cids[].setLen(encoded.blocksCount) try: - for step in 0..= encoded.ecK) or (idxPendingBlocks.len == 0): + break without (dataPieces, parityPieces) =? (await self.prepareDecodingData(encoded, step, data, parityData, cids, emptyBlock)), err: @@ -380,14 +397,14 @@ proc decode*( return failure(err) if dataPieces >= encoded.ecK: - trace "Retrieved all the required data blocks" + trace "Retrieved all the required data blocks", data = dataPieces, parity = parityPieces continue - trace "Erasure decoding data" + trace "Erasure decoding data", data = dataPieces, parity = parityPieces if ( - let err = decoder.decode(data[], parityData[], recovered); + let err = decoder.decode(data, parityData, recovered); err.isErr): - trace "Unable to decode data!", err = $err.error + trace "Unable to decode manifest!", err = $err.error return failure($err.error) for i in 0..= self.leavesCount: return failure("Index " & $index & " out of range [0.." & $(self.leavesCount - 1) & "]" ) - + success(self.nodeBufferToMultiHash(index)) proc height*(self: MerkleTree): Natural = @@ -203,7 +203,7 @@ proc height*(self: MerkleTree): Natural = proc getProof*(self: MerkleTree, index: Natural): ?!MerkleProof = ## Extracts proof from a tree for a given index - ## + ## ## Given a tree built from data blocks A, B and C ## H5 ## / \ @@ -217,7 +217,7 @@ proc getProof*(self: MerkleTree, index: Natural): ?!MerkleProof = ## - 0,[H1, H4] for data block A ## - 1,[H0, H4] for data block B ## - 2,[0x00, H3] for data block C - ## + ## if index >= self.leavesCount: return failure("Index " & $index & " out of range [0.." & $(self.leavesCount - 1) & "]" ) @@ -237,7 +237,7 @@ proc getProof*(self: MerkleTree, index: Natural): ?!MerkleProof = var dummyValue = if level.index == 0: zero else: one if siblingIndex < level.offset + level.width: - proofNodesBuffer[level.index * self.digestSize..<(level.index + 1) * self.digestSize] = + proofNodesBuffer[level.index * self.digestSize..<(level.index + 1) * self.digestSize] = self.nodesBuffer[siblingIndex * self.digestSize..<(siblingIndex + 1) * self.digestSize] else: proofNodesBuffer[level.index * self.digestSize..<(level.index + 1) * self.digestSize] = dummyValue @@ -268,9 +268,9 @@ proc init*( if totalNodes * digestSize == nodesBuffer.len: success( MerkleTree( - mcodec: mcodec, - digestSize: digestSize, - leavesCount: leavesCount, + mcodec: mcodec, + digestSize: digestSize, + leavesCount: leavesCount, nodesBuffer: nodesBuffer ) ) @@ -295,13 +295,13 @@ proc init*( ): ?!MerkleTree = without leaf =? leaves.?[0]: return failure("At least one leaf is required") - + var builder = ? MerkleTreeBuilder.init(mcodec = leaf.mcodec) for l in leaves: if err =? builder.addLeaf(l).errorOption: return failure(err) - + builder.build() ########################################################### @@ -328,7 +328,7 @@ proc verifyLeaf*(self: MerkleProof, leaf: MultiHash, treeRoot: MultiHash): ?!boo else: concatBuf[0..^1] = self.nodesBuffer[offset..<(offset + self.digestSize)] & digestBuf ? digestFn(self.mcodec, digestBuf, 0, concatBuf) - + let computedRoot = ? MultiHash.init(self.mcodec, digestBuf).mapFailure success(computedRoot == treeRoot) @@ -352,8 +352,8 @@ proc `$`*(self: MerkleProof): string = ", nodes: " & $self.nodes func `==`*(a, b: MerkleProof): bool = - (a.index == b.index) and - (a.mcodec == b.mcodec) and + (a.index == b.index) and + (a.mcodec == b.mcodec) and (a.digestSize == b.digestSize) and (a.nodesBuffer == b.nodesBuffer) @@ -368,11 +368,11 @@ proc init*( let mcodec = nodes[0].mcodec digestSize = nodes[0].size - + var nodesBuffer = newSeq[byte](nodes.len * digestSize) for nodeIndex, node in nodes: nodesBuffer[nodeIndex * digestSize..<(nodeIndex + 1) * digestSize] = node.data.buffer[node.dpos..= self.size + +method readOnce*( + self: SeekableStoreStream, + pbytes: pointer, + nbytes: int +): Future[int] {.async.} = + ## Read `nbytes` from current position in the SeekableStoreStream into output buffer pointed by `pbytes`. + ## Return how many bytes were actually read before EOF was encountered. + ## Raise exception if we are already at EOF. + ## + + trace "Reading from manifest", cid = self.manifest.cid.get(), blocks = self.manifest.blocksCount + if self.atEof: + raise newLPStreamEOFError() + + # The loop iterates over blocks in the SeekableStoreStream, + # reading them and copying their data into outbuf + var read = 0 # Bytes read so far, and thus write offset in the outbuf + while read < nbytes and not self.atEof: + # Compute from the current stream position `self.offset` the block num/offset to read + # Compute how many bytes to read from this block + let + blockNum = self.offset div self.manifest.blockSize.int + blockOffset = self.offset mod self.manifest.blockSize.int + readBytes = min([self.size - self.offset, + nbytes - read, + self.manifest.blockSize.int - blockOffset]) + + # Read contents of block `blockNum` + without blk =? await self.store.getBlock(self.manifest.treeCid, blockNum, self.manifest.treeRoot), error: + raise newLPStreamReadError(error) + + trace "Reading bytes from store stream", blockNum, cid = blk.cid, bytes = readBytes, blockOffset + + # Copy `readBytes` bytes starting at `blockOffset` from the block into the outbuf + if blk.isEmpty: + zeroMem(pbytes.offset(read), readBytes) + else: + copyMem(pbytes.offset(read), blk.data[blockOffset].addr, readBytes) + + # Update current positions in the stream and outbuf + self.offset += readBytes + read += readBytes + + return read + +method closeImpl*(self: SeekableStoreStream) {.async.} = + trace "Closing SeekableStoreStream" + self.offset = self.size # set Eof + await procCall LPStream(self).closeImpl() diff --git a/codex/streams/storestream.nim b/codex/streams/storestream.nim index 92678670..2f897804 100644 --- a/codex/streams/storestream.nim +++ b/codex/streams/storestream.nim @@ -39,12 +39,16 @@ type store*: BlockStore # Store where to lookup block contents manifest*: Manifest # List of block CIDs pad*: bool # Pad last block to manifest.blockSize? + iter*: AsyncIter[?!Block] + lastBlock: Block + lastIndex: int + offset: int method initStream*(s: SeekableStoreStream) = if s.objName.len == 0: s.objName = SeekableStoreStreamTrackerName - procCall SeekableStream(s).initStream() + procCall LPStream(s).initStream() proc new*( T: type SeekableStoreStream, @@ -53,11 +57,12 @@ proc new*( pad = true ): SeekableStoreStream = ## Create a new SeekableStoreStream instance for a given store and manifest - ## + ## result = SeekableStoreStream( store: store, manifest: manifest, pad: pad, + lastIndex: -1, offset: 0) result.initStream() @@ -80,20 +85,32 @@ method readOnce*( ## Read `nbytes` from current position in the SeekableStoreStream into output buffer pointed by `pbytes`. ## Return how many bytes were actually read before EOF was encountered. ## Raise exception if we are already at EOF. - ## + ## trace "Reading from manifest", cid = self.manifest.cid.get(), blocks = self.manifest.blocksCount if self.atEof: raise newLPStreamEOFError() + # Initialize a block iterator + if self.lastIndex < 0: + without iter =? await self.store.getBlocks(self.manifest.treeCid, self.manifest.blocksCount, self.manifest.treeRoot), err: + raise newLPStreamReadError(err) + self.iter = iter + # The loop iterates over blocks in the SeekableStoreStream, # reading them and copying their data into outbuf var read = 0 # Bytes read so far, and thus write offset in the outbuf while read < nbytes and not self.atEof: - # Compute from the current stream position `self.offset` the block num/offset to read + if self.offset >= (self.lastIndex + 1) * self.manifest.blockSize.int: + if not self.iter.finished: + without lastBlock =? await self.iter.next(), err: + raise newLPStreamReadError(err) + self.lastBlock = lastBlock + inc self.lastIndex + else: + raise newLPStreamReadError(newException(CodexError, "Block iterator finished prematurely")) # Compute how many bytes to read from this block let - blockNum = self.offset div self.manifest.blockSize.int blockOffset = self.offset mod self.manifest.blockSize.int readBytes = min([self.size - self.offset, nbytes - read, @@ -107,10 +124,10 @@ method readOnce*( trace "Reading bytes from store stream", blockNum, cid = blk.cid, bytes = readBytes, blockOffset # Copy `readBytes` bytes starting at `blockOffset` from the block into the outbuf - if blk.isEmpty: + if self.lastBlock.isEmpty: zeroMem(pbytes.offset(read), readBytes) else: - copyMem(pbytes.offset(read), blk.data[blockOffset].addr, readBytes) + copyMem(pbytes.offset(read), self.lastBlock.data[blockOffset].addr, readBytes) # Update current positions in the stream and outbuf self.offset += readBytes diff --git a/codex/utils.nim b/codex/utils.nim index 1180455e..20ff6af8 100644 --- a/codex/utils.nim +++ b/codex/utils.nim @@ -6,7 +6,7 @@ ## at your option. ## This file may not be copied, modified, or distributed except according to ## those terms. -## +## import std/parseutils import std/options @@ -30,12 +30,11 @@ func roundUp*[T](a, b : T): T = divUp(a,b) * b proc orElse*[A](a, b: Option[A]): Option[A] = - if (a.isSome()): - a - else: + if (a.isSome()): + a + else: b - when not declared(parseDuration): # Odd code formatting to minimize diff v. mainLine const Whitespace = {' ', '\t', '\v', '\r', '\l', '\f'} diff --git a/codex/utils/asynciter.nim b/codex/utils/asynciter.nim index 502644c6..9aa7683f 100644 --- a/codex/utils/asynciter.nim +++ b/codex/utils/asynciter.nim @@ -1,5 +1,4 @@ import std/sugar - import pkg/questionable import pkg/chronos import pkg/upraises @@ -54,14 +53,14 @@ proc new*[T](_: type Iter, genNext: GenNext[T], isFinished: IsFinished, finishOn if isFinished(): iter.finish - + iter.next = next return iter proc fromItems*[T](_: type Iter, items: openArray[T]): Iter[T] = ## Create new iterator from items ## - + Iter.fromSlice(0.. items[i]) @@ -81,7 +80,7 @@ proc fromRange*[U, V, S: Ordinal](_: type Iter, a: U, b: V, step: S = 1): Iter[U let u = i inc(i, step) u - + proc isFinished(): bool = (step > 0 and i > b) or (step < 0 and i < b) @@ -119,7 +118,6 @@ proc prefetch*[T](iter: Iter[T], n: Positive): Iter[T] = var ringBuf = newSeq[T](n) var iterLen = int.high var i = 0 - proc tryFetch(j: int): void = if not iter.finished: let item = iter.next() diff --git a/tests/codex/helpers.nim b/tests/codex/helpers.nim index 0f2090f2..581f9375 100644 --- a/tests/codex/helpers.nim +++ b/tests/codex/helpers.nim @@ -43,7 +43,7 @@ proc makeManifestAndTree*(blocks: seq[Block]): ?!(Manifest, MerkleTree) = if blocks.len == 0: return failure("Blocks list was empty") - let + let datasetSize = blocks.mapIt(it.data.len).foldl(a + b) blockSize = blocks.mapIt(it.data.len).foldl(max(a, b)) tree = ? MerkleTree.init(blocks.mapIt(it.cid)) diff --git a/vendor/codex-contracts-eth b/vendor/codex-contracts-eth index 14e453ac..1854dfba 160000 --- a/vendor/codex-contracts-eth +++ b/vendor/codex-contracts-eth @@ -1 +1 @@ -Subproject commit 14e453ac3150e6c9ca277e605d5df9389ac7eea7 +Subproject commit 1854dfba9991a25532de5f6a53cf50e66afb3c8b