diff --git a/codex/node.nim b/codex/node.nim index 70cb19fb..4c783086 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -213,6 +213,28 @@ proc fetchBatched*( let iter = Iter[int].new(0 ..< manifest.blocksCount) self.fetchBatched(manifest.treeCid, iter, batchSize, onBatch, fetchLocal) +proc fetchDatasetAsync*( + self: CodexNodeRef, manifest: Manifest, fetchLocal = true +): Future[void] {.async: (raises: []).} = + ## Asynchronously fetch a dataset in the background. + ## This task will be tracked and cleaned up on node shutdown. + ## + try: + if err =? ( + await self.fetchBatched( + manifest = manifest, batchSize = DefaultFetchBatch, fetchLocal = fetchLocal + ) + ).errorOption: + error "Unable to fetch blocks", err = err.msg + except CatchableError as exc: + error "Error fetching blocks", exc = exc.msg + +proc fetchDatasetAsyncTask*(self: CodexNodeRef, manifest: Manifest) = + ## Start fetching a dataset in the background. + ## The task will be tracked and cleaned up on node shutdown. + ## + self.trackedFutures.track(self.fetchDatasetAsync(manifest, fetchLocal = false)) + proc streamSingleBlock(self: CodexNodeRef, cid: Cid): Future[?!LPStream] {.async.} = ## Streams the contents of a single block. ## @@ -256,16 +278,8 @@ proc streamEntireDataset( self.trackedFutures.track(erasureJob()) - proc prefetch() {.async: (raises: []).} = - try: - if err =? ( - await self.fetchBatched(manifest, DefaultFetchBatch, fetchLocal = false) - ).errorOption: - error "Unable to fetch blocks", err = err.msg - except CatchableError as exc: - error "Error fetching blocks", exc = exc.msg - - self.trackedFutures.track(prefetch()) + self.trackedFutures.track(self.fetchDatasetAsync(manifest, fetchLocal = false)) + # prefetch task should not fetch from local store # Retrieve all blocks of the dataset sequentially from the local store or network trace "Creating store stream for manifest", manifestCid diff --git a/codex/rest/api.nim b/codex/rest/api.nim index 89dbe220..713e1c04 100644 --- a/codex/rest/api.nim +++ b/codex/rest/api.nim @@ -299,15 +299,8 @@ proc initDataApi(node: CodexNodeRef, repoStore: RepoStore, router: var RestRoute error "Failed to fetch manifest", err = err.msg return RestApiResponse.error(Http404, err.msg, headers = headers) - proc fetchDatasetAsync(): Future[void] {.async.} = - try: - if err =? (await node.fetchBatched(manifest)).errorOption: - error "Unable to fetch dataset", cid = cid.get(), err = err.msg - except CatchableError as exc: - error "CatchableError when fetching dataset", cid = cid.get(), exc = exc.msg - discard - - asyncSpawn fetchDatasetAsync() + # Start fetching the dataset in the background + node.fetchDatasetAsyncTask(manifest) let json = %formatManifest(cid.get(), manifest) return RestApiResponse.response($json, contentType = "application/json")