From 2c3849f1198db5323ac29ab26805e348c5884d84 Mon Sep 17 00:00:00 2001 From: Arnaud Date: Mon, 29 Sep 2025 15:23:11 +0200 Subject: [PATCH] Refactore upload for better memory management with thread and avoid segfault --- .../codex_thread_request.nim | 7 +- .../requests/node_upload_request.nim | 179 +++++++----------- library/libcodex.h | 25 ++- 3 files changed, 98 insertions(+), 113 deletions(-) diff --git a/library/codex_thread_requests/codex_thread_request.nim b/library/codex_thread_requests/codex_thread_request.nim index 1701a669..e717554b 100644 --- a/library/codex_thread_requests/codex_thread_request.nim +++ b/library/codex_thread_requests/codex_thread_request.nim @@ -54,7 +54,7 @@ proc createShared*( # We can improve this by dispatching the callbacks to a thread pool or # moving to a MP channel. # See: https://github.com/codex-storage/nim-codex/pull/1322#discussion_r2340708316 -proc handleRes[T: string | void]( +proc handleRes[T: string | void | seq[byte]]( res: Result[T, string], request: ptr CodexThreadRequest ) = ## Handles the Result responses, which can either be Result[string, string] or @@ -95,7 +95,10 @@ proc process*( of P2P: cast[ptr NodeP2PRequest](request[].reqContent).process(codex) of UPLOAD: - cast[ptr NodeUploadRequest](request[].reqContent).process(codex) + let onUploadProgress = proc(bytes: int) = + request[].callback(RET_PROGRESS, nil, cast[csize_t](bytes), request[].userData) + + cast[ptr NodeUploadRequest](request[].reqContent).process(codex, onUploadProgress) handleRes(await retFut, request) diff --git a/library/codex_thread_requests/requests/node_upload_request.nim b/library/codex_thread_requests/requests/node_upload_request.nim index b83d6aeb..7dc70d4e 100644 --- a/library/codex_thread_requests/requests/node_upload_request.nim +++ b/library/codex_thread_requests/requests/node_upload_request.nim @@ -10,15 +10,14 @@ ## - CHUNK: sends a chunk of data to the upload session. ## - FINALIZE: finalizes the upload and returns the CID of the uploaded file. ## - CANCEL: cancels the upload session. -## - SUBSCRIBE: subscribes to progress updates for the upload session. ## ## 2. Directly from a file path: the filepath has to be absolute. ## - INIT: creates a new upload session and returns its ID ## - FILE: starts the upload and returns the CID of the uploaded file -## when the upload is done. -## - CANCEL: cancels the upload session. +## Cancel is not supported in this mode because the worker will be busy +## uploading the file so it cannot pickup another request to cancel the upload. -import std/[options, os, mimetypes, streams] +import std/[options, os, mimetypes] import chronos import chronicles import questionable @@ -31,7 +30,7 @@ import ../../../codex/codextypes from ../../../codex/codex import CodexServer, node from ../../../codex/node import store -from libp2p import Cid +from libp2p import Cid, `$` logScope: topics = "codexlib codexlibupload" @@ -42,10 +41,8 @@ type NodeUploadMsgType* = enum FINALIZE CANCEL FILE - SUBSCRIBE -type OnProgressHandler = - proc(bytes: int): Future[void] {.gcsafe, async: (raises: [CancelledError]).} +type OnProgressHandler = proc(bytes: int): void {.gcsafe, raises: [].} type NodeUploadRequest* = object operation: NodeUploadMsgType @@ -53,7 +50,6 @@ type NodeUploadRequest* = object filepath: cstring chunk: seq[byte] chunkSize: csize_t - onProgress: OnProgressHandler type UploadSessionId* = string @@ -74,7 +70,6 @@ proc createShared*( sessionId: cstring = "", filepath: cstring = "", chunk: seq[byte] = @[], - onProgress: OnProgressHandler = nil, chunkSize: csize_t = 0, ): ptr type T = var ret = createShared(T) @@ -83,7 +78,7 @@ proc createShared*( ret[].filepath = filepath.alloc() ret[].chunk = chunk ret[].chunkSize = chunkSize - ret[].onProgress = onProgress + return ret proc destroyShared(self: ptr NodeUploadRequest) = @@ -104,11 +99,11 @@ proc init( ## ## The chunkSize matches by default the block size used to store the file. ## - ## When a session contains an onProgress handler, it is called - ## with the number of bytes received each time a block is stored thanks to - ## `onBlockStore` callback. - ## After the `node.store` future is done, the onProgress handler - ## is called one last time with 0 bytes to signal the end of the upload. + ## A callback `onBlockStore` is provided to `node.store` to + ## report the progress of the upload. This callback will check + ## that an `onProgress` handler is set in the session + ## and call it with the number of bytes stored each time a block + ## is stored. var filenameOpt, mimetypeOpt = string.none @@ -141,31 +136,19 @@ proc init( let lpStream = LPStream(stream) let node = codex[].node - let onBlockStore = proc( - chunk: seq[byte] - ): Future[void] {.gcsafe, async: (raises: [CancelledError]).} = + let onBlockStored = proc(chunk: seq[byte]): void {.gcsafe, raises: [].} = try: - let session = uploadSessions[$sessionId] - if session.onProgress != nil: - await session.onProgress(chunk.len) + if uploadSessions.contains($sessionId): + let session = uploadSessions[$sessionId] + if session.onProgress != nil: + session.onProgress(chunk.len) except KeyError: error "Failed to push progress update, session is not found: ", sessionId = $sessionId let blockSize = if chunkSize.NBytes > 0.NBytes: chunkSize.NBytes else: DefaultBlockSize - let fut = node.store(lpStream, filenameOpt, mimetypeOpt, blockSize, onBlockStore) - - proc cb(_: pointer) {.raises: [].} = - try: - let session = uploadSessions[$sessionId] - if session.onProgress != nil: - discard session.onProgress(0) - except KeyError: - error "Failed to push the progress final state, session is not found.", - sessionId = $sessionId - - fut.addCallback(cb) + let fut = node.store(lpStream, filenameOpt, mimetypeOpt, blockSize, onBlockStored) uploadSessions[sessionId] = UploadSession( stream: stream, fut: fut, filepath: $filepath, chunkSize: blockSize.int @@ -178,19 +161,47 @@ proc chunk( ): Future[Result[string, string]] {.async: (raises: []).} = ## Upload a chunk of data to the session identified by sessionId. ## The chunk is pushed to the BufferStream of the session. + ## If the chunk size is equal or greater than the session chunkSize, + ## the `onProgress` callback is temporarily set to receive the progress + ## from `onBlockStored` callback. This provide a way to report progress + ## precisely when a block is stored. + ## If the chunk size is smaller than the session chunkSize, + ## the `onProgress` callback is not set because the LPStream will + ## wait until enough data is received to form a block before storing it. + ## The wrapper may then report the progress because the data is in the stream + ## but not yet stored. if not uploadSessions.contains($sessionId): return err("Failed to upload the chunk, the session is not found: " & $sessionId) + var fut = newFuture[void]() + try: let session = uploadSessions[$sessionId] - await session.stream.pushData(chunk) - except KeyError as e: + + if chunk.len >= session.chunkSize: + uploadSessions[$sessionId].onProgress = proc( + bytes: int + ): void {.gcsafe, raises: [].} = + fut.complete() + await session.stream.pushData(chunk) + else: + fut = session.stream.pushData(chunk) + + await fut + + uploadSessions[$sessionId].onProgress = nil + except KeyError: return err("Failed to upload the chunk, the session is not found: " & $sessionId) except LPError as e: return err("Failed to upload the chunk, stream error: " & $e.msg) - except CancelledError as e: + except CancelledError: return err("Failed to upload the chunk, operation cancelled.") + except CatchableError as e: + return err("Failed to upload the chunk: " & $e.msg) + finally: + if not fut.finished(): + fut.cancelSoon() return ok("") @@ -200,10 +211,6 @@ proc finalize( ## Finalize the upload session identified by sessionId. ## This closes the BufferStream and waits for the `node.store` future ## to complete. It returns the CID of the uploaded file. - ## - ## In the finally block, the cleanup section removes the session - ## from the table and cancels the future if it is not complete (in - ## case of errors). if not uploadSessions.contains($sessionId): return @@ -219,12 +226,12 @@ proc finalize( return err("Failed to finalize the upload session: " & res.error().msg) return ok($res.get()) - except KeyError as e: + except KeyError: return err("Failed to finalize the upload session, invalid session ID: " & $sessionId) except LPStreamError as e: return err("Failed to finalize the upload session, stream error: " & $e.msg) - except CancelledError as e: + except CancelledError: return err("Failed to finalize the upload session, operation cancelled") except CatchableError as e: return err("Failed to finalize the upload session: " & $e.msg) @@ -241,6 +248,9 @@ proc cancel( ## Cancel the upload session identified by sessionId. ## This cancels the `node.store` future and removes the session ## from the table. + ## This operation is not supported when uploading file because + ## the worker will be busy uploading the file so it cannot pickup + ## another request to cancel the upload. if not uploadSessions.contains($sessionId): return err("Failed to cancel the upload session, session not found: " & $sessionId) @@ -248,7 +258,7 @@ proc cancel( try: let session = uploadSessions[$sessionId] session.fut.cancelSoon() - except KeyError as e: + except KeyError: return err("Failed to cancel the upload session, invalid session ID: " & $sessionId) uploadSessions.del($sessionId) @@ -286,15 +296,12 @@ proc streamFile( proc file( codex: ptr CodexServer, sessionId: cstring, onProgress: OnProgressHandler -): Future[Result[string, string]] {.raises: [], async: (raises: []).} = +): Future[Result[string, string]] {.async: (raises: []).} = ## Starts the file upload for the session identified by sessionId. ## Will call finalize when done and return the CID of the uploaded file. - ## In the finally block, the cleanup section removes the session - ## from the table and cancels the future if it is not complete (in - ## case of errors). ## - ## If `onProgress` is provided, it is called with the number of bytes - ## received each time a block is stored thanks to `onBlockStore` callback. + ## The onProgress callback is called with the number of bytes + ## to report the progress of the upload. if not uploadSessions.contains($sessionId): return err("Failed to upload the file, invalid session ID: " & $sessionId) @@ -302,20 +309,20 @@ proc file( var session: UploadSession try: + uploadSessions[$sessionId].onProgress = onProgress session = uploadSessions[$sessionId] - if onProgress != nil: - uploadSessions[$sessionId].onProgress = onProgress + let res = await streamFile(session.filepath, session.stream, session.chunkSize) if res.isErr: return err("Failed to upload the file: " & res.error) return await codex.finalize(sessionId) - except KeyError as e: + except KeyError: return err("Failed to upload the file, the session is not found: " & $sessionId) except LPStreamError, IOError: let e = getCurrentException() return err("Failed to upload the file: " & $e.msg) - except CancelledError as e: + except CancelledError: return err("Failed to upload the file, the operation is cancelled.") except CatchableError as e: return err("Failed to upload the file: " & $e.msg) @@ -326,48 +333,10 @@ proc file( if session.fut != nil and not session.fut.finished(): session.fut.cancelSoon() -proc subscribe( - codex: ptr CodexServer, sessionId: cstring, onProgress: OnProgressHandler -): Future[Result[string, string]] {.raises: [], async: (raises: []).} = - ## Subscribes to progress updates for the upload session identified by sessionId. - ## The onProgress handler is called with the number of bytes received - ## each time a block is stored thanks to `onBlockStore` callback. - - if not uploadSessions.contains($sessionId): - return err( - "Failed to subscribe to the upload session, invalid session ID: " & $sessionId - ) - - let fut = newFuture[void]() - - proc onBlockReceived(bytes: int): Future[void] {.async: (raises: [CancelledError]).} = - try: - let session = uploadSessions[$sessionId] - await onProgress(bytes) - - if bytes == 0: - fut.complete() - except KeyError: - fut.cancelSoon() - error "Failed to push progress update, session is not found: ", - sessionId = $sessionId - - try: - uploadSessions[$sessionId].onProgress = onBlockReceived - except KeyError: - return err( - "Failed to subscribe to the upload session, session is not found: " & $sessionId - ) - - try: - await fut - except CatchableError as e: - return err("Failed to subscribe to the upload session: " & $e.msg) - - return ok("") - proc process*( - self: ptr NodeUploadRequest, codex: ptr CodexServer + self: ptr NodeUploadRequest, + codex: ptr CodexServer, + onUploadProgress: OnProgressHandler = nil, ): Future[Result[string, string]] {.async: (raises: []).} = defer: destroyShared(self) @@ -376,38 +345,30 @@ proc process*( of NodeUploadMsgType.INIT: let res = (await init(codex, self.filepath, self.chunkSize)) if res.isErr: - error "INIT failed", error = res.error + error "Failed to INIT.", error = res.error return err($res.error) return res of NodeUploadMsgType.CHUNK: let res = (await chunk(codex, self.sessionId, self.chunk)) if res.isErr: - error "CHUNK failed", error = res.error + error "Failed to CHUNK.", error = res.error return err($res.error) return res of NodeUploadMsgType.FINALIZE: let res = (await finalize(codex, self.sessionId)) if res.isErr: - error "FINALIZE failed", error = res.error + error "Failed to FINALIZE.", error = res.error return err($res.error) return res of NodeUploadMsgType.CANCEL: let res = (await cancel(codex, self.sessionId)) if res.isErr: - error "CANCEL failed", error = res.error + error "Failed to CANCEL.", error = res.error return err($res.error) return res of NodeUploadMsgType.FILE: - let res = (await file(codex, self.sessionId, self.onProgress)) + let res = (await file(codex, self.sessionId, onUploadProgress)) if res.isErr: - error "FILE failed", error = res.error + error "Failed to FILE.", error = res.error return err($res.error) return res - of NodeUploadMsgType.SUBSCRIBE: - let res = (await subscribe(codex, self.sessionId, self.onProgress)) - if res.isErr: - error "SUBSCRIBE failed", error = res.error - return err($res.error) - return res - - return ok("") diff --git a/library/libcodex.h b/library/libcodex.h index 8523e86a..3386c52b 100644 --- a/library/libcodex.h +++ b/library/libcodex.h @@ -115,9 +115,30 @@ int codex_upload_file( CodexCallback callback, void* userData); -int codex_upload_subscribe( +int codex_download_local( void* ctx, - const char* sessionId, + const char* cid, + size_t chunkSize, + CodexCallback callback, + void* userData); + +int codex_download_init( + void* ctx, + const char* cid, + size_t chunkSize, + bool local, + CodexCallback callback, + void* userData); + +int codex_download_chunk( + void* ctx, + const char* cid, + CodexCallback callback, + void* userData); + +int codex_download_cancel( + void* ctx, + const char* cid, CodexCallback callback, void* userData);