From 919d15ff1fa9d9e56d2b9d2daf43e07fdc1490f4 Mon Sep 17 00:00:00 2001 From: Tomasz Bekas Date: Thu, 26 Oct 2023 13:23:23 +0200 Subject: [PATCH] Tmp, rework of merkleization --- codex/blockexchange/engine/engine.nim | 82 +----- codex/blockexchange/engine/pendingblocks.nim | 284 +++++-------------- codex/namespaces.nim | 2 + codex/node.nim | 25 +- codex/stores/blockstore.nim | 27 +- codex/stores/keyutils.nim | 6 + codex/stores/networkstore.nim | 43 +-- codex/stores/repostore.nim | 66 ++++- codex/streams/seekablestorestream.nim | 123 -------- codex/streams/storestream.nim | 77 +++-- 10 files changed, 213 insertions(+), 522 deletions(-) delete mode 100644 codex/streams/seekablestorestream.nim diff --git a/codex/blockexchange/engine/engine.nim b/codex/blockexchange/engine/engine.nim index d72151e8..a74bca2c 100644 --- a/codex/blockexchange/engine/engine.nim +++ b/codex/blockexchange/engine/engine.nim @@ -175,60 +175,11 @@ proc monitorBlockHandle(b: BlockExcEngine, handle: Future[Block], address: Block b.discovery.queueFindBlocksReq(@[address.cidOrTreeCid]) await b.network.switch.disconnect(peerId) -proc requestBlock*( - b: BlockExcEngine, - cid: Cid, - timeout = DefaultBlockTimeout): Future[Block] {.async.} = - trace "Begin block request", cid, peers = b.peers.len - - if b.pendingBlocks.isInFlight(cid): - trace "Request handle already pending", cid - return await b.pendingBlocks.getWantHandle(cid, timeout) - - let - blk = b.pendingBlocks.getWantHandle(cid, timeout) - address = BlockAddress(leaf: false, cid: cid) - - trace "Selecting peers who have", address - var - peers = b.peers.selectCheapest(address) - - without blockPeer =? b.findCheapestPeerForBlock(peers): - trace "No peers to request blocks from. Queue discovery...", cid - b.discovery.queueFindBlocksReq(@[cid]) - return await blk - - asyncSpawn b.monitorBlockHandle(blk, address, blockPeer.id) - b.pendingBlocks.setInFlight(cid, true) - await b.sendWantBlock(address, blockPeer) - - codexBlockExchangeWantBlockListsSent.inc() - - if (peers.len - 1) == 0: - trace "No peers to send want list to", cid - b.discovery.queueFindBlocksReq(@[cid]) - return await blk - - await b.sendWantHave(address, blockPeer, toSeq(b.peers)) - - codexBlockExchangeWantHaveListsSent.inc() - - return await blk - proc requestBlock( b: BlockExcEngine, - treeReq: TreeReq, - index: Natural, + address: BlockAddress timeout = DefaultBlockTimeout ): Future[Block] {.async.} = - let address = BlockAddress(leaf: true, treeCid: treeReq.treeCid, index: index) - - let handleOrCid = treeReq.getWantHandleOrCid(index, timeout) - if handleOrCid.resolved: - without blk =? await b.localStore.getBlock(handleOrCid.cid), err: - return await b.requestBlock(handleOrCid.cid, timeout) - return blk - let blockFuture = handleOrCid.handle if treeReq.isInFlight(index): @@ -236,7 +187,7 @@ proc requestBlock( let peers = b.peers.selectCheapest(address) if peers.len == 0: - b.discovery.queueFindBlocksReq(@[treeReq.treeCid]) + b.discovery.queueFindBlocksReq(@[address.cidOrTreeCid]) let maybePeer = if peers.len > 0: @@ -248,7 +199,7 @@ proc requestBlock( if peer =? maybePeer: asyncSpawn b.monitorBlockHandle(blockFuture, address, peer.id) - treeReq.trySetInFlight(index) + treeReq.setInFlight(address) await b.sendWantBlock(address, peer) codexBlockExchangeWantBlockListsSent.inc() await b.sendWantHave(address, peer, toSeq(b.peers)) @@ -256,32 +207,6 @@ proc requestBlock( return await blockFuture -proc requestBlock*( - b: BlockExcEngine, - treeCid: Cid, - index: Natural, - merkleRoot: MultiHash, - timeout = DefaultBlockTimeout -): Future[Block] = - without treeReq =? b.pendingBlocks.getOrPutTreeReq(treeCid, Natural.none, merkleRoot), err: - raise err - - return b.requestBlock(treeReq, index, timeout) - -proc requestBlocks*( - b: BlockExcEngine, - treeCid: Cid, - leavesCount: Natural, - merkleRoot: MultiHash, - timeout = DefaultBlockTimeout -): ?!AsyncIter[Block] = - without treeReq =? b.pendingBlocks.getOrPutTreeReq(treeCid, leavesCount.some, merkleRoot), err: - return failure(err) - - return Iter.fromSlice(0.. b.requestBlock(treeReq, index, timeout) - ).success - proc blockPresenceHandler*( b: BlockExcEngine, peer: PeerId, @@ -383,6 +308,7 @@ proc blocksDeliveryHandler*( for bd in blocksDelivery: if err =? (await b.localStore.putBlock(bd.blk)).errorOption: trace "Unable to store block", cid = bd.blk.cid, err = err.msg + # TODO store proof if bd.leaf else: storedBlocks.add(bd) diff --git a/codex/blockexchange/engine/pendingblocks.nim b/codex/blockexchange/engine/pendingblocks.nim index 81d7f079..8e07d4bb 100644 --- a/codex/blockexchange/engine/pendingblocks.nim +++ b/codex/blockexchange/engine/pendingblocks.nim @@ -14,19 +14,12 @@ import pkg/upraises push: {.upraises: [].} -import ../../blocktype import pkg/chronicles -import pkg/questionable -import pkg/questionable/options -import pkg/questionable/results import pkg/chronos import pkg/libp2p import pkg/metrics -import ../protobuf/blockexc - -import ../../merkletree -import ../../utils +import ../../blocktype logScope: topics = "codex pendingblocks" @@ -43,121 +36,15 @@ type inFlight*: bool startTime*: int64 - LeafReq* = object - case delivered*: bool - of false: - handle*: Future[Block] - inFlight*: bool - of true: - leaf: MultiHash - blkCid*: Cid - - TreeReq* = ref object - leaves*: Table[Natural, LeafReq] - deliveredCount*: Natural - leavesCount*: ?Natural - treeRoot*: MultiHash - treeCid*: Cid - - TreeHandler* = proc(tree: MerkleTree): Future[void] {.raises:[], gcsafe.} - PendingBlocksManager* = ref object of RootObj - blocks*: Table[Cid, BlockReq] # pending Block requests - trees*: Table[Cid, TreeReq] - onTree*: TreeHandler + blocks*: Table[BlockAddress, BlockReq] # pending Block requests proc updatePendingBlockGauge(p: PendingBlocksManager) = codexBlockExchangePendingBlockRequests.set(p.blocks.len.int64) -type - BlockHandleOrCid = object - case resolved*: bool - of true: - cid*: Cid - else: - handle*: Future[Block] - -proc buildTree(treeReq: TreeReq): ?!MerkleTree = - trace "Building a merkle tree from leaves", treeCid = treeReq.treeCid, leavesCount = treeReq.leavesCount - - without leavesCount =? treeReq.leavesCount: - return failure("Leaves count is none, cannot build a tree") - - var builder = ? MerkleTreeBuilder.init(treeReq.treeRoot.mcodec) - for i in 0..= self.size - -method readOnce*( - self: SeekableStoreStream, - pbytes: pointer, - nbytes: int -): Future[int] {.async.} = - ## Read `nbytes` from current position in the SeekableStoreStream into output buffer pointed by `pbytes`. - ## Return how many bytes were actually read before EOF was encountered. - ## Raise exception if we are already at EOF. - ## - - trace "Reading from manifest", cid = self.manifest.cid.get(), blocks = self.manifest.blocksCount - if self.atEof: - raise newLPStreamEOFError() - - # The loop iterates over blocks in the SeekableStoreStream, - # reading them and copying their data into outbuf - var read = 0 # Bytes read so far, and thus write offset in the outbuf - while read < nbytes and not self.atEof: - # Compute from the current stream position `self.offset` the block num/offset to read - # Compute how many bytes to read from this block - let - blockNum = self.offset div self.manifest.blockSize.int - blockOffset = self.offset mod self.manifest.blockSize.int - readBytes = min([self.size - self.offset, - nbytes - read, - self.manifest.blockSize.int - blockOffset]) - - # Read contents of block `blockNum` - without blk =? await self.store.getBlock(self.manifest.treeCid, blockNum, self.manifest.treeRoot), error: - raise newLPStreamReadError(error) - - trace "Reading bytes from store stream", blockNum, cid = blk.cid, bytes = readBytes, blockOffset - - # Copy `readBytes` bytes starting at `blockOffset` from the block into the outbuf - if blk.isEmpty: - zeroMem(pbytes.offset(read), readBytes) - else: - copyMem(pbytes.offset(read), blk.data[blockOffset].addr, readBytes) - - # Update current positions in the stream and outbuf - self.offset += readBytes - read += readBytes - - return read - -method closeImpl*(self: SeekableStoreStream) {.async.} = - trace "Closing SeekableStoreStream" - self.offset = self.size # set Eof - await procCall LPStream(self).closeImpl() diff --git a/codex/streams/storestream.nim b/codex/streams/storestream.nim index 2978f59f..92678670 100644 --- a/codex/streams/storestream.nim +++ b/codex/streams/storestream.nim @@ -1,5 +1,5 @@ -## Nim-Dagger -## Copyright (c) 2022 Status Research & Development GmbH +## Nim-Codex +## Copyright (c) 2023 Status Research & Development GmbH ## Licensed under either of ## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) ## * MIT license ([LICENSE-MIT](LICENSE-MIT)) @@ -30,92 +30,87 @@ logScope: topics = "codex storestream" const - StoreStreamTrackerName* = "StoreStream" + SeekableStoreStreamTrackerName* = "SeekableStoreStream" type - StoreStream* = ref object of LPStream + # Make SeekableStream from a sequence of blocks stored in Manifest + # (only original file data - see StoreStream.size) + SeekableStoreStream* = ref object of SeekableStream store*: BlockStore # Store where to lookup block contents manifest*: Manifest # List of block CIDs pad*: bool # Pad last block to manifest.blockSize? - iter: AsyncIter[?!Block] - lastBlock: Block - lastIndex: int - offset: int -method initStream*(s: StoreStream) = +method initStream*(s: SeekableStoreStream) = if s.objName.len == 0: - s.objName = StoreStreamTrackerName + s.objName = SeekableStoreStreamTrackerName - procCall LPStream(s).initStream() + procCall SeekableStream(s).initStream() proc new*( - T: type StoreStream, + T: type SeekableStoreStream, store: BlockStore, manifest: Manifest, pad = true -): StoreStream = - ## Create a new StoreStream instance for a given store and manifest - ## - result = StoreStream( +): SeekableStoreStream = + ## Create a new SeekableStoreStream instance for a given store and manifest + ## + result = SeekableStoreStream( store: store, manifest: manifest, pad: pad, - lastIndex: -1, offset: 0) result.initStream() -method `size`*(self: StoreStream): int = +method `size`*(self: SeekableStoreStream): int = bytes(self.manifest, self.pad).int -proc `size=`*(self: StoreStream, size: int) +proc `size=`*(self: SeekableStoreStream, size: int) {.error: "Setting the size is forbidden".} = discard -method atEof*(self: StoreStream): bool = +method atEof*(self: SeekableStoreStream): bool = self.offset >= self.size method readOnce*( - self: StoreStream, + self: SeekableStoreStream, pbytes: pointer, nbytes: int ): Future[int] {.async.} = - ## Read `nbytes` from current position in the StoreStream into output buffer pointed by `pbytes`. + ## Read `nbytes` from current position in the SeekableStoreStream into output buffer pointed by `pbytes`. ## Return how many bytes were actually read before EOF was encountered. ## Raise exception if we are already at EOF. - ## + ## trace "Reading from manifest", cid = self.manifest.cid.get(), blocks = self.manifest.blocksCount if self.atEof: raise newLPStreamEOFError() - - # Initialize a block iterator - if self.lastIndex < 0: - without iter =? await self.store.getBlocks(self.manifest.treeCid, self.manifest.blocksCount, self.manifest.treeRoot), err: - raise newLPStreamReadError(err) - self.iter = iter + # The loop iterates over blocks in the SeekableStoreStream, + # reading them and copying their data into outbuf var read = 0 # Bytes read so far, and thus write offset in the outbuf while read < nbytes and not self.atEof: - if self.offset >= (self.lastIndex + 1) * self.manifest.blockSize.int: - if not self.iter.finished: - without lastBlock =? await self.iter.next(), err: - raise newLPStreamReadError(err) - self.lastBlock = lastBlock - inc self.lastIndex - else: - raise newLPStreamReadError(newException(CodexError, "Block iterator finished prematurely")) + # Compute from the current stream position `self.offset` the block num/offset to read # Compute how many bytes to read from this block let + blockNum = self.offset div self.manifest.blockSize.int blockOffset = self.offset mod self.manifest.blockSize.int readBytes = min([self.size - self.offset, nbytes - read, self.manifest.blockSize.int - blockOffset]) + address = BlockAddress(leaf: true, treeCid: self.manifest.treeCid, index: blockNum) + + # Read contents of block `blockNum` + without blk =? await self.store.getBlock(address), error: + raise newLPStreamReadError(error) + + trace "Reading bytes from store stream", blockNum, cid = blk.cid, bytes = readBytes, blockOffset + # Copy `readBytes` bytes starting at `blockOffset` from the block into the outbuf - if self.lastBlock.isEmpty: + if blk.isEmpty: zeroMem(pbytes.offset(read), readBytes) else: - copyMem(pbytes.offset(read), self.lastBlock.data[blockOffset].addr, readBytes) + copyMem(pbytes.offset(read), blk.data[blockOffset].addr, readBytes) # Update current positions in the stream and outbuf self.offset += readBytes @@ -123,7 +118,7 @@ method readOnce*( return read -method closeImpl*(self: StoreStream) {.async.} = - trace "Closing StoreStream" +method closeImpl*(self: SeekableStoreStream) {.async.} = + trace "Closing SeekableStoreStream" self.offset = self.size # set Eof await procCall LPStream(self).closeImpl()