Refactore upload for better memory management with thread and avoid segfault

This commit is contained in:
Arnaud 2025-09-29 15:23:11 +02:00 committed by Eric
parent 349d0a326f
commit 2c3849f119
No known key found for this signature in database
3 changed files with 98 additions and 113 deletions

View File

@ -54,7 +54,7 @@ proc createShared*(
# We can improve this by dispatching the callbacks to a thread pool or
# moving to a MP channel.
# See: https://github.com/codex-storage/nim-codex/pull/1322#discussion_r2340708316
proc handleRes[T: string | void](
proc handleRes[T: string | void | seq[byte]](
res: Result[T, string], request: ptr CodexThreadRequest
) =
## Handles the Result responses, which can either be Result[string, string] or
@ -95,7 +95,10 @@ proc process*(
of P2P:
cast[ptr NodeP2PRequest](request[].reqContent).process(codex)
of UPLOAD:
cast[ptr NodeUploadRequest](request[].reqContent).process(codex)
let onUploadProgress = proc(bytes: int) =
request[].callback(RET_PROGRESS, nil, cast[csize_t](bytes), request[].userData)
cast[ptr NodeUploadRequest](request[].reqContent).process(codex, onUploadProgress)
handleRes(await retFut, request)

View File

@ -10,15 +10,14 @@
## - CHUNK: sends a chunk of data to the upload session.
## - FINALIZE: finalizes the upload and returns the CID of the uploaded file.
## - CANCEL: cancels the upload session.
## - SUBSCRIBE: subscribes to progress updates for the upload session.
##
## 2. Directly from a file path: the filepath has to be absolute.
## - INIT: creates a new upload session and returns its ID
## - FILE: starts the upload and returns the CID of the uploaded file
## when the upload is done.
## - CANCEL: cancels the upload session.
## Cancel is not supported in this mode because the worker will be busy
## uploading the file so it cannot pickup another request to cancel the upload.
import std/[options, os, mimetypes, streams]
import std/[options, os, mimetypes]
import chronos
import chronicles
import questionable
@ -31,7 +30,7 @@ import ../../../codex/codextypes
from ../../../codex/codex import CodexServer, node
from ../../../codex/node import store
from libp2p import Cid
from libp2p import Cid, `$`
logScope:
topics = "codexlib codexlibupload"
@ -42,10 +41,8 @@ type NodeUploadMsgType* = enum
FINALIZE
CANCEL
FILE
SUBSCRIBE
type OnProgressHandler =
proc(bytes: int): Future[void] {.gcsafe, async: (raises: [CancelledError]).}
type OnProgressHandler = proc(bytes: int): void {.gcsafe, raises: [].}
type NodeUploadRequest* = object
operation: NodeUploadMsgType
@ -53,7 +50,6 @@ type NodeUploadRequest* = object
filepath: cstring
chunk: seq[byte]
chunkSize: csize_t
onProgress: OnProgressHandler
type
UploadSessionId* = string
@ -74,7 +70,6 @@ proc createShared*(
sessionId: cstring = "",
filepath: cstring = "",
chunk: seq[byte] = @[],
onProgress: OnProgressHandler = nil,
chunkSize: csize_t = 0,
): ptr type T =
var ret = createShared(T)
@ -83,7 +78,7 @@ proc createShared*(
ret[].filepath = filepath.alloc()
ret[].chunk = chunk
ret[].chunkSize = chunkSize
ret[].onProgress = onProgress
return ret
proc destroyShared(self: ptr NodeUploadRequest) =
@ -104,11 +99,11 @@ proc init(
##
## The chunkSize matches by default the block size used to store the file.
##
## When a session contains an onProgress handler, it is called
## with the number of bytes received each time a block is stored thanks to
## `onBlockStore` callback.
## After the `node.store` future is done, the onProgress handler
## is called one last time with 0 bytes to signal the end of the upload.
## A callback `onBlockStore` is provided to `node.store` to
## report the progress of the upload. This callback will check
## that an `onProgress` handler is set in the session
## and call it with the number of bytes stored each time a block
## is stored.
var filenameOpt, mimetypeOpt = string.none
@ -141,31 +136,19 @@ proc init(
let lpStream = LPStream(stream)
let node = codex[].node
let onBlockStore = proc(
chunk: seq[byte]
): Future[void] {.gcsafe, async: (raises: [CancelledError]).} =
let onBlockStored = proc(chunk: seq[byte]): void {.gcsafe, raises: [].} =
try:
let session = uploadSessions[$sessionId]
if session.onProgress != nil:
await session.onProgress(chunk.len)
if uploadSessions.contains($sessionId):
let session = uploadSessions[$sessionId]
if session.onProgress != nil:
session.onProgress(chunk.len)
except KeyError:
error "Failed to push progress update, session is not found: ",
sessionId = $sessionId
let blockSize =
if chunkSize.NBytes > 0.NBytes: chunkSize.NBytes else: DefaultBlockSize
let fut = node.store(lpStream, filenameOpt, mimetypeOpt, blockSize, onBlockStore)
proc cb(_: pointer) {.raises: [].} =
try:
let session = uploadSessions[$sessionId]
if session.onProgress != nil:
discard session.onProgress(0)
except KeyError:
error "Failed to push the progress final state, session is not found.",
sessionId = $sessionId
fut.addCallback(cb)
let fut = node.store(lpStream, filenameOpt, mimetypeOpt, blockSize, onBlockStored)
uploadSessions[sessionId] = UploadSession(
stream: stream, fut: fut, filepath: $filepath, chunkSize: blockSize.int
@ -178,19 +161,47 @@ proc chunk(
): Future[Result[string, string]] {.async: (raises: []).} =
## Upload a chunk of data to the session identified by sessionId.
## The chunk is pushed to the BufferStream of the session.
## If the chunk size is equal or greater than the session chunkSize,
## the `onProgress` callback is temporarily set to receive the progress
## from `onBlockStored` callback. This provide a way to report progress
## precisely when a block is stored.
## If the chunk size is smaller than the session chunkSize,
## the `onProgress` callback is not set because the LPStream will
## wait until enough data is received to form a block before storing it.
## The wrapper may then report the progress because the data is in the stream
## but not yet stored.
if not uploadSessions.contains($sessionId):
return err("Failed to upload the chunk, the session is not found: " & $sessionId)
var fut = newFuture[void]()
try:
let session = uploadSessions[$sessionId]
await session.stream.pushData(chunk)
except KeyError as e:
if chunk.len >= session.chunkSize:
uploadSessions[$sessionId].onProgress = proc(
bytes: int
): void {.gcsafe, raises: [].} =
fut.complete()
await session.stream.pushData(chunk)
else:
fut = session.stream.pushData(chunk)
await fut
uploadSessions[$sessionId].onProgress = nil
except KeyError:
return err("Failed to upload the chunk, the session is not found: " & $sessionId)
except LPError as e:
return err("Failed to upload the chunk, stream error: " & $e.msg)
except CancelledError as e:
except CancelledError:
return err("Failed to upload the chunk, operation cancelled.")
except CatchableError as e:
return err("Failed to upload the chunk: " & $e.msg)
finally:
if not fut.finished():
fut.cancelSoon()
return ok("")
@ -200,10 +211,6 @@ proc finalize(
## Finalize the upload session identified by sessionId.
## This closes the BufferStream and waits for the `node.store` future
## to complete. It returns the CID of the uploaded file.
##
## In the finally block, the cleanup section removes the session
## from the table and cancels the future if it is not complete (in
## case of errors).
if not uploadSessions.contains($sessionId):
return
@ -219,12 +226,12 @@ proc finalize(
return err("Failed to finalize the upload session: " & res.error().msg)
return ok($res.get())
except KeyError as e:
except KeyError:
return
err("Failed to finalize the upload session, invalid session ID: " & $sessionId)
except LPStreamError as e:
return err("Failed to finalize the upload session, stream error: " & $e.msg)
except CancelledError as e:
except CancelledError:
return err("Failed to finalize the upload session, operation cancelled")
except CatchableError as e:
return err("Failed to finalize the upload session: " & $e.msg)
@ -241,6 +248,9 @@ proc cancel(
## Cancel the upload session identified by sessionId.
## This cancels the `node.store` future and removes the session
## from the table.
## This operation is not supported when uploading file because
## the worker will be busy uploading the file so it cannot pickup
## another request to cancel the upload.
if not uploadSessions.contains($sessionId):
return err("Failed to cancel the upload session, session not found: " & $sessionId)
@ -248,7 +258,7 @@ proc cancel(
try:
let session = uploadSessions[$sessionId]
session.fut.cancelSoon()
except KeyError as e:
except KeyError:
return err("Failed to cancel the upload session, invalid session ID: " & $sessionId)
uploadSessions.del($sessionId)
@ -286,15 +296,12 @@ proc streamFile(
proc file(
codex: ptr CodexServer, sessionId: cstring, onProgress: OnProgressHandler
): Future[Result[string, string]] {.raises: [], async: (raises: []).} =
): Future[Result[string, string]] {.async: (raises: []).} =
## Starts the file upload for the session identified by sessionId.
## Will call finalize when done and return the CID of the uploaded file.
## In the finally block, the cleanup section removes the session
## from the table and cancels the future if it is not complete (in
## case of errors).
##
## If `onProgress` is provided, it is called with the number of bytes
## received each time a block is stored thanks to `onBlockStore` callback.
## The onProgress callback is called with the number of bytes
## to report the progress of the upload.
if not uploadSessions.contains($sessionId):
return err("Failed to upload the file, invalid session ID: " & $sessionId)
@ -302,20 +309,20 @@ proc file(
var session: UploadSession
try:
uploadSessions[$sessionId].onProgress = onProgress
session = uploadSessions[$sessionId]
if onProgress != nil:
uploadSessions[$sessionId].onProgress = onProgress
let res = await streamFile(session.filepath, session.stream, session.chunkSize)
if res.isErr:
return err("Failed to upload the file: " & res.error)
return await codex.finalize(sessionId)
except KeyError as e:
except KeyError:
return err("Failed to upload the file, the session is not found: " & $sessionId)
except LPStreamError, IOError:
let e = getCurrentException()
return err("Failed to upload the file: " & $e.msg)
except CancelledError as e:
except CancelledError:
return err("Failed to upload the file, the operation is cancelled.")
except CatchableError as e:
return err("Failed to upload the file: " & $e.msg)
@ -326,48 +333,10 @@ proc file(
if session.fut != nil and not session.fut.finished():
session.fut.cancelSoon()
proc subscribe(
codex: ptr CodexServer, sessionId: cstring, onProgress: OnProgressHandler
): Future[Result[string, string]] {.raises: [], async: (raises: []).} =
## Subscribes to progress updates for the upload session identified by sessionId.
## The onProgress handler is called with the number of bytes received
## each time a block is stored thanks to `onBlockStore` callback.
if not uploadSessions.contains($sessionId):
return err(
"Failed to subscribe to the upload session, invalid session ID: " & $sessionId
)
let fut = newFuture[void]()
proc onBlockReceived(bytes: int): Future[void] {.async: (raises: [CancelledError]).} =
try:
let session = uploadSessions[$sessionId]
await onProgress(bytes)
if bytes == 0:
fut.complete()
except KeyError:
fut.cancelSoon()
error "Failed to push progress update, session is not found: ",
sessionId = $sessionId
try:
uploadSessions[$sessionId].onProgress = onBlockReceived
except KeyError:
return err(
"Failed to subscribe to the upload session, session is not found: " & $sessionId
)
try:
await fut
except CatchableError as e:
return err("Failed to subscribe to the upload session: " & $e.msg)
return ok("")
proc process*(
self: ptr NodeUploadRequest, codex: ptr CodexServer
self: ptr NodeUploadRequest,
codex: ptr CodexServer,
onUploadProgress: OnProgressHandler = nil,
): Future[Result[string, string]] {.async: (raises: []).} =
defer:
destroyShared(self)
@ -376,38 +345,30 @@ proc process*(
of NodeUploadMsgType.INIT:
let res = (await init(codex, self.filepath, self.chunkSize))
if res.isErr:
error "INIT failed", error = res.error
error "Failed to INIT.", error = res.error
return err($res.error)
return res
of NodeUploadMsgType.CHUNK:
let res = (await chunk(codex, self.sessionId, self.chunk))
if res.isErr:
error "CHUNK failed", error = res.error
error "Failed to CHUNK.", error = res.error
return err($res.error)
return res
of NodeUploadMsgType.FINALIZE:
let res = (await finalize(codex, self.sessionId))
if res.isErr:
error "FINALIZE failed", error = res.error
error "Failed to FINALIZE.", error = res.error
return err($res.error)
return res
of NodeUploadMsgType.CANCEL:
let res = (await cancel(codex, self.sessionId))
if res.isErr:
error "CANCEL failed", error = res.error
error "Failed to CANCEL.", error = res.error
return err($res.error)
return res
of NodeUploadMsgType.FILE:
let res = (await file(codex, self.sessionId, self.onProgress))
let res = (await file(codex, self.sessionId, onUploadProgress))
if res.isErr:
error "FILE failed", error = res.error
error "Failed to FILE.", error = res.error
return err($res.error)
return res
of NodeUploadMsgType.SUBSCRIBE:
let res = (await subscribe(codex, self.sessionId, self.onProgress))
if res.isErr:
error "SUBSCRIBE failed", error = res.error
return err($res.error)
return res
return ok("")

View File

@ -115,9 +115,30 @@ int codex_upload_file(
CodexCallback callback,
void* userData);
int codex_upload_subscribe(
int codex_download_local(
void* ctx,
const char* sessionId,
const char* cid,
size_t chunkSize,
CodexCallback callback,
void* userData);
int codex_download_init(
void* ctx,
const char* cid,
size_t chunkSize,
bool local,
CodexCallback callback,
void* userData);
int codex_download_chunk(
void* ctx,
const char* cid,
CodexCallback callback,
void* userData);
int codex_download_cancel(
void* ctx,
const char* cid,
CodexCallback callback,
void* userData);