Add cancel support for file stream download

This commit is contained in:
Arnaud 2025-10-17 13:13:19 +02:00 committed by Eric
parent a343eb6564
commit d698d6df65
No known key found for this signature in database

View File

@ -10,10 +10,10 @@
## - CHUNK: downloads the next chunk of the file
## - CANCEL: cancels the download session
## 2. Via stream.
## - INIT: initializes the download session
## - STREAM: downloads the file in a streaming manner, calling
## the onChunk handler for each chunk and / or writing to a file if filepath is set.
## Cancel is supported in this mode because the worker will be busy
## downloading the file so it cannot pickup another request to cancel the download.
## - CANCEL: cancels the download session
import std/[options, streams]
import chronos
@ -105,11 +105,11 @@ proc init(
return err("Failed to init the download: " & res.error.msg)
stream = res.get()
except CancelledError:
downloadSessions.del($cCid)
downloadSessions.del($cid)
return err("Failed to init the download: download cancelled.")
let blockSize = if chunkSize.int > 0: chunkSize.int else: DefaultBlockSize.int
downloadSessions[$cCid] = DownloadSession(stream: stream, chunkSize: blockSize)
downloadSessions[$cid] = DownloadSession(stream: stream, chunkSize: blockSize)
return ok("")
@ -161,23 +161,13 @@ proc chunk(
proc streamData(
codex: ptr CodexServer,
cid: Cid,
local: bool,
stream: LPStream,
onChunk: OnChunkHandler,
chunkSize: csize_t,
filepath: cstring,
): Future[Result[string, string]] {.async: (raises: [CancelledError]).} =
let node = codex[].node
let res = await node.retrieve(cid, local = local)
if res.isErr():
return err("Failed to retrieve CID: " & res.error.msg)
let stream = res.get()
if stream.atEof:
return err("Failed to retrieve CID: empty stream.")
): Future[Result[string, string]] {.
async: (raises: [CancelledError, LPStreamError, IOError])
.} =
let blockSize = if chunkSize.int > 0: chunkSize.int else: DefaultBlockSize.int
var buf = newSeq[byte](blockSize)
var read = 0
@ -189,6 +179,10 @@ proc streamData(
outputStream = filedest.fileOutput()
while not stream.atEof:
## Yield immediately to the event loop
## It gives a chance to cancel request to be processed
await sleepAsync(0)
let read = await stream.readOnce(addr buf[0], buf.len)
buf.setLen(read)
@ -202,13 +196,9 @@ proc streamData(
if outputStream != nil:
outputStream.close()
except LPStreamError as e:
return err("Failed to stream file: " & $e.msg)
except IOError as e:
return err("Failed to write to file: " & $e.msg)
finally:
await stream.close()
downloadSessions.del($cid)
if outputStream != nil:
outputStream.close()
return ok("")
@ -223,20 +213,37 @@ proc stream(
## Stream the file identified by cid, calling the onChunk handler for each chunk
## and / or writing to a file if filepath is set.
##
## If local is true, the file will be retrived from the local store.
let node = codex[].node
## If local is true, the file will be retrieved from the local store.
let cid = Cid.init($cCid)
if cid.isErr:
return err("Failed to download locally: cannot parse cid: " & $cCid)
if not downloadSessions.contains($cid):
return err("Failed to download chunk: no session for cid " & $cid)
var session: DownloadSession
try:
let res = await codex.streamData(cid.get(), local, onChunk, chunkSize, filepath)
session = downloadSessions[$cid]
except KeyError:
return err("Failed to download chunk: no session for cid " & $cid)
let node = codex[].node
try:
let res = await codex.streamData(session.stream, onChunk, chunkSize, filepath)
if res.isErr:
return err($res.error)
except LPStreamError as e:
return err("Failed to stream file: " & $e.msg)
except IOError as e:
return err("Failed to write to file: " & $e.msg)
except CancelledError:
return err("Failed to download locally: download cancelled.")
finally:
if session.stream != nil:
await session.stream.close()
downloadSessions.del($cid)
return ok("")
@ -247,14 +254,18 @@ proc cancel(
## This operation is not supported when using the stream mode,
## because the worker will be busy downloading the file.
if not downloadSessions.contains($cCid):
return err("Failed to download chunk: no session for cid " & $cCid)
let cid = Cid.init($cCid)
if cid.isErr:
return err("Failed to download locally: cannot parse cid: " & $cCid)
if not downloadSessions.contains($cid):
return err("Failed to download chunk: no session for cid " & $cid)
var session: DownloadSession
try:
session = downloadSessions[$cCid]
session = downloadSessions[$cid]
except KeyError:
return err("Failed to download chunk: no session for cid " & $cCid)
return err("Failed to download chunk: no session for cid " & $cid)
let stream = session.stream
await stream.close()