From 110147d8efbb1411fa4cb393125ada1b2e461be1 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Fri, 21 Mar 2025 11:23:07 -0600 Subject: [PATCH] monitor background tasks on streaming dataset (#1164) --- codex/node.nim | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/codex/node.nim b/codex/node.nim index 203e034a..9932deb6 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -271,6 +271,8 @@ proc streamEntireDataset( ## trace "Retrieving blocks from manifest", manifestCid + var jobs: seq[Future[void]] + let stream = LPStream(StoreStream.new(self.networkStore, manifest, pad = false)) if manifest.protected: # Retrieve, decode and save to the local store all EС groups proc erasureJob(): Future[void] {.async: (raises: []).} = @@ -284,14 +286,25 @@ proc streamEntireDataset( except CatchableError as exc: trace "Error erasure decoding manifest", manifestCid, exc = exc.msg - self.trackedFutures.track(erasureJob()) + jobs.add(erasureJob()) - self.trackedFutures.track(self.fetchDatasetAsync(manifest, fetchLocal = false)) - # prefetch task should not fetch from local store + jobs.add(self.fetchDatasetAsync(manifest)) + + # Monitor stream completion and cancel background jobs when done + proc monitorStream() {.async: (raises: []).} = + try: + await stream.join() + except CatchableError as exc: + warn "Stream failed", exc = exc.msg + finally: + await noCancel allFutures(jobs.mapIt(it.cancelAndWait)) + + self.trackedFutures.track(monitorStream()) # Retrieve all blocks of the dataset sequentially from the local store or network trace "Creating store stream for manifest", manifestCid - LPStream(StoreStream.new(self.networkStore, manifest, pad = false)).success + + stream.success proc retrieve*( self: CodexNodeRef, cid: Cid, local: bool = true