mirror of
https://github.com/logos-storage/logos-storage-nim.git
synced 2026-01-04 06:23:06 +00:00
Use faststream, provide better cleanup and comments
This commit is contained in:
parent
54f71890c7
commit
be76f4abbf
@ -1,15 +1,36 @@
|
|||||||
## This file contains the lifecycle request type that will be handled.
|
|
||||||
{.push raises: [].}
|
{.push raises: [].}
|
||||||
|
|
||||||
|
## This file contains the upload request.
|
||||||
|
## A session is created for each upload allowing to resume,
|
||||||
|
## pause (using chunks) and cancels uploads.
|
||||||
|
##
|
||||||
|
## There are two ways to upload a file:
|
||||||
|
## 1. Via chunks: the filepath parameter is the data filename. Steps are:
|
||||||
|
## - INIT: creates a new upload session and returns its ID.
|
||||||
|
## - CHUNK: sends a chunk of data to the upload session.
|
||||||
|
## - FINALIZE: finalizes the upload and returns the CID of the uploaded file.
|
||||||
|
## - CANCEL: cancels the upload session.
|
||||||
|
##
|
||||||
|
## 2. Directly from a file path: the filepath has to be absolute.
|
||||||
|
## - INIT: creates a new upload session and returns its ID
|
||||||
|
## - FILE: starts the upload and returns the CID of the uploaded file
|
||||||
|
## when the upload is done.
|
||||||
|
## - CANCEL: cancels the upload session.
|
||||||
|
|
||||||
import std/[options, os, mimetypes, streams]
|
import std/[options, os, mimetypes, streams]
|
||||||
import chronos
|
import chronos
|
||||||
import chronicles
|
import chronicles
|
||||||
import libp2p
|
import questionable
|
||||||
|
import questionable/results
|
||||||
|
import faststreams/inputs
|
||||||
|
import libp2p/stream/[bufferstream, lpstream]
|
||||||
import ../../alloc
|
import ../../alloc
|
||||||
import ../../../codex/streams
|
import ../../../codex/units
|
||||||
import ../../../codex/node
|
import ../../../codex/codextypes
|
||||||
|
|
||||||
from ../../../codex/codex import CodexServer, node
|
from ../../../codex/codex import CodexServer, node
|
||||||
|
from ../../../codex/node import store
|
||||||
|
from libp2p import Cid
|
||||||
|
|
||||||
type NodeUploadMsgType* = enum
|
type NodeUploadMsgType* = enum
|
||||||
INIT
|
INIT
|
||||||
@ -36,7 +57,6 @@ type
|
|||||||
stream: BufferStream
|
stream: BufferStream
|
||||||
fut: Future[?!Cid]
|
fut: Future[?!Cid]
|
||||||
filepath: string
|
filepath: string
|
||||||
chunkSize: int
|
|
||||||
|
|
||||||
var uploadSessions {.threadvar.}: Table[UploadSessionId, UploadSession]
|
var uploadSessions {.threadvar.}: Table[UploadSessionId, UploadSession]
|
||||||
var nexUploadSessionCount {.threadvar.}: UploadSessionCount
|
var nexUploadSessionCount {.threadvar.}: UploadSessionCount
|
||||||
@ -64,18 +84,29 @@ proc destroyShared(self: ptr NodeUploadRequest) =
|
|||||||
deallocShared(self[].sessionId)
|
deallocShared(self[].sessionId)
|
||||||
deallocShared(self)
|
deallocShared(self)
|
||||||
|
|
||||||
## Init upload create a new upload session and returns its ID.
|
|
||||||
## The session can be used to send chunks of data
|
|
||||||
## and to pause and resume the upload.
|
|
||||||
## filepath can be the absolute path to a file to upload directly,
|
|
||||||
## or it can be the filename when the file will be uploaded via chunks.
|
|
||||||
## The mimetype is deduced from the filename extension.
|
|
||||||
proc init(
|
proc init(
|
||||||
codex: ptr CodexServer,
|
codex: ptr CodexServer,
|
||||||
filepath: cstring = "",
|
filepath: cstring = "",
|
||||||
chunkSize: csize_t = 0,
|
chunkSize: csize_t = 0,
|
||||||
onProgress: OnProgressHandler,
|
onProgress: OnProgressHandler,
|
||||||
): Future[Result[string, string]] {.async: (raises: []).} =
|
): Future[Result[string, string]] {.async: (raises: []).} =
|
||||||
|
## Init a new session upload and return its ID.
|
||||||
|
## The session contains the future corresponding to the
|
||||||
|
## `node.store` call.
|
||||||
|
## The filepath can be:
|
||||||
|
## - the filename when uploading via chunks
|
||||||
|
## - the absolute path to a file when uploading directly.
|
||||||
|
## The mimetype is deduced from the filename extension.
|
||||||
|
##
|
||||||
|
## The chunkSize matches by default the block size used to store the file.
|
||||||
|
##
|
||||||
|
## An onProgress handler can be provided to get upload progress.
|
||||||
|
## The handler is called with the size of the block stored in the node
|
||||||
|
## when a new block is put in the node.
|
||||||
|
## After the `node.store` future is completed, whether successfully or not,
|
||||||
|
## the onProgress handler is called with -1 to signal the end of the upload.
|
||||||
|
## This allows to clean up the cGo states.
|
||||||
|
|
||||||
var filenameOpt, mimetypeOpt = string.none
|
var filenameOpt, mimetypeOpt = string.none
|
||||||
|
|
||||||
if isAbsolute($filepath):
|
if isAbsolute($filepath):
|
||||||
@ -120,15 +151,17 @@ proc init(
|
|||||||
|
|
||||||
fut.addCallback(cb)
|
fut.addCallback(cb)
|
||||||
|
|
||||||
uploadSessions[sessionId] = UploadSession(
|
uploadSessions[sessionId] =
|
||||||
stream: stream, fut: fut, filepath: $filepath, chunkSize: blockSize.int
|
UploadSession(stream: stream, fut: fut, filepath: $filepath)
|
||||||
)
|
|
||||||
|
|
||||||
return ok(sessionId)
|
return ok(sessionId)
|
||||||
|
|
||||||
proc chunk(
|
proc chunk(
|
||||||
codex: ptr CodexServer, sessionId: cstring, chunk: seq[byte]
|
codex: ptr CodexServer, sessionId: cstring, chunk: seq[byte]
|
||||||
): Future[Result[string, string]] {.async: (raises: []).} =
|
): Future[Result[string, string]] {.async: (raises: []).} =
|
||||||
|
## Upload a chunk of data to the session identified by sessionId.
|
||||||
|
## The chunk is pushed to the BufferStream of the session.
|
||||||
|
|
||||||
if not uploadSessions.contains($sessionId):
|
if not uploadSessions.contains($sessionId):
|
||||||
return err("Invalid session ID")
|
return err("Invalid session ID")
|
||||||
|
|
||||||
@ -147,6 +180,14 @@ proc chunk(
|
|||||||
proc finalize(
|
proc finalize(
|
||||||
codex: ptr CodexServer, sessionId: cstring
|
codex: ptr CodexServer, sessionId: cstring
|
||||||
): Future[Result[string, string]] {.async: (raises: []).} =
|
): Future[Result[string, string]] {.async: (raises: []).} =
|
||||||
|
## Finalize the upload session identified by sessionId.
|
||||||
|
## This closes the BufferStream and waits for the `node.store` future
|
||||||
|
## to complete. It returns the CID of the uploaded file.
|
||||||
|
##
|
||||||
|
## In the finally block, the cleanup section removes the session
|
||||||
|
## from the table and cancels the future if it is not complete (in
|
||||||
|
## case of errors).
|
||||||
|
|
||||||
if not uploadSessions.contains($sessionId):
|
if not uploadSessions.contains($sessionId):
|
||||||
return err("Invalid session ID")
|
return err("Invalid session ID")
|
||||||
|
|
||||||
@ -154,27 +195,34 @@ proc finalize(
|
|||||||
try:
|
try:
|
||||||
session = uploadSessions[$sessionId]
|
session = uploadSessions[$sessionId]
|
||||||
await session.stream.pushEof()
|
await session.stream.pushEof()
|
||||||
|
|
||||||
|
let res = await session.fut
|
||||||
|
if res.isErr:
|
||||||
|
return err("Upload failed: " & res.error().msg)
|
||||||
|
|
||||||
|
return ok($res.get())
|
||||||
except KeyError as e:
|
except KeyError as e:
|
||||||
return err("Invalid session ID")
|
return err("Invalid session ID")
|
||||||
except LPStreamError as e:
|
except LPStreamError as e:
|
||||||
return err("Stream error: " & $e.msg)
|
return err("Stream error: " & $e.msg)
|
||||||
except CancelledError as e:
|
except CancelledError as e:
|
||||||
return err("Operation cancelled")
|
return err("Operation cancelled")
|
||||||
|
|
||||||
try:
|
|
||||||
let res = await session.fut
|
|
||||||
if res.isErr:
|
|
||||||
return err("Upload failed: " & res.error().msg)
|
|
||||||
|
|
||||||
return ok($res.get())
|
|
||||||
except CatchableError as e:
|
except CatchableError as e:
|
||||||
return err("Upload failed: " & $e.msg)
|
return err("Upload failed: " & $e.msg)
|
||||||
finally:
|
finally:
|
||||||
uploadSessions.del($sessionId)
|
if uploadSessions.contains($sessionId):
|
||||||
|
uploadSessions.del($sessionId)
|
||||||
|
|
||||||
|
if session.fut != nil and not session.fut.finished():
|
||||||
|
session.fut.cancelSoon()
|
||||||
|
|
||||||
proc cancel(
|
proc cancel(
|
||||||
codex: ptr CodexServer, sessionId: cstring
|
codex: ptr CodexServer, sessionId: cstring
|
||||||
): Future[Result[string, string]] {.async: (raises: []).} =
|
): Future[Result[string, string]] {.async: (raises: []).} =
|
||||||
|
## Cancel the upload session identified by sessionId.
|
||||||
|
## This cancels the `node.store` future and removes the session
|
||||||
|
## from the table.
|
||||||
|
|
||||||
if not uploadSessions.contains($sessionId):
|
if not uploadSessions.contains($sessionId):
|
||||||
return err("Invalid session ID")
|
return err("Invalid session ID")
|
||||||
|
|
||||||
@ -188,29 +236,48 @@ proc cancel(
|
|||||||
|
|
||||||
return ok("")
|
return ok("")
|
||||||
|
|
||||||
|
proc streamFile(
|
||||||
|
filepath: string, stream: BufferStream
|
||||||
|
): Future[Result[void, string]] {.async: (raises: [CancelledError]).} =
|
||||||
|
## Streams a file from the given filepath using faststream.
|
||||||
|
## fsMultiSync cannot be used with chronos because of this warning:
|
||||||
|
## Warning: chronos backend uses nested calls to `waitFor` which
|
||||||
|
## is not supported by chronos - it is not recommended to use it until
|
||||||
|
## this has been resolved.
|
||||||
|
##
|
||||||
|
## Ideally when it is solved, we should use fsMultiSync or find a way to use async
|
||||||
|
## file I/O with chronos, see https://github.com/status-im/nim-chronos/issues/501.
|
||||||
|
|
||||||
|
try:
|
||||||
|
let inputStreamHandle = filePath.fileInput()
|
||||||
|
let inputStream = inputStreamHandle.implicitDeref
|
||||||
|
|
||||||
|
while inputStream.readable:
|
||||||
|
let byt = inputStream.read
|
||||||
|
await stream.pushData(@[byt])
|
||||||
|
return ok()
|
||||||
|
except IOError, OSError, LPStreamError:
|
||||||
|
let e = getCurrentException()
|
||||||
|
return err("Stream error: " & $e.msg)
|
||||||
|
|
||||||
proc file(
|
proc file(
|
||||||
codex: ptr CodexServer, sessionId: cstring
|
codex: ptr CodexServer, sessionId: cstring
|
||||||
): Future[Result[string, string]] {.raises: [], async: (raises: []).} =
|
): Future[Result[string, string]] {.raises: [], async: (raises: []).} =
|
||||||
|
## Starts the file upload for the session identified by sessionId.
|
||||||
|
## Will call finalize when done and return the CID of the uploaded file.
|
||||||
|
## In the finally block, the cleanup section removes the session
|
||||||
|
## from the table and cancels the future if it is not complete (in
|
||||||
|
## case of errors).
|
||||||
if not uploadSessions.contains($sessionId):
|
if not uploadSessions.contains($sessionId):
|
||||||
return err("Invalid session ID")
|
return err("Invalid session ID")
|
||||||
|
|
||||||
var session: UploadSession
|
var session: UploadSession
|
||||||
|
|
||||||
## Here we certainly need to spawn a new thread to avoid blocking
|
|
||||||
## the worker thread while reading the file.
|
|
||||||
try:
|
try:
|
||||||
session = uploadSessions[$sessionId]
|
session = uploadSessions[$sessionId]
|
||||||
var buffer = newSeq[byte](session.chunkSize)
|
let res = await streamFile(session.filepath, session.stream)
|
||||||
let fs = openFileStream(session.filepath)
|
if res.isErr:
|
||||||
defer:
|
return err("Failed to stream file: " & res.error)
|
||||||
fs.close()
|
|
||||||
|
|
||||||
while true:
|
|
||||||
let bytesRead = fs.readData(addr buffer[0], buffer.len)
|
|
||||||
|
|
||||||
if bytesRead == 0:
|
|
||||||
break
|
|
||||||
await session.stream.pushData(buffer[0 ..< bytesRead])
|
|
||||||
|
|
||||||
return await codex.finalize(sessionId)
|
return await codex.finalize(sessionId)
|
||||||
except KeyError as e:
|
except KeyError as e:
|
||||||
@ -223,8 +290,11 @@ proc file(
|
|||||||
except CatchableError as e:
|
except CatchableError as e:
|
||||||
return err("Upload failed: " & $e.msg)
|
return err("Upload failed: " & $e.msg)
|
||||||
finally:
|
finally:
|
||||||
session.fut.cancelSoon()
|
if uploadSessions.contains($sessionId):
|
||||||
uploadSessions.del($sessionId)
|
uploadSessions.del($sessionId)
|
||||||
|
|
||||||
|
if session.fut != nil and not session.fut.finished():
|
||||||
|
session.fut.cancelSoon()
|
||||||
|
|
||||||
proc process*(
|
proc process*(
|
||||||
self: ptr NodeUploadRequest, codex: ptr CodexServer
|
self: ptr NodeUploadRequest, codex: ptr CodexServer
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user