diff --git a/codex/blockexchange/engine/engine.nim b/codex/blockexchange/engine/engine.nim index 36d00cf0..0d04fd7f 100644 --- a/codex/blockexchange/engine/engine.nim +++ b/codex/blockexchange/engine/engine.nim @@ -230,6 +230,12 @@ proc requestBlock*( ): Future[?!Block] {.async: (raw: true, raises: [CancelledError]).} = self.requestBlock(BlockAddress.init(cid)) +proc completeBlock*(self: BlockExcEngine, address: BlockAddress, blk: Block) = + if address in self.pendingBlocks.blocks: + self.pendingBlocks.completeWantHandle(address, blk) + else: + warn "Attempted to complete non-pending block", address + proc blockPresenceHandler*( self: BlockExcEngine, peer: PeerId, blocks: seq[BlockPresence] ) {.async: (raises: []).} = diff --git a/codex/blockexchange/engine/pendingblocks.nim b/codex/blockexchange/engine/pendingblocks.nim index f169f744..80c88527 100644 --- a/codex/blockexchange/engine/pendingblocks.nim +++ b/codex/blockexchange/engine/pendingblocks.nim @@ -90,6 +90,19 @@ proc getWantHandle*( ): Future[Block] {.async: (raw: true, raises: [CancelledError, RetriesExhaustedError]).} = self.getWantHandle(BlockAddress.init(cid), inFlight) +proc completeWantHandle*( + self: PendingBlocksManager, address: BlockAddress, blk: Block +) {.raises: [].} = + ## Complete a pending want handle + self.blocks.withValue(address, blockReq): + if not blockReq[].handle.finished: + trace "Completing want handle from provided block", address + blockReq[].handle.complete(blk) + else: + trace "Want handle already completed", address + do: + trace "No pending want handle found for address", address + proc resolve*( self: PendingBlocksManager, blocksDelivery: seq[BlockDelivery] ) {.gcsafe, raises: [].} = diff --git a/codex/erasure/erasure.nim b/codex/erasure/erasure.nim index a75837b7..95516500 100644 --- a/codex/erasure/erasure.nim +++ b/codex/erasure/erasure.nim @@ -122,12 +122,10 @@ func indexToPos(steps, idx, step: int): int {.inline.} = proc getPendingBlocks( self: Erasure, manifest: Manifest, indices: seq[int] -): (AsyncIter[(?!bt.Block, int)], seq[Future[?!bt.Block]]) = +): AsyncIter[(?!bt.Block, int)] = ## Get pending blocks iterator ## - var - pendingBlockFutures: seq[Future[?!bt.Block]] = @[] - pendingBlocks: seq[Future[(?!bt.Block, int)]] = @[] + var pendingBlocks: seq[Future[(?!bt.Block, int)]] = @[] proc attachIndex( fut: Future[?!bt.Block], i: int @@ -138,7 +136,6 @@ proc getPendingBlocks( for blockIndex in indices: # request blocks from the store let fut = self.store.getBlock(BlockAddress.init(manifest.treeCid, blockIndex)) - pendingBlockFutures.add(fut) pendingBlocks.add(attachIndex(fut, blockIndex)) proc isFinished(): bool = @@ -157,7 +154,7 @@ proc getPendingBlocks( $index, ) - (AsyncIter[(?!bt.Block, int)].new(genNext, isFinished), pendingBlockFutures) + AsyncIter[(?!bt.Block, int)].new(genNext, isFinished) proc prepareEncodingData( self: Erasure, @@ -176,7 +173,7 @@ proc prepareEncodingData( firstIndex = 0, lastIndex = params.rounded - 1, iterations = params.steps ) indices = toSeq(strategy.getIndices(step)) - (pendingBlocksIter, _) = + pendingBlocksIter = self.getPendingBlocks(manifest, indices.filterIt(it < manifest.blocksCount)) var resolved = 0 @@ -226,7 +223,7 @@ proc prepareDecodingData( firstIndex = 0, lastIndex = encoded.blocksCount - 1, iterations = encoded.steps ) indices = toSeq(strategy.getIndices(step)) - (pendingBlocksIter, pendingBlockFutures) = self.getPendingBlocks(encoded, indices) + pendingBlocksIter = self.getPendingBlocks(encoded, indices) var dataPieces = 0 @@ -266,11 +263,6 @@ proc prepareDecodingData( resolved.inc - pendingBlockFutures.apply( - proc(fut: auto) = - fut.cancel() - ) - return success (dataPieces.Natural, parityPieces.Natural) proc init*( @@ -631,6 +623,8 @@ proc decodeInternal( warn "Unable to store block!", cid = blk.cid, msg = error.msg return failure("Unable to store block!") + self.store.completeBlock(BlockAddress.init(encoded.treeCid, idx), blk) + cids[idx] = blk.cid recoveredIndices.add(idx) except CancelledError as exc: diff --git a/codex/stores/blockstore.nim b/codex/stores/blockstore.nim index bbe0bef8..e436577c 100644 --- a/codex/stores/blockstore.nim +++ b/codex/stores/blockstore.nim @@ -65,6 +65,11 @@ method getBlock*( raiseAssert("getBlock by addr not implemented!") +method completeBlock*( + self: BlockStore, address: BlockAddress, blk: Block +) {.base, gcsafe.} = + discard + method getBlockAndProof*( self: BlockStore, treeCid: Cid, index: Natural ): Future[?!(Block, CodexProof)] {.base, async: (raises: [CancelledError]), gcsafe.} = diff --git a/codex/stores/cachestore.nim b/codex/stores/cachestore.nim index 54bed1b8..ff3fd6df 100644 --- a/codex/stores/cachestore.nim +++ b/codex/stores/cachestore.nim @@ -259,6 +259,9 @@ method delBlock*( return success() +method completeBlock*(self: CacheStore, address: BlockAddress, blk: Block) {.gcsafe.} = + discard + method close*(self: CacheStore): Future[void] {.async: (raises: []).} = ## Close the blockstore, a no-op for this implementation ## diff --git a/codex/stores/networkstore.nim b/codex/stores/networkstore.nim index 64410ce0..06b96b77 100644 --- a/codex/stores/networkstore.nim +++ b/codex/stores/networkstore.nim @@ -63,6 +63,9 @@ method getBlock*( self.getBlock(BlockAddress.init(treeCid, index)) +method completeBlock*(self: NetworkStore, address: BlockAddress, blk: Block) = + self.engine.completeBlock(address, blk) + method putBlock*( self: NetworkStore, blk: Block, ttl = Duration.none ): Future[?!void] {.async: (raises: [CancelledError]).} =