diff --git a/codex/node.nim b/codex/node.nim index 203e034a..9932deb6 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -271,6 +271,8 @@ proc streamEntireDataset( ## trace "Retrieving blocks from manifest", manifestCid + var jobs: seq[Future[void]] + let stream = LPStream(StoreStream.new(self.networkStore, manifest, pad = false)) if manifest.protected: # Retrieve, decode and save to the local store all EС groups proc erasureJob(): Future[void] {.async: (raises: []).} = @@ -284,14 +286,25 @@ proc streamEntireDataset( except CatchableError as exc: trace "Error erasure decoding manifest", manifestCid, exc = exc.msg - self.trackedFutures.track(erasureJob()) + jobs.add(erasureJob()) - self.trackedFutures.track(self.fetchDatasetAsync(manifest, fetchLocal = false)) - # prefetch task should not fetch from local store + jobs.add(self.fetchDatasetAsync(manifest)) + + # Monitor stream completion and cancel background jobs when done + proc monitorStream() {.async: (raises: []).} = + try: + await stream.join() + except CatchableError as exc: + warn "Stream failed", exc = exc.msg + finally: + await noCancel allFutures(jobs.mapIt(it.cancelAndWait)) + + self.trackedFutures.track(monitorStream()) # Retrieve all blocks of the dataset sequentially from the local store or network trace "Creating store stream for manifest", manifestCid - LPStream(StoreStream.new(self.networkStore, manifest, pad = false)).success + + stream.success proc retrieve*( self: CodexNodeRef, cid: Cid, local: bool = true