monitor background tasks on streaming dataset (#1164)

This commit is contained in:
Dmitriy Ryajov 2025-03-21 11:23:07 -06:00 committed by GitHub
parent 3a312596bf
commit 110147d8ef
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -271,6 +271,8 @@ proc streamEntireDataset(
##
trace "Retrieving blocks from manifest", manifestCid
var jobs: seq[Future[void]]
let stream = LPStream(StoreStream.new(self.networkStore, manifest, pad = false))
if manifest.protected:
# Retrieve, decode and save to the local store all EС groups
proc erasureJob(): Future[void] {.async: (raises: []).} =
@ -284,14 +286,25 @@ proc streamEntireDataset(
except CatchableError as exc:
trace "Error erasure decoding manifest", manifestCid, exc = exc.msg
self.trackedFutures.track(erasureJob())
jobs.add(erasureJob())
self.trackedFutures.track(self.fetchDatasetAsync(manifest, fetchLocal = false))
# prefetch task should not fetch from local store
jobs.add(self.fetchDatasetAsync(manifest))
# Monitor stream completion and cancel background jobs when done
proc monitorStream() {.async: (raises: []).} =
try:
await stream.join()
except CatchableError as exc:
warn "Stream failed", exc = exc.msg
finally:
await noCancel allFutures(jobs.mapIt(it.cancelAndWait))
self.trackedFutures.track(monitorStream())
# Retrieve all blocks of the dataset sequentially from the local store or network
trace "Creating store stream for manifest", manifestCid
LPStream(StoreStream.new(self.networkStore, manifest, pad = false)).success
stream.success
proc retrieve*(
self: CodexNodeRef, cid: Cid, local: bool = true