From f3618cb007b5d914a7a59c8a7a8db8269975cef1 Mon Sep 17 00:00:00 2001 From: Chrysostomos Nanakos Date: Thu, 19 Jun 2025 12:01:22 +0300 Subject: [PATCH] fix: Disable erasure early exit during streaming to prevent cancellation conflicts (#1275) When streamEntireDataset() runs background erasure decoding while client streams blocks, erasure early exit was cancelling downloads that the streaming client still needed, causing CancelledError failures. Add allowEarlyExit parameter to erasure.decode() and disable it during streaming. This makes the background erasure job act as a prefetching helper rather than competing with client reads for the same blocks. The early exit optimization is less valuable during streaming since the client typically needs most blocks anyway, and cooperative prefetching provides better user experience than download cancellation conflicts. --- codex/erasure/erasure.nim | 24 +++++++++++++++--------- codex/node.nim | 5 ++++- 2 files changed, 19 insertions(+), 10 deletions(-) diff --git a/codex/erasure/erasure.nim b/codex/erasure/erasure.nim index a75837b7..d31256cb 100644 --- a/codex/erasure/erasure.nim +++ b/codex/erasure/erasure.nim @@ -211,6 +211,7 @@ proc prepareDecodingData( parityData: ref seq[seq[byte]], cids: ref seq[Cid], emptyBlock: seq[byte], + allowEarlyExit = true, ): Future[?!(Natural, Natural)] {.async.} = ## Prepare data for decoding ## `encoded` - the encoded manifest @@ -228,6 +229,13 @@ proc prepareDecodingData( indices = toSeq(strategy.getIndices(step)) (pendingBlocksIter, pendingBlockFutures) = self.getPendingBlocks(encoded, indices) + defer: + if allowEarlyExit: + pendingBlockFutures.apply( + proc(fut: auto) = + fut.cancel() + ) + var dataPieces = 0 parityPieces = 0 @@ -266,11 +274,6 @@ proc prepareDecodingData( resolved.inc - pendingBlockFutures.apply( - proc(fut: auto) = - fut.cancel() - ) - return success (dataPieces.Natural, parityPieces.Natural) proc init*( @@ -561,7 +564,7 @@ proc asyncDecode*( success() proc decodeInternal( - self: Erasure, encoded: Manifest + self: Erasure, encoded: Manifest, allowEarlyExit = true ): Future[?!(ref seq[Cid], seq[Natural])] {.async.} = logScope: steps = encoded.steps @@ -594,7 +597,7 @@ proc decodeInternal( without (dataPieces, _) =? ( await self.prepareDecodingData( - encoded, step, data, parityData, cids, emptyBlock + encoded, step, data, parityData, cids, emptyBlock, allowEarlyExit ) ), err: trace "Unable to prepare data", error = err.msg @@ -644,7 +647,9 @@ proc decodeInternal( return (cids, recoveredIndices).success -proc decode*(self: Erasure, encoded: Manifest): Future[?!Manifest] {.async.} = +proc decode*( + self: Erasure, encoded: Manifest, allowEarlyExit = true +): Future[?!Manifest] {.async.} = ## Decode a protected manifest into it's original ## manifest ## @@ -652,7 +657,8 @@ proc decode*(self: Erasure, encoded: Manifest): Future[?!Manifest] {.async.} = ## be recovered ## - without (cids, recoveredIndices) =? (await self.decodeInternal(encoded)), err: + without (cids, recoveredIndices) =? + (await self.decodeInternal(encoded, allowEarlyExit)), err: return failure(err) without tree =? CodexTree.init(cids[0 ..< encoded.originalBlocksCount]), err: diff --git a/codex/node.nim b/codex/node.nim index e010b085..c98f7ecf 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -296,7 +296,10 @@ proc streamEntireDataset( let erasure = Erasure.new( self.networkStore, leoEncoderProvider, leoDecoderProvider, self.taskpool ) - without _ =? (await erasure.decode(manifest)), error: + # when streaming, erasure acts as a prefetching helper rather than + # trying to exit early. The early exit optimization is less valuable + # when the client needs most blocks anyway. + without _ =? (await erasure.decode(manifest, allowEarlyExit = false)), error: error "Unable to erasure decode manifest", manifestCid, exc = error.msg except CatchableError as exc: trace "Error erasure decoding manifest", manifestCid, exc = exc.msg