From ad7619e1ff6a4c1e992460ef0435beccf2a4f30e Mon Sep 17 00:00:00 2001 From: Arnaud Date: Fri, 17 Oct 2025 13:13:19 +0200 Subject: [PATCH] Add cancel support for file stream download --- .../requests/node_download_request.nim | 75 +++++++++++-------- 1 file changed, 43 insertions(+), 32 deletions(-) diff --git a/library/codex_thread_requests/requests/node_download_request.nim b/library/codex_thread_requests/requests/node_download_request.nim index 76baae0b..7ffc1361 100644 --- a/library/codex_thread_requests/requests/node_download_request.nim +++ b/library/codex_thread_requests/requests/node_download_request.nim @@ -10,10 +10,10 @@ ## - CHUNK: downloads the next chunk of the file ## - CANCEL: cancels the download session ## 2. Via stream. +## - INIT: initializes the download session ## - STREAM: downloads the file in a streaming manner, calling ## the onChunk handler for each chunk and / or writing to a file if filepath is set. -## Cancel is supported in this mode because the worker will be busy -## downloading the file so it cannot pickup another request to cancel the download. +## - CANCEL: cancels the download session import std/[options, streams] import chronos @@ -105,11 +105,11 @@ proc init( return err("Failed to init the download: " & res.error.msg) stream = res.get() except CancelledError: - downloadSessions.del($cCid) + downloadSessions.del($cid) return err("Failed to init the download: download cancelled.") let blockSize = if chunkSize.int > 0: chunkSize.int else: DefaultBlockSize.int - downloadSessions[$cCid] = DownloadSession(stream: stream, chunkSize: blockSize) + downloadSessions[$cid] = DownloadSession(stream: stream, chunkSize: blockSize) return ok("") @@ -161,23 +161,13 @@ proc chunk( proc streamData( codex: ptr CodexServer, - cid: Cid, - local: bool, + stream: LPStream, onChunk: OnChunkHandler, chunkSize: csize_t, filepath: cstring, -): Future[Result[string, string]] {.async: (raises: [CancelledError]).} = - let node = codex[].node - - let res = await node.retrieve(cid, local = local) - if res.isErr(): - return err("Failed to retrieve CID: " & res.error.msg) - - let stream = res.get() - - if stream.atEof: - return err("Failed to retrieve CID: empty stream.") - +): Future[Result[string, string]] {. + async: (raises: [CancelledError, LPStreamError, IOError]) +.} = let blockSize = if chunkSize.int > 0: chunkSize.int else: DefaultBlockSize.int var buf = newSeq[byte](blockSize) var read = 0 @@ -189,6 +179,10 @@ proc streamData( outputStream = filedest.fileOutput() while not stream.atEof: + ## Yield immediately to the event loop + ## It gives a chance to cancel request to be processed + await sleepAsync(0) + let read = await stream.readOnce(addr buf[0], buf.len) buf.setLen(read) @@ -202,13 +196,9 @@ proc streamData( if outputStream != nil: outputStream.close() - except LPStreamError as e: - return err("Failed to stream file: " & $e.msg) - except IOError as e: - return err("Failed to write to file: " & $e.msg) finally: - await stream.close() - downloadSessions.del($cid) + if outputStream != nil: + outputStream.close() return ok("") @@ -223,20 +213,37 @@ proc stream( ## Stream the file identified by cid, calling the onChunk handler for each chunk ## and / or writing to a file if filepath is set. ## - ## If local is true, the file will be retrived from the local store. - - let node = codex[].node + ## If local is true, the file will be retrieved from the local store. let cid = Cid.init($cCid) if cid.isErr: return err("Failed to download locally: cannot parse cid: " & $cCid) + if not downloadSessions.contains($cid): + return err("Failed to download chunk: no session for cid " & $cid) + + var session: DownloadSession try: - let res = await codex.streamData(cid.get(), local, onChunk, chunkSize, filepath) + session = downloadSessions[$cid] + except KeyError: + return err("Failed to download chunk: no session for cid " & $cid) + + let node = codex[].node + + try: + let res = await codex.streamData(session.stream, onChunk, chunkSize, filepath) if res.isErr: return err($res.error) + except LPStreamError as e: + return err("Failed to stream file: " & $e.msg) + except IOError as e: + return err("Failed to write to file: " & $e.msg) except CancelledError: return err("Failed to download locally: download cancelled.") + finally: + if session.stream != nil: + await session.stream.close() + downloadSessions.del($cid) return ok("") @@ -247,14 +254,18 @@ proc cancel( ## This operation is not supported when using the stream mode, ## because the worker will be busy downloading the file. - if not downloadSessions.contains($cCid): - return err("Failed to download chunk: no session for cid " & $cCid) + let cid = Cid.init($cCid) + if cid.isErr: + return err("Failed to download locally: cannot parse cid: " & $cCid) + + if not downloadSessions.contains($cid): + return err("Failed to download chunk: no session for cid " & $cid) var session: DownloadSession try: - session = downloadSessions[$cCid] + session = downloadSessions[$cid] except KeyError: - return err("Failed to download chunk: no session for cid " & $cCid) + return err("Failed to download chunk: no session for cid " & $cid) let stream = session.stream await stream.close()