rework background downloads and prefetch

This commit is contained in:
Dmitriy Ryajov 2025-02-26 10:13:51 -06:00
parent 7aae901440
commit 1a2381c0f2
No known key found for this signature in database
GPG Key ID: DA8C680CE7C657A4
2 changed files with 26 additions and 19 deletions

View File

@ -213,6 +213,28 @@ proc fetchBatched*(
let iter = Iter[int].new(0 ..< manifest.blocksCount)
self.fetchBatched(manifest.treeCid, iter, batchSize, onBatch, fetchLocal)
proc fetchDatasetAsync*(
self: CodexNodeRef, manifest: Manifest, fetchLocal = true
): Future[void] {.async: (raises: []).} =
## Asynchronously fetch a dataset in the background.
## This task will be tracked and cleaned up on node shutdown.
##
try:
if err =? (
await self.fetchBatched(
manifest = manifest, batchSize = DefaultFetchBatch, fetchLocal = fetchLocal
)
).errorOption:
error "Unable to fetch blocks", err = err.msg
except CatchableError as exc:
error "Error fetching blocks", exc = exc.msg
proc fetchDatasetAsyncTask*(self: CodexNodeRef, manifest: Manifest) =
## Start fetching a dataset in the background.
## The task will be tracked and cleaned up on node shutdown.
##
self.trackedFutures.track(self.fetchDatasetAsync(manifest, fetchLocal = false))
proc streamSingleBlock(self: CodexNodeRef, cid: Cid): Future[?!LPStream] {.async.} =
## Streams the contents of a single block.
##
@ -256,16 +278,8 @@ proc streamEntireDataset(
self.trackedFutures.track(erasureJob())
proc prefetch() {.async: (raises: []).} =
try:
if err =? (
await self.fetchBatched(manifest, DefaultFetchBatch, fetchLocal = false)
).errorOption:
error "Unable to fetch blocks", err = err.msg
except CatchableError as exc:
error "Error fetching blocks", exc = exc.msg
self.trackedFutures.track(prefetch())
self.trackedFutures.track(self.fetchDatasetAsync(manifest, fetchLocal = false))
# prefetch task should not fetch from local store
# Retrieve all blocks of the dataset sequentially from the local store or network
trace "Creating store stream for manifest", manifestCid

View File

@ -299,15 +299,8 @@ proc initDataApi(node: CodexNodeRef, repoStore: RepoStore, router: var RestRoute
error "Failed to fetch manifest", err = err.msg
return RestApiResponse.error(Http404, err.msg, headers = headers)
proc fetchDatasetAsync(): Future[void] {.async.} =
try:
if err =? (await node.fetchBatched(manifest)).errorOption:
error "Unable to fetch dataset", cid = cid.get(), err = err.msg
except CatchableError as exc:
error "CatchableError when fetching dataset", cid = cid.get(), exc = exc.msg
discard
asyncSpawn fetchDatasetAsync()
# Start fetching the dataset in the background
node.fetchDatasetAsyncTask(manifest)
let json = %formatManifest(cid.get(), manifest)
return RestApiResponse.response($json, contentType = "application/json")