Use faststream, provide better cleanup and comments

This commit is contained in:
Arnaud 2025-09-23 14:22:45 +02:00
parent 1228b59cca
commit 82fea5ee4c
No known key found for this signature in database
GPG Key ID: B8FBC178F10CA7AE

View File

@ -1,15 +1,36 @@
## This file contains the lifecycle request type that will be handled.
{.push raises: [].}
## This file contains the upload request.
## A session is created for each upload allowing to resume,
## pause (using chunks) and cancels uploads.
##
## There are two ways to upload a file:
## 1. Via chunks: the filepath parameter is the data filename. Steps are:
## - INIT: creates a new upload session and returns its ID.
## - CHUNK: sends a chunk of data to the upload session.
## - FINALIZE: finalizes the upload and returns the CID of the uploaded file.
## - CANCEL: cancels the upload session.
##
## 2. Directly from a file path: the filepath has to be absolute.
## - INIT: creates a new upload session and returns its ID
## - FILE: starts the upload and returns the CID of the uploaded file
## when the upload is done.
## - CANCEL: cancels the upload session.
import std/[options, os, mimetypes, streams]
import chronos
import chronicles
import libp2p
import questionable
import questionable/results
import faststreams/inputs
import libp2p/stream/[bufferstream, lpstream]
import ../../alloc
import ../../../codex/streams
import ../../../codex/node
import ../../../codex/units
import ../../../codex/codextypes
from ../../../codex/codex import CodexServer, node
from ../../../codex/node import store
from libp2p import Cid
type NodeUploadMsgType* = enum
INIT
@ -36,7 +57,6 @@ type
stream: BufferStream
fut: Future[?!Cid]
filepath: string
chunkSize: int
var uploadSessions {.threadvar.}: Table[UploadSessionId, UploadSession]
var nexUploadSessionCount {.threadvar.}: UploadSessionCount
@ -64,18 +84,29 @@ proc destroyShared(self: ptr NodeUploadRequest) =
deallocShared(self[].sessionId)
deallocShared(self)
## Init upload create a new upload session and returns its ID.
## The session can be used to send chunks of data
## and to pause and resume the upload.
## filepath can be the absolute path to a file to upload directly,
## or it can be the filename when the file will be uploaded via chunks.
## The mimetype is deduced from the filename extension.
proc init(
codex: ptr CodexServer,
filepath: cstring = "",
chunkSize: csize_t = 0,
onProgress: OnProgressHandler,
): Future[Result[string, string]] {.async: (raises: []).} =
## Init a new session upload and return its ID.
## The session contains the future corresponding to the
## `node.store` call.
## The filepath can be:
## - the filename when uploading via chunks
## - the absolute path to a file when uploading directly.
## The mimetype is deduced from the filename extension.
##
## The chunkSize matches by default the block size used to store the file.
##
## An onProgress handler can be provided to get upload progress.
## The handler is called with the size of the block stored in the node
## when a new block is put in the node.
## After the `node.store` future is completed, whether successfully or not,
## the onProgress handler is called with -1 to signal the end of the upload.
## This allows to clean up the cGo states.
var filenameOpt, mimetypeOpt = string.none
if isAbsolute($filepath):
@ -120,15 +151,17 @@ proc init(
fut.addCallback(cb)
uploadSessions[sessionId] = UploadSession(
stream: stream, fut: fut, filepath: $filepath, chunkSize: blockSize.int
)
uploadSessions[sessionId] =
UploadSession(stream: stream, fut: fut, filepath: $filepath)
return ok(sessionId)
proc chunk(
codex: ptr CodexServer, sessionId: cstring, chunk: seq[byte]
): Future[Result[string, string]] {.async: (raises: []).} =
## Upload a chunk of data to the session identified by sessionId.
## The chunk is pushed to the BufferStream of the session.
if not uploadSessions.contains($sessionId):
return err("Invalid session ID")
@ -147,6 +180,14 @@ proc chunk(
proc finalize(
codex: ptr CodexServer, sessionId: cstring
): Future[Result[string, string]] {.async: (raises: []).} =
## Finalize the upload session identified by sessionId.
## This closes the BufferStream and waits for the `node.store` future
## to complete. It returns the CID of the uploaded file.
##
## In the finally block, the cleanup section removes the session
## from the table and cancels the future if it is not complete (in
## case of errors).
if not uploadSessions.contains($sessionId):
return err("Invalid session ID")
@ -154,27 +195,34 @@ proc finalize(
try:
session = uploadSessions[$sessionId]
await session.stream.pushEof()
let res = await session.fut
if res.isErr:
return err("Upload failed: " & res.error().msg)
return ok($res.get())
except KeyError as e:
return err("Invalid session ID")
except LPStreamError as e:
return err("Stream error: " & $e.msg)
except CancelledError as e:
return err("Operation cancelled")
try:
let res = await session.fut
if res.isErr:
return err("Upload failed: " & res.error().msg)
return ok($res.get())
except CatchableError as e:
return err("Upload failed: " & $e.msg)
finally:
uploadSessions.del($sessionId)
if uploadSessions.contains($sessionId):
uploadSessions.del($sessionId)
if session.fut != nil and not session.fut.finished():
session.fut.cancelSoon()
proc cancel(
codex: ptr CodexServer, sessionId: cstring
): Future[Result[string, string]] {.async: (raises: []).} =
## Cancel the upload session identified by sessionId.
## This cancels the `node.store` future and removes the session
## from the table.
if not uploadSessions.contains($sessionId):
return err("Invalid session ID")
@ -188,29 +236,48 @@ proc cancel(
return ok("")
proc streamFile(
filepath: string, stream: BufferStream
): Future[Result[void, string]] {.async: (raises: [CancelledError]).} =
## Streams a file from the given filepath using faststream.
## fsMultiSync cannot be used with chronos because of this warning:
## Warning: chronos backend uses nested calls to `waitFor` which
## is not supported by chronos - it is not recommended to use it until
## this has been resolved.
##
## Ideally when it is solved, we should use fsMultiSync or find a way to use async
## file I/O with chronos, see https://github.com/status-im/nim-chronos/issues/501.
try:
let inputStreamHandle = filePath.fileInput()
let inputStream = inputStreamHandle.implicitDeref
while inputStream.readable:
let byt = inputStream.read
await stream.pushData(@[byt])
return ok()
except IOError, OSError, LPStreamError:
let e = getCurrentException()
return err("Stream error: " & $e.msg)
proc file(
codex: ptr CodexServer, sessionId: cstring
): Future[Result[string, string]] {.raises: [], async: (raises: []).} =
## Starts the file upload for the session identified by sessionId.
## Will call finalize when done and return the CID of the uploaded file.
## In the finally block, the cleanup section removes the session
## from the table and cancels the future if it is not complete (in
## case of errors).
if not uploadSessions.contains($sessionId):
return err("Invalid session ID")
var session: UploadSession
## Here we certainly need to spawn a new thread to avoid blocking
## the worker thread while reading the file.
try:
session = uploadSessions[$sessionId]
var buffer = newSeq[byte](session.chunkSize)
let fs = openFileStream(session.filepath)
defer:
fs.close()
while true:
let bytesRead = fs.readData(addr buffer[0], buffer.len)
if bytesRead == 0:
break
await session.stream.pushData(buffer[0 ..< bytesRead])
let res = await streamFile(session.filepath, session.stream)
if res.isErr:
return err("Failed to stream file: " & res.error)
return await codex.finalize(sessionId)
except KeyError as e:
@ -223,8 +290,11 @@ proc file(
except CatchableError as e:
return err("Upload failed: " & $e.msg)
finally:
session.fut.cancelSoon()
uploadSessions.del($sessionId)
if uploadSessions.contains($sessionId):
uploadSessions.del($sessionId)
if session.fut != nil and not session.fut.finished():
session.fut.cancelSoon()
proc process*(
self: ptr NodeUploadRequest, codex: ptr CodexServer