diff --git a/library/codex_thread_requests/requests/node_upload_request.nim b/library/codex_thread_requests/requests/node_upload_request.nim index 457d11bc..e7b2a64a 100644 --- a/library/codex_thread_requests/requests/node_upload_request.nim +++ b/library/codex_thread_requests/requests/node_upload_request.nim @@ -1,15 +1,36 @@ -## This file contains the lifecycle request type that will be handled. {.push raises: [].} +## This file contains the upload request. +## A session is created for each upload allowing to resume, +## pause (using chunks) and cancels uploads. +## +## There are two ways to upload a file: +## 1. Via chunks: the filepath parameter is the data filename. Steps are: +## - INIT: creates a new upload session and returns its ID. +## - CHUNK: sends a chunk of data to the upload session. +## - FINALIZE: finalizes the upload and returns the CID of the uploaded file. +## - CANCEL: cancels the upload session. +## +## 2. Directly from a file path: the filepath has to be absolute. +## - INIT: creates a new upload session and returns its ID +## - FILE: starts the upload and returns the CID of the uploaded file +## when the upload is done. +## - CANCEL: cancels the upload session. + import std/[options, os, mimetypes, streams] import chronos import chronicles -import libp2p +import questionable +import questionable/results +import faststreams/inputs +import libp2p/stream/[bufferstream, lpstream] import ../../alloc -import ../../../codex/streams -import ../../../codex/node +import ../../../codex/units +import ../../../codex/codextypes from ../../../codex/codex import CodexServer, node +from ../../../codex/node import store +from libp2p import Cid type NodeUploadMsgType* = enum INIT @@ -36,7 +57,6 @@ type stream: BufferStream fut: Future[?!Cid] filepath: string - chunkSize: int var uploadSessions {.threadvar.}: Table[UploadSessionId, UploadSession] var nexUploadSessionCount {.threadvar.}: UploadSessionCount @@ -64,18 +84,29 @@ proc destroyShared(self: ptr NodeUploadRequest) = deallocShared(self[].sessionId) deallocShared(self) -## Init upload create a new upload session and returns its ID. -## The session can be used to send chunks of data -## and to pause and resume the upload. -## filepath can be the absolute path to a file to upload directly, -## or it can be the filename when the file will be uploaded via chunks. -## The mimetype is deduced from the filename extension. proc init( codex: ptr CodexServer, filepath: cstring = "", chunkSize: csize_t = 0, onProgress: OnProgressHandler, ): Future[Result[string, string]] {.async: (raises: []).} = + ## Init a new session upload and return its ID. + ## The session contains the future corresponding to the + ## `node.store` call. + ## The filepath can be: + ## - the filename when uploading via chunks + ## - the absolute path to a file when uploading directly. + ## The mimetype is deduced from the filename extension. + ## + ## The chunkSize matches by default the block size used to store the file. + ## + ## An onProgress handler can be provided to get upload progress. + ## The handler is called with the size of the block stored in the node + ## when a new block is put in the node. + ## After the `node.store` future is completed, whether successfully or not, + ## the onProgress handler is called with -1 to signal the end of the upload. + ## This allows to clean up the cGo states. + var filenameOpt, mimetypeOpt = string.none if isAbsolute($filepath): @@ -120,15 +151,17 @@ proc init( fut.addCallback(cb) - uploadSessions[sessionId] = UploadSession( - stream: stream, fut: fut, filepath: $filepath, chunkSize: blockSize.int - ) + uploadSessions[sessionId] = + UploadSession(stream: stream, fut: fut, filepath: $filepath) return ok(sessionId) proc chunk( codex: ptr CodexServer, sessionId: cstring, chunk: seq[byte] ): Future[Result[string, string]] {.async: (raises: []).} = + ## Upload a chunk of data to the session identified by sessionId. + ## The chunk is pushed to the BufferStream of the session. + if not uploadSessions.contains($sessionId): return err("Invalid session ID") @@ -147,6 +180,14 @@ proc chunk( proc finalize( codex: ptr CodexServer, sessionId: cstring ): Future[Result[string, string]] {.async: (raises: []).} = + ## Finalize the upload session identified by sessionId. + ## This closes the BufferStream and waits for the `node.store` future + ## to complete. It returns the CID of the uploaded file. + ## + ## In the finally block, the cleanup section removes the session + ## from the table and cancels the future if it is not complete (in + ## case of errors). + if not uploadSessions.contains($sessionId): return err("Invalid session ID") @@ -154,27 +195,34 @@ proc finalize( try: session = uploadSessions[$sessionId] await session.stream.pushEof() + + let res = await session.fut + if res.isErr: + return err("Upload failed: " & res.error().msg) + + return ok($res.get()) except KeyError as e: return err("Invalid session ID") except LPStreamError as e: return err("Stream error: " & $e.msg) except CancelledError as e: return err("Operation cancelled") - - try: - let res = await session.fut - if res.isErr: - return err("Upload failed: " & res.error().msg) - - return ok($res.get()) except CatchableError as e: return err("Upload failed: " & $e.msg) finally: - uploadSessions.del($sessionId) + if uploadSessions.contains($sessionId): + uploadSessions.del($sessionId) + + if session.fut != nil and not session.fut.finished(): + session.fut.cancelSoon() proc cancel( codex: ptr CodexServer, sessionId: cstring ): Future[Result[string, string]] {.async: (raises: []).} = + ## Cancel the upload session identified by sessionId. + ## This cancels the `node.store` future and removes the session + ## from the table. + if not uploadSessions.contains($sessionId): return err("Invalid session ID") @@ -188,29 +236,48 @@ proc cancel( return ok("") +proc streamFile( + filepath: string, stream: BufferStream +): Future[Result[void, string]] {.async: (raises: [CancelledError]).} = + ## Streams a file from the given filepath using faststream. + ## fsMultiSync cannot be used with chronos because of this warning: + ## Warning: chronos backend uses nested calls to `waitFor` which + ## is not supported by chronos - it is not recommended to use it until + ## this has been resolved. + ## + ## Ideally when it is solved, we should use fsMultiSync or find a way to use async + ## file I/O with chronos, see https://github.com/status-im/nim-chronos/issues/501. + + try: + let inputStreamHandle = filePath.fileInput() + let inputStream = inputStreamHandle.implicitDeref + + while inputStream.readable: + let byt = inputStream.read + await stream.pushData(@[byt]) + return ok() + except IOError, OSError, LPStreamError: + let e = getCurrentException() + return err("Stream error: " & $e.msg) + proc file( codex: ptr CodexServer, sessionId: cstring ): Future[Result[string, string]] {.raises: [], async: (raises: []).} = + ## Starts the file upload for the session identified by sessionId. + ## Will call finalize when done and return the CID of the uploaded file. + ## In the finally block, the cleanup section removes the session + ## from the table and cancels the future if it is not complete (in + ## case of errors). if not uploadSessions.contains($sessionId): return err("Invalid session ID") var session: UploadSession - ## Here we certainly need to spawn a new thread to avoid blocking - ## the worker thread while reading the file. try: session = uploadSessions[$sessionId] - var buffer = newSeq[byte](session.chunkSize) - let fs = openFileStream(session.filepath) - defer: - fs.close() - - while true: - let bytesRead = fs.readData(addr buffer[0], buffer.len) - - if bytesRead == 0: - break - await session.stream.pushData(buffer[0 ..< bytesRead]) + let res = await streamFile(session.filepath, session.stream) + if res.isErr: + return err("Failed to stream file: " & res.error) return await codex.finalize(sessionId) except KeyError as e: @@ -223,8 +290,11 @@ proc file( except CatchableError as e: return err("Upload failed: " & $e.msg) finally: - session.fut.cancelSoon() - uploadSessions.del($sessionId) + if uploadSessions.contains($sessionId): + uploadSessions.del($sessionId) + + if session.fut != nil and not session.fut.finished(): + session.fut.cancelSoon() proc process*( self: ptr NodeUploadRequest, codex: ptr CodexServer