mirror of
https://github.com/logos-storage/logos-storage-nim.git
synced 2026-01-02 13:33:10 +00:00
fix: resolve shared block request cancellation conflicts (#1284)
This commit is contained in:
parent
4d44154a40
commit
baff902137
@ -230,6 +230,12 @@ proc requestBlock*(
|
||||
): Future[?!Block] {.async: (raw: true, raises: [CancelledError]).} =
|
||||
self.requestBlock(BlockAddress.init(cid))
|
||||
|
||||
proc completeBlock*(self: BlockExcEngine, address: BlockAddress, blk: Block) =
|
||||
if address in self.pendingBlocks.blocks:
|
||||
self.pendingBlocks.completeWantHandle(address, blk)
|
||||
else:
|
||||
warn "Attempted to complete non-pending block", address
|
||||
|
||||
proc blockPresenceHandler*(
|
||||
self: BlockExcEngine, peer: PeerId, blocks: seq[BlockPresence]
|
||||
) {.async: (raises: []).} =
|
||||
|
||||
@ -90,6 +90,19 @@ proc getWantHandle*(
|
||||
): Future[Block] {.async: (raw: true, raises: [CancelledError, RetriesExhaustedError]).} =
|
||||
self.getWantHandle(BlockAddress.init(cid), inFlight)
|
||||
|
||||
proc completeWantHandle*(
|
||||
self: PendingBlocksManager, address: BlockAddress, blk: Block
|
||||
) {.raises: [].} =
|
||||
## Complete a pending want handle
|
||||
self.blocks.withValue(address, blockReq):
|
||||
if not blockReq[].handle.finished:
|
||||
trace "Completing want handle from provided block", address
|
||||
blockReq[].handle.complete(blk)
|
||||
else:
|
||||
trace "Want handle already completed", address
|
||||
do:
|
||||
trace "No pending want handle found for address", address
|
||||
|
||||
proc resolve*(
|
||||
self: PendingBlocksManager, blocksDelivery: seq[BlockDelivery]
|
||||
) {.gcsafe, raises: [].} =
|
||||
|
||||
@ -122,12 +122,10 @@ func indexToPos(steps, idx, step: int): int {.inline.} =
|
||||
|
||||
proc getPendingBlocks(
|
||||
self: Erasure, manifest: Manifest, indices: seq[int]
|
||||
): (AsyncIter[(?!bt.Block, int)], seq[Future[?!bt.Block]]) =
|
||||
): AsyncIter[(?!bt.Block, int)] =
|
||||
## Get pending blocks iterator
|
||||
##
|
||||
var
|
||||
pendingBlockFutures: seq[Future[?!bt.Block]] = @[]
|
||||
pendingBlocks: seq[Future[(?!bt.Block, int)]] = @[]
|
||||
var pendingBlocks: seq[Future[(?!bt.Block, int)]] = @[]
|
||||
|
||||
proc attachIndex(
|
||||
fut: Future[?!bt.Block], i: int
|
||||
@ -138,7 +136,6 @@ proc getPendingBlocks(
|
||||
for blockIndex in indices:
|
||||
# request blocks from the store
|
||||
let fut = self.store.getBlock(BlockAddress.init(manifest.treeCid, blockIndex))
|
||||
pendingBlockFutures.add(fut)
|
||||
pendingBlocks.add(attachIndex(fut, blockIndex))
|
||||
|
||||
proc isFinished(): bool =
|
||||
@ -157,7 +154,7 @@ proc getPendingBlocks(
|
||||
$index,
|
||||
)
|
||||
|
||||
(AsyncIter[(?!bt.Block, int)].new(genNext, isFinished), pendingBlockFutures)
|
||||
AsyncIter[(?!bt.Block, int)].new(genNext, isFinished)
|
||||
|
||||
proc prepareEncodingData(
|
||||
self: Erasure,
|
||||
@ -176,7 +173,7 @@ proc prepareEncodingData(
|
||||
firstIndex = 0, lastIndex = params.rounded - 1, iterations = params.steps
|
||||
)
|
||||
indices = toSeq(strategy.getIndices(step))
|
||||
(pendingBlocksIter, _) =
|
||||
pendingBlocksIter =
|
||||
self.getPendingBlocks(manifest, indices.filterIt(it < manifest.blocksCount))
|
||||
|
||||
var resolved = 0
|
||||
@ -226,7 +223,7 @@ proc prepareDecodingData(
|
||||
firstIndex = 0, lastIndex = encoded.blocksCount - 1, iterations = encoded.steps
|
||||
)
|
||||
indices = toSeq(strategy.getIndices(step))
|
||||
(pendingBlocksIter, pendingBlockFutures) = self.getPendingBlocks(encoded, indices)
|
||||
pendingBlocksIter = self.getPendingBlocks(encoded, indices)
|
||||
|
||||
var
|
||||
dataPieces = 0
|
||||
@ -266,11 +263,6 @@ proc prepareDecodingData(
|
||||
|
||||
resolved.inc
|
||||
|
||||
pendingBlockFutures.apply(
|
||||
proc(fut: auto) =
|
||||
fut.cancel()
|
||||
)
|
||||
|
||||
return success (dataPieces.Natural, parityPieces.Natural)
|
||||
|
||||
proc init*(
|
||||
@ -631,6 +623,8 @@ proc decodeInternal(
|
||||
warn "Unable to store block!", cid = blk.cid, msg = error.msg
|
||||
return failure("Unable to store block!")
|
||||
|
||||
self.store.completeBlock(BlockAddress.init(encoded.treeCid, idx), blk)
|
||||
|
||||
cids[idx] = blk.cid
|
||||
recoveredIndices.add(idx)
|
||||
except CancelledError as exc:
|
||||
|
||||
@ -65,6 +65,11 @@ method getBlock*(
|
||||
|
||||
raiseAssert("getBlock by addr not implemented!")
|
||||
|
||||
method completeBlock*(
|
||||
self: BlockStore, address: BlockAddress, blk: Block
|
||||
) {.base, gcsafe.} =
|
||||
discard
|
||||
|
||||
method getBlockAndProof*(
|
||||
self: BlockStore, treeCid: Cid, index: Natural
|
||||
): Future[?!(Block, CodexProof)] {.base, async: (raises: [CancelledError]), gcsafe.} =
|
||||
|
||||
@ -259,6 +259,9 @@ method delBlock*(
|
||||
|
||||
return success()
|
||||
|
||||
method completeBlock*(self: CacheStore, address: BlockAddress, blk: Block) {.gcsafe.} =
|
||||
discard
|
||||
|
||||
method close*(self: CacheStore): Future[void] {.async: (raises: []).} =
|
||||
## Close the blockstore, a no-op for this implementation
|
||||
##
|
||||
|
||||
@ -63,6 +63,9 @@ method getBlock*(
|
||||
|
||||
self.getBlock(BlockAddress.init(treeCid, index))
|
||||
|
||||
method completeBlock*(self: NetworkStore, address: BlockAddress, blk: Block) =
|
||||
self.engine.completeBlock(address, blk)
|
||||
|
||||
method putBlock*(
|
||||
self: NetworkStore, blk: Block, ttl = Duration.none
|
||||
): Future[?!void] {.async: (raises: [CancelledError]).} =
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user